diff options
Diffstat (limited to 't/ipc.t')
-rw-r--r-- | t/ipc.t | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/t/ipc.t b/t/ipc.t new file mode 100644 index 00000000..23ae2e7b --- /dev/null +++ b/t/ipc.t @@ -0,0 +1,195 @@ +#!perl -w +# Copyright (C) all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use v5.12; +use PublicInbox::TestCommon; +use Fcntl qw(SEEK_SET); +use PublicInbox::SHA qw(sha1_hex); +require_mods(qw(Storable||Sereal)); +require_ok 'PublicInbox::IPC'; +my ($tmpdir, $for_destroy) = tmpdir(); +state $once = eval <<''; +package PublicInbox::IPC; +use strict; +use PublicInbox::SHA qw(sha1_hex); +sub test_array { qw(test array) } +sub test_scalar { 'scalar' } +sub test_scalarref { \'scalarref' } +sub test_undef { undef } +sub test_die { shift; die @_; 'unreachable' } +sub test_pid { $$ } +sub test_write_each_fd { + my ($self, @args) = @_; + for my $fd (0..2) { + print { $self->{$fd} } "i=$fd $$ ", @args, "\n"; + $self->{$fd}->flush; + } +} +sub test_sha { + my ($self, $buf) = @_; + print { $self->{1} } sha1_hex($buf), "\n"; + $self->{1}->flush; +} +sub test_append_pid { + my ($self, $file) = @_; + open my $fh, '>>', $file or die "open: $!"; + $fh->autoflush(1); + print $fh "$$\n" or die "print: $!"; +} +1; + +my $ipc = bless {}, 'PublicInbox::IPC'; +my @t = qw(array scalar scalarref undef); +my $test = sub { + my $x = shift; + for my $type (@t) { + my $m = "test_$type"; + my @ret = $ipc->ipc_do($m); + my @exp = $ipc->$m; + is_deeply(\@ret, \@exp, "wantarray $m $x"); + + $ipc->ipc_do($m); + + my $ret = $ipc->ipc_do($m); + my $exp = $ipc->$m; + is_deeply($ret, $exp, "!wantarray $m $x"); + } + my $ret = eval { $ipc->test_die('phail') }; + my $exp = $@; + $ret = eval { $ipc->ipc_do('test_die', 'phail') }; + my $err = $@; + my %lines; + for ($err, $exp) { + s/ line (\d+).*//s and $lines{$1}++; + } + is(scalar keys %lines, 1, 'line numbers match'); + is((values %lines)[0], 2, '2 hits on same line number'); + is($err, $exp, "$x die matches"); + is($ret, undef, "$x die did not return"); + + eval { $ipc->test_die(['arrayref']) }; + $exp = $@; + $ret = eval { $ipc->ipc_do('test_die', ['arrayref']) }; + $err = $@; + is_deeply($err, $exp, 'die with unblessed ref'); + is(ref($err), 'ARRAY', 'got an array ref'); + + $exp = bless ['blessed'], 'PublicInbox::WTF'; + $ret = eval { $ipc->ipc_do('test_die', $exp) }; + $err = $@; + is_deeply($err, $exp, 'die with blessed ref'); + is(ref($err), 'PublicInbox::WTF', 'got blessed ref'); +}; +$test->('local'); + +{ + my $pid = $ipc->ipc_worker_spawn('test worker'); + ok($pid > 0 && kill(0, $pid), 'worker spawned and running'); + defined($pid) or BAIL_OUT 'no spawn, no test'; + is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned'); + $test->('worker'); + is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned'); + $ipc->ipc_worker_stop; + ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped'); +} +$ipc->ipc_worker_stop; # idempotent + +# work queues +pipe(my ($ra, $wa)) or BAIL_OUT $!; +pipe(my ($rb, $wb)) or BAIL_OUT $!; +pipe(my ($rc, $wc)) or BAIL_OUT $!; +open my $warn, '+>', undef or BAIL_OUT; +$warn->autoflush(0); +local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ }; +my @ppids; +open my $agpl, '<', 'COPYING' or BAIL_OUT "AGPL-3 missing: $!"; +my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!"; +close $agpl or BAIL_OUT "close: $!"; + +for my $t ('worker', 'worker again') { + my $ppid = $ipc->wq_workers_start('wq', 1); + push(@ppids, $ppid); + $ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world'); + my $i = 0; + for my $fh ($ra, $rb, $rc) { + my $buf = readline($fh); + is(chop($buf), "\n", "trailing CR ($t)"); + like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)"); + $i++; + } + $ipc->wq_io_do('test_die', [ $wa, $wb, $wc ]); + $ipc->wq_io_do('test_sha', [ $wa, $wb ], 'hello world'); + is(readline($rb), sha1_hex('hello world')."\n", "SHA small ($t)"); + { + my $bigger = $big x 10; # to hit EMSGSIZE + $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger); + my $exp = sha1_hex($bigger)."\n"; + is(readline($rb), $exp, "SHA big for EMSGSIZE ($t)"); + + # to hit the WQWorker recv_and_run length + substr($bigger, my $MY_MAX_ARG_STRLEN = 4096 * 33, -1) = ''; + $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger); + $exp = sha1_hex($bigger)."\n"; + is(readline($rb), $exp, "SHA WQWorker limit ($t)"); + } + SKIP: { + $ENV{TEST_EXPENSIVE} or skip 'TEST_EXPENSIVE not set', 1; + my $bigger = $big x 75000; # over 2G to trigger partial sendmsg + $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger); + my $exp = sha1_hex($bigger)."\n"; + is(readline($rb), $exp, "SHA WQWorker sendmsg limit ($t)"); + } +} + +# wq_io_do works across fork (siblings can feed) +SKIP: { + skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0]; + is_xdeeply(\@ppids, [$$, undef], + 'parent pid returned in wq_workers_start'); + my $pid = fork // BAIL_OUT $!; + if ($pid == 0) { + use POSIX qw(_exit); + $ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], $$); + _exit(0); + } else { + my $i = 0; + my ($wpid, @rest) = keys %{$ipc->{-wq_workers}}; + is(scalar(@rest), 0, 'only one worker'); + for my $fh ($ra, $rb, $rc) { + my $buf = readline($fh); + is(chop($buf), "\n", "trailing CR #$i"); + like($buf, qr/^i=$i $wpid $pid\z/, + 'got expected from sibling'); + $i++; + } + is(waitpid($pid, 0), $pid, 'waitpid complete'); + is($?, 0, 'child wq producer exited'); + } + my @ary = $ipc->wq_do('test_array'); + is_deeply(\@ary, [ qw(test array) ], 'wq_do wantarray'); + is(my $s = $ipc->wq_do('test_scalar'), 'scalar', 'defined wantarray'); + my $exp = bless ['blessed'], 'PublicInbox::WTF'; + my $ret = eval { $ipc->wq_do('test_die', $exp) }; + is_deeply($@, $exp, 'die with blessed ref'); +} + +$ipc->wq_close; +SKIP: { + skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0]; + seek($warn, 0, SEEK_SET) or BAIL_OUT; + my @warn = <$warn>; + is(scalar(@warn), 2, 'warned 3 times'); + like($warn[0], qr/ wq_worker: /, '2nd warned from wq_worker'); + is($warn[0], $warn[1], 'worker did not die'); + + $SIG{__WARN__} = 'DEFAULT'; + is($ipc->wq_workers_start('wq', 2), $$, 'workers started again'); + $ipc->wq_broadcast('test_append_pid', "$tmpdir/append_pid"); + $ipc->wq_close; + open my $fh, '<', "$tmpdir/append_pid" or BAIL_OUT "open: $!"; + chomp(my @pids = <$fh>); + my %pids = map { $_ => 1 } grep(/\A[0-9]+\z/, @pids); + is(scalar keys %pids, 2, 'broadcast hit both PIDs'); +} + +done_testing; |