about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-12-31 13:51:32 +0000
committerEric Wong <e@80x24.org>2021-01-01 05:00:39 +0000
commit3ae5275eb187ed00bb83061a4d37a161bc8eb3e7 (patch)
tree651a0108d3ce80fb75f7075c111293ca9ac01e1e
parentabd0a85b212ce1467ddc94d523152d9a65028960 (diff)
downloadpublic-inbox-3ae5275eb187ed00bb83061a4d37a161bc8eb3e7.tar.gz
I intend to use this with LeiStore when importing from multiple
slow sources at once (e.g. curl, IMAP, etc).  This is because
over.sqlite3 can only have a single writer, and we'll have
several slow readers running in parallel.

Watch and SearchIdxShard should also be able to use this code
in the future, but this will be proven with LeiStore, first.
-rw-r--r--MANIFEST2
-rw-r--r--lib/PublicInbox/IPC.pm129
-rw-r--r--lib/PublicInbox/LeiStore.pm2
-rw-r--r--t/ipc.t67
-rw-r--r--t/lei_store.t5
5 files changed, 204 insertions, 1 deletions
diff --git a/MANIFEST b/MANIFEST
index 7ce2075e..96ad52bf 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -153,6 +153,7 @@ lib/PublicInbox/IMAPD.pm
 lib/PublicInbox/IMAPTracker.pm
 lib/PublicInbox/IMAPdeflate.pm
 lib/PublicInbox/IMAPsearchqp.pm
+lib/PublicInbox/IPC.pm
 lib/PublicInbox/IdxStack.pm
 lib/PublicInbox/Import.pm
 lib/PublicInbox/In2Tie.pm
@@ -327,6 +328,7 @@ t/index-git-times.t
 t/indexlevels-mirror-v1.t
 t/indexlevels-mirror.t
 t/init.t
+t/ipc.t
 t/iso-2202-jp.eml
 t/kqnotify.t
 t/lei-oneshot.t
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
new file mode 100644
index 00000000..c04140ae
--- /dev/null
+++ b/lib/PublicInbox/IPC.pm
@@ -0,0 +1,129 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# base class for remote IPC calls, requires Storable
+# TODO: this ought to be usable in SearchIdxShard
+package PublicInbox::IPC;
+use strict;
+use v5.10.1;
+use Socket qw(AF_UNIX SOCK_STREAM);
+use Carp qw(confess croak);
+use PublicInbox::Sigfd;
+
+sub _get_rec ($) {
+        my ($sock) = @_;
+        local $/ = "\n";
+        defined(my $len = <$sock>) or return;
+        chop($len) eq "\n" or croak "no LF byte in $len";
+        defined(my $r = read($sock, my $buf, $len)) or croak "read error: $!";
+        $r == $len or croak "short read: $r != $len";
+        thaw($buf);
+}
+
+sub _send_rec ($$) {
+        my ($sock, $ref) = @_;
+        my $buf = freeze($ref);
+        print $sock length($buf), "\n", $buf or croak "print: $!";
+}
+
+sub ipc_return ($$$) {
+        my ($s2, $ret, $exc) = @_;
+        _send_rec($s2, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
+}
+
+sub ipc_worker_loop ($$) {
+        my ($self, $s2) = @_;
+        $self->ipc_atfork_child if $self->can('ipc_atfork_child');
+        $s2->autoflush(1);
+        while (my $rec = _get_rec($s2)) {
+                my ($wantarray, $sub, @args) = @$rec;
+                if (!defined($wantarray)) { # no waiting if client doesn't care
+                        eval { $self->$sub(@args) };
+                        eval { warn "die: $@ (from nowait $sub)\n" } if $@;
+                } elsif ($wantarray) {
+                        my @ret = eval { $self->$sub(@args) };
+                        ipc_return($s2, \@ret, $@);
+                } else {
+                        my $ret = eval { $self->$sub(@args) };
+                        ipc_return($s2, \$ret, $@);
+                }
+        }
+}
+
+sub ipc_worker_spawn ($$$) {
+        my ($self, $ident, $oldset) = @_;
+        eval { require Storable; Storable->import(qw(freeze thaw)); };
+        if ($@) {
+                state $w //= warn "Storable (part of Perl) missing: $@\n";
+                return;
+        }
+        my $pid = $self->{-ipc_worker_pid};
+        confess "BUG: already spawned PID:$pid" if $pid;
+        confess "BUG: already have worker socket" if $self->{-ipc_sock};
+        my ($s1, $s2);
+        socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!";
+        my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
+        defined($pid = fork) or die "fork: $!";
+        if ($pid == 0) {
+                undef $s1;
+                local $0 = $ident;
+                $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+                PublicInbox::Sigfd::sig_setmask($oldset);
+                eval { ipc_worker_loop($self, $s2) };
+                die "worker $ident died: $@\n" if $@;
+                $self->ipc_at_worker_exit if $self->can('ipc_at_worker_exit');
+                exit;
+        }
+        PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
+        $s1->autoflush(1);
+        $self->{-ipc_sock} = $s1;
+        $self->{-ipc_worker_pid} = $pid;
+}
+
+sub ipc_reap_worker { # dwaitpid callback
+        my ($self, $pid) = @_;
+        warn "PID:$pid died with \$?=$?\n" if $?;
+}
+
+sub ipc_worker_stop {
+        my ($self) = @_;
+        my $pid;
+        if (delete $self->{-ipc_sock}) {
+                $pid = delete $self->{-ipc_worker_pid} or die "no PID?";
+        } else {
+                $pid = delete $self->{-ipc_worker_pid} and
+                        die "unexpected PID:$pid";
+        }
+        return unless $pid;
+        eval { PublicInbox::DS::dwaitpid($pid, \&ipc_reap_worker, $self) };
+        if ($@) {
+                my $wp = waitpid($pid, 0);
+                $pid == $wp or die "waitpid($pid) returned $wp: \$?=$?";
+                ipc_reap_worker($self, $pid);
+        }
+}
+
+# use this if we have multiple readers reading curl or "pigz -dc"
+# and writing to the same store
+sub ipc_lock_init {
+        my ($self, $f) = @_;
+        require PublicInbox::Lock;
+        $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock'
+}
+
+sub ipc_do {
+        my ($self, $sub, @args) = @_;
+        if (my $s1 = $self->{-ipc_sock}) {
+                my $ipc_lock = $self->{-ipc_lock};
+                my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
+                _send_rec($s1, [ wantarray, $sub, @args ]);
+                return unless defined(wantarray);
+                my $ret = _get_rec($s1) // die "no response on $sub";
+                die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
+                wantarray ? @$ret : $$ret;
+        } else {
+                $self->$sub(@args);
+        }
+}
+
+1;
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index f8383d5e..2745c560 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -9,7 +9,7 @@
 package PublicInbox::LeiStore;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::Lock);
