about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-19 09:34:27 +0000
committerEric Wong <e@80x24.org>2021-01-21 03:29:06 +0000
commit90de75619d2ec8a59c265fd8f2c6cc1d4a7daba2 (patch)
tree85b0430774b3dd7f90aefb4d7c74238af22de2ff
parent935f837e759f03ed48d369ab97517b8b03662ba3 (diff)
downloadpublic-inbox-90de75619d2ec8a59c265fd8f2c6cc1d4a7daba2.tar.gz
lei q: start ->mset while query_prepare runs
We don't need the result of query_prepare (for augmenting or
mass unlinking) until we're ready to deduplicate and write
results to the filesystem.  This ought to let us hide some of
the cost of Xapian searches on multi-device/core systems for
extremely expensive searches.
-rw-r--r--lib/PublicInbox/LEI.pm2
-rw-r--r--lib/PublicInbox/LeiToMail.pm3
-rw-r--r--lib/PublicInbox/LeiXSearch.pm54
-rw-r--r--lib/PublicInbox/Spawn.pm2
4 files changed, 35 insertions, 26 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 6b6ee0f5..4b1dc673 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -293,7 +293,7 @@ sub atfork_child_wq {
         my ($sock, $l2m_wq_s1);
         (@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4});
         $self->{sock} = $sock if -S $sock;
-        $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1;
+        $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1 && -S $l2m_wq_s1;
         %PATH2CFG = ();
         $quit = \&CORE::exit;
         @TO_CLOSE_ATFORK_CHILD = ();
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index dcf6d8a3..a1dce550 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -440,6 +440,7 @@ sub lock_free {
 
 sub write_mail { # via ->wq_do
         my ($self, $git_dir, $oid, $lei, $kw) = @_;
+        my $not_done = delete $self->{4}; # write end of {each_smsg_done}
         my $wcb = $self->{wcb} //= do { # first message
                 my %sig = $lei->atfork_child_wq($self);
                 @SIG{keys %sig} = values %sig; # not local
@@ -447,7 +448,7 @@ sub write_mail { # via ->wq_do
                 $self->write_cb($lei);
         };
         my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
-        $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw ]);
+        $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw, $not_done ]);
 }
 
 sub ipc_atfork_prepare {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index dc5cf3b6..73fd17f4 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -94,8 +94,17 @@ sub _mset_more ($$) {
         $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
 }
 
+# $startq will EOF when query_prepare is done augmenting and allow
+# query_mset and query_thread_mset to proceed.
+sub wait_startq ($) {
+        my ($startq) = @_;
+        $_[0] = undef;
+        read($startq, my $query_prepare_done, 1);
+}
+
 sub query_thread_mset { # for --thread
         my ($self, $lei, $ibxish) = @_;
+        my $startq = delete $self->{5};
         my %sig = $lei->atfork_child_wq($self);
         local @SIG{keys %sig} = values %sig;
 
@@ -119,6 +128,7 @@ sub query_thread_mset { # for --thread
                 while ($over->expand_thread($ctx)) {
                         for my $n (@{$ctx->{xids}}) {
                                 my $smsg = $over->get_art($n) or next;
+                                wait_startq($startq) if $startq;
                                 next if $dedupe->is_smsg_dup($smsg);
                                 my $mitem = delete $n2item{$smsg->{num}};
                                 $each_smsg->($smsg, $mitem);
@@ -132,6 +142,7 @@ sub query_thread_mset { # for --thread
 
 sub query_mset { # non-parallel for non-"--thread" users
         my ($self, $lei, $srcs) = @_;
+        my $startq = delete $self->{5};
         my %sig = $lei->atfork_child_wq($self);
         local @SIG{keys %sig} = values %sig;
         my $mo = { %{$lei->{mset_opt}} };
@@ -144,6 +155,7 @@ sub query_mset { # non-parallel for non-"--thread" users
                 $mset = $self->mset($mo->{qstr}, $mo);
                 for my $it ($mset->items) {
                         my $smsg = smsg_for($self, $it) or next;
+                        wait_startq($startq) if $startq;
                         next if $dedupe->is_smsg_dup($smsg);
                         $each_smsg->($smsg, $it);
                 }
@@ -207,47 +219,42 @@ sub start_query { # always runs in main (lei-daemon) process
         @$io = ();
 }
 
-sub query_prepare { # wq_do
+sub query_prepare { # for wq_do,
         my ($self, $lei) = @_;
         my %sig = $lei->atfork_child_wq($self);
         local @SIG{keys %sig} = values %sig;
-        if (my $l2m = $lei->{l2m}) {
-                eval { $l2m->do_augment($lei) };
-                return $lei->fail($@) if $@;
-        }
-        # trigger PublicInbox::OpPipe->event_step
-        my $qry_status_wr = $lei->{0} or
-                return $lei->fail('BUG: qry_status_wr missing');
-        $qry_status_wr->autoflush(1);
-        print $qry_status_wr '.' or # this should never fail...
-                return $lei->fail("BUG? print qry_status_wr: $!");
+        eval { $lei->{l2m}->do_augment($lei) };
+        $lei->fail($@) if $@;
 }
 
 sub do_query {
         my ($self, $lei_orig, $srcs) = @_;
         my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
         $io[0] = undef;
-        pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
+        pipe(my $done, $io[0]) or die "pipe $!";
 
         $lei_orig->event_step_init; # wait for shutdowns
-        my $op_map = { '' => [ \&query_done, $self, $lei_orig ] };
+        my $done_op = { '' => [ \&query_done, $self, $lei_orig ] };
         my $in_loop = exists $lei_orig->{sock};
-        my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
+        $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
         my $l2m = $lei->{l2m};
         if ($l2m) {
                 $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
                 $io[1] = $lei_orig->{1};
-                $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
-                $self->wq_do('query_prepare', \@io, $lei);
-                $opp->event_step if !$in_loop;
-        } else {
-                start_query($self, \@io, $lei, $srcs);
+                my @l2m_io = (undef, @io[1..$#io]);
+                pipe(my $startq, $l2m_io[0]) or die "pipe: $!";
+                $self->wq_do('query_prepare', \@l2m_io, $lei);
+                $io[4] //= *STDERR{GLOB};
+                die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
+                fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
+                $io[5] = $startq;
         }
+        start_query($self, \@io, $lei, $srcs);
         unless ($in_loop) {
                 my @pids = $self->wq_close;
                 # for the $lei->atfork_child_wq PIPE handler:
-                $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
-                $opp->event_step;
+                $done_op->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
+                $done->event_step;
                 my $ipc_worker_reap = $self->can('ipc_worker_reap');
                 if (my $l2m_pids = delete $self->{l2m_pids}) {
                         dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
@@ -258,8 +265,9 @@ sub do_query {
 
 sub ipc_atfork_prepare {
         my ($self) = @_;
-        # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1})
-        $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]);
+        # (0: qry_status_wr, 1: stdout|mbox, 2: stderr,
+        #  3: sock, 4: $l2m->{-wq_s1}, 5: $startq)
+        $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&= <&=]);
         $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }
 
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index b03f2d59..376d2190 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-#define SEND_FD_CAPA 5
+#define SEND_FD_CAPA 6
 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
         struct cmsghdr hdr;