user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 6/7] lei up: avoid excessively parallel --all
  @ 2021-09-22  2:24  5% ` Eric Wong
  0 siblings, 0 replies; 3+ results
From: Eric Wong @ 2021-09-22  2:24 UTC (permalink / raw)
  To: meta

We shouldn't dispatch all outputs right away since they
can be expensive CPU-wise.  Instead, rely on DESTROY to
trigger further redispatches.

This also fixes a circular reference bug for the single-output
case that could lead to a leftover script/lei after MUA exit.

I'm not sure how --jobs/-j should work when the actual xsearch
and lei2mail has it's own parallelism ("--jobs=$X,$M"), but
it's better than having thousands of subtasks running.

Fixes: b34a267efff7b831 ("lei up: fix --mua with single output")
---
 lib/PublicInbox/LEI.pm   |  2 +-
 lib/PublicInbox/LeiUp.pm | 86 +++++++++++++++++++++++++---------------
 2 files changed, 56 insertions(+), 32 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index a1cab55a..1305dfb8 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1384,7 +1384,7 @@ sub fchdir {
 sub wq_eof { # EOF callback for main daemon
 	my ($lei) = @_;
 	my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed
-	$wq1->wq_wait_old(\&wq_done_wait, $lei);
+	$wq1->wq_wait_old($wq1->can('_wq_done_wait') // \&wq_done_wait, $lei);
 }
 
 sub watch_state_ok ($) {
diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm
index 89cf0112..377a720e 100644
--- a/lib/PublicInbox/LeiUp.pm
+++ b/lib/PublicInbox/LeiUp.pm
@@ -36,7 +36,7 @@ sub up1 ($$) {
 		$lei->{opt}->{$k} //= $v;
 	}
 	my $o = $lei->{opt}->{output} // '';
-	return $lei->fail("lei.q.output unset in $f") if $o eq '';
+	return $lei->fail("lei.q.output unset in $f (out=$out)") if $o eq '';
 	$lss->translate_dedupe($lei) or return;
 	$lei->{lss} = $lss; # for LeiOverview->new and query_remote_mboxrd
 	my $lxs = $lei->lxs_prepare or return;
@@ -44,39 +44,30 @@ sub up1 ($$) {
 	$lei->_start_query;
 }
 
-sub up1_redispatch {
-	my ($lei, $out, $op_p) = @_;
-	my $l;
-	if (defined($lei->{opt}->{mua})) { # single output
-		$l = $lei;
-	} else { # multiple outputs
-		$l = bless { %$lei }, ref($lei);
-		$l->{opt} = { %{$l->{opt}} }; # deep copy
-		delete $l->{opt}->{all};
-		delete $l->{sock}; # do not close
-		# make close($l->{1}) happy in lei->dclose
-		open my $fh, '>&', $l->{1} or
-			return $l->child_error(0, "dup: $!");
-		$l->{1} = $fh;
-		$l->qerr("# updating $out");
-	}
-	$l->{''} = $op_p; # daemon only ($l => $lei => script/lei)
-	eval { $l->dispatch('up', $out) };
-	$lei->child_error(0, $@) if $@ || $l->{failed}; # lei->fail()
-}
-
 sub redispatch_all ($$) {
 	my ($self, $lei) = @_;
+	my $upq = [ (@{$self->{local} // []}, @{$self->{remote} // []}) ];
+	return up1($lei, $upq->[0]) if @$upq == 1; # just one, may start MUA
+
+	# FIXME: this is also used per-query, see lei->_start_query
+	my $j = $lei->{opt}->{jobs} || do {
+		my $n = $self->detect_nproc // 1;
+		$n > 4 ? 4 : $n;
+	};
+	$j = ($j =~ /\A([0-9]+)/) ? $1 + 0 : 1; # may be --jobs=$x,$m on CLI
 	# re-dispatch into our event loop w/o creating an extra fork-level
+	# $upq will be drained via DESTROY as each query finishes
 	$lei->{fmsg} = PublicInbox::LeiFinmsg->new($lei);
 	my ($op_c, $op_p) = PublicInbox::PktOp->pair;
-	for my $o (@{$self->{local} // []}, @{$self->{remote} // []}) {
-		PublicInbox::DS::requeue(sub {
-			up1_redispatch($lei, $o, $op_p);
-		});
+	# call lei->dclose when upq is done processing:
+	$op_c->{ops} = { '' => [ $lei->can('dclose'), $lei ] };
+	my @first_batch = splice(@$upq, 0, $j); # initial parallelism
+	$lei->{-upq} = $upq;
+	$lei->event_step_init; # wait for client disconnects
+	for my $out (@first_batch) {
+		PublicInbox::DS::requeue(
+			PublicInbox::LeiUp1::nxt($lei, $out, $op_p));
 	}
-	$lei->event_step_init;
-	$lei->pkt_ops($op_c->{ops} = { '' => [$lei->can('dclose'), $lei] });
 }
 
 sub lei_up {
@@ -98,7 +89,7 @@ sub lei_up {
 		} else {
 			$lei->fail("only --all=$all not understood");
 		}
-	} elsif ($lei->{lse}) {
+	} elsif ($lei->{lse}) { # redispatched
 		scalar(@outs) == 1 or die "BUG: lse set w/ >1 out[@outs]";
 		return up1($lei, $outs[0]);
 	} else {
@@ -131,16 +122,49 @@ sub net_merge_all_done {
 	my ($self, $lei) = @_;
 	$lei->{net} = delete($self->{-net_new}) if $self->{-net_new};
 	$self->wq_close(1);
-	redispatch_all($self, $lei);
+	eval { redispatch_all($self, $lei) };
+	warn "E: $@" if $@;
 }
 
-sub _complete_up {
+sub _complete_up { # lei__complete hook
 	my ($lei, @argv) = @_;
 	my $match_cb = $lei->complete_url_prepare(\@argv);
 	map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei);
 }
 
+sub _wq_done_wait { # dwaitpid callback
+	my ($arg, $pid) = @_;
+	my ($wq, $lei) = @$arg;
+	$lei->child_error($?, 'auth failure') if $?
+}
+
 no warnings 'once';
 *ipc_atfork_child = \&PublicInbox::LeiInput::input_only_atfork_child;
 
+package PublicInbox::LeiUp1; # for redispatch_all
+use strict;
+use v5.10.1;
+
+sub nxt ($$$) {
+	my ($lei, $out, $op_p) = @_;
+	bless { lei => $lei, out => $out, op_p => $op_p }, __PACKAGE__;
+}
+
+sub event_step { # runs via PublicInbox::DS::requeue
+	my ($self) = @_;
+	my $lei = $self->{lei}; # the original, from lei_up
+	my $l = bless { %$lei }, ref($lei); # per-output copy
+	delete($l->{sock}) or return; # client disconnected if {sock} is gone
+	$l->{opt} = { %{$l->{opt}} }; # deep copy
+	delete $l->{opt}->{all};
+	$l->qerr("# updating $self->{out}");
+	$l->{up_op_p} = $self->{op_p}; # ($l => $lei => script/lei)
+	eval { $l->dispatch('up', $self->{out}) };
+	$lei->child_error(0, $@) if $@ || $l->{failed}; # lei->fail()
+
+	# onto the next:
+	my $out = shift(@{$lei->{-upq}}) or return;
+	PublicInbox::DS::requeue(nxt($lei, $out, $self->{op_p}));
+}
+
 1;

^ permalink raw reply related	[relevance 5%]

* [PATCH 6/6] lei up: fix --mua with single output
  2021-09-13 20:53  6% [PATCH 0/6] a batch of misc fixes Eric Wong
@ 2021-09-13 20:53  7% ` Eric Wong
  0 siblings, 0 replies; 3+ results
