diff options
Diffstat (limited to 't')
-rw-r--r-- | t/ipc.t | 66 |
1 files changed, 66 insertions, 0 deletions
@@ -5,6 +5,7 @@ use strict; use v5.10.1; use Test::More; use PublicInbox::TestCommon; +use Fcntl qw(SEEK_SET); require_ok 'PublicInbox::IPC'; state $once = eval <<''; package PublicInbox::IPC; @@ -15,6 +16,13 @@ sub test_scalarref { \'scalarref' } sub test_undef { undef } sub test_die { shift; die @_; 'unreachable' } sub test_pid { $$ } +sub test_write_each_fd { + my ($self, @args) = @_; + for my $fd (0..2) { + print { $self->{$fd} } "i=$fd $$ ", @args, "\n"; + $self->{$fd}->flush; + } +} 1; my $ipc = bless {}, 'PublicInbox::IPC'; @@ -102,4 +110,62 @@ SKIP: { ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped'); } $ipc->ipc_worker_stop; # idempotent + +# work queues +$ipc->{wq_open_modes} = [qw( >&= >&= >&= )]; +pipe(my ($ra, $wa)) or BAIL_OUT $!; +pipe(my ($rb, $wb)) or BAIL_OUT $!; +pipe(my ($rc, $wc)) or BAIL_OUT $!; +open my $warn, '+>', undef or BAIL_OUT; +$warn->autoflush(0); +local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ }; +my @ppids; +for my $t ('local', 'worker', 'worker again') { + $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, 'hello world'); + my $i = 0; + for my $fh ($ra, $rb, $rc) { + my $buf = readline($fh); + is(chop($buf), "\n", "trailing CR ($t)"); + like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)"); + $i++; + } + $ipc->wq_do('test_die', $wa, $wb, $wc); + my $ppid = $ipc->wq_workers_start('wq', 1); + push(@ppids, $ppid); +} + +# wq_do works across fork (siblings can feed) +SKIP: { + skip 'Socket::MsgHdr, IO::FDPass, Inline::C missing', 7 if !$ppids[0]; + is_deeply(\@ppids, [$$, undef, undef], + 'parent pid returned in wq_workers_start'); + my $pid = fork // BAIL_OUT $!; + if ($pid == 0) { + use POSIX qw(_exit); + $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, $$); + _exit(0); + } else { + my $i = 0; + my ($wpid, @rest) = keys %{$ipc->{-wq_workers}}; + is(scalar(@rest), 0, 'only one worker'); + for my $fh ($ra, $rb, $rc) { + my $buf = readline($fh); + is(chop($buf), "\n", "trailing CR #$i"); + like($buf, qr/^i=$i $wpid $pid\z/, + 'got expected from sibling'); + $i++; + } + is(waitpid($pid, 0), $pid, 'waitpid complete'); + is($?, 0, 'child wq producer exited'); + } +} + +$ipc->wq_close; +seek($warn, 0, SEEK_SET) or BAIL_OUT; +my @warn = <$warn>; +is(scalar(@warn), 3, 'warned 3 times'); +like($warn[0], qr/ wq_do: /, '1st warned from wq_do'); +like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker'); +is($warn[2], $warn[1], 'worker did not die'); + done_testing; |