about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-10-07 21:24:05 +0000
committerEric Wong <e@80x24.org>2023-10-08 18:54:44 +0000
commitfd799cf3d3f48f0329b125324db173c9d5e4bddf (patch)
treebac1d9c30f92417a89e08e2470b0cf3f97ab9f21
parentc8b757d945040426f8c4cd909c2f05e34be6ccb1 (diff)
downloadpublic-inbox-fd799cf3d3f48f0329b125324db173c9d5e4bddf.tar.gz
None of the lei internals works properly without forking and
sockets.  The fallback code increases the potential to accidentally
call subs in the wrong process during the teardown phase.

We'll still support ipc_do w/o forking for now since it
forking doesn't benefit small indexing runs from -mda and
such.
-rw-r--r--lib/PublicInbox/IPC.pm43
-rw-r--r--t/ipc.t19
2 files changed, 24 insertions, 38 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 068c5623..ba8b5739 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -256,16 +256,12 @@ sub do_sock_stream { # via wq_io_do, for big requests
 
 sub wq_broadcast {
         my ($self, $sub, @args) = @_;
-        if (my $wkr = $self->{-wq_workers}) {
-                my $buf = ipc_freeze([$sub, @args]);
-                for my $bcast1 (values %$wkr) {
-                        my $sock = $bcast1 // $self->{-wq_s1} // next;
-                        send($sock, $buf, 0) // croak "send: $!";
-                        # XXX shouldn't have to deal with EMSGSIZE here...
-                }
-        } else {
-                eval { $self->$sub(@args) };
-                warn "wq_broadcast: $@" if $@;
+        my $wkr = $self->{-wq_workers} or Carp::confess('no -wq_workers');
+        my $buf = ipc_freeze([$sub, @args]);
+        for my $bcast1 (values %$wkr) {
+                my $sock = $bcast1 // $self->{-wq_s1} // next;
+                send($sock, $buf, 0) // croak "send: $!";
+                # XXX shouldn't have to deal with EMSGSIZE here...
         }
 }
 
@@ -291,24 +287,17 @@ sub stream_in_full ($$$) {
 
 sub wq_io_do { # always async
         my ($self, $sub, $ios, @args) = @_;
-        if (my $s1 = $self->{-wq_s1}) { # run in worker
-                my $fds = [ map { fileno($_) } @$ios ];
-                my $buf = ipc_freeze([$sub, @args]);
-                if (length($buf) > $MY_MAX_ARG_STRLEN) {
-                        stream_in_full($s1, $fds, $buf);
-                } else {
-                        my $n = $send_cmd->($s1, $fds, $buf, 0);
-                        return if defined($n); # likely
-                        $!{ETOOMANYREFS} and
-                                croak "sendmsg: $! (check RLIMIT_NOFILE)";
-                        $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
-                                croak("sendmsg: $!");
-                }
+        my $s1 = $self->{-wq_s1} or Carp::confess('no -wq_s1');
+        my $fds = [ map { fileno($_) } @$ios ];
+        my $buf = ipc_freeze([$sub, @args]);
+        if (length($buf) > $MY_MAX_ARG_STRLEN) {
+                stream_in_full($s1, $fds, $buf);
         } else {
-                @$self{0..$#$ios} = @$ios;
-                eval { $self->$sub(@args) };
-                warn "wq_io_do: $@" if $@;
-                delete @$self{0..$#$ios}; # don't close
+                my $n = $send_cmd->($s1, $fds, $buf, 0);
+                return if defined($n); # likely
+                $!{ETOOMANYREFS} and croak "sendmsg: $! (check RLIMIT_NOFILE)";
+                $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+                        croak("sendmsg: $!");
         }
 }
 
diff --git a/t/ipc.t b/t/ipc.t
index 7bdf2218..519ef089 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -1,9 +1,7 @@
 #!perl -w
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict;
-use v5.10.1;
-use Test::More;
+use v5.12;
 use PublicInbox::TestCommon;
 use Fcntl qw(SEEK_SET);
 use PublicInbox::SHA qw(sha1_hex);
@@ -108,7 +106,9 @@ open my $agpl, '<', 'COPYING' or BAIL_OUT "AGPL-3 missing: $!";
 my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!";
 close $agpl or BAIL_OUT "close: $!";
 
-for my $t ('local', 'worker', 'worker again') {
+for my $t ('worker', 'worker again') {
+        my $ppid = $ipc->wq_workers_start('wq', 1);
+        push(@ppids, $ppid);
         $ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
         my $i = 0;
         for my $fh ($ra, $rb, $rc) {
@@ -132,14 +132,12 @@ for my $t ('local', 'worker', 'worker again') {
                 $exp = sha1_hex($bigger)."\n";
                 is(readline($rb), $exp, "SHA WQWorker limit ($t)");
         }
-        my $ppid = $ipc->wq_workers_start('wq', 1);
-        push(@ppids, $ppid);
 }
 
 # wq_io_do works across fork (siblings can feed)
 SKIP: {
         skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0];
-        is_deeply(\@ppids, [$$, undef, undef],
+        is_xdeeply(\@ppids, [$$, undef],
                 'parent pid returned in wq_workers_start');
         my $pid = fork // BAIL_OUT $!;
         if ($pid == 0) {
@@ -173,10 +171,9 @@ SKIP: {
         skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0];
         seek($warn, 0, SEEK_SET) or BAIL_OUT;
         my @warn = <$warn>;
-        is(scalar(@warn), 3, 'warned 3 times');
-        like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do');
-        like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
-        is($warn[2], $warn[1], 'worker did not die');
+        is(scalar(@warn), 2, 'warned 3 times');
+        like($warn[0], qr/ wq_worker: /, '2nd warned from wq_worker');
+        is($warn[0], $warn[1], 'worker did not die');
 
         $SIG{__WARN__} = 'DEFAULT';
         is($ipc->wq_workers_start('wq', 2), $$, 'workers started again');