about summary refs log tree commit homepage
path: root/lib/PublicInbox/IPC.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/IPC.pm')
-rw-r--r--lib/PublicInbox/IPC.pm17
1 files changed, 11 insertions, 6 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index d2ff038d..479c4377 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -14,6 +14,7 @@ use Carp qw(confess croak);
 use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
+use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
 use Errno qw(EMSGSIZE);
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
@@ -151,6 +152,8 @@ sub wq_wait_old {
 # for base class, override in sub classes
 sub ipc_atfork_prepare {}
 
+sub wq_atexit_child {}
+
 sub ipc_atfork_child {
         my ($self) = @_;
         my $io = delete($self->{-ipc_atfork_child_close}) or return;
@@ -251,10 +254,11 @@ sub ipc_sibling_atfork_child {
         $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub _recv_and_run {
+sub recv_and_run {
         my ($self, $s2, $len, $full_stream) = @_;
         my @fds = $recv_cmd->($s2, my $buf, $len);
-        my $n = length($buf // '') or return;
+        return if scalar(@fds) && !defined($fds[0]);
+        my $n = length($buf) or return 0;
         my $nfd = 0;
         for my $fd (@fds) {
                 if (open(my $cmdfh, '+<&=', $fd)) {
@@ -281,14 +285,15 @@ sub _recv_and_run {
 
 sub wq_worker_loop ($) {
         my ($self) = @_;
-        my $len = $self->{wq_req_len} // (4096 * 33);
-        my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
-        1 while (_recv_and_run($self, $s2, $len));
+        my $wqw = PublicInbox::WQWorker->new($self);
+        PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
+        PublicInbox::DS->EventLoop;
+        PublicInbox::DS->Reset;
 }
 
 sub do_sock_stream { # via wq_do, for big requests
         my ($self, $len) = @_;
-        _recv_and_run($self, delete $self->{0}, $len, 1);
+        recv_and_run($self, delete $self->{0}, $len, 1);
 }
 
 sub wq_do { # always async