diff options
Diffstat (limited to 'lib/PublicInbox/IPC.pm')
-rw-r--r-- | lib/PublicInbox/IPC.pm | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index d2ff038d..479c4377 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -14,6 +14,7 @@ use Carp qw(confess croak); use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn; use PublicInbox::OnDestroy; +use PublicInbox::WQWorker; use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); use Errno qw(EMSGSIZE); my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? @@ -151,6 +152,8 @@ sub wq_wait_old { # for base class, override in sub classes sub ipc_atfork_prepare {} +sub wq_atexit_child {} + sub ipc_atfork_child { my ($self) = @_; my $io = delete($self->{-ipc_atfork_child_close}) or return; @@ -251,10 +254,11 @@ sub ipc_sibling_atfork_child { $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; } -sub _recv_and_run { +sub recv_and_run { my ($self, $s2, $len, $full_stream) = @_; my @fds = $recv_cmd->($s2, my $buf, $len); - my $n = length($buf // '') or return; + return if scalar(@fds) && !defined($fds[0]); + my $n = length($buf) or return 0; my $nfd = 0; for my $fd (@fds) { if (open(my $cmdfh, '+<&=', $fd)) { @@ -281,14 +285,15 @@ sub _recv_and_run { sub wq_worker_loop ($) { my ($self) = @_; - my $len = $self->{wq_req_len} // (4096 * 33); - my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2'; - 1 while (_recv_and_run($self, $s2, $len)); + my $wqw = PublicInbox::WQWorker->new($self); + PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} }); + PublicInbox::DS->EventLoop; + PublicInbox::DS->Reset; } sub do_sock_stream { # via wq_do, for big requests my ($self, $len) = @_; - _recv_and_run($self, delete $self->{0}, $len, 1); + recv_and_run($self, delete $self->{0}, $len, 1); } sub wq_do { # always async |