diff options
-rw-r--r-- | MANIFEST | 1 | ||||
-rw-r--r-- | lib/PublicInbox/IPC.pm | 17 | ||||
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 26 | ||||
-rw-r--r-- | lib/PublicInbox/WQWorker.pm | 34 |
4 files changed, 57 insertions, 21 deletions
@@ -238,6 +238,7 @@ lib/PublicInbox/V2Writable.pm lib/PublicInbox/View.pm lib/PublicInbox/ViewDiff.pm lib/PublicInbox/ViewVCS.pm +lib/PublicInbox/WQWorker.pm lib/PublicInbox/WWW.pm lib/PublicInbox/WWW.pod lib/PublicInbox/Watch.pm diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index d2ff038d..479c4377 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -14,6 +14,7 @@ use Carp qw(confess croak); use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn; use PublicInbox::OnDestroy; +use PublicInbox::WQWorker; use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); use Errno qw(EMSGSIZE); my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? @@ -151,6 +152,8 @@ sub wq_wait_old { # for base class, override in sub classes sub ipc_atfork_prepare {} +sub wq_atexit_child {} + sub ipc_atfork_child { my ($self) = @_; my $io = delete($self->{-ipc_atfork_child_close}) or return; @@ -251,10 +254,11 @@ sub ipc_sibling_atfork_child { $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; } -sub _recv_and_run { +sub recv_and_run { my ($self, $s2, $len, $full_stream) = @_; my @fds = $recv_cmd->($s2, my $buf, $len); - my $n = length($buf // '') or return; + return if scalar(@fds) && !defined($fds[0]); + my $n = length($buf) or return 0; my $nfd = 0; for my $fd (@fds) { if (open(my $cmdfh, '+<&=', $fd)) { @@ -281,14 +285,15 @@ sub _recv_and_run { sub wq_worker_loop ($) { my ($self) = @_; - my $len = $self->{wq_req_len} // (4096 * 33); - my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2'; - 1 while (_recv_and_run($self, $s2, $len)); + my $wqw = PublicInbox::WQWorker->new($self); + PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} }); + PublicInbox::DS->EventLoop; + PublicInbox::DS->Reset; } sub do_sock_stream { # via wq_do, for big requests my ($self, $len) = @_; - _recv_and_run($self, delete $self->{0}, $len, 1); + recv_and_run($self, delete $self->{0}, $len, 1); } sub wq_do { # always async diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 244bfb67..1f6c2a3b 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -12,11 +12,16 @@ use PublicInbox::ProcessPipe; use PublicInbox::Spawn qw(which spawn popen_rd); use PublicInbox::LeiDedupe; use PublicInbox::OnDestroy; +use PublicInbox::Git; +use PublicInbox::GitAsyncCat; use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); use Errno qw(EEXIST ESPIPE ENOENT); -use PublicInbox::Git; + +# struggles with short-lived repos, Gcf2Client makes little sense with lei; +# but we may use in-process libgit2 in the future. +$PublicInbox::GitAsyncCat::GCF2C = 0; my %kw2char = ( # Maildir characters draft => 'D', @@ -467,27 +472,18 @@ sub write_mail { # via ->wq_do $self->write_cb($lei); }; my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir); - $git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]); + git_async_cat($git, $smsg->{blob}, \&git_to_mail, + [$wcb, $smsg, $not_done]); } -# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY -# ordering is unstable at worker exit and may cause segfaults -sub reap_gits { +sub wq_atexit_child { my ($self) = @_; delete $self->{wcb}; for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) { $git->async_wait_all; } -} - -sub DESTROY { delete $_[0]->{wcb} } - -sub ipc_atfork_child { # runs after IPC::wq_worker_loop - my ($self) = @_; - $self->SUPER::ipc_atfork_child; - # reap_gits needs to run before $self->DESTROY, - # IPC.pm will ensure that. - PublicInbox::OnDestroy->new($$, \&reap_gits, $self); + $SIG{__WARN__} = 'DEFAULT'; + $SIG{PIPE} = 'DEFAULT'; } 1; diff --git a/lib/PublicInbox/WQWorker.pm b/lib/PublicInbox/WQWorker.pm new file mode 100644 index 00000000..25a5e4fb --- /dev/null +++ b/lib/PublicInbox/WQWorker.pm @@ -0,0 +1,34 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# for PublicInbox::IPC wq_* (work queue) workers +package PublicInbox::WQWorker; +use strict; +use v5.10.1; +use parent qw(PublicInbox::DS); +use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE EPOLLET); +use Errno qw(EAGAIN ECONNRESET); +use IO::Handle (); # blocking + +sub new { + my (undef, $wq) = @_; + my $s2 = $wq->{-wq_s2} // die 'BUG: no -wq_s2'; + $s2->blocking(0); + my $self = bless { sock => $s2, wq => $wq }, __PACKAGE__; + $self->SUPER::new($s2, EPOLLEXCLUSIVE|EPOLLIN|EPOLLET); + $self; +} + +sub event_step { + my ($self) = @_; + my $n; + do { + $n = $self->{wq}->recv_and_run($self->{sock}, 4096 * 33); + } while ($n); + return if !defined($n) && $! == EAGAIN; # likely + warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET; + $self->{wq}->wq_atexit_child; + $self->close; # PublicInbox::DS::close +} + +1; |