diff options
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 63 |
1 files changed, 37 insertions, 26 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 692d5e54..9d367977 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -111,7 +111,7 @@ sub _mset_more ($$) { } # $startq will EOF when do_augment is done augmenting and allow -# query_mset and query_thread_mset to proceed. +# query_combined_mset and query_thread_mset to proceed. sub wait_startq ($) { my ($lei) = @_; my $startq = delete $lei->{startq} or return; @@ -144,9 +144,9 @@ sub mset_progress { } } -sub query_thread_mset { # for --threads +sub query_one_mset { # for --threads and l2m w/o sort my ($self, $ibxish) = @_; - local $0 = "$0 query_thread_mset"; + local $0 = "$0 query_one_mset"; my $lei = $self->{lei}; my ($srch, $over) = ($ibxish->search, $ibxish->over); my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; @@ -155,41 +155,51 @@ sub query_thread_mset { # for --threads my $mset; my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); my $can_kw = !!$ibxish->can('msg_keywords'); - my $fl = $lei->{opt}->{threads} > 1 ? 1 : undef; + my $threads = $lei->{opt}->{threads} // 0; + my $fl = $threads > 1 ? 1 : undef; do { $mset = $srch->mset($mo->{qstr}, $mo); mset_progress($lei, $desc, $mset->size, $mset->get_matches_estimated); wait_startq($lei); # wait for keyword updates my $ids = $srch->mset_to_artnums($mset, $mo); - my $ctx = { ids => $ids }; my $i = 0; - my %n2item = map { ($ids->[$i++], $_) } $mset->items; - while ($over->expand_thread($ctx)) { - for my $n (@{$ctx->{xids}}) { - my $smsg = $over->get_art($n) or next; - my $mitem = delete $n2item{$smsg->{num}}; - next if $smsg->{bytes} == 0; - if ($mitem) { - if ($can_kw) { + if ($threads) { + my $ctx = { ids => $ids }; + my %n2item = map { ($ids->[$i++], $_) } $mset->items; + while ($over->expand_thread($ctx)) { + for my $n (@{$ctx->{xids}}) { + my $smsg = $over->get_art($n) or next; + my $mitem = delete $n2item{$n}; + next if $smsg->{bytes} == 0; + if ($mitem && $can_kw) { mitem_kw($smsg, $mitem, $fl); - } elsif ($fl) { + } elsif ($mitem && $fl) { # call ->xsmsg_vmd, later $smsg->{lei_q_tt_flagged} = 1; } + $each_smsg->($smsg, $mitem); } + @{$ctx->{xids}} = (); + } + } else { + my @items = $mset->items; + for my $n (@$ids) { + my $mitem = $items[$i++]; + my $smsg = $over->get_art($n) or next; + next if $smsg->{bytes} == 0; + mitem_kw($smsg, $mitem, $fl) if $can_kw; $each_smsg->($smsg, $mitem); } - @{$ctx->{xids}} = (); } } while (_mset_more($mset, $mo)); - undef $each_smsg; # drops @io for l2m->{each_smsg_done} + undef $each_smsg; # may commit $lei->{ovv}->ovv_atexit_child($lei); } -sub query_mset { # non-parallel for non-"--threads" users +sub query_combined_mset { # non-parallel for non-"--threads" users my ($self) = @_; - local $0 = "$0 query_mset"; + local $0 = "$0 query_combined_mset"; my $lei = $self->{lei}; my $mo = { %{$lei->{mset_opt}} }; my $mset; @@ -207,7 +217,7 @@ sub query_mset { # non-parallel for non-"--threads" users $each_smsg->($smsg, $mitem); } } while (_mset_more($mset, $mo)); - undef $each_smsg; # drops @io for l2m->{each_smsg_done} + undef $each_smsg; # may commit $lei->{ovv}->ovv_atexit_child($lei); } @@ -379,14 +389,14 @@ sub concurrency { $nl + $nr; } -sub start_query { # always runs in main (lei-daemon) process - my ($self) = @_; - if ($self->{threads}) { +sub start_query ($;$) { # always runs in main (lei-daemon) process + my ($self, $l2m) = @_; + if ($self->{opt_threads} || ($l2m && !$self->{opt_sort})) { for my $ibxish (locals($self)) { - $self->wq_io_do('query_thread_mset', [], $ibxish); + $self->wq_io_do('query_one_mset', [], $ibxish); } } elsif (locals($self)) { - $self->wq_io_do('query_mset', []); + $self->wq_io_do('query_combined_mset', []); } my $i = 0; my $q = []; @@ -402,7 +412,7 @@ sub start_query { # always runs in main (lei-daemon) process sub incr_start_query { # called whenever an l2m shard starts do_post_auth my ($self, $l2m) = @_; return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers}; - start_query($self); + start_query($self, $l2m); } sub ipc_atfork_child { @@ -448,7 +458,8 @@ sub do_query { my $op_c = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; @$end = (); - $self->{threads} = $lei->{opt}->{threads}; + $self->{opt_threads} = $lei->{opt}->{threads}; + $self->{opt_sort} = $lei->{opt}->{'sort'}; if ($l2m) { $l2m->net_merge_complete unless $lei->{auth}; } else { |