about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiXSearch.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-18 04:30:31 -0600
committerEric Wong <e@80x24.org>2021-01-18 21:20:25 +0000
commit51191d611e918ff3ef6e9ce8ee52ba7b2cd2144c (patch)
treec787b82356d3f7e42d003a478d8c3a3ce0ee956e /lib/PublicInbox/LeiXSearch.pm
parent21671ed82f8d1a7b6de593e073079e29c5675aa8 (diff)
downloadpublic-inbox-51191d611e918ff3ef6e9ce8ee52ba7b2cd2144c.tar.gz
With 4 dedicated workers, this seems to provide a 100-120%
speedup on a 4 core machine when writing thousands of search
results to a Maildir or mbox.  This also sets us up for
high-latency IMAP destinations in the future.

This opens the door to more speedup opportunities such
as optimizing dedupe locking and other ways to reduce
contention.

This change is fairly complex and convoluted, unfortunately.
Further work may allow us to simplify it and even improve
performance.
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r--lib/PublicInbox/LeiXSearch.pm27
1 files changed, 17 insertions, 10 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 91864cd0..dc5cf3b6 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -126,6 +126,7 @@ sub query_thread_mset { # for --thread
                         @{$ctx->{xids}} = ();
                 }
         } while (_mset_more($mset, $mo));
+        undef $each_smsg; # drops @io for l2m->{each_smsg_done}
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
@@ -147,6 +148,7 @@ sub query_mset { # non-parallel for non-"--thread" users
                         $each_smsg->($smsg, $it);
                 }
         } while (_mset_more($mset, $mo));
+        undef $each_smsg; # drops @io for l2m->{each_smsg_done}
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
@@ -170,11 +172,14 @@ sub git {
 }
 
 sub query_done { # EOF callback
-        my ($lei) = @_;
-        $lei->{ovv}->ovv_end($lei);
-        if (my $l2m = $lei->{l2m}) {
-                $lei->start_mua unless $l2m->lock_free;
+        my ($self, $lei) = @_;
+        my $l2m = delete $lei->{l2m};
+        if (my $pids = delete $self->{l2m_pids}) {
+                my $ipc_worker_reap = $self->can('ipc_worker_reap');
+                dwaitpid($_, $ipc_worker_reap, $l2m) for @$pids;
         }
+        $lei->{ovv}->ovv_end($lei);
+        $lei->start_mua if $l2m && !$l2m->lock_free;
         $lei->dclose;
 }
 
@@ -188,12 +193,10 @@ sub start_query { # always runs in main (lei-daemon) process
         }
         my $remotes = $self->{remotes} // [];
         if ($lei->{opt}->{thread}) {
-                $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
                 for my $ibxish (@$srcs) {
                         $self->wq_do('query_thread_mset', $io, $lei, $ibxish);
                 }
         } else {
-                $lei->{-parallel} = scalar(@$remotes);
                 $self->wq_do('query_mset', $io, $lei, $srcs);
         }
         # TODO
@@ -226,12 +229,12 @@ sub do_query {
         $io[0] = undef;
         pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
 
-        $lei_orig->{lxs} = $self;
         $lei_orig->event_step_init; # wait for shutdowns
-        my $op_map = { '' => [ \&query_done, $lei_orig ] };
+        my $op_map = { '' => [ \&query_done, $self, $lei_orig ] };
         my $in_loop = exists $lei_orig->{sock};
         my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
-        if (my $l2m = $lei->{l2m}) {
+        my $l2m = $lei->{l2m};
+        if ($l2m) {
                 $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
                 $io[1] = $lei_orig->{1};
                 $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
@@ -246,13 +249,17 @@ sub do_query {
                 $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
                 $opp->event_step;
                 my $ipc_worker_reap = $self->can('ipc_worker_reap');
+                if (my $l2m_pids = delete $self->{l2m_pids}) {
+                        dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
+                }
                 dwaitpid($_, $ipc_worker_reap, $self) for @pids;
         }
 }
 
 sub ipc_atfork_prepare {
         my ($self) = @_;
-        $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
+        # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1})
+        $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]);
         $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }