From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 6/7] lei up: avoid excessively parallel --all
Date: Wed, 22 Sep 2021 02:24:34 +0000 [thread overview]
Message-ID: <20210922022435.17835-7-e@80x24.org> (raw)
In-Reply-To: <20210922022435.17835-1-e@80x24.org>
We shouldn't dispatch all outputs right away since they
can be expensive CPU-wise. Instead, rely on DESTROY to
trigger further redispatches.
This also fixes a circular reference bug for the single-output
case that could lead to a leftover script/lei after MUA exit.
I'm not sure how --jobs/-j should work when the actual xsearch
and lei2mail has it's own parallelism ("--jobs=$X,$M"), but
it's better than having thousands of subtasks running.
Fixes: b34a267efff7b831 ("lei up: fix --mua with single output")
---
lib/PublicInbox/LEI.pm | 2 +-
lib/PublicInbox/LeiUp.pm | 86 +++++++++++++++++++++++++---------------
2 files changed, 56 insertions(+), 32 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index a1cab55a..1305dfb8 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1384,7 +1384,7 @@ sub fchdir {
sub wq_eof { # EOF callback for main daemon
my ($lei) = @_;
my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed
- $wq1->wq_wait_old(\&wq_done_wait, $lei);
+ $wq1->wq_wait_old($wq1->can('_wq_done_wait') // \&wq_done_wait, $lei);
}
sub watch_state_ok ($) {
diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm
index 89cf0112..377a720e 100644
--- a/lib/PublicInbox/LeiUp.pm
+++ b/lib/PublicInbox/LeiUp.pm
@@ -36,7 +36,7 @@ sub up1 ($$) {
$lei->{opt}->{$k} //= $v;
}
my $o = $lei->{opt}->{output} // '';
- return $lei->fail("lei.q.output unset in $f") if $o eq '';
+ return $lei->fail("lei.q.output unset in $f (out=$out)") if $o eq '';
$lss->translate_dedupe($lei) or return;
$lei->{lss} = $lss; # for LeiOverview->new and query_remote_mboxrd
my $lxs = $lei->lxs_prepare or return;
@@ -44,39 +44,30 @@ sub up1 ($$) {
$lei->_start_query;
}
-sub up1_redispatch {
- my ($lei, $out, $op_p) = @_;
- my $l;
- if (defined($lei->{opt}->{mua})) { # single output
- $l = $lei;
- } else { # multiple outputs
- $l = bless { %$lei }, ref($lei);
- $l->{opt} = { %{$l->{opt}} }; # deep copy
- delete $l->{opt}->{all};
- delete $l->{sock}; # do not close
- # make close($l->{1}) happy in lei->dclose
- open my $fh, '>&', $l->{1} or
- return $l->child_error(0, "dup: $!");
- $l->{1} = $fh;
- $l->qerr("# updating $out");
- }
- $l->{''} = $op_p; # daemon only ($l => $lei => script/lei)
- eval { $l->dispatch('up', $out) };
- $lei->child_error(0, $@) if $@ || $l->{failed}; # lei->fail()
-}
-
sub redispatch_all ($$) {
my ($self, $lei) = @_;
+ my $upq = [ (@{$self->{local} // []}, @{$self->{remote} // []}) ];
+ return up1($lei, $upq->[0]) if @$upq == 1; # just one, may start MUA
+
+ # FIXME: this is also used per-query, see lei->_start_query
+ my $j = $lei->{opt}->{jobs} || do {
+ my $n = $self->detect_nproc // 1;
+ $n > 4 ? 4 : $n;
+ };
+ $j = ($j =~ /\A([0-9]+)/) ? $1 + 0 : 1; # may be --jobs=$x,$m on CLI
# re-dispatch into our event loop w/o creating an extra fork-level
+ # $upq will be drained via DESTROY as each query finishes
$lei->{fmsg} = PublicInbox::LeiFinmsg->new($lei);
my ($op_c, $op_p) = PublicInbox::PktOp->pair;
- for my $o (@{$self->{local} // []}, @{$self->{remote} // []}) {
- PublicInbox::DS::requeue(sub {
- up1_redispatch($lei, $o, $op_p);
- });
+ # call lei->dclose when upq is done processing:
+ $op_c->{ops} = { '' => [ $lei->can('dclose'), $lei ] };
+ my @first_batch = splice(@$upq, 0, $j); # initial parallelism
+ $lei->{-upq} = $upq;
+ $lei->event_step_init; # wait for client disconnects
+ for my $out (@first_batch) {
+ PublicInbox::DS::requeue(
+ PublicInbox::LeiUp1::nxt($lei, $out, $op_p));
}
- $lei->event_step_init;
- $lei->pkt_ops($op_c->{ops} = { '' => [$lei->can('dclose'), $lei] });
}
sub lei_up {
@@ -98,7 +89,7 @@ sub lei_up {
} else {
$lei->fail("only --all=$all not understood");
}
- } elsif ($lei->{lse}) {
+ } elsif ($lei->{lse}) { # redispatched
scalar(@outs) == 1 or die "BUG: lse set w/ >1 out[@outs]";
return up1($lei, $outs[0]);
} else {
@@ -131,16 +122,49 @@ sub net_merge_all_done {
my ($self, $lei) = @_;
$lei->{net} = delete($self->{-net_new}) if $self->{-net_new};
$self->wq_close(1);
- redispatch_all($self, $lei);
+ eval { redispatch_all($self, $lei) };
+ warn "E: $@" if $@;
}
-sub _complete_up {
+sub _complete_up { # lei__complete hook
my ($lei, @argv) = @_;
my $match_cb = $lei->complete_url_prepare(\@argv);
map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei);
}
+sub _wq_done_wait { # dwaitpid callback
+ my ($arg, $pid) = @_;
+ my ($wq, $lei) = @$arg;
+ $lei->child_error($?, 'auth failure') if $?
+}
+
no warnings 'once';
*ipc_atfork_child = \&PublicInbox::LeiInput::input_only_atfork_child;
+package PublicInbox::LeiUp1; # for redispatch_all
+use strict;
+use v5.10.1;
+
+sub nxt ($$$) {
+ my ($lei, $out, $op_p) = @_;
+ bless { lei => $lei, out => $out, op_p => $op_p }, __PACKAGE__;
+}
+
+sub event_step { # runs via PublicInbox::DS::requeue
+ my ($self) = @_;
+ my $lei = $self->{lei}; # the original, from lei_up
+ my $l = bless { %$lei }, ref($lei); # per-output copy
+ delete($l->{sock}) or return; # client disconnected if {sock} is gone
+ $l->{opt} = { %{$l->{opt}} }; # deep copy
+ delete $l->{opt}->{all};
+ $l->qerr("# updating $self->{out}");
+ $l->{up_op_p} = $self->{op_p}; # ($l => $lei => script/lei)
+ eval { $l->dispatch('up', $self->{out}) };
+ $lei->child_error(0, $@) if $@ || $l->{failed}; # lei->fail()
+
+ # onto the next:
+ my $out = shift(@{$lei->{-upq}}) or return;
+ PublicInbox::DS::requeue(nxt($lei, $out, $self->{op_p}));
+}
+
1;
next prev parent reply other threads:[~2021-09-22 2:24 UTC|newest]
Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-09-22 2:24 [PATCH 0/7] lei bugfixes and other fixes Eric Wong
2021-09-22 2:24 ` [PATCH 1/7] ipc: do not add "0" to $0 of solo workers Eric Wong
2021-09-22 2:24 ` [PATCH 2/7] treewide: fix %SIG localization, harder Eric Wong
2021-09-22 2:24 ` [PATCH 3/7] script/lei: describe purpose of sleep loop Eric Wong
2021-09-22 2:24 ` [PATCH 4/7] lei: dclose: do not close unnecessarily Eric Wong
2021-09-22 2:24 ` [PATCH 5/7] inbox: do not waste hash slot on httpbackend_limiter Eric Wong
2021-09-22 2:24 ` Eric Wong [this message]
2021-09-22 2:24 ` [PATCH 7/7] lei: drop redundant WQ EOF callbacks Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210922022435.17835-7-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).