From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.9 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id B6FC11FFA1 for ; Sun, 10 Jan 2021 12:15:20 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 16/22] ipc: fix IO::FDPass use with a worker limit of 1 Date: Sun, 10 Jan 2021 12:15:13 +0000 Message-Id: <20210110121519.17044-17-e@80x24.org> In-Reply-To: <20210110121519.17044-1-e@80x24.org> References: <20210110121519.17044-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: IO::FDPass is our last choice for implementing the workqueue because its lack of atomicity makes it impossible to guarantee all requests of a single group hit a single worker out of many. So the only way to use IO::FDPass for workqueues it to only have a single worker. A single worker still buys us a small amount of parallelism because of the parent process. --- lib/PublicInbox/IPC.pm | 34 +++++++++++++++++++++++++++++++--- t/ipc.t | 18 +++++++++++------- 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index b0a0bfb5..e6a1082c 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -12,6 +12,7 @@ use POSIX qw(WNOHANG); use Socket qw(AF_UNIX MSG_EOR); my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); +my $WQ_MAX_WORKERS = 4096; my ($enc, $dec); # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+ # and eliminate method call overhead @@ -36,17 +37,39 @@ if ($enc && $dec) { # should be custom ops } // warn("Storable (part of Perl) missing: $@\n"); } +my $recv_cmd1; # PublicInbox::CmdIPC1::recv_cmd1; my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4'); my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do { require PublicInbox::CmdIPC4; $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4'); PublicInbox::CmdIPC4->can('send_cmd4'); } // do { + # IO::FDPass only allows sending a single FD at-a-time, which + # means we can't guarantee all packets end up on the same worker, + # so we cap WQ_MAX_WORKERS require PublicInbox::CmdIPC1; - $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1'); + $recv_cmd1 = PublicInbox::CmdIPC1->can('recv_cmd1'); + $WQ_MAX_WORKERS = 1 if $recv_cmd1; + wq_set_recv_fds(3); PublicInbox::CmdIPC1->can('send_cmd1'); }; +# needed to tell recv_cmd1 how many times to loop IO::FDPass::recv +sub wq_set_recv_fds { + return unless $recv_cmd1; + my $nfds = pop; + my $sub = sub { + my ($sock, $fds, undef, $flags) = @_; + $recv_cmd1->($sock, $fds, $_[2], $flags, $nfds); + }; + my $self = pop; + if (ref $self) { + $self->{-wq_recv_cmd} = $sub; + } else { + $recv_cmd = $sub; + } +} + sub _get_rec ($) { my ($r) = @_; defined(my $len = <$r>) or return; @@ -237,8 +260,9 @@ sub wq_worker_loop ($) { local $SIG{PIPE} = sub { die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub; }; + my $rcv = $self->{-wq_recv_cmd} // $recv_cmd; until ($self->{-wq_quit}) { - my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF + my (@fds) = $rcv->($s2, $buf, $len) or return; # EOF my $i = 0; my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]}; for my $fd (@fds) { @@ -305,6 +329,7 @@ sub wq_workers_start { socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!"; $self->ipc_atfork_parent; $nr_workers //= 4; + $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS; my $sigset = $oldset // PublicInbox::DS::block_signals(); $self->{-wq_workers} = {}; $self->{-wq_ident} = $ident; @@ -318,6 +343,7 @@ sub wq_workers_start { sub wq_worker_incr { # SIGTTIN handler my ($self, $oldset) = @_; $self->{-wq_s2} or return; + return if wq_workers($self) >= $WQ_MAX_WORKERS; $self->ipc_atfork_parent; my $sigset = $oldset // PublicInbox::DS::block_signals(); _wq_worker_start($self, $sigset); @@ -331,7 +357,7 @@ sub wq_exit { # wakes up wq_worker_decr_wait sub wq_worker_decr { # SIGTTOU handler, kills first idle worker my ($self) = @_; - my $workers = $self->{-wq_workers} or return; + return unless wq_workers($self); my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2'; $self->wq_do('wq_exit', [ $s2, $s2, $s2 ]); $self->{-wq_exit_pending}++; @@ -377,6 +403,8 @@ sub wq_close { } } +sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS } + sub DESTROY { wq_close($_[0]); ipc_worker_stop($_[0]); diff --git a/t/ipc.t b/t/ipc.t index d2b6ad4f..fd290809 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -171,13 +171,17 @@ is($warn[2], $warn[1], 'worker did not die'); $SIG{__WARN__} = 'DEFAULT'; is($ipc->wq_workers_start('wq', 1), $$, 'workers started again'); is($ipc->wq_workers, 1, '1 worker started'); -$ipc->wq_worker_incr; -is($ipc->wq_workers, 2, 'worker count bumped'); -$ipc->wq_worker_decr; -$ipc->wq_worker_decr_wait(10); -is($ipc->wq_workers, 1, 'worker count lowered'); -is($ipc->wq_workers(2), 2, 'worker count set'); -is($ipc->wq_workers, 2, 'worker count stayed set'); +SKIP: { + $ipc->WQ_MAX_WORKERS > 1 or + skip 'Inline::C or Socket::MsgHdr not available', 4; + $ipc->wq_worker_incr; + is($ipc->wq_workers, 2, 'worker count bumped'); + $ipc->wq_worker_decr; + $ipc->wq_worker_decr_wait(10); + is($ipc->wq_workers, 1, 'worker count lowered'); + is($ipc->wq_workers(2), 2, 'worker count set'); + is($ipc->wq_workers, 2, 'worker count stayed set'); +} $ipc->wq_close; is($ipc->wq_workers, undef, 'workers undef after close');