diff options
Diffstat (limited to 'lib/PublicInbox/IPC.pm')
-rw-r--r-- | lib/PublicInbox/IPC.pm | 106 |
1 files changed, 102 insertions, 4 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 5082f110..27ea90de 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -1,14 +1,16 @@ # Copyright (C) 2020-2021 all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> -# base class for remote IPC calls, requires Storable -# TODO: this ought to be usable in SearchIdxShard +# base class for remote IPC calls and workqueues, requires Storable or Sereal package PublicInbox::IPC; use strict; use v5.10.1; use Carp qw(confess croak); -use PublicInbox::Sigfd; +use PublicInbox::DS qw(dwaitpid); +use PublicInbox::Spawn; use POSIX (); +use Socket qw(AF_UNIX MSG_EOR); +my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); my ($enc, $dec); # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+ @@ -34,6 +36,17 @@ if ($enc && $dec) { # should be custom ops } // warn("Storable (part of Perl) missing: $@\n"); } +my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4'); +my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do { + require PublicInbox::CmdIPC4; + $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4'); + PublicInbox::CmdIPC4->can('send_cmd4'); +} // do { + require PublicInbox::CmdIPC1; + $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1'); + PublicInbox::CmdIPC1->can('send_cmd1'); +}; + sub _get_rec ($) { my ($r) = @_; defined(my $len = <$r>) or return; @@ -144,7 +157,7 @@ sub ipc_worker_stop { # allow any sibling to send ipc_worker_exit, but siblings can't wait return if $$ != $ppid; - PublicInbox::DS::dwaitpid($pid, \&ipc_worker_reap, $self); + dwaitpid($pid, \&ipc_worker_reap, $self); } # use this if we have multiple readers reading curl or "pigz -dc" @@ -224,4 +237,89 @@ sub ipc_sibling_atfork_child { $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; } +sub wq_worker_loop ($$) { + my ($self, $s2) = @_; + my $buf; + my $len = $self->{wq_req_len} // (4096 * 33); + my ($rec, $sub, @args); + while (1) { + my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF + my $i = 0; + my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]}; + for my $fd (@fds) { + my $mode = shift(@m); + if (open(my $fh, $mode, $fd)) { + $self->{$i++} = $fh; + } else { + die "$$ open($mode$fd) (FD:$i): $!"; + } + } + # Sereal dies, Storable returns undef + $rec = thaw($buf) // + die "thaw error on buffer of size:".length($buf); + ($sub, @args) = @$rec; + eval { $self->$sub(@args) }; + warn "$$ wq_worker: $@" if $@; + delete @$self{0, 1, 2}; + } +} + +sub wq_do { # always async + my ($self, $sub, $in, $out, $err, @args) = @_; + if (my $s1 = $self->{-wq_seq}) { # run in worker + $_ = fileno($_) for ($in, $out, $err); + $send_cmd->($s1, $in, $out, $err, + freeze([$sub, @args]), MSG_EOR); + } else { + @$self{0, 1, 2} = ($in, $out, $err); + eval { $self->$sub(@args) }; + warn "wq_do: $@" if $@; + delete @$self{0, 1, 2}; + } +} + +# starts workqueue workers if Sereal or Storable is installed +sub wq_workers_start { + my ($self, $ident, $nr_workers, $oldset) = @_; + ($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return; + return if $self->{-wq_seq}; # idempotent + my ($s1, $s2); + socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!"; + my $sigset = $oldset // PublicInbox::DS::block_signals(); + $self->ipc_atfork_parent; + $nr_workers //= 4; + $self->{-wq_workers} = {}; + for my $i (0..($nr_workers - 1)) { + defined(my $pid = fork) or die "fork: $!"; + if ($pid == 0) { + eval { PublicInbox::DS->Reset }; + $s1 = undef; + $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); + local $0 = $ident."[$i]"; + PublicInbox::DS::sig_setmask($oldset); + my $on_destroy = $self->ipc_atfork_child; + eval { wq_worker_loop($self, $s2) }; + die "worker $ident PID:$$ died: $@\n" if $@; + exit; + } else { + $self->{-wq_workers}->{$pid} = $i; + } + } + PublicInbox::DS::sig_setmask($sigset) unless $oldset; + $s2 = undef; + $self->{-wq_seq} = $s1; + $self->{-wq_ppid} = $$; +} + +sub wq_close { + my ($self) = @_; + delete $self->{-wq_seq} or return; + my $ppid = delete $self->{-wq_ppid} // die 'BUG: no wq_ppid'; + my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers'; + return if $ppid != $$; # can't reap siblings or parents + for my $pid (keys %$workers) { + dwaitpid($pid, \&ipc_worker_reap, $self); + } +} + 1; |