From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id AB5421FA13 for ; Fri, 29 Jan 2021 07:43:00 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 3/7] ipc: more consistent behavior between worker types Date: Fri, 29 Jan 2021 12:42:56 +0500 Message-Id: <20210129074300.14475-4-e@80x24.org> In-Reply-To: <20210129074300.14475-1-e@80x24.org> References: <20210129074300.14475-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Localize signals inside the respective worker loops in case there's circular references. We'll also rely on OnDestroy to trigger exits from the ipc_worker_loop like we do with wq_worker_loop. And also add some more developer documentation to help future developers. The default signals remain different, for now. Cleanup some unnecessary "use" statements while we're loading OnDestroy. --- lib/PublicInbox/IPC.pm | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 838f9530..ece0e8b8 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -2,16 +2,20 @@ # License: AGPL-3.0+ # base class for remote IPC calls and workqueues, requires Storable or Sereal +# - ipc_do and ipc_worker_* is for a single worker/producer and uses pipes +# - wq_do and wq_worker* is for a single producer and multiple workers, +# using SOCK_SEQPACKET for work distribution +# use ipc_do when you need work done on a certain process +# use wq_do when your work can be done on any idle worker package PublicInbox::IPC; use strict; use v5.10.1; use Carp qw(confess croak); use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn; -use POSIX qw(mkfifo WNOHANG); +use PublicInbox::OnDestroy; use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); use Errno qw(EMSGSIZE); -use File::Temp 0.19 (); # 0.19 for ->newdir my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); my $WQ_MAX_WORKERS = 4096; @@ -107,16 +111,22 @@ sub ipc_worker_spawn { if ($pid == 0) { srand($seed); eval { PublicInbox::DS->Reset }; - delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; + delete @$self{qw(-wq_s1 -wq_s2 -wq_workers -wq_ppid)}; $w_req = $r_res = undef; $w_res->autoflush(1); $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); local $0 = $ident; PublicInbox::DS::sig_setmask($sigset); + # ensure we properly exit even if warn() dies: + my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); my $on_destroy = $self->ipc_atfork_child; - eval { ipc_worker_loop($self, $r_req, $w_res) }; + eval { + local %SIG = %SIG; + ipc_worker_loop($self, $r_req, $w_res); + }; die "worker $ident PID:$$ died: $@\n" if $@; - exit; + undef $on_destroy; + undef $end; # trigger exit } PublicInbox::DS::sig_setmask($sigset) unless $oldset; $r_req = $w_res = undef; @@ -320,14 +330,17 @@ sub _wq_worker_start ($$$) { eval { PublicInbox::DS->Reset }; delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; @$self{keys %$fields} = values(%$fields) if $fields; - $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN)); - $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD)); + $SIG{$_} = 'IGNORE' for (qw(PIPE)); + $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD)); local $0 = $self->{-wq_ident}; PublicInbox::DS::sig_setmask($oldset); # ensure we properly exit even if warn() dies: my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); my $on_destroy = $self->ipc_atfork_child; - eval { wq_worker_loop($self) }; + eval { + local %SIG = %SIG; + wq_worker_loop($self); + }; warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; undef $on_destroy; undef $end; # trigger exit