* [PATCH] lei_store: avoid redundant work on no-op worker spawn
@ 2023-03-25 11:11 7% Eric Wong
0 siblings, 0 replies; 3+ results
From: Eric Wong @ 2023-03-25 11:11 UTC (permalink / raw)
To: meta
While ->wq_workers_start is idempotent, the pipe creation for
PublicInbox::LeiStoreErr was not and required several extra
syscalls and FD allocations. Check the correct field required
for SOCK_SEQPACKET workers rather than pipe-based workers.
Fixes: cbc2890cb89b81cb ("lei/store: use SOCK_SEQPACKET rather than pipe")
---
lib/PublicInbox/LeiStore.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index fce15a72..cf5a03a0 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -612,7 +612,7 @@ sub _sto_atexit { # awaitpid cb
sub write_prepare {
my ($self, $lei) = @_;
$lei // die 'BUG: $lei not passed';
- unless ($self->{-ipc_req}) {
+ unless ($self->{-wq_s1}) {
my $dir = $lei->store_path;
substr($dir, -length('/lei/store'), 10, '');
pipe(my ($r, $w)) or die "pipe: $!";
^ permalink raw reply related [relevance 7%]
* [PATCH 00/16] lei IPC overhaul, NNTP fixes
@ 2021-09-19 12:50 5% Eric Wong
2021-09-19 12:50 4% ` [PATCH 03/16] lei/store: use SOCK_SEQPACKET rather than pipe Eric Wong
0 siblings, 1 reply; 3+ results
From: Eric Wong @ 2021-09-19 12:50 UTC (permalink / raw)
To: meta
11/16 is a bit worrying for saved search dedupe over HTTP(S),
and I can't seem to reproduce it reliably, either..
ls-mail-source and import use is far nicer, as it provides a
good avenue for doing partial fetches.
lei/store IPC got a massive overhaul, and the sto_done_request
simplification is nice. This will probably simplify automatic
export-kw support to IMAP folders.
I also noticed "lei config --edit" was wonky, so
I made it share code with "lei edit-search".
Starting to document config knobs, too.
Eric Wong (16):
ipc: wq_do: support synchronous waits and responses
ipc: allow disabling broadcast for wq_workers
lei/store: use SOCK_SEQPACKET rather than pipe
lei: simplify sto_done_request
lei_xsearch: drop Data::Dumper use
ipc: drop dynamic WQ process counts
lei: clamp internal worker processes to 4
lei ls-mail-source: use "high"/"low" for NNTP
lei ls-mail-source: pretty JSON support
net_reader: fix single NNTP article fetch, test ranges
xt: add fsck script over over.sqlite3
watch: use net_reader->mic_new wrapper for SOCKS+TLS
net_reader: no STARTTLS for IMAP localhost or onions
lei config --edit: use controlling terminal
net_reader: disallow imap.fetchBatchSize=0
doc: lei-config: document various knobs
Documentation/lei-config.pod | 91 +++++++++++++++++++-
MANIFEST | 2 +
lib/PublicInbox/IPC.pm | 117 +++++++++++---------------
lib/PublicInbox/LEI.pm | 32 +++----
lib/PublicInbox/LeiConfig.pm | 42 +++++++++
lib/PublicInbox/LeiEditSearch.pm | 60 +++++--------
lib/PublicInbox/LeiExternal.pm | 2 +-
lib/PublicInbox/LeiImport.pm | 2 +-
lib/PublicInbox/LeiImportKw.pm | 6 +-
lib/PublicInbox/LeiIndex.pm | 2 +-
lib/PublicInbox/LeiInit.pm | 4 +-
lib/PublicInbox/LeiInput.pm | 2 +-
lib/PublicInbox/LeiLsMailSource.pm | 25 +++---
lib/PublicInbox/LeiNoteEvent.pm | 11 +--
lib/PublicInbox/LeiRefreshMailSync.pm | 2 +-
lib/PublicInbox/LeiRemote.pm | 4 +-
lib/PublicInbox/LeiRm.pm | 2 +-
lib/PublicInbox/LeiSavedSearch.pm | 16 +---
lib/PublicInbox/LeiStore.pm | 22 ++---
lib/PublicInbox/LeiTag.pm | 2 +-
lib/PublicInbox/LeiToMail.pm | 22 ++---
lib/PublicInbox/LeiXSearch.pm | 9 +-
lib/PublicInbox/NetReader.pm | 39 +++++----
lib/PublicInbox/WQWorker.pm | 9 +-
lib/PublicInbox/Watch.pm | 3 +-
t/imapd-tls.t | 11 ++-
t/ipc.t | 19 ++---
t/lei-import-nntp.t | 26 ++++++
t/lei.t | 3 +
t/nntpd-tls.t | 8 ++
t/uri_nntps.t | 3 +
xt/over-fsck.perl | 44 ++++++++++
32 files changed, 403 insertions(+), 239 deletions(-)
create mode 100644 lib/PublicInbox/LeiConfig.pm
create mode 100644 xt/over-fsck.perl
^ permalink raw reply [relevance 5%]
* [PATCH 03/16] lei/store: use SOCK_SEQPACKET rather than pipe
2021-09-19 12:50 5% [PATCH 00/16] lei IPC overhaul, NNTP fixes Eric Wong
@ 2021-09-19 12:50 4% ` Eric Wong
0 siblings, 0 replies; 3+ results
From: Eric Wong @ 2021-09-19 12:50 UTC (permalink / raw)
To: meta
This has several advantages:
* no need to use ipc.lock to protect a pipe for non-atomic writes
* ability to pass FDs. In another commit, this will let us
simplify lei->sto_done_request and pass newly-created
sockets to lei/store directly.
disadvantages:
- an extra pipe is required for rare messages over several
hundred KB, this is probably a non-issue, though
The performance delta is unknown, but I expect shards
(which remain pipes) to be the primary bottleneck IPC-wise
for lei/store.
---
lib/PublicInbox/LEI.pm | 4 ++--
lib/PublicInbox/LeiImport.pm | 2 +-
lib/PublicInbox/LeiImportKw.pm | 2 +-
lib/PublicInbox/LeiIndex.pm | 2 +-
lib/PublicInbox/LeiInput.pm | 2 +-
lib/PublicInbox/LeiNoteEvent.pm | 8 ++++----
lib/PublicInbox/LeiRemote.pm | 4 ++--
lib/PublicInbox/LeiRm.pm | 2 +-
lib/PublicInbox/LeiStore.pm | 10 ++++++++--
lib/PublicInbox/LeiTag.pm | 2 +-
lib/PublicInbox/LeiToMail.pm | 22 ++++++++++++----------
lib/PublicInbox/LeiXSearch.pm | 6 +++---
12 files changed, 37 insertions(+), 29 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 8b0614f2..549b855b 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1501,9 +1501,9 @@ sub sto_done_request { # only call this from lei-daemon process (not workers)
eval {
if ($sock //= $lei->{sock}) { # issue, async wait
$LIVE_SOCK{"$sock"} = $sock;
- $lei->{sto}->ipc_do('done', "$sock");
+ $lei->{sto}->wq_do('done', "$sock");
} else { # forcibly wait
- my $wait = $lei->{sto}->ipc_do('done');
+ my $wait = $lei->{sto}->wq_do('done');
}
};
$lei->err($@) if $@;
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 9084d771..40530914 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -16,7 +16,7 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
if (my $all_vmd = $self->{all_vmd}) {
@$vmd{keys %$all_vmd} = values %$all_vmd;
}
- $self->{lei}->{sto}->ipc_do('set_eml', $eml, $vmd, $xoids);
+ $self->{lei}->{sto}->wq_do('set_eml', $eml, $vmd, $xoids);
}
sub input_mbox_cb { # MboxReader callback
diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm
index 402125cf..2863d17f 100644
--- a/lib/PublicInbox/LeiImportKw.pm
+++ b/lib/PublicInbox/LeiImportKw.pm
@@ -37,7 +37,7 @@ sub ck_update_kw { # via wq_io_do
$self->{lse}->kw_changed(undef, $kw, \@docids) or return;
$self->{verbose} and
$self->{lei}->qerr('# '.unpack('H*', $oidbin)." => @$kw\n");
- $self->{sto}->ipc_do('set_eml_vmd', undef, { kw => $kw }, \@docids);
+ $self->{sto}->wq_do('set_eml_vmd', undef, { kw => $kw }, \@docids);
}
sub ikw_done_wait {
diff --git a/lib/PublicInbox/LeiIndex.pm b/lib/PublicInbox/LeiIndex.pm
index 1b327a2c..b3f3e1a0 100644
--- a/lib/PublicInbox/LeiIndex.pm
+++ b/lib/PublicInbox/LeiIndex.pm
@@ -16,7 +16,7 @@ sub input_eml_cb { # used by input_maildir_cb and input_net_cb
if (my $all_vmd = $self->{all_vmd}) {
@$vmd{keys %$all_vmd} = values %$all_vmd;
}
- $self->{lei}->{sto}->ipc_do('index_eml_only', $eml, $vmd, $xoids);
+ $self->{lei}->{sto}->wq_do('index_eml_only', $eml, $vmd, $xoids);
}
sub input_fh { # overrides PublicInbox::LeiInput::input_fh
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index fe736981..22bedba6 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -378,7 +378,7 @@ sub process_inputs {
}
# always commit first, even on error partial work is acceptable for
# lei <import|tag|convert>
- my $wait = $self->{lei}->{sto}->ipc_do('done') if $self->{lei}->{sto};
+ my $wait = $self->{lei}->{sto}->wq_do('done') if $self->{lei}->{sto};
$self->{lei}->fail($err) if $err;
}
diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm
index 18313359..5f692e75 100644
--- a/lib/PublicInbox/LeiNoteEvent.pm
+++ b/lib/PublicInbox/LeiNoteEvent.pm
@@ -36,18 +36,18 @@ sub eml_event ($$$$) {
my ($self, $eml, $vmd, $state) = @_;
my $sto = $self->{lei}->{sto};
if ($state =~ /\Aimport-(?:rw|ro)\z/) {
- $sto->ipc_do('set_eml', $eml, $vmd);
+ $sto->wq_do('set_eml', $eml, $vmd);
} elsif ($state =~ /\Aindex-(?:rw|ro)\z/) {
my $xoids = $self->{lei}->ale->xoids_for($eml);
- $sto->ipc_do('index_eml_only', $eml, $vmd, $xoids);
+ $sto->wq_do('index_eml_only', $eml, $vmd, $xoids);
} elsif ($state =~ /\Atag-(?:rw|ro)\z/) {
my $docids = [];
my $c = $self->{lse}->kw_changed($eml, $vmd->{kw}, $docids);
if (scalar @$docids) { # already in lei/store
- $sto->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c;
+ $sto->wq_do('set_eml_vmd', undef, $vmd, $docids) if $c;
} elsif (my $xoids = $self->{lei}->ale->xoids_for($eml)) {
# it's in an external, only set kw, here
- $sto->ipc_do('set_xvmd', $xoids, $eml, $vmd);
+ $sto->wq_do('set_xvmd', $xoids, $eml, $vmd);
} # else { totally unknown: ignore
} else {
warn "unknown state: $state (in $self->{lei}->{cfg}->{'-f'})\n";
diff --git a/lib/PublicInbox/LeiRemote.pm b/lib/PublicInbox/LeiRemote.pm
index 8d4ffed0..346aa6a4 100644
--- a/lib/PublicInbox/LeiRemote.pm
+++ b/lib/PublicInbox/LeiRemote.pm
@@ -28,7 +28,7 @@ sub _each_mboxrd_eml { # callback for MboxReader->mboxrd
my $xoids = $lei->{ale}->xoids_for($eml, 1);
my $smsg = bless {}, 'PublicInbox::Smsg';
if ($lei->{sto} && !$xoids) { # memoize locally
- my $res = $lei->{sto}->ipc_do('add_eml', $eml);
+ my $res = $lei->{sto}->wq_do('add_eml', $eml);
$smsg = $res if ref($res) eq ref($smsg);
}
$smsg->{blob} //= $xoids ? (keys(%$xoids))[0]
@@ -56,7 +56,7 @@ sub mset {
my $err = waitpid($pid, 0) == $pid ? undef
: "BUG: waitpid($cmd): $!";
@$reap = (); # cancel OnDestroy
- my $wait = $self->{lei}->{sto}->ipc_do('done');
+ my $wait = $self->{lei}->{sto}->wq_do('done');
die $err if $err;
$self; # we are the mset (and $ibx, and $self)
}
diff --git a/lib/PublicInbox/LeiRm.pm b/lib/PublicInbox/LeiRm.pm
index 3371f3ed..97b1c5c1 100644
--- a/lib/PublicInbox/LeiRm.pm
+++ b/lib/PublicInbox/LeiRm.pm
@@ -10,7 +10,7 @@ use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
my ($self, $eml) = @_;
- $self->{lei}->{sto}->ipc_do('remove_eml', $eml);
+ $self->{lei}->{sto}->wq_do('remove_eml', $eml);
}
sub input_mbox_cb { # MboxReader callback
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 08add8f5..4ec63699 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -552,6 +552,12 @@ sub ipc_atfork_child {
$self->SUPER::ipc_atfork_child;
}
+sub recv_and_run {
+ my ($self, @args) = @_;
+ local $PublicInbox::DS::in_loop = 0; # waitpid synchronously
+ $self->SUPER::recv_and_run(@args);
+}
+
sub write_prepare {
my ($self, $lei) = @_;
$lei // die 'BUG: $lei not passed';
@@ -560,14 +566,14 @@ sub write_prepare {
require PublicInbox::PktOp;
my ($s2d_op_c, $s2d_op_p) = PublicInbox::PktOp->pair;
my $dir = $lei->store_path;
- $self->ipc_lock_init("$dir/ipc.lock");
substr($dir, -length('/lei/store'), 10, '');
pipe(my ($r, $w)) or die "pipe: $!";
$w->autoflush(1);
# Mail we import into lei are private, so headers filtered out
# by -mda for public mail are not appropriate
local @PublicInbox::MDA::BAD_HEADERS = ();
- $self->ipc_worker_spawn("lei/store $dir", $lei->oldset, {
+ $self->{-wq_no_bcast} = 1;
+ $self->wq_workers_start("lei/store $dir", 1, $lei->oldset, {
lei => $lei,
-err_wr => $w,
to_close => [ $r, $s2d_op_c->{sock} ],
diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm
index c4f5ecff..9bbf0d79 100644
--- a/lib/PublicInbox/LeiTag.pm
+++ b/lib/PublicInbox/LeiTag.pm
@@ -12,7 +12,7 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
my ($self, $eml) = @_;
if (my $xoids = $self->{lse}->xoids_for($eml) // # tries LeiMailSync
$self->{lei}->{ale}->xoids_for($eml)) {
- $self->{lei}->{sto}->ipc_do('update_xvmd', $xoids, $eml,
+ $self->{lei}->{sto}->wq_do('update_xvmd', $xoids, $eml,
$self->{vmd_mod});
} else {
++$self->{unimported};
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 9f7171fb..a419b83f 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -215,14 +215,14 @@ sub update_kw_maybe ($$$$) {
my $c = $lse->kw_changed($eml, $kw, my $docids = []);
my $vmd = { kw => $kw };
if (scalar @$docids) { # already in lei/store
- $lei->{sto}->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c;
+ $lei->{sto}->wq_do('set_eml_vmd', undef, $vmd, $docids) if $c;
} elsif (my $xoids = $lei->{ale}->xoids_for($eml)) {
# it's in an external, only set kw, here
- $lei->{sto}->ipc_do('set_xvmd', $xoids, $eml, $vmd);
+ $lei->{sto}->wq_do('set_xvmd', $xoids, $eml, $vmd);
} else { # never-before-seen, import the whole thing
# XXX this is critical in protecting against accidental
# data loss without --augment
- $lei->{sto}->ipc_do('set_eml', $eml, $vmd);
+ $lei->{sto}->wq_do('set_eml', $eml, $vmd);
}
}
@@ -296,7 +296,7 @@ sub _maildir_write_cb ($$) {
$lse->xsmsg_vmd($smsg) if $lse;
my $n = _buf2maildir($dst, $bref // \($eml->as_string),
$smsg, $dir);
- $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
+ $sto->wq_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
++$lei->{-nr_write};
}
}
@@ -326,7 +326,7 @@ sub _imap_write_cb ($$) {
}
# imap_append returns UID if IMAP server has UIDPLUS extension
($sto && $uid =~ /\A[0-9]+\z/) and
- $sto->ipc_do('set_sync_info',
+ $sto->wq_do('set_sync_info',
$smsg->{blob}, $$uri, $uid + 0);
++$lei->{-nr_write};
}
@@ -360,7 +360,7 @@ sub _v2_write_cb ($$) {
my ($bref, $smsg, $eml) = @_;
$eml //= PublicInbox::Eml->new($bref);
return if $dedupe && $dedupe->is_dup($eml, $smsg);
- $lei->{v2w}->ipc_do('add', $eml); # V2Writable->add
+ $lei->{v2w}->wq_do('add', $eml); # V2Writable->add
++$lei->{-nr_write};
}
}
@@ -658,9 +658,10 @@ sub _pre_augment_v2 {
}
PublicInbox::InboxWritable->new($ibx, @creat);
$ibx->init_inbox if @creat;
- my $v2w = $lei->{v2w} = $ibx->importer;
- $v2w->ipc_lock_init("$dir/ipc.lock");
- $v2w->ipc_worker_spawn("lei/v2w $dir", $lei->oldset, { lei => $lei });
+ my $v2w = $ibx->importer;
+ $v2w->{-wq_no_bcast} = 1;
+ $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
+ $lei->{v2w} = $v2w;
return if !$lei->{opt}->{shared};
my $d = "$lei->{ale}->{git}->{git_dir}/objects";
my $al = "$dir/git/0.git/objects/info/alternates";
@@ -689,7 +690,7 @@ sub do_augment { # slow, runs in wq worker
sub post_augment {
my ($self, $lei, @args) = @_;
my $wait = $lei->{opt}->{'import-before'} ?
- $lei->{sto}->ipc_do('checkpoint', 1) : 0;
+ $lei->{sto}->wq_do('checkpoint', 1) : 0;
# _post_augment_mbox
my $m = $self->can("_post_augment_$self->{base_type}") or return;
$m->($self, $lei, @args);
@@ -774,6 +775,7 @@ sub write_mail { # via ->wq_io_do
sub wq_atexit_child {
my ($self) = @_;
+ local $PublicInbox::DS::in_loop = 0; # waitpid synchronously
my $lei = $self->{lei};
delete $self->{wcb};
$lei->{ale}->git->async_wait_all;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 1d49da3d..4583b067 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -269,7 +269,7 @@ sub each_remote_eml { # callback for MboxReader->mboxrd
my $xoids = $lei->{ale}->xoids_for($eml, 1);
my $smsg = bless {}, 'PublicInbox::Smsg';
if ($self->{import_sto} && !$xoids) {
- my $res = $self->{import_sto}->ipc_do('add_eml', $eml);
+ my $res = $self->{import_sto}->wq_do('add_eml', $eml);
if (ref($res) eq ref($smsg)) { # totally new message
$smsg = $res;
$smsg->{kw} = []; # short-circuit xsmsg_vmd
@@ -369,7 +369,7 @@ sub query_remote_mboxrd {
@$reap_curl = (); # cancel OnDestroy
die $err if $err;
my $nr = $lei->{-nr_remote_eml};
- my $wait = $lei->{sto}->ipc_do('done') if $nr && $lei->{sto};
+ my $wait = $lei->{sto}->wq_do('done') if $nr && $lei->{sto};
if ($? == 0) {
# don't update if no results, maybe MTA is down
$key && $nr and
@@ -413,7 +413,7 @@ sub query_done { # EOF callback for main daemon
warn "BUG: {sto} missing with --mail-sync";
}
$lei->sto_done_request if $lei->{sto};
- my $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
+ my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef;
$lei->{ovv}->ovv_end($lei);
my $start_mua;
if ($l2m) { # close() calls LeiToMail reap_compress
^ permalink raw reply related [relevance 4%]
Results 1-3 of 3 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-09-19 12:50 5% [PATCH 00/16] lei IPC overhaul, NNTP fixes Eric Wong
2021-09-19 12:50 4% ` [PATCH 03/16] lei/store: use SOCK_SEQPACKET rather than pipe Eric Wong
2023-03-25 11:11 7% [PATCH] lei_store: avoid redundant work on no-op worker spawn Eric Wong
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).