diff options
-rw-r--r-- | MANIFEST | 2 | ||||
-rw-r--r-- | lib/PublicInbox/IPC.pm | 129 | ||||
-rw-r--r-- | lib/PublicInbox/LeiStore.pm | 2 | ||||
-rw-r--r-- | t/ipc.t | 67 | ||||
-rw-r--r-- | t/lei_store.t | 5 |
5 files changed, 204 insertions, 1 deletions
@@ -153,6 +153,7 @@ lib/PublicInbox/IMAPD.pm lib/PublicInbox/IMAPTracker.pm lib/PublicInbox/IMAPdeflate.pm lib/PublicInbox/IMAPsearchqp.pm +lib/PublicInbox/IPC.pm lib/PublicInbox/IdxStack.pm lib/PublicInbox/Import.pm lib/PublicInbox/In2Tie.pm @@ -327,6 +328,7 @@ t/index-git-times.t t/indexlevels-mirror-v1.t t/indexlevels-mirror.t t/init.t +t/ipc.t t/iso-2202-jp.eml t/kqnotify.t t/lei-oneshot.t diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm new file mode 100644 index 00000000..c04140ae --- /dev/null +++ b/lib/PublicInbox/IPC.pm @@ -0,0 +1,129 @@ +# Copyright (C) 2020 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# base class for remote IPC calls, requires Storable +# TODO: this ought to be usable in SearchIdxShard +package PublicInbox::IPC; +use strict; +use v5.10.1; +use Socket qw(AF_UNIX SOCK_STREAM); +use Carp qw(confess croak); +use PublicInbox::Sigfd; + +sub _get_rec ($) { + my ($sock) = @_; + local $/ = "\n"; + defined(my $len = <$sock>) or return; + chop($len) eq "\n" or croak "no LF byte in $len"; + defined(my $r = read($sock, my $buf, $len)) or croak "read error: $!"; + $r == $len or croak "short read: $r != $len"; + thaw($buf); +} + +sub _send_rec ($$) { + my ($sock, $ref) = @_; + my $buf = freeze($ref); + print $sock length($buf), "\n", $buf or croak "print: $!"; +} + +sub ipc_return ($$$) { + my ($s2, $ret, $exc) = @_; + _send_rec($s2, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret); +} + +sub ipc_worker_loop ($$) { + my ($self, $s2) = @_; + $self->ipc_atfork_child if $self->can('ipc_atfork_child'); + $s2->autoflush(1); + while (my $rec = _get_rec($s2)) { + my ($wantarray, $sub, @args) = @$rec; + if (!defined($wantarray)) { # no waiting if client doesn't care + eval { $self->$sub(@args) }; + eval { warn "die: $@ (from nowait $sub)\n" } if $@; + } elsif ($wantarray) { + my @ret = eval { $self->$sub(@args) }; + ipc_return($s2, \@ret, $@); + } else { + my $ret = eval { $self->$sub(@args) }; + ipc_return($s2, \$ret, $@); + } + } +} + +sub ipc_worker_spawn ($$$) { + my ($self, $ident, $oldset) = @_; + eval { require Storable; Storable->import(qw(freeze thaw)); }; + if ($@) { + state $w //= warn "Storable (part of Perl) missing: $@\n"; + return; + } + my $pid = $self->{-ipc_worker_pid}; + confess "BUG: already spawned PID:$pid" if $pid; + confess "BUG: already have worker socket" if $self->{-ipc_sock}; + my ($s1, $s2); + socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!"; + my $sigset = $oldset // PublicInbox::Sigfd::block_signals(); + defined($pid = fork) or die "fork: $!"; + if ($pid == 0) { + undef $s1; + local $0 = $ident; + $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); + PublicInbox::Sigfd::sig_setmask($oldset); + eval { ipc_worker_loop($self, $s2) }; + die "worker $ident died: $@\n" if $@; + $self->ipc_at_worker_exit if $self->can('ipc_at_worker_exit'); + exit; + } + PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset; + $s1->autoflush(1); + $self->{-ipc_sock} = $s1; + $self->{-ipc_worker_pid} = $pid; +} + +sub ipc_reap_worker { # dwaitpid callback + my ($self, $pid) = @_; + warn "PID:$pid died with \$?=$?\n" if $?; +} + +sub ipc_worker_stop { + my ($self) = @_; + my $pid; + if (delete $self->{-ipc_sock}) { + $pid = delete $self->{-ipc_worker_pid} or die "no PID?"; + } else { + $pid = delete $self->{-ipc_worker_pid} and + die "unexpected PID:$pid"; + } + return unless $pid; + eval { PublicInbox::DS::dwaitpid($pid, \&ipc_reap_worker, $self) }; + if ($@) { + my $wp = waitpid($pid, 0); + $pid == $wp or die "waitpid($pid) returned $wp: \$?=$?"; + ipc_reap_worker($self, $pid); + } +} + +# use this if we have multiple readers reading curl or "pigz -dc" +# and writing to the same store +sub ipc_lock_init { + my ($self, $f) = @_; + require PublicInbox::Lock; + $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock' +} + +sub ipc_do { + my ($self, $sub, @args) = @_; + if (my $s1 = $self->{-ipc_sock}) { + my $ipc_lock = $self->{-ipc_lock}; + my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef; + _send_rec($s1, [ wantarray, $sub, @args ]); + return unless defined(wantarray); + my $ret = _get_rec($s1) // die "no response on $sub"; + die $$ret if ref($ret) eq 'PublicInbox::IPC::Die'; + wantarray ? @$ret : $$ret; + } else { + $self->$sub(@args); + } +} + +1; diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index f8383d5e..2745c560 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -9,7 +9,7 @@ package PublicInbox::LeiStore; use strict; use v5.10.1; -use parent qw(PublicInbox::Lock); +use parent qw(PublicInbox::Lock PublicInbox::IPC); use PublicInbox::SearchIdx qw(crlf_adjust); use PublicInbox::ExtSearchIdx; use PublicInbox::Import; diff --git a/t/ipc.t b/t/ipc.t new file mode 100644 index 00000000..f9c4024b --- /dev/null +++ b/t/ipc.t @@ -0,0 +1,67 @@ +#!perl -w +# Copyright (C) 2020 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; +use v5.10.1; +use Test::More; +use PublicInbox::TestCommon; +require_ok 'PublicInbox::IPC'; +state $once = eval <<''; +package PublicInbox::IPC; +use strict; +sub test_array { qw(test array) } +sub test_scalar { 'scalar' } +sub test_scalarref { \'scalarref' } +sub test_undef { undef } +sub test_die { shift; die @_; 'unreachable' } +sub test_pid { $$ } +1; + +my $ipc = bless {}, 'PublicInbox::IPC'; +my @t = qw(array scalar scalarref undef); +my $test = sub { + my $x = shift; + for my $type (@t) { + my $m = "test_$type"; + my @ret = $ipc->ipc_do($m); + my @exp = $ipc->$m; + is_deeply(\@ret, \@exp, "wantarray $m $x"); + + $ipc->ipc_do($m); + + my $ret = $ipc->ipc_do($m); + my $exp = $ipc->$m; + is_deeply($ret, $exp, "!wantarray $m $x"); + } + my $ret = eval { $ipc->test_die('phail') }; + my $exp = $@; + $ret = eval { $ipc->ipc_do('test_die', 'phail') }; + my $err = $@; + my %lines; + for ($err, $exp) { + s/ line (\d+).*//s and $lines{$1}++; + } + is(scalar keys %lines, 1, 'line numbers match'); + is((values %lines)[0], 2, '2 hits on same line number'); + is($err, $exp, "$x die matches"); + is($ret, undef, "$x die did not return"); +}; +$test->('local'); + +SKIP: { + require_mods(qw(Storable), 16); + my $pid = $ipc->ipc_worker_spawn('test worker'); + ok($pid > 0 && kill(0, $pid), 'worker spawned and running'); + defined($pid) or BAIL_OUT 'no spawn, no test'; + is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned'); + $test->('worker'); + { + my ($tmp, $for_destroy) = tmpdir(); + $ipc->ipc_lock_init("$tmp/lock"); + is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned'); + } + $ipc->ipc_worker_stop; + ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped'); +} +$ipc->ipc_worker_stop; # idempotent +done_testing; diff --git a/t/lei_store.t b/t/lei_store.t index 03ab5af6..a189f897 100644 --- a/t/lei_store.t +++ b/t/lei_store.t @@ -85,4 +85,9 @@ for my $parallel (0, 1) { is_deeply(\@kw, [], 'set clobbers all'); } +SKIP: { + require_mods(qw(Storable), 1); + ok($lst->can('ipc_do'), 'ipc_do works if we have Storable'); +} + done_testing; |