From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 4C38C1F934 for ; Tue, 19 Jan 2021 09:34:35 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 1/9] lei q: start ->mset while query_prepare runs Date: Tue, 19 Jan 2021 09:34:27 +0000 Message-Id: <20210119093435.17955-2-e@80x24.org> In-Reply-To: <20210119093435.17955-1-e@80x24.org> References: <20210119093435.17955-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We don't need the result of query_prepare (for augmenting or mass unlinking) until we're ready to deduplicate and write results to the filesystem. This ought to let us hide some of the cost of Xapian searches on multi-device/core systems for extremely expensive searches. --- lib/PublicInbox/LEI.pm | 2 +- lib/PublicInbox/LeiToMail.pm | 3 +- lib/PublicInbox/LeiXSearch.pm | 54 ++++++++++++++++++++--------------- lib/PublicInbox/Spawn.pm | 2 +- 4 files changed, 35 insertions(+), 26 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 6b6ee0f5..4b1dc673 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -293,7 +293,7 @@ sub atfork_child_wq { my ($sock, $l2m_wq_s1); (@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4}); $self->{sock} = $sock if -S $sock; - $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1; + $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1 && -S $l2m_wq_s1; %PATH2CFG = (); $quit = \&CORE::exit; @TO_CLOSE_ATFORK_CHILD = (); diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index dcf6d8a3..a1dce550 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -440,6 +440,7 @@ sub lock_free { sub write_mail { # via ->wq_do my ($self, $git_dir, $oid, $lei, $kw) = @_; + my $not_done = delete $self->{4}; # write end of {each_smsg_done} my $wcb = $self->{wcb} //= do { # first message my %sig = $lei->atfork_child_wq($self); @SIG{keys %sig} = values %sig; # not local @@ -447,7 +448,7 @@ sub write_mail { # via ->wq_do $self->write_cb($lei); }; my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir); - $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw ]); + $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw, $not_done ]); } sub ipc_atfork_prepare { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index dc5cf3b6..73fd17f4 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -94,8 +94,17 @@ sub _mset_more ($$) { $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000)); } +# $startq will EOF when query_prepare is done augmenting and allow +# query_mset and query_thread_mset to proceed. +sub wait_startq ($) { + my ($startq) = @_; + $_[0] = undef; + read($startq, my $query_prepare_done, 1); +} + sub query_thread_mset { # for --thread my ($self, $lei, $ibxish) = @_; + my $startq = delete $self->{5}; my %sig = $lei->atfork_child_wq($self); local @SIG{keys %sig} = values %sig; @@ -119,6 +128,7 @@ sub query_thread_mset { # for --thread while ($over->expand_thread($ctx)) { for my $n (@{$ctx->{xids}}) { my $smsg = $over->get_art($n) or next; + wait_startq($startq) if $startq; next if $dedupe->is_smsg_dup($smsg); my $mitem = delete $n2item{$smsg->{num}}; $each_smsg->($smsg, $mitem); @@ -132,6 +142,7 @@ sub query_thread_mset { # for --thread sub query_mset { # non-parallel for non-"--thread" users my ($self, $lei, $srcs) = @_; + my $startq = delete $self->{5}; my %sig = $lei->atfork_child_wq($self); local @SIG{keys %sig} = values %sig; my $mo = { %{$lei->{mset_opt}} }; @@ -144,6 +155,7 @@ sub query_mset { # non-parallel for non-"--thread" users $mset = $self->mset($mo->{qstr}, $mo); for my $it ($mset->items) { my $smsg = smsg_for($self, $it) or next; + wait_startq($startq) if $startq; next if $dedupe->is_smsg_dup($smsg); $each_smsg->($smsg, $it); } @@ -207,47 +219,42 @@ sub start_query { # always runs in main (lei-daemon) process @$io = (); } -sub query_prepare { # wq_do +sub query_prepare { # for wq_do, my ($self, $lei) = @_; my %sig = $lei->atfork_child_wq($self); local @SIG{keys %sig} = values %sig; - if (my $l2m = $lei->{l2m}) { - eval { $l2m->do_augment($lei) }; - return $lei->fail($@) if $@; - } - # trigger PublicInbox::OpPipe->event_step - my $qry_status_wr = $lei->{0} or - return $lei->fail('BUG: qry_status_wr missing'); - $qry_status_wr->autoflush(1); - print $qry_status_wr '.' or # this should never fail... - return $lei->fail("BUG? print qry_status_wr: $!"); + eval { $lei->{l2m}->do_augment($lei) }; + $lei->fail($@) if $@; } sub do_query { my ($self, $lei_orig, $srcs) = @_; my ($lei, @io) = $lei_orig->atfork_parent_wq($self); $io[0] = undef; - pipe(my $qry_status_rd, $io[0]) or die "pipe $!"; + pipe(my $done, $io[0]) or die "pipe $!"; $lei_orig->event_step_init; # wait for shutdowns - my $op_map = { '' => [ \&query_done, $self, $lei_orig ] }; + my $done_op = { '' => [ \&query_done, $self, $lei_orig ] }; my $in_loop = exists $lei_orig->{sock}; - my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop); + $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop); my $l2m = $lei->{l2m}; if ($l2m) { $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox $io[1] = $lei_orig->{1}; - $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ]; - $self->wq_do('query_prepare', \@io, $lei); - $opp->event_step if !$in_loop; - } else { - start_query($self, \@io, $lei, $srcs); + my @l2m_io = (undef, @io[1..$#io]); + pipe(my $startq, $l2m_io[0]) or die "pipe: $!"; + $self->wq_do('query_prepare', \@l2m_io, $lei); + $io[4] //= *STDERR{GLOB}; + die "BUG: unexpected \$io[5]: $io[5]" if $io[5]; + fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ + $io[5] = $startq; } + start_query($self, \@io, $lei, $srcs); unless ($in_loop) { my @pids = $self->wq_close; # for the $lei->atfork_child_wq PIPE handler: - $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ]; - $opp->event_step; + $done_op->{'!'} = [ \&CORE::kill, 'TERM', @pids ]; + $done->event_step; my $ipc_worker_reap = $self->can('ipc_worker_reap'); if (my $l2m_pids = delete $self->{l2m_pids}) { dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids; @@ -258,8 +265,9 @@ sub do_query { sub ipc_atfork_prepare { my ($self) = @_; - # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1}) - $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]); + # (0: qry_status_wr, 1: stdout|mbox, 2: stderr, + # 3: sock, 4: $l2m->{-wq_s1}, 5: $startq) + $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&= <&=]); $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC } diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index b03f2d59..376d2190 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS'; #include #if defined(CMSG_SPACE) && defined(CMSG_LEN) -#define SEND_FD_CAPA 5 +#define SEND_FD_CAPA 6 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int)) union my_cmsg { struct cmsghdr hdr;