diff options
-rw-r--r-- | lib/PublicInbox/IPC.pm | 106 | ||||
-rw-r--r-- | t/ipc.t | 66 |
2 files changed, 168 insertions, 4 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 5082f110..27ea90de 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -1,14 +1,16 @@ # Copyright (C) 2020-2021 all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> -# base class for remote IPC calls, requires Storable -# TODO: this ought to be usable in SearchIdxShard +# base class for remote IPC calls and workqueues, requires Storable or Sereal package PublicInbox::IPC; use strict; use v5.10.1; use Carp qw(confess croak); -use PublicInbox::Sigfd; +use PublicInbox::DS qw(dwaitpid); +use PublicInbox::Spawn; use POSIX (); +use Socket qw(AF_UNIX MSG_EOR); +my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); my ($enc, $dec); # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+ @@ -34,6 +36,17 @@ if ($enc && $dec) { # should be custom ops } // warn("Storable (part of Perl) missing: $@\n"); } +my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4'); +my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do { + require PublicInbox::CmdIPC4; + $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4'); + PublicInbox::CmdIPC4->can('send_cmd4'); +} // do { + require PublicInbox::CmdIPC1; + $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1'); + PublicInbox::CmdIPC1->can('send_cmd1'); +}; + sub _get_rec ($) { my ($r) = @_; defined(my $len = <$r>) or return; @@ -144,7 +157,7 @@ sub ipc_worker_stop { # allow any sibling to send ipc_worker_exit, but siblings can't wait return if $$ != $ppid; - PublicInbox::DS::dwaitpid($pid, \&ipc_worker_reap, $self); + dwaitpid($pid, \&ipc_worker_reap, $self); } # use this if we have multiple readers reading curl or "pigz -dc" @@ -224,4 +237,89 @@ sub ipc_sibling_atfork_child { $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; } +sub wq_worker_loop ($$) { + my ($self, $s2) = @_; + my $buf; + my $len = $self->{wq_req_len} // (4096 * 33); + my ($rec, $sub, @args); + while (1) { + my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF + my $i = 0; + my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]}; + for my $fd (@fds) { + my $mode = shift(@m); + if (open(my $fh, $mode, $fd)) { + $self->{$i++} = $fh; + } else { + die "$$ open($mode$fd) (FD:$i): $!"; + } + } + # Sereal dies, Storable returns undef + $rec = thaw($buf) // + die "thaw error on buffer of size:".length($buf); + ($sub, @args) = @$rec; + eval { $self->$sub(@args) }; + warn "$$ wq_worker: $@" if $@; + delete @$self{0, 1, 2}; + } +} + +sub wq_do { # always async + my ($self, $sub, $in, $out, $err, @args) = @_; + if (my $s1 = $self->{-wq_seq}) { # run in worker + $_ = fileno($_) for ($in, $out, $err); + $send_cmd->($s1, $in, $out, $err, + freeze([$sub, @args]), MSG_EOR); + } else { + @$self{0, 1, 2} = ($in, $out, $err); + eval { $self->$sub(@args) }; + warn "wq_do: $@" if $@; + delete @$self{0, 1, 2}; + } +} + +# starts workqueue workers if Sereal or Storable is installed +sub wq_workers_start { + my ($self, $ident, $nr_workers, $oldset) = @_; + ($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return; + return if $self->{-wq_seq}; # idempotent + my ($s1, $s2); + socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!"; + my $sigset = $oldset // PublicInbox::DS::block_signals(); + $self->ipc_atfork_parent; + $nr_workers //= 4; + $self->{-wq_workers} = {}; + for my $i (0..($nr_workers - 1)) { + defined(my $pid = fork) or die "fork: $!"; + if ($pid == 0) { + eval { PublicInbox::DS->Reset }; + $s1 = undef; + $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); + local $0 = $ident."[$i]"; + PublicInbox::DS::sig_setmask($oldset); + my $on_destroy = $self->ipc_atfork_child; + eval { wq_worker_loop($self, $s2) }; + die "worker $ident PID:$$ died: $@\n" if $@; + exit; + } else { + $self->{-wq_workers}->{$pid} = $i; + } + } + PublicInbox::DS::sig_setmask($sigset) unless $oldset; + $s2 = undef; + $self->{-wq_seq} = $s1; + $self->{-wq_ppid} = $$; +} + +sub wq_close { + my ($self) = @_; + delete $self->{-wq_seq} or return; + my $ppid = delete $self->{-wq_ppid} // die 'BUG: no wq_ppid'; + my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers'; + return if $ppid != $$; # can't reap siblings or parents + for my $pid (keys %$workers) { + dwaitpid($pid, \&ipc_worker_reap, $self); + } +} + 1; @@ -5,6 +5,7 @@ use strict; use v5.10.1; use Test::More; use PublicInbox::TestCommon; +use Fcntl qw(SEEK_SET); require_ok 'PublicInbox::IPC'; state $once = eval <<''; package PublicInbox::IPC; @@ -15,6 +16,13 @@ sub test_scalarref { \'scalarref' } sub test_undef { undef } sub test_die { shift; die @_; 'unreachable' } sub test_pid { $$ } +sub test_write_each_fd { + my ($self, @args) = @_; + for my $fd (0..2) { + print { $self->{$fd} } "i=$fd $$ ", @args, "\n"; + $self->{$fd}->flush; + } +} 1; my $ipc = bless {}, 'PublicInbox::IPC'; @@ -102,4 +110,62 @@ SKIP: { ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped'); } $ipc->ipc_worker_stop; # idempotent + +# work queues +$ipc->{wq_open_modes} = [qw( >&= >&= >&= )]; +pipe(my ($ra, $wa)) or BAIL_OUT $!; +pipe(my ($rb, $wb)) or BAIL_OUT $!; +pipe(my ($rc, $wc)) or BAIL_OUT $!; +open my $warn, '+>', undef or BAIL_OUT; +$warn->autoflush(0); +local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ }; +my @ppids; +for my $t ('local', 'worker', 'worker again') { + $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, 'hello world'); + my $i = 0; + for my $fh ($ra, $rb, $rc) { + my $buf = readline($fh); + is(chop($buf), "\n", "trailing CR ($t)"); + like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)"); + $i++; + } + $ipc->wq_do('test_die', $wa, $wb, $wc); + my $ppid = $ipc->wq_workers_start('wq', 1); + push(@ppids, $ppid); +} + +# wq_do works across fork (siblings can feed) +SKIP: { + skip 'Socket::MsgHdr, IO::FDPass, Inline::C missing', 7 if !$ppids[0]; + is_deeply(\@ppids, [$$, undef, undef], + 'parent pid returned in wq_workers_start'); + my $pid = fork // BAIL_OUT $!; + if ($pid == 0) { + use POSIX qw(_exit); + $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, $$); + _exit(0); + } else { + my $i = 0; + my ($wpid, @rest) = keys %{$ipc->{-wq_workers}}; + is(scalar(@rest), 0, 'only one worker'); + for my $fh ($ra, $rb, $rc) { + my $buf = readline($fh); + is(chop($buf), "\n", "trailing CR #$i"); + like($buf, qr/^i=$i $wpid $pid\z/, + 'got expected from sibling'); + $i++; + } + is(waitpid($pid, 0), $pid, 'waitpid complete'); + is($?, 0, 'child wq producer exited'); + } +} + +$ipc->wq_close; +seek($warn, 0, SEEK_SET) or BAIL_OUT; +my @warn = <$warn>; +is(scalar(@warn), 3, 'warned 3 times'); +like($warn[0], qr/ wq_do: /, '1st warned from wq_do'); +like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker'); +is($warn[2], $warn[1], 'worker did not die'); + done_testing; |