diff options
-rw-r--r-- | lib/PublicInbox/IPC.pm | 21 | ||||
-rw-r--r-- | lib/PublicInbox/LEI.pm | 1 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 11 |
3 files changed, 21 insertions, 12 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index fbc91f6f..78cb8400 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -104,11 +104,11 @@ sub ipc_worker_spawn { pipe(my ($r_req, $w_req)) or die "pipe: $!"; pipe(my ($r_res, $w_res)) or die "pipe: $!"; my $sigset = $oldset // PublicInbox::DS::block_signals(); - my $parent = $$; $self->ipc_atfork_prepare; defined(my $pid = fork) or die "fork: $!"; if ($pid == 0) { eval { PublicInbox::DS->Reset }; + delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; $w_req = $r_res = undef; $w_res->autoflush(1); $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); @@ -283,8 +283,7 @@ sub _wq_worker_start ($$) { my $pid = fork // die "fork: $!"; if ($pid == 0) { eval { PublicInbox::DS->Reset }; - close(delete $self->{-wq_s1}); - delete $self->{qw(-wq_workers -wq_ppid)}; + delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN)); $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD)); local $0 = $self->{-wq_ident}; @@ -306,16 +305,15 @@ sub wq_workers_start { my ($self, $ident, $nr_workers, $oldset) = @_; ($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return; return if $self->{-wq_s1}; # idempotent - my ($s1, $s2); - socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!"; + $self->{-wq_s1} = $self->{-wq_s2} = undef; + socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or + die "socketpair: $!"; $self->ipc_atfork_prepare; $nr_workers //= 4; $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS; my $sigset = $oldset // PublicInbox::DS::block_signals(); $self->{-wq_workers} = {}; $self->{-wq_ident} = $ident; - $self->{-wq_s1} = $s1; - $self->{-wq_s2} = $s2; _wq_worker_start($self, $sigset) for (1..$nr_workers); PublicInbox::DS::sig_setmask($sigset) unless $oldset; $self->{-wq_ppid} = $$; @@ -377,6 +375,7 @@ sub wq_close { my $ppid = delete $self->{-wq_ppid} or return; my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers'; return if $ppid != $$; # can't reap siblings or parents + return (keys %$workers) if wantarray; # caller will reap for my $pid (keys %$workers) { dwaitpid($pid, \&ipc_worker_reap, $self); } @@ -391,9 +390,11 @@ sub wq_kill { sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS } sub DESTROY { - wq_kill($_[0]); - wq_close($_[0]); - ipc_worker_stop($_[0]); + my ($self) = @_; + my $ppid = $self->{-wq_ppid}; + wq_kill($self) if $ppid && $ppid == $$; + wq_close($self); + ipc_worker_stop($self); } 1; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 2784ca6b..5e6eb0af 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -287,6 +287,7 @@ sub atfork_child_wq { $self->x_it(13); # SIGPIPE = 13 # we need to close explicitly to avoid Perl warning on SIGPIPE close($_) for (delete @$self{1..2}); + syswrite($self->{0}, '!') unless $self->{sock}; # for eof_wait die bless(\"$_[0]", 'PublicInbox::SIGPIPE'), }); } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 25ded544..8b70167c 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -8,6 +8,7 @@ package PublicInbox::LeiXSearch; use strict; use v5.10.1; use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); +use PublicInbox::DS qw(dwaitpid); sub new { my ($class) = @_; @@ -181,8 +182,14 @@ sub do_query { $lei_orig->{lxs} = $self; $lei_orig->event_step_init; } else { - $self->wq_close; - read($eof_wait, my $buf, 1); # wait for close($lei->{0}) + my @pids = $self->wq_close; + # wait for close($lei->{0}) + if (read($eof_wait, my $buf, 1)) { + # if we get a SIGPIPE from one, kill the rest + kill('TERM', @pids) if $buf eq '!'; + } + my $ipc_worker_reap = $self->can('ipc_worker_reap'); + dwaitpid($_, $ipc_worker_reap, $self) for @pids; query_done($lei_orig); # may SIGPIPE } } |