From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id CF3411FA17 for ; Sun, 10 Jan 2021 12:15:19 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 04/22] ipc: add support for asynchronous callbacks Date: Sun, 10 Jan 2021 12:15:01 +0000 Message-Id: <20210110121519.17044-5-e@80x24.org> In-Reply-To: <20210110121519.17044-1-e@80x24.org> References: <20210110121519.17044-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Similar to git->cat_async, this will let us deal with responses asynchronously, as well as being able to mix synchronous and asynchronous code transparently (though perhaps not optimally). --- lib/PublicInbox/IPC.pm | 53 +++++++++++++++++++++++++++++++++++++++--- t/ipc.t | 25 ++++++++++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 81623fc0..7dc8ec6a 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -8,6 +8,8 @@ use strict; use v5.10.1; use Carp qw(confess croak); use PublicInbox::Sigfd; +use POSIX (); +use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); my ($enc, $dec); # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+ # and eliminate method call overhead @@ -41,10 +43,15 @@ sub _get_rec ($) { thaw($buf); } +sub _pack_rec ($) { + my ($ref) = @_; + my $buf = freeze($ref); + length($buf) . "\n" . $buf; +} + sub _send_rec ($$) { my ($w, $ref) = @_; - my $buf = freeze($ref); - print $w length($buf), "\n", $buf or croak "print: $!"; + print $w _pack_rec($ref) or croak "print: $!"; } sub ipc_return ($$$) { @@ -156,6 +163,21 @@ sub ipc_lock_init { $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock' } +sub ipc_async_wait ($$) { + my ($self, $max) = @_; # max == -1 to wait for all + my $aif = $self->{-async_inflight} or return; + my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res'; + while (my ($sub, $bytes, $cb, $cb_arg) = splice(@$aif, 0, 4)) { + my $ret = _get_rec($r_res) // + die "no response on $sub (req.size=$bytes)"; + $self->{-async_inflight_bytes} -= $bytes; + + eval { $cb->($cb_arg, $ret) }; + warn "E: $sub callback error: $@\n" if $@; + return if --$max == 0; + } +} + # call $self->$sub(@args), on a worker if ipc_worker_spawn was used sub ipc_do { my ($self, $sub, @args) = @_; @@ -163,7 +185,8 @@ sub ipc_do { my $ipc_lock = $self->{-ipc_lock}; my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef; if (defined(wantarray)) { - my $r_res = $self->{-ipc_res} or die 'no ipc_res'; + my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res'; + ipc_async_wait($self, -1); _send_rec($w_req, [ wantarray, $sub, @args ]); my $ret = _get_rec($r_res) // die "no response on $sub"; die $$ret if ref($ret) eq 'PublicInbox::IPC::Die'; @@ -176,6 +199,30 @@ sub ipc_do { } } +sub ipc_async { + my ($self, $sub, $sub_args, $cb, $cb_arg) = @_; + if (my $w_req = $self->{-ipc_req}) { # run in worker + my $rec = _pack_rec([ 1, $sub, @$sub_args ]); + my $cur_bytes = \($self->{-async_inflight_bytes} //= 0); + while (($$cur_bytes + length($rec)) > PIPE_BUF) { + ipc_async_wait($self, 1); + } + my $ipc_lock = $self->{-ipc_lock}; + my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef; + print $w_req $rec or croak "print: $!"; + $$cur_bytes += length($rec); + push @{$self->{-async_inflight}}, + $sub, length($rec), $cb, $cb_arg; + } else { + my $ret = [ eval { $self->$sub(@$sub_args) } ]; + if (my $exc = $@) { + $ret = ( bless(\$exc, 'PublicInbox::IPC::Die') ); + } + eval { $cb->($cb_arg, $ret) }; + warn "E: $sub callback error: $@\n" if $@; + } +} + # needed when there's multiple IPC workers and the parent forking # causes newer siblings to inherit older siblings sockets sub ipc_sibling_atfork_child { diff --git a/t/ipc.t b/t/ipc.t index 0efc5394..400fb768 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -21,6 +21,7 @@ my $ipc = bless {}, 'PublicInbox::IPC'; my @t = qw(array scalar scalarref undef); my $test = sub { my $x = shift; + my @res; for my $type (@t) { my $m = "test_$type"; my @ret = $ipc->ipc_do($m); @@ -29,10 +30,34 @@ my $test = sub { $ipc->ipc_do($m); + $ipc->ipc_async($m, [], sub { push @res, \@_ }, \$m); + my $ret = $ipc->ipc_do($m); my $exp = $ipc->$m; is_deeply($ret, $exp, "!wantarray $m $x"); + + is_deeply(\@res, [ [ \$m, \@exp ] ], "async $m $x"); + @res = (); } + $ipc->ipc_async_wait(-1); + is_deeply(\@res, [], 'no leftover results'); + $ipc->ipc_async('test_die', ['die test'], + sub { push @res, \@_ }, 'die arg'); + $ipc->ipc_async_wait(1); + is(scalar(@res), 1, 'only one result'); + is(scalar(@{$res[0]}), 2, 'result has 2-element array'); + is($res[0]->[0], 'die arg', 'got async die arg '.$x); + is(ref($res[0]->[1]), 'PublicInbox::IPC::Die', + "exception type $x"); + { + my $nr = PublicInbox::IPC::PIPE_BUF(); + my $count = 0; + my $cb = sub { ++$count }; + $ipc->ipc_async('test_undef', [], $cb) for (1..$nr); + $ipc->ipc_async_wait(-1); + is($count, $nr, "$x async runs w/o deadlock"); + } + my $ret = eval { $ipc->test_die('phail') }; my $exp = $@; $ret = eval { $ipc->ipc_do('test_die', 'phail') };