From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 7F8151F934 for ; Tue, 13 Apr 2021 10:54:46 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 1/5] lei_xsearch: use per-external queries when not sorting Date: Tue, 13 Apr 2021 10:54:42 +0000 Message-Id: <20210413105446.7245-2-e@80x24.org> In-Reply-To: <20210413105446.7245-1-e@80x24.org> References: <20210413105446.7245-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We only need the combined mset query when we care about sort order. When writing to --output destinations intended for MUA consumption, sort order is irrelevant as MUAs are expected to offer their own sorting, so run queries to each external in parallel. This prepares us for docid-sort-based saved search support. It will also become faster than the combined mset query for users with many externals due to current Xapian exhibiting poor performance with many shards (the same reason -extindex exists) --- lib/PublicInbox/LeiXSearch.pm | 63 ++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 692d5e54..9d367977 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -111,7 +111,7 @@ sub _mset_more ($$) { } # $startq will EOF when do_augment is done augmenting and allow -# query_mset and query_thread_mset to proceed. +# query_combined_mset and query_thread_mset to proceed. sub wait_startq ($) { my ($lei) = @_; my $startq = delete $lei->{startq} or return; @@ -144,9 +144,9 @@ sub mset_progress { } } -sub query_thread_mset { # for --threads +sub query_one_mset { # for --threads and l2m w/o sort my ($self, $ibxish) = @_; - local $0 = "$0 query_thread_mset"; + local $0 = "$0 query_one_mset"; my $lei = $self->{lei}; my ($srch, $over) = ($ibxish->search, $ibxish->over); my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; @@ -155,41 +155,51 @@ sub query_thread_mset { # for --threads my $mset; my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); my $can_kw = !!$ibxish->can('msg_keywords'); - my $fl = $lei->{opt}->{threads} > 1 ? 1 : undef; + my $threads = $lei->{opt}->{threads} // 0; + my $fl = $threads > 1 ? 1 : undef; do { $mset = $srch->mset($mo->{qstr}, $mo); mset_progress($lei, $desc, $mset->size, $mset->get_matches_estimated); wait_startq($lei); # wait for keyword updates my $ids = $srch->mset_to_artnums($mset, $mo); - my $ctx = { ids => $ids }; my $i = 0; - my %n2item = map { ($ids->[$i++], $_) } $mset->items; - while ($over->expand_thread($ctx)) { - for my $n (@{$ctx->{xids}}) { - my $smsg = $over->get_art($n) or next; - my $mitem = delete $n2item{$smsg->{num}}; - next if $smsg->{bytes} == 0; - if ($mitem) { - if ($can_kw) { + if ($threads) { + my $ctx = { ids => $ids }; + my %n2item = map { ($ids->[$i++], $_) } $mset->items; + while ($over->expand_thread($ctx)) { + for my $n (@{$ctx->{xids}}) { + my $smsg = $over->get_art($n) or next; + my $mitem = delete $n2item{$n}; + next if $smsg->{bytes} == 0; + if ($mitem && $can_kw) { mitem_kw($smsg, $mitem, $fl); - } elsif ($fl) { + } elsif ($mitem && $fl) { # call ->xsmsg_vmd, later $smsg->{lei_q_tt_flagged} = 1; } + $each_smsg->($smsg, $mitem); } + @{$ctx->{xids}} = (); + } + } else { + my @items = $mset->items; + for my $n (@$ids) { + my $mitem = $items[$i++]; + my $smsg = $over->get_art($n) or next; + next if $smsg->{bytes} == 0; + mitem_kw($smsg, $mitem, $fl) if $can_kw; $each_smsg->($smsg, $mitem); } - @{$ctx->{xids}} = (); } } while (_mset_more($mset, $mo)); - undef $each_smsg; # drops @io for l2m->{each_smsg_done} + undef $each_smsg; # may commit $lei->{ovv}->ovv_atexit_child($lei); } -sub query_mset { # non-parallel for non-"--threads" users +sub query_combined_mset { # non-parallel for non-"--threads" users my ($self) = @_; - local $0 = "$0 query_mset"; + local $0 = "$0 query_combined_mset"; my $lei = $self->{lei}; my $mo = { %{$lei->{mset_opt}} }; my $mset; @@ -207,7 +217,7 @@ sub query_mset { # non-parallel for non-"--threads" users $each_smsg->($smsg, $mitem); } } while (_mset_more($mset, $mo)); - undef $each_smsg; # drops @io for l2m->{each_smsg_done} + undef $each_smsg; # may commit $lei->{ovv}->ovv_atexit_child($lei); } @@ -379,14 +389,14 @@ sub concurrency { $nl + $nr; } -sub start_query { # always runs in main (lei-daemon) process - my ($self) = @_; - if ($self->{threads}) { +sub start_query ($;$) { # always runs in main (lei-daemon) process + my ($self, $l2m) = @_; + if ($self->{opt_threads} || ($l2m && !$self->{opt_sort})) { for my $ibxish (locals($self)) { - $self->wq_io_do('query_thread_mset', [], $ibxish); + $self->wq_io_do('query_one_mset', [], $ibxish); } } elsif (locals($self)) { - $self->wq_io_do('query_mset', []); + $self->wq_io_do('query_combined_mset', []); } my $i = 0; my $q = []; @@ -402,7 +412,7 @@ sub start_query { # always runs in main (lei-daemon) process sub incr_start_query { # called whenever an l2m shard starts do_post_auth my ($self, $l2m) = @_; return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers}; - start_query($self); + start_query($self, $l2m); } sub ipc_atfork_child { @@ -448,7 +458,8 @@ sub do_query { my $op_c = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; @$end = (); - $self->{threads} = $lei->{opt}->{threads}; + $self->{opt_threads} = $lei->{opt}->{threads}; + $self->{opt_sort} = $lei->{opt}->{'sort'}; if ($l2m) { $l2m->net_merge_complete unless $lei->{auth}; } else {