about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-04-13 10:54:42 +0000
committerEric Wong <e@80x24.org>2021-04-13 15:04:41 -0400
commita1a309418475aaa99d63cda8191ef3ce534343cd (patch)
treec46c9a5a3f53a882fa1dd2964d99d71d18b750e6
parent775e160375fb5c15f3b5ade3a1165f7ce5df9eb4 (diff)
downloadpublic-inbox-a1a309418475aaa99d63cda8191ef3ce534343cd.tar.gz
We only need the combined mset query when we care about sort
order.  When writing to --output destinations intended for MUA
consumption, sort order is irrelevant as MUAs are expected to
offer their own sorting, so run queries to each external in
parallel.

This prepares us for docid-sort-based saved search support.

It will also become faster than the combined mset query for
users with many externals due to current Xapian exhibiting poor
performance with many shards (the same reason -extindex exists)
-rw-r--r--lib/PublicInbox/LeiXSearch.pm63
1 files changed, 37 insertions, 26 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 692d5e54..9d367977 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -111,7 +111,7 @@ sub _mset_more ($$) {
 }
 
 # $startq will EOF when do_augment is done augmenting and allow
-# query_mset and query_thread_mset to proceed.
+# query_combined_mset and query_thread_mset to proceed.
 sub wait_startq ($) {
         my ($lei) = @_;
         my $startq = delete $lei->{startq} or return;
@@ -144,9 +144,9 @@ sub mset_progress {
         }
 }
 
-sub query_thread_mset { # for --threads
+sub query_one_mset { # for --threads and l2m w/o sort
         my ($self, $ibxish) = @_;
-        local $0 = "$0 query_thread_mset";
+        local $0 = "$0 query_one_mset";
         my $lei = $self->{lei};
         my ($srch, $over) = ($ibxish->search, $ibxish->over);
         my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
@@ -155,41 +155,51 @@ sub query_thread_mset { # for --threads
         my $mset;
         my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
         my $can_kw = !!$ibxish->can('msg_keywords');
-        my $fl = $lei->{opt}->{threads} > 1 ? 1 : undef;
+        my $threads = $lei->{opt}->{threads} // 0;
+        my $fl = $threads > 1 ? 1 : undef;
         do {
                 $mset = $srch->mset($mo->{qstr}, $mo);
                 mset_progress($lei, $desc, $mset->size,
                                 $mset->get_matches_estimated);
                 wait_startq($lei); # wait for keyword updates
                 my $ids = $srch->mset_to_artnums($mset, $mo);
-                my $ctx = { ids => $ids };
                 my $i = 0;
-                my %n2item = map { ($ids->[$i++], $_) } $mset->items;
-                while ($over->expand_thread($ctx)) {
-                        for my $n (@{$ctx->{xids}}) {
-                                my $smsg = $over->get_art($n) or next;
-                                my $mitem = delete $n2item{$smsg->{num}};
-                                next if $smsg->{bytes} == 0;
-                                if ($mitem) {
-                                        if ($can_kw) {
+                if ($threads) {
+                        my $ctx = { ids => $ids };
+                        my %n2item = map { ($ids->[$i++], $_) } $mset->items;
+                        while ($over->expand_thread($ctx)) {
+                                for my $n (@{$ctx->{xids}}) {
+                                        my $smsg = $over->get_art($n) or next;
+                                        my $mitem = delete $n2item{$n};
+                                        next if $smsg->{bytes} == 0;
+                                        if ($mitem && $can_kw) {
                                                 mitem_kw($smsg, $mitem, $fl);
-                                        } elsif ($fl) {
+                                        } elsif ($mitem && $fl) {
                                                 # call ->xsmsg_vmd, later
                                                 $smsg->{lei_q_tt_flagged} = 1;
                                         }
+                                        $each_smsg->($smsg, $mitem);
                                 }
+                                @{$ctx->{xids}} = ();
+                        }
+                } else {
+                        my @items = $mset->items;
+                        for my $n (@$ids) {
+                                my $mitem = $items[$i++];
+                                my $smsg = $over->get_art($n) or next;
+                                next if $smsg->{bytes} == 0;
+                                mitem_kw($smsg, $mitem, $fl) if $can_kw;
                                 $each_smsg->($smsg, $mitem);
                         }
-                        @{$ctx->{xids}} = ();
                 }
         } while (_mset_more($mset, $mo));
-        undef $each_smsg; # drops @io for l2m->{each_smsg_done}
+        undef $each_smsg; # may commit
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
-sub query_mset { # non-parallel for non-"--threads" users
+sub query_combined_mset { # non-parallel for non-"--threads" users
         my ($self) = @_;
-        local $0 = "$0 query_mset";
+        local $0 = "$0 query_combined_mset";
         my $lei = $self->{lei};
         my $mo = { %{$lei->{mset_opt}} };
         my $mset;
@@ -207,7 +217,7 @@ sub query_mset { # non-parallel for non-"--threads" users
                         $each_smsg->($smsg, $mitem);
                 }
         } while (_mset_more($mset, $mo));
-        undef $each_smsg; # drops @io for l2m->{each_smsg_done}
+        undef $each_smsg; # may commit
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
@@ -379,14 +389,14 @@ sub concurrency {
         $nl + $nr;
 }
 
-sub start_query { # always runs in main (lei-daemon) process
-        my ($self) = @_;
-        if ($self->{threads}) {
+sub start_query ($;$) { # always runs in main (lei-daemon) process
+        my ($self, $l2m) = @_;
+        if ($self->{opt_threads} || ($l2m && !$self->{opt_sort})) {
                 for my $ibxish (locals($self)) {
-                        $self->wq_io_do('query_thread_mset', [], $ibxish);
+                        $self->wq_io_do('query_one_mset', [], $ibxish);
                 }
         } elsif (locals($self)) {
-                $self->wq_io_do('query_mset', []);
+                $self->wq_io_do('query_combined_mset', []);
         }
         my $i = 0;
         my $q = [];
@@ -402,7 +412,7 @@ sub start_query { # always runs in main (lei-daemon) process
 sub incr_start_query { # called whenever an l2m shard starts do_post_auth
         my ($self, $l2m) = @_;
         return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers};
-        start_query($self);
+        start_query($self, $l2m);
 }
 
 sub ipc_atfork_child {
@@ -448,7 +458,8 @@ sub do_query {
         my $op_c = delete $lei->{pkt_op_c};
         delete $lei->{pkt_op_p};
         @$end = ();
-        $self->{threads} = $lei->{opt}->{threads};
+        $self->{opt_threads} = $lei->{opt}->{threads};
+        $self->{opt_sort} = $lei->{opt}->{'sort'};
         if ($l2m) {
                 $l2m->net_merge_complete unless $lei->{auth};
         } else {