From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 59E951FA00 for ; Sun, 21 Feb 2021 07:41:35 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 3/7] ipc: add wq_broadcast Date: Sun, 21 Feb 2021 07:41:30 +0000 Message-Id: <20210221074134.15084-4-e@80x24.org> In-Reply-To: <20210221074134.15084-1-e@80x24.org> References: <20210221074134.15084-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We'll give workqueues a broadcast mechanism to ensure all workers see a certain message. We'll also tag each worker with {-wq_worker_nr} in preparation for work distribution. This is intended to avoid extra connection and fork() costs from LeiAuth in a future commit. --- lib/PublicInbox/IPC.pm | 30 ++++++++++++++++++++++++---- lib/PublicInbox/WQWorker.pm | 8 ++++---- t/ipc.t | 39 ++++++++++++++++++++++--------------- 3 files changed, 53 insertions(+), 24 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index efac4c4d..2aeb6462 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -240,8 +240,9 @@ sub recv_and_run { } sub wq_worker_loop ($) { - my ($self) = @_; + my ($self, $bcast_a) = @_; my $wqw = PublicInbox::WQWorker->new($self); + PublicInbox::WQWorker->new($self, '-wq_bcast2'); PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} }); PublicInbox::DS->EventLoop; PublicInbox::DS->Reset; @@ -252,6 +253,20 @@ sub do_sock_stream { # via wq_io_do, for big requests recv_and_run($self, delete $self->{0}, $len, 1); } +sub wq_broadcast { + my ($self, $sub, @args) = @_; + if (my $wkr = $self->{-wq_workers}) { + for my $bcast1 (values %$wkr) { + my $buf = ipc_freeze([$sub, @args]); + send($bcast1, $buf, MSG_EOR) // croak "send: $!"; + # XXX shouldn't have to deal with EMSGSIZE here... + } + } else { + eval { $self->$sub(@args) }; + warn "wq_broadcast: $@" if $@; + } +} + sub wq_io_do { # always async my ($self, $sub, $ios, @args) = @_; if (my $s1 = $self->{-wq_s1}) { # run in worker @@ -284,15 +299,21 @@ sub wq_io_do { # always async sub _wq_worker_start ($$$) { my ($self, $oldset, $fields) = @_; + my ($bcast1, $bcast2); + socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or + die "socketpair: $!"; my $seed = rand(0xffffffff); my $pid = fork // die "fork: $!"; if ($pid == 0) { srand($seed); + undef $bcast1; eval { PublicInbox::DS->Reset }; - delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; + delete @$self{qw(-wq_s1 -wq_ppid)}; + $self->{-wq_worker_nr} = + keys %{delete($self->{-wq_workers}) // {}}; $SIG{$_} = 'IGNORE' for (qw(PIPE)); $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD)); - local $0 = $self->{-wq_ident}; + local $0 = "$self->{-wq_ident} $self->{-wq_worker_nr}"; # ensure we properly exit even if warn() dies: my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); eval { @@ -301,12 +322,13 @@ sub _wq_worker_start ($$$) { my $on_destroy = $self->ipc_atfork_child; local %SIG = %SIG; PublicInbox::DS::sig_setmask($oldset); + $self->{-wq_bcast2} = $bcast2; wq_worker_loop($self); }; warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; undef $end; # trigger exit } else { - $self->{-wq_workers}->{$pid} = \undef; + $self->{-wq_workers}->{$pid} = $bcast1; } } diff --git a/lib/PublicInbox/WQWorker.pm b/lib/PublicInbox/WQWorker.pm index 25a5e4fb..3636321e 100644 --- a/lib/PublicInbox/WQWorker.pm +++ b/lib/PublicInbox/WQWorker.pm @@ -11,10 +11,10 @@ use Errno qw(EAGAIN ECONNRESET); use IO::Handle (); # blocking sub new { - my (undef, $wq) = @_; - my $s2 = $wq->{-wq_s2} // die 'BUG: no -wq_s2'; + my ($cls, $wq, $field) = @_; + my $s2 = $wq->{$field // '-wq_s2'} // die "BUG: no {$field}"; $s2->blocking(0); - my $self = bless { sock => $s2, wq => $wq }, __PACKAGE__; + my $self = bless { sock => $s2, wq => $wq }, $cls; $self->SUPER::new($s2, EPOLLEXCLUSIVE|EPOLLIN|EPOLLET); $self; } @@ -27,7 +27,7 @@ sub event_step { } while ($n); return if !defined($n) && $! == EAGAIN; # likely warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET; - $self->{wq}->wq_atexit_child; + $self->{wq}->wq_atexit_child if $self->{sock} == $self->{wq}->{-wq_s2}; $self->close; # PublicInbox::DS::close } diff --git a/t/ipc.t b/t/ipc.t index 345024bd..ca88eb59 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -9,6 +9,7 @@ use Fcntl qw(SEEK_SET); use Digest::SHA qw(sha1_hex); require_mods(qw(Storable||Sereal)); require_ok 'PublicInbox::IPC'; +my ($tmpdir, $for_destroy) = tmpdir(); state $once = eval <<''; package PublicInbox::IPC; use strict; @@ -31,6 +32,12 @@ sub test_sha { print { $self->{1} } sha1_hex($buf), "\n"; $self->{1}->flush; } +sub test_append_pid { + my ($self, $file) = @_; + open my $fh, '>>', $file or die "open: $!"; + $fh->autoflush(1); + print $fh "$$\n" or die "print: $!"; +} 1; my $ipc = bless {}, 'PublicInbox::IPC'; @@ -83,11 +90,8 @@ $test->('local'); defined($pid) or BAIL_OUT 'no spawn, no test'; is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned'); $test->('worker'); - { - my ($tmp, $for_destroy) = tmpdir(); - $ipc->ipc_lock_init("$tmp/lock"); - is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned'); - } + $ipc->ipc_lock_init("$tmpdir/lock"); + is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned'); $ipc->ipc_worker_stop; ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped'); } @@ -167,18 +171,21 @@ SKIP: { $SIG{__WARN__} = 'DEFAULT'; is($ipc->wq_workers_start('wq', 1), $$, 'workers started again'); is($ipc->wq_workers, 1, '1 worker started'); - SKIP: { - $ipc->WQ_MAX_WORKERS > 1 or - skip 'Inline::C or Socket::MsgHdr not available', 4; - $ipc->wq_worker_incr; - is($ipc->wq_workers, 2, 'worker count bumped'); - $ipc->wq_worker_decr; - $ipc->wq_worker_decr_wait(10); - is($ipc->wq_workers, 1, 'worker count lowered'); - is($ipc->wq_workers(2), 2, 'worker count set'); - is($ipc->wq_workers, 2, 'worker count stayed set'); - } + + $ipc->wq_worker_incr; + is($ipc->wq_workers, 2, 'worker count bumped'); + $ipc->wq_worker_decr; + $ipc->wq_worker_decr_wait(10); + is($ipc->wq_workers, 1, 'worker count lowered'); + is($ipc->wq_workers(2), 2, 'worker count set'); + is($ipc->wq_workers, 2, 'worker count stayed set'); + + $ipc->wq_broadcast('test_append_pid', "$tmpdir/append_pid"); $ipc->wq_close; + open my $fh, '<', "$tmpdir/append_pid" or BAIL_OUT "open: $!"; + chomp(my @pids = <$fh>); + my %pids = map { $_ => 1 } grep(/\A[0-9]+\z/, @pids); + is(scalar keys %pids, 2, 'broadcast hit both PIDs'); is($ipc->wq_workers, undef, 'workers undef after close'); }