about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-09-22 02:24:34 +0000
committerEric Wong <e@80x24.org>2021-09-22 05:21:22 +0000
commitcafbd77b3c82167d7dd6958c45373a0e92a0e2c5 (patch)
tree4e97001ef3d330cc6ebb62ed61db67205dc866e1 /lib
parent5c86be0c56d7f250a91c061c00243546d81146ae (diff)
downloadpublic-inbox-cafbd77b3c82167d7dd6958c45373a0e92a0e2c5.tar.gz
We shouldn't dispatch all outputs right away since they
can be expensive CPU-wise.  Instead, rely on DESTROY to
trigger further redispatches.

This also fixes a circular reference bug for the single-output
case that could lead to a leftover script/lei after MUA exit.

I'm not sure how --jobs/-j should work when the actual xsearch
and lei2mail has it's own parallelism ("--jobs=$X,$M"), but
it's better than having thousands of subtasks running.

Fixes: b34a267efff7b831 ("lei up: fix --mua with single output")
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/LEI.pm2
-rw-r--r--lib/PublicInbox/LeiUp.pm86
2 files changed, 56 insertions, 32 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index a1cab55a..1305dfb8 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1384,7 +1384,7 @@ sub fchdir {
 sub wq_eof { # EOF callback for main daemon
         my ($lei) = @_;
         my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed
-        $wq1->wq_wait_old(\&wq_done_wait, $lei);
+        $wq1->wq_wait_old($wq1->can('_wq_done_wait') // \&wq_done_wait, $lei);
 }
 
 sub watch_state_ok ($) {
diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm
index 89cf0112..377a720e 100644
--- a/lib/PublicInbox/LeiUp.pm
+++ b/lib/PublicInbox/LeiUp.pm
@@ -36,7 +36,7 @@ sub up1 ($$) {
                 $lei->{opt}->{$k} //= $v;
         }
         my $o = $lei->{opt}->{output} // '';
-        return $lei->fail("lei.q.output unset in $f") if $o eq '';
+        return $lei->fail("lei.q.output unset in $f (out=$out)") if $o eq '';
         $lss->translate_dedupe($lei) or return;
         $lei->{lss} = $lss; # for LeiOverview->new and query_remote_mboxrd
         my $lxs = $lei->lxs_prepare or return;
@@ -44,39 +44,30 @@ sub up1 ($$) {
         $lei->_start_query;
 }
 
-sub up1_redispatch {
-        my ($lei, $out, $op_p) = @_;
-        my $l;
-        if (defined($lei->{opt}->{mua})) { # single output
-                $l = $lei;
-        } else { # multiple outputs
-                $l = bless { %$lei }, ref($lei);
-                $l->{opt} = { %{$l->{opt}} }; # deep copy
-                delete $l->{opt}->{all};
-                delete $l->{sock}; # do not close
-                # make close($l->{1}) happy in lei->dclose
-                open my $fh, '>&', $l->{1} or
-                        return $l->child_error(0, "dup: $!");
-                $l->{1} = $fh;
-                $l->qerr("# updating $out");
-        }
-        $l->{''} = $op_p; # daemon only ($l => $lei => script/lei)
-        eval { $l->dispatch('up', $out) };
-        $lei->child_error(0, $@) if $@ || $l->{failed}; # lei->fail()
-}
-
 sub redispatch_all ($$) {
         my ($self, $lei) = @_;
+        my $upq = [ (@{$self->{local} // []}, @{$self->{remote} // []}) ];
+        return up1($lei, $upq->[0]) if @$upq == 1; # just one, may start MUA
+
+        # FIXME: this is also used per-query, see lei->_start_query
+        my $j = $lei->{opt}->{jobs} || do {
+                my $n = $self->detect_nproc // 1;
+                $n > 4 ? 4 : $n;
+        };
+        $j = ($j =~ /\A([0-9]+)/) ? $1 + 0 : 1; # may be --jobs=$x,$m on CLI
         # re-dispatch into our event loop w/o creating an extra fork-level
+        # $upq will be drained via DESTROY as each query finishes
         $lei->{fmsg} = PublicInbox::LeiFinmsg->new($lei);
         my ($op_c, $op_p) = PublicInbox::PktOp->pair;
-        for my $o (@{$self->{local} // []}, @{$self->{remote} // []}) {
-                PublicInbox::DS::requeue(sub {
-                        up1_redispatch($lei, $o, $op_p);
-                });
+        # call lei->dclose when upq is done processing:
+        $op_c->{ops} = { '' => [ $lei->can('dclose'), $lei ] };
+        my @first_batch = splice(@$upq, 0, $j); # initial parallelism
+        $lei->{-upq} = $upq;
+        $lei->event_step_init; # wait for client disconnects
+        for my $out (@first_batch) {
+                PublicInbox::DS::requeue(
+                        PublicInbox::LeiUp1::nxt($lei, $out, $op_p));
         }
-        $lei->event_step_init;
-        $lei->pkt_ops($op_c->{ops} = { '' => [$lei->can('dclose'), $lei] });
 }
 
 sub lei_up {
@@ -98,7 +89,7 @@ sub lei_up {
                 } else {
                         $lei->fail("only --all=$all not understood");
                 }
-        } elsif ($lei->{lse}) {
+        } elsif ($lei->{lse}) { # redispatched
                 scalar(@outs) == 1 or die "BUG: lse set w/ >1 out[@outs]";
                 return up1($lei, $outs[0]);
         } else {
@@ -131,16 +122,49 @@ sub net_merge_all_done {
         my ($self, $lei) = @_;
         $lei->{net} = delete($self->{-net_new}) if $self->{-net_new};
         $self->wq_close(1);
-        redispatch_all($self, $lei);
+        eval { redispatch_all($self, $lei) };
+        warn "E: $@" if $@;
 }
 
-sub _complete_up {
+sub _complete_up { # lei__complete hook
         my ($lei, @argv) = @_;
         my $match_cb = $lei->complete_url_prepare(\@argv);
         map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei);
 }
 
+sub _wq_done_wait { # dwaitpid callback
+        my ($arg, $pid) = @_;
+        my ($wq, $lei) = @$arg;
+        $lei->child_error($?, 'auth failure') if $?
+}
+
 no warnings 'once';
 *ipc_atfork_child = \&PublicInbox::LeiInput::input_only_atfork_child;
 
+package PublicInbox::LeiUp1; # for redispatch_all
+use strict;
+use v5.10.1;
+
+sub nxt ($$$) {
+        my ($lei, $out, $op_p) = @_;
+        bless { lei => $lei, out => $out, op_p => $op_p }, __PACKAGE__;
+}
+
+sub event_step { # runs via PublicInbox::DS::requeue
+        my ($self) = @_;
+        my $lei = $self->{lei}; # the original, from lei_up
+        my $l = bless { %$lei }, ref($lei); # per-output copy
+        delete($l->{sock}) or return; # client disconnected if {sock} is gone
+        $l->{opt} = { %{$l->{opt}} }; # deep copy
+        delete $l->{opt}->{all};
+        $l->qerr("# updating $self->{out}");
+        $l->{up_op_p} = $self->{op_p}; # ($l => $lei => script/lei)
+        eval { $l->dispatch('up', $self->{out}) };
+        $lei->child_error(0, $@) if $@ || $l->{failed}; # lei->fail()
+
+        # onto the next:
+        my $out = shift(@{$lei->{-upq}}) or return;
+        PublicInbox::DS::requeue(nxt($lei, $out, $self->{op_p}));
+}
+
 1;