about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiXSearch.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r--lib/PublicInbox/LeiXSearch.pm27
1 files changed, 17 insertions, 10 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 91864cd0..dc5cf3b6 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -126,6 +126,7 @@ sub query_thread_mset { # for --thread
                         @{$ctx->{xids}} = ();
                 }
         } while (_mset_more($mset, $mo));
+        undef $each_smsg; # drops @io for l2m->{each_smsg_done}
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
@@ -147,6 +148,7 @@ sub query_mset { # non-parallel for non-"--thread" users
                         $each_smsg->($smsg, $it);
                 }
         } while (_mset_more($mset, $mo));
+        undef $each_smsg; # drops @io for l2m->{each_smsg_done}
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
@@ -170,11 +172,14 @@ sub git {
 }
 
 sub query_done { # EOF callback
-        my ($lei) = @_;
-        $lei->{ovv}->ovv_end($lei);
-        if (my $l2m = $lei->{l2m}) {
-                $lei->start_mua unless $l2m->lock_free;
+        my ($self, $lei) = @_;
+        my $l2m = delete $lei->{l2m};
+        if (my $pids = delete $self->{l2m_pids}) {
+                my $ipc_worker_reap = $self->can('ipc_worker_reap');
+                dwaitpid($_, $ipc_worker_reap, $l2m) for @$pids;
         }
+        $lei->{ovv}->ovv_end($lei);
+        $lei->start_mua if $l2m && !$l2m->lock_free;
         $lei->dclose;
 }
 
@@ -188,12 +193,10 @@ sub start_query { # always runs in main (lei-daemon) process
         }
         my $remotes = $self->{remotes} // [];
         if ($lei->{opt}->{thread}) {
-                $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
                 for my $ibxish (@$srcs) {
                         $self->wq_do('query_thread_mset', $io, $lei, $ibxish);
                 }
         } else {
-                $lei->{-parallel} = scalar(@$remotes);
                 $self->wq_do('query_mset', $io, $lei, $srcs);
         }
         # TODO
@@ -226,12 +229,12 @@ sub do_query {
         $io[0] = undef;
         pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
 
-        $lei_orig->{lxs} = $self;
         $lei_orig->event_step_init; # wait for shutdowns
-        my $op_map = { '' => [ \&query_done, $lei_orig ] };
+        my $op_map = { '' => [ \&query_done, $self, $lei_orig ] };
         my $in_loop = exists $lei_orig->{sock};
         my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
-        if (my $l2m = $lei->{l2m}) {
+        my $l2m = $lei->{l2m};
+        if ($l2m) {
                 $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
                 $io[1] = $lei_orig->{1};
                 $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
@@ -246,13 +249,17 @@ sub do_query {
                 $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
                 $opp->event_step;
                 my $ipc_worker_reap = $self->can('ipc_worker_reap');
+                if (my $l2m_pids = delete $self->{l2m_pids}) {
+                        dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
+                }
                 dwaitpid($_, $ipc_worker_reap, $self) for @pids;
         }
 }
 
 sub ipc_atfork_prepare {
         my ($self) = @_;
-        $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
+        # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1})
+        $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]);
         $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }