From a7e6a8cd68fb6d700337d8dbc7ee2c65ff3d2fc1 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 10 Jan 2021 12:15:01 +0000 Subject: ipc: add support for asynchronous callbacks Similar to git->cat_async, this will let us deal with responses asynchronously, as well as being able to mix synchronous and asynchronous code transparently (though perhaps not optimally). --- lib/PublicInbox/IPC.pm | 53 +++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 3 deletions(-) (limited to 'lib/PublicInbox/IPC.pm') diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 81623fc0..7dc8ec6a 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -8,6 +8,8 @@ use strict; use v5.10.1; use Carp qw(confess croak); use PublicInbox::Sigfd; +use POSIX (); +use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); my ($enc, $dec); # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+ # and eliminate method call overhead @@ -41,10 +43,15 @@ sub _get_rec ($) { thaw($buf); } +sub _pack_rec ($) { + my ($ref) = @_; + my $buf = freeze($ref); + length($buf) . "\n" . $buf; +} + sub _send_rec ($$) { my ($w, $ref) = @_; - my $buf = freeze($ref); - print $w length($buf), "\n", $buf or croak "print: $!"; + print $w _pack_rec($ref) or croak "print: $!"; } sub ipc_return ($$$) { @@ -156,6 +163,21 @@ sub ipc_lock_init { $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock' } +sub ipc_async_wait ($$) { + my ($self, $max) = @_; # max == -1 to wait for all + my $aif = $self->{-async_inflight} or return; + my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res'; + while (my ($sub, $bytes, $cb, $cb_arg) = splice(@$aif, 0, 4)) { + my $ret = _get_rec($r_res) // + die "no response on $sub (req.size=$bytes)"; + $self->{-async_inflight_bytes} -= $bytes; + + eval { $cb->($cb_arg, $ret) }; + warn "E: $sub callback error: $@\n" if $@; + return if --$max == 0; + } +} + # call $self->$sub(@args), on a worker if ipc_worker_spawn was used sub ipc_do { my ($self, $sub, @args) = @_; @@ -163,7 +185,8 @@ sub ipc_do { my $ipc_lock = $self->{-ipc_lock}; my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef; if (defined(wantarray)) { - my $r_res = $self->{-ipc_res} or die 'no ipc_res'; + my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res'; + ipc_async_wait($self, -1); _send_rec($w_req, [ wantarray, $sub, @args ]); my $ret = _get_rec($r_res) // die "no response on $sub"; die $$ret if ref($ret) eq 'PublicInbox::IPC::Die'; @@ -176,6 +199,30 @@ sub ipc_do { } } +sub ipc_async { + my ($self, $sub, $sub_args, $cb, $cb_arg) = @_; + if (my $w_req = $self->{-ipc_req}) { # run in worker + my $rec = _pack_rec([ 1, $sub, @$sub_args ]); + my $cur_bytes = \($self->{-async_inflight_bytes} //= 0); + while (($$cur_bytes + length($rec)) > PIPE_BUF) { + ipc_async_wait($self, 1); + } + my $ipc_lock = $self->{-ipc_lock}; + my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef; + print $w_req $rec or croak "print: $!"; + $$cur_bytes += length($rec); + push @{$self->{-async_inflight}}, + $sub, length($rec), $cb, $cb_arg; + } else { + my $ret = [ eval { $self->$sub(@$sub_args) } ]; + if (my $exc = $@) { + $ret = ( bless(\$exc, 'PublicInbox::IPC::Die') ); + } + eval { $cb->($cb_arg, $ret) }; + warn "E: $sub callback error: $@\n" if $@; + } +} + # needed when there's multiple IPC workers and the parent forking # causes newer siblings to inherit older siblings sockets sub ipc_sibling_atfork_child { -- cgit v1.2.3-24-ge0c7