diff options
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 27 |
1 files changed, 17 insertions, 10 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 91864cd0..dc5cf3b6 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -126,6 +126,7 @@ sub query_thread_mset { # for --thread @{$ctx->{xids}} = (); } } while (_mset_more($mset, $mo)); + undef $each_smsg; # drops @io for l2m->{each_smsg_done} $lei->{ovv}->ovv_atexit_child($lei); } @@ -147,6 +148,7 @@ sub query_mset { # non-parallel for non-"--thread" users $each_smsg->($smsg, $it); } } while (_mset_more($mset, $mo)); + undef $each_smsg; # drops @io for l2m->{each_smsg_done} $lei->{ovv}->ovv_atexit_child($lei); } @@ -170,11 +172,14 @@ sub git { } sub query_done { # EOF callback - my ($lei) = @_; - $lei->{ovv}->ovv_end($lei); - if (my $l2m = $lei->{l2m}) { - $lei->start_mua unless $l2m->lock_free; + my ($self, $lei) = @_; + my $l2m = delete $lei->{l2m}; + if (my $pids = delete $self->{l2m_pids}) { + my $ipc_worker_reap = $self->can('ipc_worker_reap'); + dwaitpid($_, $ipc_worker_reap, $l2m) for @$pids; } + $lei->{ovv}->ovv_end($lei); + $lei->start_mua if $l2m && !$l2m->lock_free; $lei->dclose; } @@ -188,12 +193,10 @@ sub start_query { # always runs in main (lei-daemon) process } my $remotes = $self->{remotes} // []; if ($lei->{opt}->{thread}) { - $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1; for my $ibxish (@$srcs) { $self->wq_do('query_thread_mset', $io, $lei, $ibxish); } } else { - $lei->{-parallel} = scalar(@$remotes); $self->wq_do('query_mset', $io, $lei, $srcs); } # TODO @@ -226,12 +229,12 @@ sub do_query { $io[0] = undef; pipe(my $qry_status_rd, $io[0]) or die "pipe $!"; - $lei_orig->{lxs} = $self; $lei_orig->event_step_init; # wait for shutdowns - my $op_map = { '' => [ \&query_done, $lei_orig ] }; + my $op_map = { '' => [ \&query_done, $self, $lei_orig ] }; my $in_loop = exists $lei_orig->{sock}; my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop); - if (my $l2m = $lei->{l2m}) { + my $l2m = $lei->{l2m}; + if ($l2m) { $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox $io[1] = $lei_orig->{1}; $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ]; @@ -246,13 +249,17 @@ sub do_query { $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ]; $opp->event_step; my $ipc_worker_reap = $self->can('ipc_worker_reap'); + if (my $l2m_pids = delete $self->{l2m_pids}) { + dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids; + } dwaitpid($_, $ipc_worker_reap, $self) for @pids; } } sub ipc_atfork_prepare { my ($self) = @_; - $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]); + # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1}) + $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]); $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC } |