From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 237F31FFA7 for ; Mon, 1 Feb 2021 08:28:34 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 02/21] ipc: switch wq to use the event loop Date: Sun, 31 Jan 2021 22:28:14 -1000 Message-Id: <20210201082833.3293-3-e@80x24.org> In-Reply-To: <20210201082833.3293-1-e@80x24.org> References: <20210201082833.3293-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This will let us to maximize the capability of our asynchronous git API. This lets us avoid relying on EOF to notify lei2mail workers; thus giving us the option of running fewer lei_xsearch worker processes in parallel than local sources. I tried using a synchronous git API; and even with libgit2 in the same process to avoid the IPC cost failed to match the throughput afforded by this change. This is because libgit2 is built (at least on Debian) with the SHA-1 collision code enabled and ubc_check stuff was dominating my profiles. --- MANIFEST | 1 + lib/PublicInbox/IPC.pm | 17 +++++++++++------ lib/PublicInbox/LeiToMail.pm | 26 +++++++++++--------------- lib/PublicInbox/WQWorker.pm | 34 ++++++++++++++++++++++++++++++++++ 4 files changed, 57 insertions(+), 21 deletions(-) create mode 100644 lib/PublicInbox/WQWorker.pm diff --git a/MANIFEST b/MANIFEST index 2077ab12..c10775e4 100644 --- a/MANIFEST +++ b/MANIFEST @@ -228,6 +228,7 @@ lib/PublicInbox/V2Writable.pm lib/PublicInbox/View.pm lib/PublicInbox/ViewDiff.pm lib/PublicInbox/ViewVCS.pm +lib/PublicInbox/WQWorker.pm lib/PublicInbox/WWW.pm lib/PublicInbox/WWW.pod lib/PublicInbox/Watch.pm diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index d2ff038d..479c4377 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -14,6 +14,7 @@ use Carp qw(confess croak); use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn; use PublicInbox::OnDestroy; +use PublicInbox::WQWorker; use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); use Errno qw(EMSGSIZE); my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? @@ -151,6 +152,8 @@ sub wq_wait_old { # for base class, override in sub classes sub ipc_atfork_prepare {} +sub wq_atexit_child {} + sub ipc_atfork_child { my ($self) = @_; my $io = delete($self->{-ipc_atfork_child_close}) or return; @@ -251,10 +254,11 @@ sub ipc_sibling_atfork_child { $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; } -sub _recv_and_run { +sub recv_and_run { my ($self, $s2, $len, $full_stream) = @_; my @fds = $recv_cmd->($s2, my $buf, $len); - my $n = length($buf // '') or return; + return if scalar(@fds) && !defined($fds[0]); + my $n = length($buf) or return 0; my $nfd = 0; for my $fd (@fds) { if (open(my $cmdfh, '+<&=', $fd)) { @@ -281,14 +285,15 @@ sub _recv_and_run { sub wq_worker_loop ($) { my ($self) = @_; - my $len = $self->{wq_req_len} // (4096 * 33); - my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2'; - 1 while (_recv_and_run($self, $s2, $len)); + my $wqw = PublicInbox::WQWorker->new($self); + PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} }); + PublicInbox::DS->EventLoop; + PublicInbox::DS->Reset; } sub do_sock_stream { # via wq_do, for big requests my ($self, $len) = @_; - _recv_and_run($self, delete $self->{0}, $len, 1); + recv_and_run($self, delete $self->{0}, $len, 1); } sub wq_do { # always async diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 244bfb67..1f6c2a3b 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -12,11 +12,16 @@ use PublicInbox::ProcessPipe; use PublicInbox::Spawn qw(which spawn popen_rd); use PublicInbox::LeiDedupe; use PublicInbox::OnDestroy; +use PublicInbox::Git; +use PublicInbox::GitAsyncCat; use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); use Errno qw(EEXIST ESPIPE ENOENT); -use PublicInbox::Git; + +# struggles with short-lived repos, Gcf2Client makes little sense with lei; +# but we may use in-process libgit2 in the future. +$PublicInbox::GitAsyncCat::GCF2C = 0; my %kw2char = ( # Maildir characters draft => 'D', @@ -467,27 +472,18 @@ sub write_mail { # via ->wq_do $self->write_cb($lei); }; my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir); - $git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]); + git_async_cat($git, $smsg->{blob}, \&git_to_mail, + [$wcb, $smsg, $not_done]); } -# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY -# ordering is unstable at worker exit and may cause segfaults -sub reap_gits { +sub wq_atexit_child { my ($self) = @_; delete $self->{wcb}; for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) { $git->async_wait_all; } -} - -sub DESTROY { delete $_[0]->{wcb} } - -sub ipc_atfork_child { # runs after IPC::wq_worker_loop - my ($self) = @_; - $self->SUPER::ipc_atfork_child; - # reap_gits needs to run before $self->DESTROY, - # IPC.pm will ensure that. - PublicInbox::OnDestroy->new($$, \&reap_gits, $self); + $SIG{__WARN__} = 'DEFAULT'; + $SIG{PIPE} = 'DEFAULT'; } 1; diff --git a/lib/PublicInbox/WQWorker.pm b/lib/PublicInbox/WQWorker.pm new file mode 100644 index 00000000..25a5e4fb --- /dev/null +++ b/lib/PublicInbox/WQWorker.pm @@ -0,0 +1,34 @@ +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ + +# for PublicInbox::IPC wq_* (work queue) workers +package PublicInbox::WQWorker; +use strict; +use v5.10.1; +use parent qw(PublicInbox::DS); +use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE EPOLLET); +use Errno qw(EAGAIN ECONNRESET); +use IO::Handle (); # blocking + +sub new { + my (undef, $wq) = @_; + my $s2 = $wq->{-wq_s2} // die 'BUG: no -wq_s2'; + $s2->blocking(0); + my $self = bless { sock => $s2, wq => $wq }, __PACKAGE__; + $self->SUPER::new($s2, EPOLLEXCLUSIVE|EPOLLIN|EPOLLET); + $self; +} + +sub event_step { + my ($self) = @_; + my $n; + do { + $n = $self->{wq}->recv_and_run($self->{sock}, 4096 * 33); + } while ($n); + return if !defined($n) && $! == EAGAIN; # likely + warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET; + $self->{wq}->wq_atexit_child; + $self->close; # PublicInbox::DS::close +} + +1;