about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/IPC.pm21
-rw-r--r--lib/PublicInbox/LEI.pm1
-rw-r--r--lib/PublicInbox/LeiXSearch.pm11
3 files changed, 21 insertions, 12 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index fbc91f6f..78cb8400 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -104,11 +104,11 @@ sub ipc_worker_spawn {
         pipe(my ($r_req, $w_req)) or die "pipe: $!";
         pipe(my ($r_res, $w_res)) or die "pipe: $!";
         my $sigset = $oldset // PublicInbox::DS::block_signals();
-        my $parent = $$;
         $self->ipc_atfork_prepare;
         defined(my $pid = fork) or die "fork: $!";
         if ($pid == 0) {
                 eval { PublicInbox::DS->Reset };
+                delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
                 $w_req = $r_res = undef;
                 $w_res->autoflush(1);
                 $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
@@ -283,8 +283,7 @@ sub _wq_worker_start ($$) {
         my $pid = fork // die "fork: $!";
         if ($pid == 0) {
                 eval { PublicInbox::DS->Reset };
-                close(delete $self->{-wq_s1});
-                delete $self->{qw(-wq_workers -wq_ppid)};
+                delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
                 $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN));
                 $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD));
                 local $0 = $self->{-wq_ident};
@@ -306,16 +305,15 @@ sub wq_workers_start {
         my ($self, $ident, $nr_workers, $oldset) = @_;
         ($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
         return if $self->{-wq_s1}; # idempotent
-        my ($s1, $s2);
-        socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!";
+        $self->{-wq_s1} = $self->{-wq_s2} = undef;
+        socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or
+                die "socketpair: $!";
         $self->ipc_atfork_prepare;
         $nr_workers //= 4;
         $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
         my $sigset = $oldset // PublicInbox::DS::block_signals();
         $self->{-wq_workers} = {};
         $self->{-wq_ident} = $ident;
-        $self->{-wq_s1} = $s1;
-        $self->{-wq_s2} = $s2;
         _wq_worker_start($self, $sigset) for (1..$nr_workers);
         PublicInbox::DS::sig_setmask($sigset) unless $oldset;
         $self->{-wq_ppid} = $$;
@@ -377,6 +375,7 @@ sub wq_close {
         my $ppid = delete $self->{-wq_ppid} or return;
         my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
         return if $ppid != $$; # can't reap siblings or parents
+        return (keys %$workers) if wantarray; # caller will reap
         for my $pid (keys %$workers) {
                 dwaitpid($pid, \&ipc_worker_reap, $self);
         }
@@ -391,9 +390,11 @@ sub wq_kill {
 sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
 
 sub DESTROY {
-        wq_kill($_[0]);
-        wq_close($_[0]);
-        ipc_worker_stop($_[0]);
+        my ($self) = @_;
+        my $ppid = $self->{-wq_ppid};
+        wq_kill($self) if $ppid && $ppid == $$;
+        wq_close($self);
+        ipc_worker_stop($self);
 }
 
 1;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 2784ca6b..5e6eb0af 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -287,6 +287,7 @@ sub atfork_child_wq {
                 $self->x_it(13); # SIGPIPE = 13
                 # we need to close explicitly to avoid Perl warning on SIGPIPE
                 close($_) for (delete @$self{1..2});
+                syswrite($self->{0}, '!') unless $self->{sock}; # for eof_wait
                 die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
         });
 }
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 25ded544..8b70167c 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -8,6 +8,7 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
+use PublicInbox::DS qw(dwaitpid);
 
 sub new {
         my ($class) = @_;
@@ -181,8 +182,14 @@ sub do_query {
                 $lei_orig->{lxs} = $self;
                 $lei_orig->event_step_init;
         } else {
-                $self->wq_close;
-                read($eof_wait, my $buf, 1); # wait for close($lei->{0})
+                my @pids = $self->wq_close;
+                # wait for close($lei->{0})
+                if (read($eof_wait, my $buf, 1)) {
+                        # if we get a SIGPIPE from one, kill the rest
+                        kill('TERM', @pids) if $buf eq '!';
+                }
+                my $ipc_worker_reap = $self->can('ipc_worker_reap');
+                dwaitpid($_, $ipc_worker_reap, $self) for @pids;
                 query_done($lei_orig); # may SIGPIPE
         }
 }