From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.9 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 7D3F51FC0C for ; Sun, 7 Feb 2021 08:52:02 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 10/19] Revert "ipc: add support for asynchronous callbacks" Date: Sun, 7 Feb 2021 08:51:52 +0000 Message-Id: <20210207085201.13871-11-e@80x24.org> In-Reply-To: <20210207085201.13871-1-e@80x24.org> References: <20210207085201.13871-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This reverts commit a7e6a8cd68fb6d700337d8dbc7ee2c65ff3d2fc1. It turns out to be unworkable in the face of multiple producer processes, since the lock we make has no effect when calculating pipe capacity. --- lib/PublicInbox/IPC.pm | 52 +++--------------------------------------- t/ipc.t | 25 -------------------- 2 files changed, 3 insertions(+), 74 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 3713b56b..7e5a0b16 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -18,7 +18,6 @@ use PublicInbox::OnDestroy; use PublicInbox::WQWorker; use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? -use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); our @EXPORT_OK = qw(ipc_freeze ipc_thaw); my $WQ_MAX_WORKERS = 4096; my ($enc, $dec); @@ -59,15 +58,10 @@ sub _get_rec ($) { ipc_thaw($buf); } -sub _pack_rec ($) { - my ($ref) = @_; - my $buf = ipc_freeze($ref); - length($buf) . "\n" . $buf; -} - sub _send_rec ($$) { my ($w, $ref) = @_; - print $w _pack_rec($ref) or croak "print: $!"; + my $buf = ipc_freeze($ref); + print $w length($buf), "\n", $buf or croak "print: $!"; } sub ipc_return ($$$) { @@ -188,21 +182,6 @@ sub ipc_lock_init { $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock' } -sub ipc_async_wait ($$) { - my ($self, $max) = @_; # max == -1 to wait for all - my $aif = $self->{-async_inflight} or return; - my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res'; - while (my ($sub, $bytes, $cb, $cb_arg) = splice(@$aif, 0, 4)) { - my $ret = _get_rec($r_res) // - die "no response on $sub (req.size=$bytes)"; - $self->{-async_inflight_bytes} -= $bytes; - - eval { $cb->($cb_arg, $ret) }; - warn "E: $sub callback error: $@\n" if $@; - return if --$max == 0; - } -} - # call $self->$sub(@args), on a worker if ipc_worker_spawn was used sub ipc_do { my ($self, $sub, @args) = @_; @@ -210,8 +189,7 @@ sub ipc_do { my $ipc_lock = $self->{-ipc_lock}; my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef; if (defined(wantarray)) { - my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res'; - ipc_async_wait($self, -1); + my $r_res = $self->{-ipc_res} or die 'no ipc_res'; _send_rec($w_req, [ wantarray, $sub, @args ]); my $ret = _get_rec($r_res) // die "no response on $sub"; die $$ret if ref($ret) eq 'PublicInbox::IPC::Die'; @@ -224,30 +202,6 @@ sub ipc_do { } } -sub ipc_async { - my ($self, $sub, $sub_args, $cb, $cb_arg) = @_; - if (my $w_req = $self->{-ipc_req}) { # run in worker - my $rec = _pack_rec([ 1, $sub, @$sub_args ]); - my $cur_bytes = \($self->{-async_inflight_bytes} //= 0); - while (($$cur_bytes + length($rec)) > PIPE_BUF) { - ipc_async_wait($self, 1); - } - my $ipc_lock = $self->{-ipc_lock}; - my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef; - print $w_req $rec or croak "print: $!"; - $$cur_bytes += length($rec); - push @{$self->{-async_inflight}}, - $sub, length($rec), $cb, $cb_arg; - } else { - my $ret = [ eval { $self->$sub(@$sub_args) } ]; - if (my $exc = $@) { - $ret = ( bless(\$exc, 'PublicInbox::IPC::Die') ); - } - eval { $cb->($cb_arg, $ret) }; - warn "E: $sub callback error: $@\n" if $@; - } -} - # needed when there's multiple IPC workers and the parent forking # causes newer siblings to inherit older siblings sockets sub ipc_sibling_atfork_child { diff --git a/t/ipc.t b/t/ipc.t index 5801c760..face5726 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -37,7 +37,6 @@ my $ipc = bless {}, 'PublicInbox::IPC'; my @t = qw(array scalar scalarref undef); my $test = sub { my $x = shift; - my @res; for my $type (@t) { my $m = "test_$type"; my @ret = $ipc->ipc_do($m); @@ -46,34 +45,10 @@ my $test = sub { $ipc->ipc_do($m); - $ipc->ipc_async($m, [], sub { push @res, \@_ }, \$m); - my $ret = $ipc->ipc_do($m); my $exp = $ipc->$m; is_deeply($ret, $exp, "!wantarray $m $x"); - - is_deeply(\@res, [ [ \$m, \@exp ] ], "async $m $x"); - @res = (); } - $ipc->ipc_async_wait(-1); - is_deeply(\@res, [], 'no leftover results'); - $ipc->ipc_async('test_die', ['die test'], - sub { push @res, \@_ }, 'die arg'); - $ipc->ipc_async_wait(1); - is(scalar(@res), 1, 'only one result'); - is(scalar(@{$res[0]}), 2, 'result has 2-element array'); - is($res[0]->[0], 'die arg', 'got async die arg '.$x); - is(ref($res[0]->[1]), 'PublicInbox::IPC::Die', - "exception type $x"); - { - my $nr = PublicInbox::IPC::PIPE_BUF(); - my $count = 0; - my $cb = sub { ++$count }; - $ipc->ipc_async('test_undef', [], $cb) for (1..$nr); - $ipc->ipc_async_wait(-1); - is($count, $nr, "$x async runs w/o deadlock"); - } - my $ret = eval { $ipc->test_die('phail') }; my $exp = $@; $ret = eval { $ipc->ipc_do('test_die', 'phail') };