From 5bbcd1686c10985168b98046d3edc6cf0818df8c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 31 Dec 2020 13:51:39 +0000 Subject: ipc: use shutdown(2), base atfork* callback shutdown(2) on a socket can be preferable if there's multiple forked processes writing to a single worker and we really want to shut things down ASAP. It may also be good to provide an ipc_worker_exit method which subclasses can override if needed for graceful shutdown. But we won't need equivalents to atexit(3) since we can rely on DESTROY handlers given this is Perl5. --- lib/PublicInbox/IPC.pm | 49 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 0baa218c..ed10cf44 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -56,8 +56,6 @@ sub ipc_return ($$$) { sub ipc_worker_loop ($$) { my ($self, $s2) = @_; - $self->ipc_atfork_child if $self->can('ipc_atfork_child'); - $s2->autoflush(1); while (my $rec = _get_rec($s2)) { my ($wantarray, $sub, @args) = @$rec; if (!defined($wantarray)) { # no waiting if client doesn't care @@ -73,7 +71,7 @@ sub ipc_worker_loop ($$) { } } -sub ipc_worker_spawn ($$$) { +sub ipc_worker_spawn { my ($self, $ident, $oldset) = @_; return unless $enc; my $pid = $self->{-ipc_worker_pid}; @@ -82,43 +80,62 @@ sub ipc_worker_spawn ($$$) { my ($s1, $s2); socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!"; my $sigset = $oldset // PublicInbox::Sigfd::block_signals(); + my $parent = $$; + $self->ipc_atfork_parent; defined($pid = fork) or die "fork: $!"; if ($pid == 0) { - undef $s1; - local $0 = $ident; + eval { PublicInbox::DS->Reset }; + $self->{-ipc_parent_pid} = $parent; + close $s1 or die "close(\$s1): $!"; + $s2->autoflush(1); $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); + local $0 = $ident; PublicInbox::Sigfd::sig_setmask($oldset); + $self->ipc_atfork_child; eval { ipc_worker_loop($self, $s2) }; - die "worker $ident died: $@\n" if $@; - $self->ipc_at_worker_exit if $self->can('ipc_at_worker_exit'); + die "worker $ident PID:$$ died: $@\n" if $@; exit; } PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset; + close $s2 or die "close(\$s2): $!"; $s1->autoflush(1); $self->{-ipc_sock} = $s1; $self->{-ipc_worker_pid} = $pid; } -sub ipc_reap_worker { # dwaitpid callback +sub ipc_worker_reap { # dwaitpid callback my ($self, $pid) = @_; warn "PID:$pid died with \$?=$?\n" if $?; } +# for base class, override in superclasses +sub ipc_atfork_parent {} +sub ipc_atfork_child {} + +sub ipc_worker_exit { + my (undef, $code) = @_; + exit($code); +} + sub ipc_worker_stop { my ($self) = @_; my $pid; - if (delete $self->{-ipc_sock}) { - $pid = delete $self->{-ipc_worker_pid} or die "no PID?"; - } else { + my $s1 = delete $self->{-ipc_sock} or do { $pid = delete $self->{-ipc_worker_pid} and - die "unexpected PID:$pid"; - } - return unless $pid; - eval { PublicInbox::DS::dwaitpid($pid, \&ipc_reap_worker, $self) }; + die "unexpected PID:$pid without ipc_sock"; + return; + }; + $pid = delete $self->{-ipc_worker_pid} or die "no PID?"; + _send_rec($s1, [ undef, 'ipc_worker_exit', 0 ]); + shutdown($s1, 2) or die "shutdown(\$s1) for PID:$pid"; + eval { + my $reap = $self->can('ipc_worker_reap'); + PublicInbox::DS::dwaitpid($pid, $reap, $self); + }; if ($@) { my $wp = waitpid($pid, 0); $pid == $wp or die "waitpid($pid) returned $wp: \$?=$?"; - ipc_reap_worker($self, $pid); + $self->ipc_worker_reap($pid); } } -- cgit v1.2.3-24-ge0c7