From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id E2A721F9F4 for ; Sun, 19 Sep 2021 12:50:35 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 03/16] lei/store: use SOCK_SEQPACKET rather than pipe Date: Sun, 19 Sep 2021 12:50:22 +0000 Message-Id: <20210919125035.6331-4-e@80x24.org> In-Reply-To: <20210919125035.6331-1-e@80x24.org> References: <20210919125035.6331-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This has several advantages: * no need to use ipc.lock to protect a pipe for non-atomic writes * ability to pass FDs. In another commit, this will let us simplify lei->sto_done_request and pass newly-created sockets to lei/store directly. disadvantages: - an extra pipe is required for rare messages over several hundred KB, this is probably a non-issue, though The performance delta is unknown, but I expect shards (which remain pipes) to be the primary bottleneck IPC-wise for lei/store. --- lib/PublicInbox/LEI.pm | 4 ++-- lib/PublicInbox/LeiImport.pm | 2 +- lib/PublicInbox/LeiImportKw.pm | 2 +- lib/PublicInbox/LeiIndex.pm | 2 +- lib/PublicInbox/LeiInput.pm | 2 +- lib/PublicInbox/LeiNoteEvent.pm | 8 ++++---- lib/PublicInbox/LeiRemote.pm | 4 ++-- lib/PublicInbox/LeiRm.pm | 2 +- lib/PublicInbox/LeiStore.pm | 10 ++++++++-- lib/PublicInbox/LeiTag.pm | 2 +- lib/PublicInbox/LeiToMail.pm | 22 ++++++++++++---------- lib/PublicInbox/LeiXSearch.pm | 6 +++--- 12 files changed, 37 insertions(+), 29 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 8b0614f2..549b855b 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1501,9 +1501,9 @@ sub sto_done_request { # only call this from lei-daemon process (not workers) eval { if ($sock //= $lei->{sock}) { # issue, async wait $LIVE_SOCK{"$sock"} = $sock; - $lei->{sto}->ipc_do('done', "$sock"); + $lei->{sto}->wq_do('done', "$sock"); } else { # forcibly wait - my $wait = $lei->{sto}->ipc_do('done'); + my $wait = $lei->{sto}->wq_do('done'); } }; $lei->err($@) if $@; diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 9084d771..40530914 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -16,7 +16,7 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh if (my $all_vmd = $self->{all_vmd}) { @$vmd{keys %$all_vmd} = values %$all_vmd; } - $self->{lei}->{sto}->ipc_do('set_eml', $eml, $vmd, $xoids); + $self->{lei}->{sto}->wq_do('set_eml', $eml, $vmd, $xoids); } sub input_mbox_cb { # MboxReader callback diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm index 402125cf..2863d17f 100644 --- a/lib/PublicInbox/LeiImportKw.pm +++ b/lib/PublicInbox/LeiImportKw.pm @@ -37,7 +37,7 @@ sub ck_update_kw { # via wq_io_do $self->{lse}->kw_changed(undef, $kw, \@docids) or return; $self->{verbose} and $self->{lei}->qerr('# '.unpack('H*', $oidbin)." => @$kw\n"); - $self->{sto}->ipc_do('set_eml_vmd', undef, { kw => $kw }, \@docids); + $self->{sto}->wq_do('set_eml_vmd', undef, { kw => $kw }, \@docids); } sub ikw_done_wait { diff --git a/lib/PublicInbox/LeiIndex.pm b/lib/PublicInbox/LeiIndex.pm index 1b327a2c..b3f3e1a0 100644 --- a/lib/PublicInbox/LeiIndex.pm +++ b/lib/PublicInbox/LeiIndex.pm @@ -16,7 +16,7 @@ sub input_eml_cb { # used by input_maildir_cb and input_net_cb if (my $all_vmd = $self->{all_vmd}) { @$vmd{keys %$all_vmd} = values %$all_vmd; } - $self->{lei}->{sto}->ipc_do('index_eml_only', $eml, $vmd, $xoids); + $self->{lei}->{sto}->wq_do('index_eml_only', $eml, $vmd, $xoids); } sub input_fh { # overrides PublicInbox::LeiInput::input_fh diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index fe736981..22bedba6 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -378,7 +378,7 @@ sub process_inputs { } # always commit first, even on error partial work is acceptable for # lei - my $wait = $self->{lei}->{sto}->ipc_do('done') if $self->{lei}->{sto}; + my $wait = $self->{lei}->{sto}->wq_do('done') if $self->{lei}->{sto}; $self->{lei}->fail($err) if $err; } diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm index 18313359..5f692e75 100644 --- a/lib/PublicInbox/LeiNoteEvent.pm +++ b/lib/PublicInbox/LeiNoteEvent.pm @@ -36,18 +36,18 @@ sub eml_event ($$$$) { my ($self, $eml, $vmd, $state) = @_; my $sto = $self->{lei}->{sto}; if ($state =~ /\Aimport-(?:rw|ro)\z/) { - $sto->ipc_do('set_eml', $eml, $vmd); + $sto->wq_do('set_eml', $eml, $vmd); } elsif ($state =~ /\Aindex-(?:rw|ro)\z/) { my $xoids = $self->{lei}->ale->xoids_for($eml); - $sto->ipc_do('index_eml_only', $eml, $vmd, $xoids); + $sto->wq_do('index_eml_only', $eml, $vmd, $xoids); } elsif ($state =~ /\Atag-(?:rw|ro)\z/) { my $docids = []; my $c = $self->{lse}->kw_changed($eml, $vmd->{kw}, $docids); if (scalar @$docids) { # already in lei/store - $sto->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c; + $sto->wq_do('set_eml_vmd', undef, $vmd, $docids) if $c; } elsif (my $xoids = $self->{lei}->ale->xoids_for($eml)) { # it's in an external, only set kw, here - $sto->ipc_do('set_xvmd', $xoids, $eml, $vmd); + $sto->wq_do('set_xvmd', $xoids, $eml, $vmd); } # else { totally unknown: ignore } else { warn "unknown state: $state (in $self->{lei}->{cfg}->{'-f'})\n"; diff --git a/lib/PublicInbox/LeiRemote.pm b/lib/PublicInbox/LeiRemote.pm index 8d4ffed0..346aa6a4 100644 --- a/lib/PublicInbox/LeiRemote.pm +++ b/lib/PublicInbox/LeiRemote.pm @@ -28,7 +28,7 @@ sub _each_mboxrd_eml { # callback for MboxReader->mboxrd my $xoids = $lei->{ale}->xoids_for($eml, 1); my $smsg = bless {}, 'PublicInbox::Smsg'; if ($lei->{sto} && !$xoids) { # memoize locally - my $res = $lei->{sto}->ipc_do('add_eml', $eml); + my $res = $lei->{sto}->wq_do('add_eml', $eml); $smsg = $res if ref($res) eq ref($smsg); } $smsg->{blob} //= $xoids ? (keys(%$xoids))[0] @@ -56,7 +56,7 @@ sub mset { my $err = waitpid($pid, 0) == $pid ? undef : "BUG: waitpid($cmd): $!"; @$reap = (); # cancel OnDestroy - my $wait = $self->{lei}->{sto}->ipc_do('done'); + my $wait = $self->{lei}->{sto}->wq_do('done'); die $err if $err; $self; # we are the mset (and $ibx, and $self) } diff --git a/lib/PublicInbox/LeiRm.pm b/lib/PublicInbox/LeiRm.pm index 3371f3ed..97b1c5c1 100644 --- a/lib/PublicInbox/LeiRm.pm +++ b/lib/PublicInbox/LeiRm.pm @@ -10,7 +10,7 @@ use parent qw(PublicInbox::IPC PublicInbox::LeiInput); sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh my ($self, $eml) = @_; - $self->{lei}->{sto}->ipc_do('remove_eml', $eml); + $self->{lei}->{sto}->wq_do('remove_eml', $eml); } sub input_mbox_cb { # MboxReader callback diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index 08add8f5..4ec63699 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -552,6 +552,12 @@ sub ipc_atfork_child { $self->SUPER::ipc_atfork_child; } +sub recv_and_run { + my ($self, @args) = @_; + local $PublicInbox::DS::in_loop = 0; # waitpid synchronously + $self->SUPER::recv_and_run(@args); +} + sub write_prepare { my ($self, $lei) = @_; $lei // die 'BUG: $lei not passed'; @@ -560,14 +566,14 @@ sub write_prepare { require PublicInbox::PktOp; my ($s2d_op_c, $s2d_op_p) = PublicInbox::PktOp->pair; my $dir = $lei->store_path; - $self->ipc_lock_init("$dir/ipc.lock"); substr($dir, -length('/lei/store'), 10, ''); pipe(my ($r, $w)) or die "pipe: $!"; $w->autoflush(1); # Mail we import into lei are private, so headers filtered out # by -mda for public mail are not appropriate local @PublicInbox::MDA::BAD_HEADERS = (); - $self->ipc_worker_spawn("lei/store $dir", $lei->oldset, { + $self->{-wq_no_bcast} = 1; + $self->wq_workers_start("lei/store $dir", 1, $lei->oldset, { lei => $lei, -err_wr => $w, to_close => [ $r, $s2d_op_c->{sock} ], diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm index c4f5ecff..9bbf0d79 100644 --- a/lib/PublicInbox/LeiTag.pm +++ b/lib/PublicInbox/LeiTag.pm @@ -12,7 +12,7 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh my ($self, $eml) = @_; if (my $xoids = $self->{lse}->xoids_for($eml) // # tries LeiMailSync $self->{lei}->{ale}->xoids_for($eml)) { - $self->{lei}->{sto}->ipc_do('update_xvmd', $xoids, $eml, + $self->{lei}->{sto}->wq_do('update_xvmd', $xoids, $eml, $self->{vmd_mod}); } else { ++$self->{unimported}; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 9f7171fb..a419b83f 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -215,14 +215,14 @@ sub update_kw_maybe ($$$$) { my $c = $lse->kw_changed($eml, $kw, my $docids = []); my $vmd = { kw => $kw }; if (scalar @$docids) { # already in lei/store - $lei->{sto}->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c; + $lei->{sto}->wq_do('set_eml_vmd', undef, $vmd, $docids) if $c; } elsif (my $xoids = $lei->{ale}->xoids_for($eml)) { # it's in an external, only set kw, here - $lei->{sto}->ipc_do('set_xvmd', $xoids, $eml, $vmd); + $lei->{sto}->wq_do('set_xvmd', $xoids, $eml, $vmd); } else { # never-before-seen, import the whole thing # XXX this is critical in protecting against accidental # data loss without --augment - $lei->{sto}->ipc_do('set_eml', $eml, $vmd); + $lei->{sto}->wq_do('set_eml', $eml, $vmd); } } @@ -296,7 +296,7 @@ sub _maildir_write_cb ($$) { $lse->xsmsg_vmd($smsg) if $lse; my $n = _buf2maildir($dst, $bref // \($eml->as_string), $smsg, $dir); - $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto; + $sto->wq_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto; ++$lei->{-nr_write}; } } @@ -326,7 +326,7 @@ sub _imap_write_cb ($$) { } # imap_append returns UID if IMAP server has UIDPLUS extension ($sto && $uid =~ /\A[0-9]+\z/) and - $sto->ipc_do('set_sync_info', + $sto->wq_do('set_sync_info', $smsg->{blob}, $$uri, $uid + 0); ++$lei->{-nr_write}; } @@ -360,7 +360,7 @@ sub _v2_write_cb ($$) { my ($bref, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($bref); return if $dedupe && $dedupe->is_dup($eml, $smsg); - $lei->{v2w}->ipc_do('add', $eml); # V2Writable->add + $lei->{v2w}->wq_do('add', $eml); # V2Writable->add ++$lei->{-nr_write}; } } @@ -658,9 +658,10 @@ sub _pre_augment_v2 { } PublicInbox::InboxWritable->new($ibx, @creat); $ibx->init_inbox if @creat; - my $v2w = $lei->{v2w} = $ibx->importer; - $v2w->ipc_lock_init("$dir/ipc.lock"); - $v2w->ipc_worker_spawn("lei/v2w $dir", $lei->oldset, { lei => $lei }); + my $v2w = $ibx->importer; + $v2w->{-wq_no_bcast} = 1; + $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}); + $lei->{v2w} = $v2w; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; my $al = "$dir/git/0.git/objects/info/alternates"; @@ -689,7 +690,7 @@ sub do_augment { # slow, runs in wq worker sub post_augment { my ($self, $lei, @args) = @_; my $wait = $lei->{opt}->{'import-before'} ? - $lei->{sto}->ipc_do('checkpoint', 1) : 0; + $lei->{sto}->wq_do('checkpoint', 1) : 0; # _post_augment_mbox my $m = $self->can("_post_augment_$self->{base_type}") or return; $m->($self, $lei, @args); @@ -774,6 +775,7 @@ sub write_mail { # via ->wq_io_do sub wq_atexit_child { my ($self) = @_; + local $PublicInbox::DS::in_loop = 0; # waitpid synchronously my $lei = $self->{lei}; delete $self->{wcb}; $lei->{ale}->git->async_wait_all; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 1d49da3d..4583b067 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -269,7 +269,7 @@ sub each_remote_eml { # callback for MboxReader->mboxrd my $xoids = $lei->{ale}->xoids_for($eml, 1); my $smsg = bless {}, 'PublicInbox::Smsg'; if ($self->{import_sto} && !$xoids) { - my $res = $self->{import_sto}->ipc_do('add_eml', $eml); + my $res = $self->{import_sto}->wq_do('add_eml', $eml); if (ref($res) eq ref($smsg)) { # totally new message $smsg = $res; $smsg->{kw} = []; # short-circuit xsmsg_vmd @@ -369,7 +369,7 @@ sub query_remote_mboxrd { @$reap_curl = (); # cancel OnDestroy die $err if $err; my $nr = $lei->{-nr_remote_eml}; - my $wait = $lei->{sto}->ipc_do('done') if $nr && $lei->{sto}; + my $wait = $lei->{sto}->wq_do('done') if $nr && $lei->{sto}; if ($? == 0) { # don't update if no results, maybe MTA is down $key && $nr and @@ -413,7 +413,7 @@ sub query_done { # EOF callback for main daemon warn "BUG: {sto} missing with --mail-sync"; } $lei->sto_done_request if $lei->{sto}; - my $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef; + my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef; $lei->{ovv}->ovv_end($lei); my $start_mua; if ($l2m) { # close() calls LeiToMail reap_compress