From 6082492b14ee2a24b0131ce0a99b26ee316a4d88 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 19 Sep 2021 12:50:20 +0000 Subject: ipc: wq_do: support synchronous waits and responses This brings the wq_* SOCK_SEQPACKET API functionality on par with the ipc_do (pipe-based) API. --- lib/PublicInbox/IPC.pm | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 9efe551b..d5e37719 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -182,6 +182,13 @@ sub ipc_lock_init { $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock' } +sub _wait_return ($$) { + my ($r_res, $sub) = @_; + my $ret = _get_rec($r_res) // die "no response on $sub"; + die $$ret if ref($ret) eq 'PublicInbox::IPC::Die'; + wantarray ? @$ret : $$ret; +} + # call $self->$sub(@args), on a worker if ipc_worker_spawn was used sub ipc_do { my ($self, $sub, @args) = @_; @@ -191,9 +198,7 @@ sub ipc_do { if (defined(wantarray)) { my $r_res = $self->{-ipc_res} or die 'no ipc_res'; _send_rec($w_req, [ wantarray, $sub, @args ]); - my $ret = _get_rec($r_res) // die "no response on $sub"; - die $$ret if ref($ret) eq 'PublicInbox::IPC::Die'; - wantarray ? @$ret : $$ret; + _wait_return($r_res, $sub); } else { # likely, fire-and-forget into pipe _send_rec($w_req, [ undef , $sub, @args ]); } @@ -298,7 +303,7 @@ sub wq_io_do { # always async $!{ETOOMANYREFS} and croak "sendmsg: $! (check RLIMIT_NOFILE)"; $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) : - croak("sendmsg: $!"); + croak("sendmsg: $!"); } } else { @$self{0..$#$ios} = @$ios; @@ -308,6 +313,29 @@ sub wq_io_do { # always async } } +sub wq_sync_run { + my ($self, $wantarray, $sub, @args) = @_; + if ($wantarray) { + my @ret = eval { $self->$sub(@args) }; + ipc_return($self->{0}, \@ret, $@); + } else { # '' => wantscalar + my $ret = eval { $self->$sub(@args) }; + ipc_return($self->{0}, \$ret, $@); + } +} + +sub wq_do { + my ($self, $sub, @args) = @_; + if (defined(wantarray)) { + pipe(my ($r, $w)) or die "pipe: $!"; + wq_io_do($self, 'wq_sync_run', [ $w ], wantarray, $sub, @args); + undef $w; + _wait_return($r, $sub); + } else { + wq_io_do($self, $sub, [], @args); + } +} + sub _wq_worker_start ($$$) { my ($self, $oldset, $fields) = @_; my ($bcast1, $bcast2); -- cgit v1.2.3-24-ge0c7