From: Eric Wong @ 2021-09-13 20:53 UTC (permalink / raw)
  To: meta

Oops :x

Fixes: b584a53f053a7629 ("lei up: support --all for IMAP folders")
---
 lib/PublicInbox/LeiUp.pm | 25 +++++++++++++++----------
 t/lei-up.t               |  4 ++++
 2 files changed, 19 insertions(+), 10 deletions(-)

diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm
index a16117c9..53f06dbc 100644
--- a/lib/PublicInbox/LeiUp.pm
+++ b/lib/PublicInbox/LeiUp.pm
@@ -54,16 +54,21 @@ sub up1 ($$) {
 
 sub up1_redispatch {
 	my ($lei, $out, $op_p) = @_;
-	my $l = bless { %$lei }, ref($lei);
-	$l->{opt} = { %{$l->{opt}} }; # deep copy
-	delete $l->{sock}; # do not close
-	$l->{''} = $op_p; # daemon only ($l => $lei => script/lei)
-
-	# make close($l->{1}) happy in lei->dclose
-	open my $fh, '>&', $l->{1} or return $l->child_error(0, "dup: $!");
+	my $l;
+	if (defined($lei->{opt}->{mua})) { # single output
+		$l = $lei;
+	} else { # multiple outputs
+		$l = bless { %$lei }, ref($lei);
+		$l->{opt} = { %{$l->{opt}} }; # deep copy
+		delete $l->{sock}; # do not close
+		# make close($l->{1}) happy in lei->dclose
+		open my $fh, '>&', $l->{1} or
+			return $l->child_error(0, "dup: $!");
+		$l->{1} = $fh;
+	}
 	local $PublicInbox::LEI::current_lei = $l;
 	local %ENV = %{$l->{env}};
-	$l->{1} = $fh;
+	$l->{''} = $op_p; # daemon only ($l => $lei => script/lei)
 	eval {
 		$l->qerr("# updating $out");
 		up1($l, $out);
@@ -92,7 +97,7 @@ sub lei_up {
 	$lei->{lse} = $lei->_lei_store(1)->write_prepare($lei)->search;
 	if (defined(my $all = $opt->{all})) {
 		return $lei->fail("--all and @outs incompatible") if @outs;
-		length($opt->{mua}//'') and return
+		defined($opt->{mua}) and return
 			$lei->fail('--all and --mua= are incompatible');
 		@outs = PublicInbox::LeiSavedSearch::list($lei);
 		if ($all eq 'local') {
@@ -110,7 +115,7 @@ sub lei_up {
 		$self->{local} = [ grep(!/$REMOTE_RE/, @outs) ];
 	}
 	((@{$self->{local} // []} + @{$self->{remote} // []}) > 1 &&
-		length($opt->{mua} // '')) and return $lei->fail(<<EOM);
+		defined($opt->{mua})) and return $lei->fail(<<EOM);
 multiple outputs and --mua= are incompatible
 EOM
 	if ($self->{remote}) { # setup lei->{auth}
diff --git a/t/lei-up.t b/t/lei-up.t
index 6b34774d..8937cfb1 100644
--- a/t/lei-up.t
+++ b/t/lei-up.t
@@ -34,6 +34,10 @@ test_lei(sub {
 	open $fh, "$ENV{HOME}/b" or xbail "open: $!";
 	$uc = do { local $/; <$fh> };
 	is($uc, $exp, 'uncompressed both match');
+
+	lei_ok [ 'up', "$ENV{HOME}/b", "--mua=touch $ENV{HOME}/c" ],
+		undef, { run_mode => 0 };
+	ok(-f "$ENV{HOME}/c", '--mua works with single output');
 });
 
 done_testing;

^ permalink raw reply related	[relevance 7%]

* [PATCH 0/6] a batch of misc fixes
@ 2021-09-13 20:53  6% Eric Wong
  2021-09-13 20:53  7% ` [PATCH 6/6] lei up: fix --mua with single output Eric Wong
  0 siblings, 1 reply; 3+ results
From: Eric Wong @ 2021-09-13 20:53 UTC (permalink / raw)
  To: meta

Eric Wong (6):
  tests: add require_cmd, require curl when needed
  lei: stop_pager: restore stdout when done
  use POSIX::PIPE_BUF (instead of _POSIX_PIPE_BUF)
  lei up: localize %ENV in redispatch
  lei_xsearch: sensible errors for missing/broken externals
  lei up: fix --mua with single output

 lib/PublicInbox/Git.pm        |  4 +---
 lib/PublicInbox/LEI.pm        | 12 +++++-------
 lib/PublicInbox/LeiUp.pm      | 26 ++++++++++++++++----------
 lib/PublicInbox/LeiXSearch.pm |  5 ++++-
 lib/PublicInbox/TestCommon.pm | 20 +++++++++++++-------
 t/ds-leak.t                   |  9 +++------
 t/hl_mod.t                    |  9 ++-------
 t/httpd-corner.t              | 11 ++++-------
 t/imapd.t                     |  1 -
 t/lei-daemon.t                |  1 -
 t/lei-externals.t             |  8 ++++----
 t/lei-import-http.t           |  3 +--
 t/lei-mirror.t                |  1 +
 t/lei-q-remote-import.t       |  1 +
 t/lei-up.t                    |  4 ++++
 t/lei.t                       |  4 +---
 t/nntpd.t                     |  7 ++-----
 t/nodatacow.t                 | 10 ++++------
 t/run.perl                    |  4 ++--
 t/solver_git.t                |  4 ++--
 t/v2mirror.t                  |  6 ++----
 t/v2reindex.t                 |  8 ++------
 t/www_altid.t                 |  8 +++-----
 t/www_listing.t               |  8 ++------
 24 files changed, 79 insertions(+), 95 deletions(-)

^ permalink raw reply	[relevance 6%]

Results 1-3 of 3 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-09-13 20:53  6% [PATCH 0/6] a batch of misc fixes Eric Wong
2021-09-13 20:53  7% ` [PATCH 6/6] lei up: fix --mua with single output Eric Wong
2021-09-22  2:24     [PATCH 0/7] lei bugfixes and other fixes Eric Wong
2021-09-22  2:24  5% ` [PATCH 6/7] lei up: avoid excessively parallel --all Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).