about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-04-27 11:07:51 +0000
committerEric Wong <e@80x24.org>2021-04-27 21:28:56 -0400
commit787cbc523c0beac69f6df8b8a689684864a6594e (patch)
tree34663f6d5488083e4f380b343db9772e2fbd4982 /lib/PublicInbox
parent98cd36cdf25fb27e006bd49a3d5bd479be44ce50 (diff)
downloadpublic-inbox-787cbc523c0beac69f6df8b8a689684864a6594e.tar.gz
Simplify our internals a little bit.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/LEI.pm2
-rw-r--r--lib/PublicInbox/LeiBlob.pm5
-rw-r--r--lib/PublicInbox/LeiConvert.pm2
-rw-r--r--lib/PublicInbox/LeiImport.pm6
-rw-r--r--lib/PublicInbox/LeiMirror.pm6
-rw-r--r--lib/PublicInbox/LeiP2q.pm2
-rw-r--r--lib/PublicInbox/LeiTag.pm8
7 files changed, 14 insertions, 17 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index c170572b..effc905a 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -531,7 +531,7 @@ sub workers_start {
                 'child_error' => [ \&child_error, $lei ],
                 ($ops ? %$ops : ()),
         };
-        $ops->{''} //= [ \&dclose, $lei ];
+        $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&dclose, $lei ];
         my $end = $lei->pkt_op_pair;
         $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
         delete $lei->{pkt_op_p};
diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm
index 4e52c8a5..0b96bd04 100644
--- a/lib/PublicInbox/LeiBlob.pm
+++ b/lib/PublicInbox/LeiBlob.pm
@@ -10,7 +10,7 @@ use parent qw(PublicInbox::IPC);
 use PublicInbox::Spawn qw(spawn popen_rd which);
 use PublicInbox::DS;
 
-sub sol_done { # EOF callback for main daemon
+sub _lei_wq_eof { # EOF callback for main daemon
         my ($lei) = @_;
         my $sol = delete $lei->{sol} // return $lei->dclose; # already failed
         $sol->wq_wait_old($lei->can('wq_done_wait'), $lei);
@@ -157,8 +157,7 @@ EOM
         }
         require PublicInbox::SolverGit;
         my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__;
-        my ($op_c, $ops) = $lei->workers_start($self, 'lei_solve', 1,
-                { '' => [ \&sol_done, $lei ] });
+        my ($op_c, $ops) = $lei->workers_start($self, 'lei-blob', 1);
         $lei->{sol} = $self;
         $self->wq_io_do('do_solve_blob', []);
         $self->wq_close(1);
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 0ce49ea9..0c324169 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -52,7 +52,7 @@ sub lei_convert { # the main "lei convert" method
         my $devfd = $lei->path_to_fd($ovv->{dst}) // return;
         $lei->{opt}->{augment} = 1 if $devfd < 0;
         $self->prepare_inputs($lei, \@inputs) or return;
-        my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1);
+        my ($op_c, $ops) = $lei->workers_start($self, 'lei-convert', 1);
         $lei->{cnv} = $self;
         $self->wq_io_do('process_inputs', []);
         $self->wq_close(1);
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index daaa6753..e0d899cc 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -53,7 +53,7 @@ sub input_nntp_cb { # nntp_each
         input_eml_cb($self, $eml, $self->{-import_kw} ? { kw => $kw } : undef);
 }
 
-sub import_done { # EOF callback for main daemon
+sub _lei_wq_eof { # EOF callback for main daemon
         my ($lei) = @_;
         my $imp = delete $lei->{imp} // return $lei->fail('BUG: {imp} gone');
         $imp->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal');
@@ -90,10 +90,10 @@ sub lei_import { # the main "lei import" method
                 my $nproc = $self->detect_nproc;
                 $j = $nproc if $j > $nproc;
         }
-        my $ops = { '' => [ \&import_done, $lei ] };
+        my $ops = {};
         $lei->{auth}->op_merge($ops, $self) if $lei->{auth};
         $self->{-wq_nr_workers} = $j // 1; # locked
-        (my $op_c, $ops) = $lei->workers_start($self, 'lei_import', $j, $ops);
+        (my $op_c, $ops) = $lei->workers_start($self, 'lei-import', $j, $ops);
         $lei->{imp} = $self;
         net_merge_complete($self) unless $lei->{auth};
         $op_c->op_wait_event($ops);
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index 15adb71b..50ab4c85 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -24,7 +24,7 @@ sub do_finish_mirror { # dwaitpid callback
         $lei->dclose;
 }
 
-sub mirror_done { # EOF callback for main daemon
+sub _lei_wq_eof { # EOF callback for main daemon
         my ($lei) = @_;
         my $mrr = delete $lei->{mrr} or return;
         $mrr->wq_wait_old(\&do_finish_mirror, $lei);
@@ -282,9 +282,7 @@ sub start {
         require PublicInbox::Inbox;
         require PublicInbox::Admin;
         require PublicInbox::InboxWritable;
-        my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1, {
-                '' => [ \&mirror_done, $lei ]
-        });
+        my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1);
         $lei->{mrr} = $self;
         $self->wq_io_do('do_mirror', []);
         $self->wq_close(1);
diff --git a/lib/PublicInbox/LeiP2q.pm b/lib/PublicInbox/LeiP2q.pm
index cb2309c7..3248afd7 100644
--- a/lib/PublicInbox/LeiP2q.pm
+++ b/lib/PublicInbox/LeiP2q.pm
@@ -188,7 +188,7 @@ sub lei_p2q { # the "lei patch-to-query" entry point
         } else {
                 $self->{input} = $input;
         }
-        my ($op, $ops) = $lei->workers_start($self, 'lei_p2q', 1);
+        my ($op, $ops) = $lei->workers_start($self, 'lei-p2q', 1);
         $lei->{p2q} = $self;
         $self->wq_io_do('do_p2q', []);
         $self->wq_close(1);
diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm
index f5791947..3cda2eca 100644
--- a/lib/PublicInbox/LeiTag.pm
+++ b/lib/PublicInbox/LeiTag.pm
@@ -19,9 +19,9 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
 
 sub input_mbox_cb { input_eml_cb($_[1], $_[0]) }
 
-sub tag_done { # EOF callback for main daemon
+sub _lei_wq_eof { # EOF callback for main daemon
         my ($lei) = @_;
-        my $tag = delete $lei->{tag} or return;
+        my $tag = delete $lei->{tag} // return $lei->dclose;
         $tag->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal');
 }
 
@@ -52,11 +52,11 @@ sub lei_tag { # the "lei tag" method
         $self->prepare_inputs($lei, \@argv) or return;
         grep(defined, @$vmd_mod{qw(+kw +L -L -kw)}) or
                 return $lei->fail('no keywords or labels specified');
-        my $ops = { '' => [ \&tag_done, $lei ] };
+        my $ops = {};
         $lei->{auth}->op_merge($ops, $self) if $lei->{auth};
         $self->{vmd_mod} = $vmd_mod;
         my $j = $self->{-wq_nr_workers} = 1; # locked for now
-        (my $op_c, $ops) = $lei->workers_start($self, 'lei_tag', $j, $ops);
+        (my $op_c, $ops) = $lei->workers_start($self, 'lei-tag', $j, $ops);
         $lei->{tag} = $self;
         net_merge_complete($self) unless $lei->{auth};
         $op_c->op_wait_event($ops);