about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-09-19 12:50:20 +0000
committerEric Wong <e@80x24.org>2021-09-19 19:52:43 +0000
commit6082492b14ee2a24b0131ce0a99b26ee316a4d88 (patch)
tree8a69104e1f7732336536720df232ea9fe08e028a /lib
parented8cffcbe04d29a047fb3eb655f50b40e2eb5462 (diff)
downloadpublic-inbox-6082492b14ee2a24b0131ce0a99b26ee316a4d88.tar.gz
This brings the wq_* SOCK_SEQPACKET API functionality
on par with the ipc_do (pipe-based) API.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/IPC.pm36
1 files changed, 32 insertions, 4 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 9efe551b..d5e37719 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -182,6 +182,13 @@ sub ipc_lock_init {
         $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock'
 }
 
+sub _wait_return ($$) {
+        my ($r_res, $sub) = @_;
+        my $ret = _get_rec($r_res) // die "no response on $sub";
+        die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
+        wantarray ? @$ret : $$ret;
+}
+
 # call $self->$sub(@args), on a worker if ipc_worker_spawn was used
 sub ipc_do {
         my ($self, $sub, @args) = @_;
@@ -191,9 +198,7 @@ sub ipc_do {
                 if (defined(wantarray)) {
                         my $r_res = $self->{-ipc_res} or die 'no ipc_res';
                         _send_rec($w_req, [ wantarray, $sub, @args ]);
-                        my $ret = _get_rec($r_res) // die "no response on $sub";
-                        die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
-                        wantarray ? @$ret : $$ret;
+                        _wait_return($r_res, $sub);
                 } else { # likely, fire-and-forget into pipe
                         _send_rec($w_req, [ undef , $sub, @args ]);
                 }
@@ -298,7 +303,7 @@ sub wq_io_do { # always async
                         $!{ETOOMANYREFS} and
                                 croak "sendmsg: $! (check RLIMIT_NOFILE)";
                         $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
-                        croak("sendmsg: $!");
+                                croak("sendmsg: $!");
                 }
         } else {
                 @$self{0..$#$ios} = @$ios;
@@ -308,6 +313,29 @@ sub wq_io_do { # always async
         }
 }
 
+sub wq_sync_run {
+        my ($self, $wantarray, $sub, @args) = @_;
+        if ($wantarray) {
+                my @ret = eval { $self->$sub(@args) };
+                ipc_return($self->{0}, \@ret, $@);
+        } else { # '' => wantscalar
+                my $ret = eval { $self->$sub(@args) };
+                ipc_return($self->{0}, \$ret, $@);
+        }
+}
+
+sub wq_do {
+        my ($self, $sub, @args) = @_;
+        if (defined(wantarray)) {
+                pipe(my ($r, $w)) or die "pipe: $!";
+                wq_io_do($self, 'wq_sync_run', [ $w ], wantarray, $sub, @args);
+                undef $w;
+                _wait_return($r, $sub);
+        } else {
+                wq_io_do($self, $sub, [], @args);
+        }
+}
+
 sub _wq_worker_start ($$$) {
         my ($self, $oldset, $fields) = @_;
         my ($bcast1, $bcast2);