about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-10 12:15:04 +0000
committerEric Wong <e@80x24.org>2021-01-12 03:51:42 +0000
commit862d18680dccc30ef6cc8044da925ec5085911b2 (patch)
tree8d89826248c1d917752444bd88344bbffafcdc01 /lib
parent9fd84b74b80eb98855d57f438e88ba9285b3d9a6 (diff)
downloadpublic-inbox-862d18680dccc30ef6cc8044da925ec5085911b2.tar.gz
This will allow any number of younger sibling processes to
communicate with older siblings directly without relying on a
mediator process.  This is intended to be useful for
distributing search work across multiple workers without caring
which worker hits it (we only care about shard members).

And any request sent with this will be able to hit any worker
without locking on our part.

Unix stream sockets with a listener were also considered;
binding to a file on the FS may confuse users given there's
already a socket path for lei(1).  Linux-only Abstract or
autobind sockets are rejected due to lack of portability.

SOCK_SEQPACKET via socketpair(2) was chosen since it's POSIX
2008 and available on FreeBSD 9+ in addition to Linux, and
doesn't require filesystem access.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/IPC.pm106
1 files changed, 102 insertions, 4 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 5082f110..27ea90de 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -1,14 +1,16 @@
 # Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# base class for remote IPC calls, requires Storable
-# TODO: this ought to be usable in SearchIdxShard
+# base class for remote IPC calls and workqueues, requires Storable or Sereal
 package PublicInbox::IPC;
 use strict;
 use v5.10.1;
 use Carp qw(confess croak);
-use PublicInbox::Sigfd;
+use PublicInbox::DS qw(dwaitpid);
+use PublicInbox::Spawn;
 use POSIX ();
+use Socket qw(AF_UNIX MSG_EOR);
+my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
 my ($enc, $dec);
 # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+
@@ -34,6 +36,17 @@ if ($enc && $dec) { # should be custom ops
         } // warn("Storable (part of Perl) missing: $@\n");
 }
 
+my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
+my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
+        require PublicInbox::CmdIPC4;
+        $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4');
+        PublicInbox::CmdIPC4->can('send_cmd4');
+} // do {
+        require PublicInbox::CmdIPC1;
+        $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
+        PublicInbox::CmdIPC1->can('send_cmd1');
+};
+
 sub _get_rec ($) {
         my ($r) = @_;
         defined(my $len = <$r>) or return;
@@ -144,7 +157,7 @@ sub ipc_worker_stop {
 
         # allow any sibling to send ipc_worker_exit, but siblings can't wait
         return if $$ != $ppid;
-        PublicInbox::DS::dwaitpid($pid, \&ipc_worker_reap, $self);
+        dwaitpid($pid, \&ipc_worker_reap, $self);
 }
 
 # use this if we have multiple readers reading curl or "pigz -dc"
@@ -224,4 +237,89 @@ sub ipc_sibling_atfork_child {
         $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
+sub wq_worker_loop ($$) {
+        my ($self, $s2) = @_;
+        my $buf;
+        my $len = $self->{wq_req_len} // (4096 * 33);
+        my ($rec, $sub, @args);
+        while (1) {
+                my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
+                my $i = 0;
+                my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]};
+                for my $fd (@fds) {
+                        my $mode = shift(@m);
+                        if (open(my $fh, $mode, $fd)) {
+                                $self->{$i++} = $fh;
+                        } else {
+                                die "$$ open($mode$fd) (FD:$i): $!";
+                        }
+                }
+                # Sereal dies, Storable returns undef
+                $rec = thaw($buf) //
+                        die "thaw error on buffer of size:".length($buf);
+                ($sub, @args) = @$rec;
+                eval { $self->$sub(@args) };
+                warn "$$ wq_worker: $@" if $@;
+                delete @$self{0, 1, 2};
+        }
+}
+
+sub wq_do { # always async
+        my ($self, $sub, $in, $out, $err, @args) = @_;
+        if (my $s1 = $self->{-wq_seq}) { # run in worker
+                $_ = fileno($_) for ($in, $out, $err);
+                $send_cmd->($s1, $in, $out, $err,
+                                freeze([$sub, @args]), MSG_EOR);
+        } else {
+                @$self{0, 1, 2} = ($in, $out, $err);
+                eval { $self->$sub(@args) };
+                warn "wq_do: $@" if $@;
+                delete @$self{0, 1, 2};
+        }
+}
+
+# starts workqueue workers if Sereal or Storable is installed
+sub wq_workers_start {
+        my ($self, $ident, $nr_workers, $oldset) = @_;
+        ($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
+        return if $self->{-wq_seq}; # idempotent
+        my ($s1, $s2);
+        socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!";
+        my $sigset = $oldset // PublicInbox::DS::block_signals();
+        $self->ipc_atfork_parent;
+        $nr_workers //= 4;
+        $self->{-wq_workers} = {};
+        for my $i (0..($nr_workers - 1)) {
+                defined(my $pid = fork) or die "fork: $!";
+                if ($pid == 0) {
+                        eval { PublicInbox::DS->Reset };
+                        $s1 = undef;
+                        $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+                        local $0 = $ident."[$i]";
+                        PublicInbox::DS::sig_setmask($oldset);
+                        my $on_destroy = $self->ipc_atfork_child;
+                        eval { wq_worker_loop($self, $s2) };
+                        die "worker $ident PID:$$ died: $@\n" if $@;
+                        exit;
+                } else {
+                        $self->{-wq_workers}->{$pid} = $i;
+                }
+        }
+        PublicInbox::DS::sig_setmask($sigset) unless $oldset;
+        $s2 = undef;
+        $self->{-wq_seq} = $s1;
+        $self->{-wq_ppid} = $$;
+}
+
+sub wq_close {
+        my ($self) = @_;
+        delete $self->{-wq_seq} or return;
+        my $ppid = delete $self->{-wq_ppid} // die 'BUG: no wq_ppid';
+        my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
+        return if $ppid != $$; # can't reap siblings or parents
+        for my $pid (keys %$workers) {
+                dwaitpid($pid, \&ipc_worker_reap, $self);
+        }
+}
+
 1;