about summary refs log tree commit homepage
path: root/t
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-10 12:15:04 +0000
committerEric Wong <e@80x24.org>2021-01-12 03:51:42 +0000
commit862d18680dccc30ef6cc8044da925ec5085911b2 (patch)
tree8d89826248c1d917752444bd88344bbffafcdc01 /t
parent9fd84b74b80eb98855d57f438e88ba9285b3d9a6 (diff)
downloadpublic-inbox-862d18680dccc30ef6cc8044da925ec5085911b2.tar.gz
This will allow any number of younger sibling processes to
communicate with older siblings directly without relying on a
mediator process.  This is intended to be useful for
distributing search work across multiple workers without caring
which worker hits it (we only care about shard members).

And any request sent with this will be able to hit any worker
without locking on our part.

Unix stream sockets with a listener were also considered;
binding to a file on the FS may confuse users given there's
already a socket path for lei(1).  Linux-only Abstract or
autobind sockets are rejected due to lack of portability.

SOCK_SEQPACKET via socketpair(2) was chosen since it's POSIX
2008 and available on FreeBSD 9+ in addition to Linux, and
doesn't require filesystem access.
Diffstat (limited to 't')
-rw-r--r--t/ipc.t66
1 files changed, 66 insertions, 0 deletions
diff --git a/t/ipc.t b/t/ipc.t
index 400fb768..f09f76ef 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -5,6 +5,7 @@ use strict;
 use v5.10.1;
 use Test::More;
 use PublicInbox::TestCommon;
+use Fcntl qw(SEEK_SET);
 require_ok 'PublicInbox::IPC';
 state $once = eval <<'';
 package PublicInbox::IPC;
@@ -15,6 +16,13 @@ sub test_scalarref { \'scalarref' }
 sub test_undef { undef }
 sub test_die { shift; die @_; 'unreachable' }
 sub test_pid { $$ }
+sub test_write_each_fd {
+        my ($self, @args) = @_;
+        for my $fd (0..2) {
+                print { $self->{$fd} } "i=$fd $$ ", @args, "\n";
+                $self->{$fd}->flush;
+        }
+}
 1;
 
 my $ipc = bless {}, 'PublicInbox::IPC';
@@ -102,4 +110,62 @@ SKIP: {
         ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped');
 }
 $ipc->ipc_worker_stop; # idempotent
+
+# work queues
+$ipc->{wq_open_modes} = [qw( >&= >&= >&= )];
+pipe(my ($ra, $wa)) or BAIL_OUT $!;
+pipe(my ($rb, $wb)) or BAIL_OUT $!;
+pipe(my ($rc, $wc)) or BAIL_OUT $!;
+open my $warn, '+>', undef or BAIL_OUT;
+$warn->autoflush(0);
+local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ };
+my @ppids;
+for my $t ('local', 'worker', 'worker again') {
+        $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, 'hello world');
+        my $i = 0;
+        for my $fh ($ra, $rb, $rc) {
+                my $buf = readline($fh);
+                is(chop($buf), "\n", "trailing CR ($t)");
+                like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)");
+                $i++;
+        }
+        $ipc->wq_do('test_die', $wa, $wb, $wc);
+        my $ppid = $ipc->wq_workers_start('wq', 1);
+        push(@ppids, $ppid);
+}
+
+# wq_do works across fork (siblings can feed)
+SKIP: {
+        skip 'Socket::MsgHdr, IO::FDPass, Inline::C missing', 7 if !$ppids[0];
+        is_deeply(\@ppids, [$$, undef, undef],
+                'parent pid returned in wq_workers_start');
+        my $pid = fork // BAIL_OUT $!;
+        if ($pid == 0) {
+                use POSIX qw(_exit);
+                $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, $$);
+                _exit(0);
+        } else {
+                my $i = 0;
+                my ($wpid, @rest) = keys %{$ipc->{-wq_workers}};
+                is(scalar(@rest), 0, 'only one worker');
+                for my $fh ($ra, $rb, $rc) {
+                        my $buf = readline($fh);
+                        is(chop($buf), "\n", "trailing CR #$i");
+                        like($buf, qr/^i=$i $wpid $pid\z/,
+                                'got expected from sibling');
+                        $i++;
+                }
+                is(waitpid($pid, 0), $pid, 'waitpid complete');
+                is($?, 0, 'child wq producer exited');
+        }
+}
+
+$ipc->wq_close;
+seek($warn, 0, SEEK_SET) or BAIL_OUT;
+my @warn = <$warn>;
+is(scalar(@warn), 3, 'warned 3 times');
+like($warn[0], qr/ wq_do: /, '1st warned from wq_do');
+like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
+is($warn[2], $warn[1], 'worker did not die');
+
 done_testing;