about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-29 12:42:56 +0500
committerEric Wong <e@80x24.org>2021-01-30 01:08:18 +0000
commit89c34c8ea543ade16e5a68bf1c2b83bf885c46ea (patch)
tree3286898dfc782e2102f322b803992e63aa8d8bce /lib
parentcc2eca91049da7e9bd4ddc8c19e85dd47913eb79 (diff)
downloadpublic-inbox-89c34c8ea543ade16e5a68bf1c2b83bf885c46ea.tar.gz
Localize signals inside the respective worker loops
in case there's circular references.

We'll also rely on OnDestroy to trigger exits from the
ipc_worker_loop like we do with wq_worker_loop.  And
also add some more developer documentation to help future
developers.

The default signals remain different, for now.
Cleanup some unnecessary "use" statements while we're
loading OnDestroy.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/IPC.pm29
1 files changed, 21 insertions, 8 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 838f9530..ece0e8b8 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -2,16 +2,20 @@
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # base class for remote IPC calls and workqueues, requires Storable or Sereal
+# - ipc_do and ipc_worker_* is for a single worker/producer and uses pipes
+# - wq_do and wq_worker* is for a single producer and multiple workers,
+#   using SOCK_SEQPACKET for work distribution
+# use ipc_do when you need work done on a certain process
+# use wq_do when your work can be done on any idle worker
 package PublicInbox::IPC;
 use strict;
 use v5.10.1;
 use Carp qw(confess croak);
 use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn;
-use POSIX qw(mkfifo WNOHANG);
+use PublicInbox::OnDestroy;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
 use Errno qw(EMSGSIZE);
-use File::Temp 0.19 (); # 0.19 for ->newdir
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
 my $WQ_MAX_WORKERS = 4096;
@@ -107,16 +111,22 @@ sub ipc_worker_spawn {
         if ($pid == 0) {
                 srand($seed);
                 eval { PublicInbox::DS->Reset };
-                delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
+                delete @$self{qw(-wq_s1 -wq_s2 -wq_workers -wq_ppid)};
                 $w_req = $r_res = undef;
                 $w_res->autoflush(1);
                 $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
                 local $0 = $ident;
                 PublicInbox::DS::sig_setmask($sigset);
+                # ensure we properly exit even if warn() dies:
+                my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                 my $on_destroy = $self->ipc_atfork_child;
-                eval { ipc_worker_loop($self, $r_req, $w_res) };
+                eval {
+                        local %SIG = %SIG;
+                        ipc_worker_loop($self, $r_req, $w_res);
+                };
                 die "worker $ident PID:$$ died: $@\n" if $@;
-                exit;
+                undef $on_destroy;
+                undef $end; # trigger exit
         }
         PublicInbox::DS::sig_setmask($sigset) unless $oldset;
         $r_req = $w_res = undef;
@@ -320,14 +330,17 @@ sub _wq_worker_start ($$$) {
                 eval { PublicInbox::DS->Reset };
                 delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
                 @$self{keys %$fields} = values(%$fields) if $fields;
-                $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN));
-                $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD));
+                $SIG{$_} = 'IGNORE' for (qw(PIPE));
+                $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
                 local $0 = $self->{-wq_ident};
                 PublicInbox::DS::sig_setmask($oldset);
                 # ensure we properly exit even if warn() dies:
                 my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                 my $on_destroy = $self->ipc_atfork_child;
-                eval { wq_worker_loop($self) };
+                eval {
+                        local %SIG = %SIG;
+                        wq_worker_loop($self);
+                };
                 warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
                 undef $on_destroy;
                 undef $end; # trigger exit