From f478afc01d2137baa215b112f7cbd33c29f28ab7 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 10 Jan 2021 12:15:16 +0000 Subject: lei: fork + FD cleanup Do a better job of closing FDs that we don't want shared with the work queue workers. We'll also fix naming and use "atfork_prepare" instead of "atfork_parent" to match pthread_atfork(3) naming. --- lib/PublicInbox/IPC.pm | 57 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 19 deletions(-) (limited to 'lib/PublicInbox/IPC.pm') diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 4db4b8ea..88f81e47 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -126,7 +126,7 @@ sub ipc_worker_spawn { pipe(my ($r_res, $w_res)) or die "pipe: $!"; my $sigset = $oldset // PublicInbox::DS::block_signals(); my $parent = $$; - $self->ipc_atfork_parent; + $self->ipc_atfork_prepare; defined(my $pid = fork) or die "fork: $!"; if ($pid == 0) { eval { PublicInbox::DS->Reset }; @@ -155,8 +155,14 @@ sub ipc_worker_reap { # dwaitpid callback } # for base class, override in sub classes -sub ipc_atfork_parent {} -sub ipc_atfork_child {} +sub ipc_atfork_prepare {} + +sub ipc_atfork_child { + my ($self) = @_; + my $io = delete($self->{-ipc_atfork_child_close}) or return; + close($_) for @$io; + undef; +} # idempotent, can be called regardless of whether worker is active or not sub ipc_worker_stop { @@ -251,14 +257,21 @@ sub ipc_sibling_atfork_child { $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; } +sub _close_recvd ($) { + my ($self) = @_; + close($_) for (grep { defined } (delete @$self{0..2})); +} + sub wq_worker_loop ($) { my ($self) = @_; my $buf; my $len = $self->{wq_req_len} // (4096 * 33); - my ($rec, $sub, @args); + my ($sub, $args); my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2'; local $SIG{PIPE} = sub { - die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub; + my $cur_sub = $sub; + _close_recvd($self); + die(bless(\$cur_sub, __PACKAGE__.'::PIPE')) if $cur_sub; }; my $rcv = $self->{-wq_recv_cmd} // $recv_cmd; while (1) { @@ -267,22 +280,25 @@ sub wq_worker_loop ($) { my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]}; for my $fd (@fds) { my $mode = shift(@m); - if (open(my $fh, $mode, $fd)) { - $self->{$i++} = $fh; - $fh->autoflush(1); + if (open(my $cmdfh, $mode, $fd)) { + $self->{$i++} = $cmdfh; + $cmdfh->autoflush(1); } else { die "$$ open($mode$fd) (FD:$i): $!"; } } - # Sereal dies, Storable returns undef - $rec = thaw($buf) // + # Sereal dies on truncated data, Storable returns undef + $args = thaw($buf) // die "thaw error on buffer of size:".length($buf); - ($sub, @args) = @$rec; - eval { $self->$sub(@args) }; + eval { + $sub = shift @$args; + eval { $self->$sub(@$args) }; + undef $sub; # quiet SIG{PIPE} handler + die $@ if $@; + }; warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE'; - undef $sub; # quiet SIG{PIPE} handler # need to close explicitly to avoid warnings after SIGPIPE - close($_) for (delete(@$self{0..2})); + _close_recvd($self); } } @@ -306,14 +322,17 @@ sub _wq_worker_start ($$) { eval { PublicInbox::DS->Reset }; close(delete $self->{-wq_s1}); delete $self->{qw(-wq_workers -wq_ppid)}; - $SIG{$_} = 'IGNORE' for (qw(TTOU TTIN)); - $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT)); + $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN)); + $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD)); local $0 = $self->{-wq_ident}; PublicInbox::DS::sig_setmask($oldset); + # ensure we properly exit even if warn() dies: + my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); my $on_destroy = $self->ipc_atfork_child; eval { wq_worker_loop($self) }; warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; - exit($@ ? 1 : 0); + undef $on_destroy; + undef $end; # trigger exit } else { $self->{-wq_workers}->{$pid} = \undef; } @@ -326,7 +345,7 @@ sub wq_workers_start { return if $self->{-wq_s1}; # idempotent my ($s1, $s2); socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!"; - $self->ipc_atfork_parent; + $self->ipc_atfork_prepare; $nr_workers //= 4; $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS; my $sigset = $oldset // PublicInbox::DS::block_signals(); @@ -343,7 +362,7 @@ sub wq_worker_incr { # SIGTTIN handler my ($self, $oldset) = @_; $self->{-wq_s2} or return; return if wq_workers($self) >= $WQ_MAX_WORKERS; - $self->ipc_atfork_parent; + $self->ipc_atfork_prepare; my $sigset = $oldset // PublicInbox::DS::block_signals(); _wq_worker_start($self, $sigset); PublicInbox::DS::sig_setmask($sigset) unless $oldset; -- cgit v1.2.3-24-ge0c7