diff options
author | Eric Wong <e@80x24.org> | 2021-01-29 12:42:56 +0500 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2021-01-30 01:08:18 +0000 |
commit | 89c34c8ea543ade16e5a68bf1c2b83bf885c46ea (patch) | |
tree | 3286898dfc782e2102f322b803992e63aa8d8bce /lib | |
parent | cc2eca91049da7e9bd4ddc8c19e85dd47913eb79 (diff) | |
download | public-inbox-89c34c8ea543ade16e5a68bf1c2b83bf885c46ea.tar.gz |
Localize signals inside the respective worker loops in case there's circular references. We'll also rely on OnDestroy to trigger exits from the ipc_worker_loop like we do with wq_worker_loop. And also add some more developer documentation to help future developers. The default signals remain different, for now. Cleanup some unnecessary "use" statements while we're loading OnDestroy.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/PublicInbox/IPC.pm | 29 |
1 files changed, 21 insertions, 8 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 838f9530..ece0e8b8 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -2,16 +2,20 @@ # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # base class for remote IPC calls and workqueues, requires Storable or Sereal +# - ipc_do and ipc_worker_* is for a single worker/producer and uses pipes +# - wq_do and wq_worker* is for a single producer and multiple workers, +# using SOCK_SEQPACKET for work distribution +# use ipc_do when you need work done on a certain process +# use wq_do when your work can be done on any idle worker package PublicInbox::IPC; use strict; use v5.10.1; use Carp qw(confess croak); use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn; -use POSIX qw(mkfifo WNOHANG); +use PublicInbox::OnDestroy; use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); use Errno qw(EMSGSIZE); -use File::Temp 0.19 (); # 0.19 for ->newdir my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); my $WQ_MAX_WORKERS = 4096; @@ -107,16 +111,22 @@ sub ipc_worker_spawn { if ($pid == 0) { srand($seed); eval { PublicInbox::DS->Reset }; - delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; + delete @$self{qw(-wq_s1 -wq_s2 -wq_workers -wq_ppid)}; $w_req = $r_res = undef; $w_res->autoflush(1); $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); local $0 = $ident; PublicInbox::DS::sig_setmask($sigset); + # ensure we properly exit even if warn() dies: + my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); my $on_destroy = $self->ipc_atfork_child; - eval { ipc_worker_loop($self, $r_req, $w_res) }; + eval { + local %SIG = %SIG; + ipc_worker_loop($self, $r_req, $w_res); + }; die "worker $ident PID:$$ died: $@\n" if $@; - exit; + undef $on_destroy; + undef $end; # trigger exit } PublicInbox::DS::sig_setmask($sigset) unless $oldset; $r_req = $w_res = undef; @@ -320,14 +330,17 @@ sub _wq_worker_start ($$$) { eval { PublicInbox::DS->Reset }; delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; @$self{keys %$fields} = values(%$fields) if $fields; - $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN)); - $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD)); + $SIG{$_} = 'IGNORE' for (qw(PIPE)); + $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD)); local $0 = $self->{-wq_ident}; PublicInbox::DS::sig_setmask($oldset); # ensure we properly exit even if warn() dies: my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); my $on_destroy = $self->ipc_atfork_child; - eval { wq_worker_loop($self) }; + eval { + local %SIG = %SIG; + wq_worker_loop($self); + }; warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; undef $on_destroy; undef $end; # trigger exit |