From cafbd77b3c82167d7dd6958c45373a0e92a0e2c5 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 22 Sep 2021 02:24:34 +0000 Subject: lei up: avoid excessively parallel --all We shouldn't dispatch all outputs right away since they can be expensive CPU-wise. Instead, rely on DESTROY to trigger further redispatches. This also fixes a circular reference bug for the single-output case that could lead to a leftover script/lei after MUA exit. I'm not sure how --jobs/-j should work when the actual xsearch and lei2mail has it's own parallelism ("--jobs=$X,$M"), but it's better than having thousands of subtasks running. Fixes: b34a267efff7b831 ("lei up: fix --mua with single output") --- lib/PublicInbox/LeiUp.pm | 86 +++++++++++++++++++++++++++++++----------------- 1 file changed, 55 insertions(+), 31 deletions(-) (limited to 'lib/PublicInbox/LeiUp.pm') diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm index 89cf0112..377a720e 100644 --- a/lib/PublicInbox/LeiUp.pm +++ b/lib/PublicInbox/LeiUp.pm @@ -36,7 +36,7 @@ sub up1 ($$) { $lei->{opt}->{$k} //= $v; } my $o = $lei->{opt}->{output} // ''; - return $lei->fail("lei.q.output unset in $f") if $o eq ''; + return $lei->fail("lei.q.output unset in $f (out=$out)") if $o eq ''; $lss->translate_dedupe($lei) or return; $lei->{lss} = $lss; # for LeiOverview->new and query_remote_mboxrd my $lxs = $lei->lxs_prepare or return; @@ -44,39 +44,30 @@ sub up1 ($$) { $lei->_start_query; } -sub up1_redispatch { - my ($lei, $out, $op_p) = @_; - my $l; - if (defined($lei->{opt}->{mua})) { # single output - $l = $lei; - } else { # multiple outputs - $l = bless { %$lei }, ref($lei); - $l->{opt} = { %{$l->{opt}} }; # deep copy - delete $l->{opt}->{all}; - delete $l->{sock}; # do not close - # make close($l->{1}) happy in lei->dclose - open my $fh, '>&', $l->{1} or - return $l->child_error(0, "dup: $!"); - $l->{1} = $fh; - $l->qerr("# updating $out"); - } - $l->{''} = $op_p; # daemon only ($l => $lei => script/lei) - eval { $l->dispatch('up', $out) }; - $lei->child_error(0, $@) if $@ || $l->{failed}; # lei->fail() -} - sub redispatch_all ($$) { my ($self, $lei) = @_; + my $upq = [ (@{$self->{local} // []}, @{$self->{remote} // []}) ]; + return up1($lei, $upq->[0]) if @$upq == 1; # just one, may start MUA + + # FIXME: this is also used per-query, see lei->_start_query + my $j = $lei->{opt}->{jobs} || do { + my $n = $self->detect_nproc // 1; + $n > 4 ? 4 : $n; + }; + $j = ($j =~ /\A([0-9]+)/) ? $1 + 0 : 1; # may be --jobs=$x,$m on CLI # re-dispatch into our event loop w/o creating an extra fork-level + # $upq will be drained via DESTROY as each query finishes $lei->{fmsg} = PublicInbox::LeiFinmsg->new($lei); my ($op_c, $op_p) = PublicInbox::PktOp->pair; - for my $o (@{$self->{local} // []}, @{$self->{remote} // []}) { - PublicInbox::DS::requeue(sub { - up1_redispatch($lei, $o, $op_p); - }); + # call lei->dclose when upq is done processing: + $op_c->{ops} = { '' => [ $lei->can('dclose'), $lei ] }; + my @first_batch = splice(@$upq, 0, $j); # initial parallelism + $lei->{-upq} = $upq; + $lei->event_step_init; # wait for client disconnects + for my $out (@first_batch) { + PublicInbox::DS::requeue( + PublicInbox::LeiUp1::nxt($lei, $out, $op_p)); } - $lei->event_step_init; - $lei->pkt_ops($op_c->{ops} = { '' => [$lei->can('dclose'), $lei] }); } sub lei_up { @@ -98,7 +89,7 @@ sub lei_up { } else { $lei->fail("only --all=$all not understood"); } - } elsif ($lei->{lse}) { + } elsif ($lei->{lse}) { # redispatched scalar(@outs) == 1 or die "BUG: lse set w/ >1 out[@outs]"; return up1($lei, $outs[0]); } else { @@ -131,16 +122,49 @@ sub net_merge_all_done { my ($self, $lei) = @_; $lei->{net} = delete($self->{-net_new}) if $self->{-net_new}; $self->wq_close(1); - redispatch_all($self, $lei); + eval { redispatch_all($self, $lei) }; + warn "E: $@" if $@; } -sub _complete_up { +sub _complete_up { # lei__complete hook my ($lei, @argv) = @_; my $match_cb = $lei->complete_url_prepare(\@argv); map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei); } +sub _wq_done_wait { # dwaitpid callback + my ($arg, $pid) = @_; + my ($wq, $lei) = @$arg; + $lei->child_error($?, 'auth failure') if $? +} + no warnings 'once'; *ipc_atfork_child = \&PublicInbox::LeiInput::input_only_atfork_child; +package PublicInbox::LeiUp1; # for redispatch_all +use strict; +use v5.10.1; + +sub nxt ($$$) { + my ($lei, $out, $op_p) = @_; + bless { lei => $lei, out => $out, op_p => $op_p }, __PACKAGE__; +} + +sub event_step { # runs via PublicInbox::DS::requeue + my ($self) = @_; + my $lei = $self->{lei}; # the original, from lei_up + my $l = bless { %$lei }, ref($lei); # per-output copy + delete($l->{sock}) or return; # client disconnected if {sock} is gone + $l->{opt} = { %{$l->{opt}} }; # deep copy + delete $l->{opt}->{all}; + $l->qerr("# updating $self->{out}"); + $l->{up_op_p} = $self->{op_p}; # ($l => $lei => script/lei) + eval { $l->dispatch('up', $self->{out}) }; + $lei->child_error(0, $@) if $@ || $l->{failed}; # lei->fail() + + # onto the next: + my $out = shift(@{$lei->{-upq}}) or return; + PublicInbox::DS::requeue(nxt($lei, $out, $self->{op_p})); +} + 1; -- cgit v1.2.3-24-ge0c7