about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/LeiXSearch.pm63
1 files changed, 37 insertions, 26 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 692d5e54..9d367977 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -111,7 +111,7 @@ sub _mset_more ($$) {
 }
 
 # $startq will EOF when do_augment is done augmenting and allow
-# query_mset and query_thread_mset to proceed.
+# query_combined_mset and query_thread_mset to proceed.
 sub wait_startq ($) {
         my ($lei) = @_;
         my $startq = delete $lei->{startq} or return;
@@ -144,9 +144,9 @@ sub mset_progress {
         }
 }
 
-sub query_thread_mset { # for --threads
+sub query_one_mset { # for --threads and l2m w/o sort
         my ($self, $ibxish) = @_;
-        local $0 = "$0 query_thread_mset";
+        local $0 = "$0 query_one_mset";
         my $lei = $self->{lei};
         my ($srch, $over) = ($ibxish->search, $ibxish->over);
         my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
@@ -155,41 +155,51 @@ sub query_thread_mset { # for --threads
         my $mset;
         my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
         my $can_kw = !!$ibxish->can('msg_keywords');
-        my $fl = $lei->{opt}->{threads} > 1 ? 1 : undef;
+        my $threads = $lei->{opt}->{threads} // 0;
+        my $fl = $threads > 1 ? 1 : undef;
         do {
                 $mset = $srch->mset($mo->{qstr}, $mo);
                 mset_progress($lei, $desc, $mset->size,
                                 $mset->get_matches_estimated);
                 wait_startq($lei); # wait for keyword updates
                 my $ids = $srch->mset_to_artnums($mset, $mo);
-                my $ctx = { ids => $ids };
                 my $i = 0;
-                my %n2item = map { ($ids->[$i++], $_) } $mset->items;
-                while ($over->expand_thread($ctx)) {
-                        for my $n (@{$ctx->{xids}}) {
-                                my $smsg = $over->get_art($n) or next;
-                                my $mitem = delete $n2item{$smsg->{num}};
-                                next if $smsg->{bytes} == 0;
-                                if ($mitem) {
-                                        if ($can_kw) {
+                if ($threads) {
+                        my $ctx = { ids => $ids };
+                        my %n2item = map { ($ids->[$i++], $_) } $mset->items;
+                        while ($over->expand_thread($ctx)) {
+                                for my $n (@{$ctx->{xids}}) {
+                                        my $smsg = $over->get_art($n) or next;
+                                        my $mitem = delete $n2item{$n};
+                                        next if $smsg->{bytes} == 0;
+                                        if ($mitem && $can_kw) {
                                                 mitem_kw($smsg, $mitem, $fl);
-                                        } elsif ($fl) {
+                                        } elsif ($mitem && $fl) {
                                                 # call ->xsmsg_vmd, later
                                                 $smsg->{lei_q_tt_flagged} = 1;
                                         }
+                                        $each_smsg->($smsg, $mitem);
                                 }
+                                @{$ctx->{xids}} = ();
+                        }
+                } else {
+                        my @items = $mset->items;
+                        for my $n (@$ids) {
+                                my $mitem = $items[$i++];
+                                my $smsg = $over->get_art($n) or next;
+                                next if $smsg->{bytes} == 0;
+                                mitem_kw($smsg, $mitem, $fl) if $can_kw;
                                 $each_smsg->($smsg, $mitem);
                         }
-                        @{$ctx->{xids}} = ();
                 }
         } while (_mset_more($mset, $mo));
-        undef $each_smsg; # drops @io for l2m->{each_smsg_done}
+        undef $each_smsg; # may commit
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
-sub query_mset { # non-parallel for non-"--threads" users
+sub query_combined_mset { # non-parallel for non-"--threads" users
         my ($self) = @_;
-        local $0 = "$0 query_mset";
+        local $0 = "$0 query_combined_mset";
         my $lei = $self->{lei};
         my $mo = { %{$lei->{mset_opt}} };
         my $mset;
@@ -207,7 +217,7 @@ sub query_mset { # non-parallel for non-"--threads" users
                         $each_smsg->($smsg, $mitem);
                 }
         } while (_mset_more($mset, $mo));
-        undef $each_smsg; # drops @io for l2m->{each_smsg_done}
+        undef $each_smsg; # may commit
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
@@ -379,14 +389,14 @@ sub concurrency {
         $nl + $nr;
 }
 
-sub start_query { # always runs in main (lei-daemon) process
-        my ($self) = @_;
-        if ($self->{threads}) {
+sub start_query ($;$) { # always runs in main (lei-daemon) process
+        my ($self, $l2m) = @_;
+        if ($self->{opt_threads} || ($l2m && !$self->{opt_sort})) {
                 for my $ibxish (locals($self)) {
-                        $self->wq_io_do('query_thread_mset', [], $ibxish);
+                        $self->wq_io_do('query_one_mset', [], $ibxish);
                 }
         } elsif (locals($self)) {
-                $self->wq_io_do('query_mset', []);
+                $self->wq_io_do('query_combined_mset', []);
         }
         my $i = 0;
         my $q = [];
@@ -402,7 +412,7 @@ sub start_query { # always runs in main (lei-daemon) process
 sub incr_start_query { # called whenever an l2m shard starts do_post_auth
         my ($self, $l2m) = @_;
         return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers};
-        start_query($self);
+        start_query($self, $l2m);
 }
 
 sub ipc_atfork_child {
@@ -448,7 +458,8 @@ sub do_query {
         my $op_c = delete $lei->{pkt_op_c};
         delete $lei->{pkt_op_p};
         @$end = ();
-        $self->{threads} = $lei->{opt}->{threads};
+        $self->{opt_threads} = $lei->{opt}->{threads};
+        $self->{opt_sort} = $lei->{opt}->{'sort'};
         if ($l2m) {
                 $l2m->net_merge_complete unless $lei->{auth};
         } else {