From 2d610f0e645fd2b01bf9108b2d06022ab730815c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 7 Feb 2021 08:51:52 +0000 Subject: Revert "ipc: add support for asynchronous callbacks" This reverts commit a7e6a8cd68fb6d700337d8dbc7ee2c65ff3d2fc1. It turns out to be unworkable in the face of multiple producer processes, since the lock we make has no effect when calculating pipe capacity. --- lib/PublicInbox/IPC.pm | 52 +++----------------------------------------------- 1 file changed, 3 insertions(+), 49 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 3713b56b..7e5a0b16 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -18,7 +18,6 @@ use PublicInbox::OnDestroy; use PublicInbox::WQWorker; use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? -use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); our @EXPORT_OK = qw(ipc_freeze ipc_thaw); my $WQ_MAX_WORKERS = 4096; my ($enc, $dec); @@ -59,15 +58,10 @@ sub _get_rec ($) { ipc_thaw($buf); } -sub _pack_rec ($) { - my ($ref) = @_; - my $buf = ipc_freeze($ref); - length($buf) . "\n" . $buf; -} - sub _send_rec ($$) { my ($w, $ref) = @_; - print $w _pack_rec($ref) or croak "print: $!"; + my $buf = ipc_freeze($ref); + print $w length($buf), "\n", $buf or croak "print: $!"; } sub ipc_return ($$$) { @@ -188,21 +182,6 @@ sub ipc_lock_init { $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock' } -sub ipc_async_wait ($$) { - my ($self, $max) = @_; # max == -1 to wait for all - my $aif = $self->{-async_inflight} or return; - my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res'; - while (my ($sub, $bytes, $cb, $cb_arg) = splice(@$aif, 0, 4)) { - my $ret = _get_rec($r_res) // - die "no response on $sub (req.size=$bytes)"; - $self->{-async_inflight_bytes} -= $bytes; - - eval { $cb->($cb_arg, $ret) }; - warn "E: $sub callback error: $@\n" if $@; - return if --$max == 0; - } -} - # call $self->$sub(@args), on a worker if ipc_worker_spawn was used sub ipc_do { my ($self, $sub, @args) = @_; @@ -210,8 +189,7 @@ sub ipc_do { my $ipc_lock = $self->{-ipc_lock}; my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef; if (defined(wantarray)) { - my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res'; - ipc_async_wait($self, -1); + my $r_res = $self->{-ipc_res} or die 'no ipc_res'; _send_rec($w_req, [ wantarray, $sub, @args ]); my $ret = _get_rec($r_res) // die "no response on $sub"; die $$ret if ref($ret) eq 'PublicInbox::IPC::Die'; @@ -224,30 +202,6 @@ sub ipc_do { } } -sub ipc_async { - my ($self, $sub, $sub_args, $cb, $cb_arg) = @_; - if (my $w_req = $self->{-ipc_req}) { # run in worker - my $rec = _pack_rec([ 1, $sub, @$sub_args ]); - my $cur_bytes = \($self->{-async_inflight_bytes} //= 0); - while (($$cur_bytes + length($rec)) > PIPE_BUF) { - ipc_async_wait($self, 1); - } - my $ipc_lock = $self->{-ipc_lock}; - my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef; - print $w_req $rec or croak "print: $!"; - $$cur_bytes += length($rec); - push @{$self->{-async_inflight}}, - $sub, length($rec), $cb, $cb_arg; - } else { - my $ret = [ eval { $self->$sub(@$sub_args) } ]; - if (my $exc = $@) { - $ret = ( bless(\$exc, 'PublicInbox::IPC::Die') ); - } - eval { $cb->($cb_arg, $ret) }; - warn "E: $sub callback error: $@\n" if $@; - } -} - # needed when there's multiple IPC workers and the parent forking # causes newer siblings to inherit older siblings sockets sub ipc_sibling_atfork_child { -- cgit v1.2.3-24-ge0c7