about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--MANIFEST1
-rw-r--r--lib/PublicInbox/IPC.pm17
-rw-r--r--lib/PublicInbox/LeiToMail.pm26
-rw-r--r--lib/PublicInbox/WQWorker.pm34
4 files changed, 57 insertions, 21 deletions
diff --git a/MANIFEST b/MANIFEST
index 56fde540..d6902076 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -238,6 +238,7 @@ lib/PublicInbox/V2Writable.pm
 lib/PublicInbox/View.pm
 lib/PublicInbox/ViewDiff.pm
 lib/PublicInbox/ViewVCS.pm
+lib/PublicInbox/WQWorker.pm
 lib/PublicInbox/WWW.pm
 lib/PublicInbox/WWW.pod
 lib/PublicInbox/Watch.pm
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index d2ff038d..479c4377 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -14,6 +14,7 @@ use Carp qw(confess croak);
 use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
+use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
 use Errno qw(EMSGSIZE);
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
@@ -151,6 +152,8 @@ sub wq_wait_old {
 # for base class, override in sub classes
 sub ipc_atfork_prepare {}
 
+sub wq_atexit_child {}
+
 sub ipc_atfork_child {
         my ($self) = @_;
         my $io = delete($self->{-ipc_atfork_child_close}) or return;
@@ -251,10 +254,11 @@ sub ipc_sibling_atfork_child {
         $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub _recv_and_run {
+sub recv_and_run {
         my ($self, $s2, $len, $full_stream) = @_;
         my @fds = $recv_cmd->($s2, my $buf, $len);
-        my $n = length($buf // '') or return;
+        return if scalar(@fds) && !defined($fds[0]);
+        my $n = length($buf) or return 0;
         my $nfd = 0;
         for my $fd (@fds) {
                 if (open(my $cmdfh, '+<&=', $fd)) {
@@ -281,14 +285,15 @@ sub _recv_and_run {
 
 sub wq_worker_loop ($) {
         my ($self) = @_;
-        my $len = $self->{wq_req_len} // (4096 * 33);
-        my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
-        1 while (_recv_and_run($self, $s2, $len));
+        my $wqw = PublicInbox::WQWorker->new($self);
+        PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
+        PublicInbox::DS->EventLoop;
+        PublicInbox::DS->Reset;
 }
 
 sub do_sock_stream { # via wq_do, for big requests
         my ($self, $len) = @_;
-        _recv_and_run($self, delete $self->{0}, $len, 1);
+        recv_and_run($self, delete $self->{0}, $len, 1);
 }
 
 sub wq_do { # always async
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 244bfb67..1f6c2a3b 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -12,11 +12,16 @@ use PublicInbox::ProcessPipe;
 use PublicInbox::Spawn qw(which spawn popen_rd);
 use PublicInbox::LeiDedupe;
 use PublicInbox::OnDestroy;
+use PublicInbox::Git;
+use PublicInbox::GitAsyncCat;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use Errno qw(EEXIST ESPIPE ENOENT);
-use PublicInbox::Git;
+
+# struggles with short-lived repos, Gcf2Client makes little sense with lei;
+# but we may use in-process libgit2 in the future.
+$PublicInbox::GitAsyncCat::GCF2C = 0;
 
 my %kw2char = ( # Maildir characters
         draft => 'D',
@@ -467,27 +472,18 @@ sub write_mail { # via ->wq_do
                 $self->write_cb($lei);
         };
         my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
-        $git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]);
+        git_async_cat($git, $smsg->{blob}, \&git_to_mail,
+                                [$wcb, $smsg, $not_done]);
 }
 
-# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY
-# ordering is unstable at worker exit and may cause segfaults
-sub reap_gits {
+sub wq_atexit_child {
         my ($self) = @_;
         delete $self->{wcb};
         for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) {
                 $git->async_wait_all;
         }
-}
-
-sub DESTROY { delete $_[0]->{wcb} }
-
-sub ipc_atfork_child { # runs after IPC::wq_worker_loop
-        my ($self) = @_;
-        $self->SUPER::ipc_atfork_child;
-        # reap_gits needs to run before $self->DESTROY,
-        # IPC.pm will ensure that.
-        PublicInbox::OnDestroy->new($$, \&reap_gits, $self);
+        $SIG{__WARN__} = 'DEFAULT';
+        $SIG{PIPE} = 'DEFAULT';
 }
 
 1;
diff --git a/lib/PublicInbox/WQWorker.pm b/lib/PublicInbox/WQWorker.pm
new file mode 100644
index 00000000..25a5e4fb
--- /dev/null
+++ b/lib/PublicInbox/WQWorker.pm
@@ -0,0 +1,34 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# for PublicInbox::IPC wq_* (work queue) workers
+package PublicInbox::WQWorker;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE EPOLLET);
+use Errno qw(EAGAIN ECONNRESET);
+use IO::Handle (); # blocking
+
+sub new {
+        my (undef, $wq) = @_;
+        my $s2 = $wq->{-wq_s2} // die 'BUG: no -wq_s2';
+        $s2->blocking(0);
+        my $self = bless { sock => $s2, wq => $wq }, __PACKAGE__;
+        $self->SUPER::new($s2, EPOLLEXCLUSIVE|EPOLLIN|EPOLLET);
+        $self;
+}
+
+sub event_step {
+        my ($self) = @_;
+        my $n;
+        do {
+                $n = $self->{wq}->recv_and_run($self->{sock}, 4096 * 33);
+        } while ($n);
+        return if !defined($n) && $! == EAGAIN; # likely
+        warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET;
+        $self->{wq}->wq_atexit_child;
+        $self->close; # PublicInbox::DS::close
+}
+
+1;