+use parent qw(PublicInbox::Lock PublicInbox::IPC);
 use PublicInbox::SearchIdx qw(crlf_adjust);
 use PublicInbox::ExtSearchIdx;
 use PublicInbox::Import;
diff --git a/t/ipc.t b/t/ipc.t
new file mode 100644
index 00000000..f9c4024b
--- /dev/null
+++ b/t/ipc.t
@@ -0,0 +1,67 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+require_ok 'PublicInbox::IPC';
+state $once = eval <<'';
+package PublicInbox::IPC;
+use strict;
+sub test_array { qw(test array) }
+sub test_scalar { 'scalar' }
+sub test_scalarref { \'scalarref' }
+sub test_undef { undef }
+sub test_die { shift; die @_; 'unreachable' }
+sub test_pid { $$ }
+1;
+
+my $ipc = bless {}, 'PublicInbox::IPC';
+my @t = qw(array scalar scalarref undef);
+my $test = sub {
+        my $x = shift;
+        for my $type (@t) {
+                my $m = "test_$type";
+                my @ret = $ipc->ipc_do($m);
+                my @exp = $ipc->$m;
+                is_deeply(\@ret, \@exp, "wantarray $m $x");
+
+                $ipc->ipc_do($m);
+
+                my $ret = $ipc->ipc_do($m);
+                my $exp = $ipc->$m;
+                is_deeply($ret, $exp, "!wantarray $m $x");
+        }
+        my $ret = eval { $ipc->test_die('phail') };
+        my $exp = $@;
+        $ret = eval { $ipc->ipc_do('test_die', 'phail') };
+        my $err = $@;
+        my %lines;
+        for ($err, $exp) {
+                s/ line (\d+).*//s and $lines{$1}++;
+        }
+        is(scalar keys %lines, 1, 'line numbers match');
+        is((values %lines)[0], 2, '2 hits on same line number');
+        is($err, $exp, "$x die matches");
+        is($ret, undef, "$x die did not return");
+};
+$test->('local');
+
+SKIP: {
+        require_mods(qw(Storable), 16);
+        my $pid = $ipc->ipc_worker_spawn('test worker');
+        ok($pid > 0 && kill(0, $pid), 'worker spawned and running');
+        defined($pid) or BAIL_OUT 'no spawn, no test';
+        is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
+        $test->('worker');
+        {
+                my ($tmp, $for_destroy) = tmpdir();
+                $ipc->ipc_lock_init("$tmp/lock");
+                is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
+        }
+        $ipc->ipc_worker_stop;
+        ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped');
+}
+$ipc->ipc_worker_stop; # idempotent
+done_testing;
diff --git a/t/lei_store.t b/t/lei_store.t
index 03ab5af6..a189f897 100644
--- a/t/lei_store.t
+++ b/t/lei_store.t
@@ -85,4 +85,9 @@ for my $parallel (0, 1) {
         is_deeply(\@kw, [], 'set clobbers all');
 }
 
+SKIP: {
+        require_mods(qw(Storable), 1);
+        ok($lst->can('ipc_do'), 'ipc_do works if we have Storable');
+}
+
 done_testing;