about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-12-31 13:51:39 +0000
committerEric Wong <e@80x24.org>2021-01-01 05:00:39 +0000
commit5bbcd1686c10985168b98046d3edc6cf0818df8c (patch)
treec9336e7eb61cd5995efd7b3569543ac41ce07a48
parentcb2b5984109b2caad941e3a2c952219890079acc (diff)
downloadpublic-inbox-5bbcd1686c10985168b98046d3edc6cf0818df8c.tar.gz
shutdown(2) on a socket can be preferable if there's multiple
forked processes writing to a single worker and we really want
to shut things down ASAP.

It may also be good to provide an ipc_worker_exit method which
subclasses can override if needed for graceful shutdown.  But we
won't need equivalents to atexit(3) since we can rely on DESTROY
handlers given this is Perl5.
-rw-r--r--lib/PublicInbox/IPC.pm49
1 files changed, 33 insertions, 16 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 0baa218c..ed10cf44 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -56,8 +56,6 @@ sub ipc_return ($$$) {
 
 sub ipc_worker_loop ($$) {
         my ($self, $s2) = @_;
-        $self->ipc_atfork_child if $self->can('ipc_atfork_child');
-        $s2->autoflush(1);
         while (my $rec = _get_rec($s2)) {
                 my ($wantarray, $sub, @args) = @$rec;
                 if (!defined($wantarray)) { # no waiting if client doesn't care
@@ -73,7 +71,7 @@ sub ipc_worker_loop ($$) {
         }
 }
 
-sub ipc_worker_spawn ($$$) {
+sub ipc_worker_spawn {
         my ($self, $ident, $oldset) = @_;
         return unless $enc;
         my $pid = $self->{-ipc_worker_pid};
@@ -82,43 +80,62 @@ sub ipc_worker_spawn ($$$) {
         my ($s1, $s2);
         socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!";
         my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
+        my $parent = $$;
+        $self->ipc_atfork_parent;
         defined($pid = fork) or die "fork: $!";
         if ($pid == 0) {
-                undef $s1;
-                local $0 = $ident;
+                eval { PublicInbox::DS->Reset };
+                $self->{-ipc_parent_pid} = $parent;
+                close $s1 or die "close(\$s1): $!";
+                $s2->autoflush(1);
                 $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+                local $0 = $ident;
                 PublicInbox::Sigfd::sig_setmask($oldset);
+                $self->ipc_atfork_child;
                 eval { ipc_worker_loop($self, $s2) };
-                die "worker $ident died: $@\n" if $@;
-                $self->ipc_at_worker_exit if $self->can('ipc_at_worker_exit');
+                die "worker $ident PID:$$ died: $@\n" if $@;
                 exit;
         }
         PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
+        close $s2 or die "close(\$s2): $!";
         $s1->autoflush(1);
         $self->{-ipc_sock} = $s1;
         $self->{-ipc_worker_pid} = $pid;
 }
 
-sub ipc_reap_worker { # dwaitpid callback
+sub ipc_worker_reap { # dwaitpid callback
         my ($self, $pid) = @_;
         warn "PID:$pid died with \$?=$?\n" if $?;
 }
 
+# for base class, override in superclasses
+sub ipc_atfork_parent {}
+sub ipc_atfork_child {}
+
+sub ipc_worker_exit {
+        my (undef, $code) = @_;
+        exit($code);
+}
+
 sub ipc_worker_stop {
         my ($self) = @_;
         my $pid;
-        if (delete $self->{-ipc_sock}) {
-                $pid = delete $self->{-ipc_worker_pid} or die "no PID?";
-        } else {
+        my $s1 = delete $self->{-ipc_sock} or do {
                 $pid = delete $self->{-ipc_worker_pid} and
-                        die "unexpected PID:$pid";
-        }
-        return unless $pid;
-        eval { PublicInbox::DS::dwaitpid($pid, \&ipc_reap_worker, $self) };
+                        die "unexpected PID:$pid without ipc_sock";
+                return;
+        };
+        $pid = delete $self->{-ipc_worker_pid} or die "no PID?";
+        _send_rec($s1, [ undef, 'ipc_worker_exit', 0 ]);
+        shutdown($s1, 2) or die "shutdown(\$s1) for PID:$pid";
+        eval {
+                my $reap = $self->can('ipc_worker_reap');
+                PublicInbox::DS::dwaitpid($pid, $reap, $self);
+        };
         if ($@) {
                 my $wp = waitpid($pid, 0);
                 $pid == $wp or die "waitpid($pid) returned $wp: \$?=$?";
-                ipc_reap_worker($self, $pid);
+                $self->ipc_worker_reap($pid);
         }
 }