From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id E61AF1FA12 for ; Sat, 16 Jan 2021 11:36:24 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 2/4] ipc: children don't kill on DESTROY, reduce FD sharing Date: Fri, 15 Jan 2021 23:36:22 -1200 Message-Id: <20210116113624.19930-3-e@80x24.org> In-Reply-To: <20210116113624.19930-1-e@80x24.org> References: <20210116113624.19930-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Children should not be blindly killing siblings on ->DESTROY since they're typically shorter-lived than parents. We'll also be more careful about on-stack variables and now we can rely exclusively on delete ops to close FDs. We also need to fix our SIGPIPE handling for the oneshot case while fixing a typo for delete, so we write "!" to the EOF pipe to ensure the parent oneshot process exits on the first worker that hits SIGPIPE, rather than waiting for the last worker to hit SIGPIPE. --- lib/PublicInbox/IPC.pm | 21 +++++++++++---------- lib/PublicInbox/LEI.pm | 1 + lib/PublicInbox/LeiXSearch.pm | 11 +++++++++-- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index fbc91f6f..78cb8400 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -104,11 +104,11 @@ sub ipc_worker_spawn { pipe(my ($r_req, $w_req)) or die "pipe: $!"; pipe(my ($r_res, $w_res)) or die "pipe: $!"; my $sigset = $oldset // PublicInbox::DS::block_signals(); - my $parent = $$; $self->ipc_atfork_prepare; defined(my $pid = fork) or die "fork: $!"; if ($pid == 0) { eval { PublicInbox::DS->Reset }; + delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; $w_req = $r_res = undef; $w_res->autoflush(1); $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); @@ -283,8 +283,7 @@ sub _wq_worker_start ($$) { my $pid = fork // die "fork: $!"; if ($pid == 0) { eval { PublicInbox::DS->Reset }; - close(delete $self->{-wq_s1}); - delete $self->{qw(-wq_workers -wq_ppid)}; + delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN)); $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD)); local $0 = $self->{-wq_ident}; @@ -306,16 +305,15 @@ sub wq_workers_start { my ($self, $ident, $nr_workers, $oldset) = @_; ($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return; return if $self->{-wq_s1}; # idempotent - my ($s1, $s2); - socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!"; + $self->{-wq_s1} = $self->{-wq_s2} = undef; + socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or + die "socketpair: $!"; $self->ipc_atfork_prepare; $nr_workers //= 4; $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS; my $sigset = $oldset // PublicInbox::DS::block_signals(); $self->{-wq_workers} = {}; $self->{-wq_ident} = $ident; - $self->{-wq_s1} = $s1; - $self->{-wq_s2} = $s2; _wq_worker_start($self, $sigset) for (1..$nr_workers); PublicInbox::DS::sig_setmask($sigset) unless $oldset; $self->{-wq_ppid} = $$; @@ -377,6 +375,7 @@ sub wq_close { my $ppid = delete $self->{-wq_ppid} or return; my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers'; return if $ppid != $$; # can't reap siblings or parents + return (keys %$workers) if wantarray; # caller will reap for my $pid (keys %$workers) { dwaitpid($pid, \&ipc_worker_reap, $self); } @@ -391,9 +390,11 @@ sub wq_kill { sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS } sub DESTROY { - wq_kill($_[0]); - wq_close($_[0]); - ipc_worker_stop($_[0]); + my ($self) = @_; + my $ppid = $self->{-wq_ppid}; + wq_kill($self) if $ppid && $ppid == $$; + wq_close($self); + ipc_worker_stop($self); } 1; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 1f4a3082..5568904d 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -287,6 +287,7 @@ sub atfork_child_wq { $self->x_it(13); # SIGPIPE = 13 # we need to close explicitly to avoid Perl warning on SIGPIPE close($_) for (delete @$self{1..2}); + syswrite($self->{0}, '!') unless $self->{sock}; # for eof_wait die bless(\"$_[0]", 'PublicInbox::SIGPIPE'), }); } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 25ded544..8b70167c 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -8,6 +8,7 @@ package PublicInbox::LeiXSearch; use strict; use v5.10.1; use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); +use PublicInbox::DS qw(dwaitpid); sub new { my ($class) = @_; @@ -181,8 +182,14 @@ sub do_query { $lei_orig->{lxs} = $self; $lei_orig->event_step_init; } else { - $self->wq_close; - read($eof_wait, my $buf, 1); # wait for close($lei->{0}) + my @pids = $self->wq_close; + # wait for close($lei->{0}) + if (read($eof_wait, my $buf, 1)) { + # if we get a SIGPIPE from one, kill the rest + kill('TERM', @pids) if $buf eq '!'; + } + my $ipc_worker_reap = $self->can('ipc_worker_reap'); + dwaitpid($_, $ipc_worker_reap, $self) for @pids; query_done($lei_orig); # may SIGPIPE } }