about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/IPC.pm46
-rw-r--r--lib/PublicInbox/LEI.pm16
-rw-r--r--lib/PublicInbox/LeiBlob.pm2
-rw-r--r--lib/PublicInbox/LeiConvert.pm2
-rw-r--r--lib/PublicInbox/LeiImportKw.pm1
-rw-r--r--lib/PublicInbox/LeiInput.pm2
-rw-r--r--lib/PublicInbox/LeiInspect.pm2
-rw-r--r--lib/PublicInbox/LeiLsSearch.pm2
-rw-r--r--lib/PublicInbox/LeiMirror.pm2
-rw-r--r--lib/PublicInbox/LeiNoteEvent.pm5
-rw-r--r--lib/PublicInbox/LeiP2q.pm2
-rw-r--r--lib/PublicInbox/LeiPmdir.pm1
-rw-r--r--lib/PublicInbox/LeiStore.pm1
-rw-r--r--lib/PublicInbox/LeiToMail.pm10
-rw-r--r--lib/PublicInbox/LeiUp.pm2
-rw-r--r--lib/PublicInbox/LeiXSearch.pm16
16 files changed, 51 insertions, 61 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 6c189b64..3e299448 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -134,16 +134,22 @@ sub ipc_worker_spawn {
 
 sub ipc_worker_reap { # dwaitpid callback
         my ($args, $pid) = @_;
+        my ($self, @uargs) = @$args;
+        delete $self->{-wq_workers}->{$pid};
+        return $self->{-reap_do}->($args, $pid) if $self->{-reap_do};
         return if !$?;
-        # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
         my $s = $? & 127;
-        warn "PID:$pid died with \$?=$?\n" if $s != 15 && $s != 13;
+        # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
+        warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
 }
 
-sub wq_wait_old {
-        my ($self, $cb, @args) = @_;
-        my $pids = delete $self->{"-wq_old_pids.$$"} or return;
-        dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids;
+sub wq_wait_async {
+        my ($self, $cb, @uargs) = @_;
+        local $PublicInbox::DS::in_loop = 1;
+        $self->{-reap_async} = 1;
+        $self->{-reap_do} = $cb;
+        my @pids = keys %{$self->{-wq_workers}};
+        dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids;
 }
 
 # for base class, override in sub classes
@@ -394,42 +400,24 @@ sub wq_workers_start {
 }
 
 sub wq_close {
-        my ($self, $nohang, $cb, @args) = @_;
+        my ($self) = @_;
         delete @$self{qw(-wq_s1 -wq_s2)} or return;
-        my $ppid = delete $self->{-wq_ppid} or return;
-        my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
-        return if $ppid != $$; # can't reap siblings or parents
-        my @pids = map { $_ + 0 } keys %$workers;
-        if ($nohang) {
-                push @{$self->{"-wq_old_pids.$$"}}, @pids;
-        } else {
-                $cb //= \&ipc_worker_reap;
-                unshift @args, $self;
-                dwaitpid($_, $cb, \@args) for @pids;
-        }
-}
-
-sub wq_kill_old {
-        my ($self, $sig) = @_;
-        my $pids = $self->{"-wq_old_pids.$$"} or return;
-        kill($sig // 'TERM', @$pids);
+        return if $self->{-reap_async};
+        my @pids = keys %{$self->{-wq_workers}};
+        dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids;
 }
 
 sub wq_kill {
         my ($self, $sig) = @_;
-        my $workers = $self->{-wq_workers} or return;
-        kill($sig // 'TERM', keys %$workers);
+        kill($sig // 'TERM', keys %{$self->{-wq_workers}});
 }
 
 sub DESTROY {
         my ($self) = @_;
         my $ppid = $self->{-wq_ppid};
         wq_kill($self) if $ppid && $ppid == $$;
-        my $err = $?;
         wq_close($self);
-        wq_wait_old($self);
         ipc_worker_stop($self);
-        $? = $err if $err;
 }
 
 sub detect_nproc () {
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index b6338377..83534878 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -455,16 +455,12 @@ my %CONFIG_KEYS = (
         'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers
 
 sub _drop_wq {
         my ($self) = @_;
         for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) {
-                if ($wq->wq_kill('-TERM')) {
-                        $wq->wq_close(0, undef, $self);
-                } elsif ($wq->wq_kill_old('-TERM')) {
-                        $wq->wq_wait_old(undef, $self);
-                }
+                $wq->wq_kill('-TERM');
                 $wq->DESTROY;
         }
 }
@@ -644,6 +640,7 @@ sub workers_start {
         my $op_c = delete $lei->{pkt_op_c};
         @$end = ();
         $lei->event_step_init;
+        $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
         ($op_c, $ops);
 }
 
@@ -651,7 +648,7 @@ sub workers_start {
 sub wait_wq_events {
         my ($lei, $op_c, $ops) = @_;
         for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
-                $wq->wq_close(1);
+                $wq->wq_close;
         }
         $op_c->{ops} = $ops;
 }
@@ -1150,7 +1147,7 @@ sub event_step {
                 if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) {
                         my $sig = "-$buf";
                         for my $wq (grep(defined, @$self{@WQ_KEYS})) {
-                                $wq->wq_kill($sig) or $wq->wq_kill_old($sig);
+                                $wq->wq_kill($sig);
                         }
                 } else {
                         die "unrecognized client signal: $buf";
@@ -1393,8 +1390,7 @@ sub fchdir {
 sub wq_eof { # EOF callback for main daemon
         my ($lei) = @_;
         local $current_lei = $lei;
-        my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed
-        $wq1->wq_wait_old($wq1->can('_wq_done_wait') // \&wq_done_wait, $lei);
+        delete $lei->{wq1} // return $lei->fail; # already failed
 }
 
 sub watch_state_ok ($) {
diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm
index b6a62d24..004b156c 100644
--- a/lib/PublicInbox/LeiBlob.pm
+++ b/lib/PublicInbox/LeiBlob.pm
@@ -166,7 +166,7 @@ sub lei_blob {
         my ($op_c, $ops) = $lei->workers_start($self, 1);
         $lei->{wq1} = $self;
         $self->wq_io_do('do_solve_blob', []);
-        $self->wq_close(1);
+        $self->wq_close;
         $lei->wait_wq_events($op_c, $ops);
 }
 
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 9e98edc3..68fc7c0b 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -58,7 +58,7 @@ sub lei_convert { # the main "lei convert" method
         my ($op_c, $ops) = $lei->workers_start($self, 1);
         $lei->{wq1} = $self;
         $self->wq_io_do('process_inputs', []);
-        $self->wq_close(1);
+        $self->wq_close;
         $lei->wait_wq_events($op_c, $ops);
 }
 
diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm
index 8359f338..54454511 100644
--- a/lib/PublicInbox/LeiImportKw.pm
+++ b/lib/PublicInbox/LeiImportKw.pm
@@ -50,7 +50,6 @@ sub _lei_wq_eof { # EOF callback for main lei daemon
         my ($lei) = @_;
         my $ikw = delete $lei->{ikw} or return $lei->fail;
         $lei->sto_done_request($ikw->{lei_sock});
-        $ikw->wq_wait_old($lei->can('wq_done_wait'), $lei);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index dd40d838..2621fc1f 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -402,7 +402,7 @@ sub input_only_atfork_child {
 sub input_only_net_merge_all_done {
         my ($self) = @_;
         $self->wq_io_do('process_inputs');
-        $self->wq_close(1);
+        $self->wq_close;
 }
 
 # like Getopt::Long, but for +kw:FOO and -kw:FOO to prepare
diff --git a/lib/PublicInbox/LeiInspect.pm b/lib/PublicInbox/LeiInspect.pm
index 5ba96056..05b6e21d 100644
--- a/lib/PublicInbox/LeiInspect.pm
+++ b/lib/PublicInbox/LeiInspect.pm
@@ -242,7 +242,7 @@ sub inspect_start ($$) {
         $lei->{wq1} = $self;
         $lei->wait_wq_events($op_c, $ops);
         $self->wq_do('inspect_argv');
-        $self->wq_close(1);
+        $self->wq_close;
 }
 
 sub ins_add { # InputPipe->consume callback
diff --git a/lib/PublicInbox/LeiLsSearch.pm b/lib/PublicInbox/LeiLsSearch.pm
index aebf0184..0193e590 100644
--- a/lib/PublicInbox/LeiLsSearch.pm
+++ b/lib/PublicInbox/LeiLsSearch.pm
@@ -75,7 +75,7 @@ sub bg_worker ($$$) {
         my ($op_c, $ops) = $lei->workers_start($self, 1);
         $lei->{wq1} = $self;
         $self->wq_io_do('do_ls_search_long', [], $pfx);
-        $self->wq_close(1);
+        $self->wq_close;
         $lei->wait_wq_events($op_c, $ops);
 }
 
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index a75c99c4..e20d30b4 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -451,7 +451,7 @@ sub start {
         my ($op_c, $ops) = $lei->workers_start($self, 1);
         $lei->{wq1} = $self;
         $self->wq_io_do('do_mirror', []);
-        $self->wq_close(1);
+        $self->wq_close;
         $lei->wait_wq_events($op_c, $ops);
 }
 
diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm
index 1b714dae..ba4dfd49 100644
--- a/lib/PublicInbox/LeiNoteEvent.pm
+++ b/lib/PublicInbox/LeiNoteEvent.pm
@@ -14,7 +14,7 @@ our $to_flush; # { cfgpath => $lei }
 sub flush_lei ($) {
         my ($lei) = @_;
         my $lne = delete $lei->{cfg}->{-lei_note_event};
-        $lne->wq_close(1, undef, $lei) if $lne; # runs _lei_wq_eof;
+        $lne->wq_close if $lne; # runs _lei_wq_eof;
 }
 
 # we batch up writes and flush every 5s (matching Linux default
@@ -111,9 +111,8 @@ sub ipc_atfork_child {
 
 sub _lei_wq_eof { # EOF callback for main lei daemon
         my ($lei) = @_;
-        my $lne = delete $lei->{lne} or return $lei->fail;
+        delete $lei->{lne} or return $lei->fail;
         $lei->sto_done_request;
-        $lne->wq_wait_old($lei->can('wq_done_wait'), $lei);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiP2q.pm b/lib/PublicInbox/LeiP2q.pm
index 5c2ce0a1..08ec81c5 100644
--- a/lib/PublicInbox/LeiP2q.pm
+++ b/lib/PublicInbox/LeiP2q.pm
@@ -191,7 +191,7 @@ sub lei_p2q { # the "lei patch-to-query" entry point
         my ($op_c, $ops) = $lei->workers_start($self, 1);
         $lei->{wq1} = $self;
         $self->wq_io_do('do_p2q', []);
-        $self->wq_close(1);
+        $self->wq_close;
         $lei->wait_wq_events($op_c, $ops);
 }
 
diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm
index 2d3b9755..f9b68fc2 100644
--- a/lib/PublicInbox/LeiPmdir.pm
+++ b/lib/PublicInbox/LeiPmdir.pm
@@ -51,7 +51,6 @@ sub _lei_wq_eof { # EOF callback for main lei daemon
         my ($lei) = @_;
         my $pmd = delete $lei->{pmd} or return $lei->fail;
         $lei->sto_done_request($pmd->{lei_sock});
-        $pmd->wq_wait_old($lei->can('wq_done_wait'), $lei);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index c45380d1..82104570 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -587,6 +587,7 @@ sub write_prepare {
                                         -err_wr => $w,
                                         to_close => [ $r ],
                                 });
+                $self->wq_wait_async; # outlives $lei
                 require PublicInbox::LeiStoreErr;
                 PublicInbox::LeiStoreErr->new($r, $lei);
         }
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 9c748dea..76e103c7 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -637,6 +637,12 @@ sub _do_augment_mbox {
         $dedupe->pause_dedupe if $dedupe;
 }
 
+sub v2w_done_wait { # dwaitpid callback
+        my ($arg, $pid) = @_;
+        my ($v2w, $lei) = @$arg;
+        $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
+}
+
 sub _pre_augment_v2 {
         my ($self, $lei) = @_;
         my $dir = $self->{dst};
@@ -659,8 +665,8 @@ sub _pre_augment_v2 {
         PublicInbox::InboxWritable->new($ibx, @creat);
         $ibx->init_inbox if @creat;
         my $v2w = $ibx->importer;
-        $v2w->{-wq_no_bcast} = 1;
         $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
+        $v2w->wq_wait_async(\&v2w_done_wait, $lei);
         $lei->{v2w} = $v2w;
         return if !$lei->{opt}->{shared};
         my $d = "$lei->{ale}->{git}->{git_dir}/objects";
@@ -811,7 +817,7 @@ sub net_merge_all_done {
                                 $self->{dst}, \$self->{-au_noted});
         }
         $self->wq_broadcast('do_post_auth');
-        $self->wq_close(1);
+        $self->wq_close;
 }
 
 1;
diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm
index df65cb9b..39604177 100644
--- a/lib/PublicInbox/LeiUp.pm
+++ b/lib/PublicInbox/LeiUp.pm
@@ -122,7 +122,7 @@ EOM
 sub net_merge_all_done {
         my ($self, $lei) = @_;
         $lei->{net} = delete($self->{-net_new}) if $self->{-net_new};
-        $self->wq_close(1);
+        $self->wq_close;
         eval { redispatch_all($self, $lei) };
         warn "E: $@" if $@;
 }
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 3ec75528..fd2c8a37 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -413,14 +413,14 @@ sub query_done { # EOF callback for main daemon
         my ($lei) = @_;
         local $PublicInbox::LEI::current_lei = $lei;
         my $l2m = delete $lei->{l2m};
-        $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m;
-        if (my $lxs = delete $lei->{lxs}) {
-                $lxs->wq_wait_old(\&xsearch_done_wait, $lei);
-        }
+        delete $lei->{lxs};
         ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
                 warn "BUG: {sto} missing with --mail-sync";
         $lei->sto_done_request if $lei->{sto};
-        my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef;
+        if (my $v2w = delete $lei->{v2w}) {
+                $v2w->wq_do('done');
+                $v2w->wq_close;
+        }
         $lei->{ovv}->ovv_end($lei);
         my $start_mua;
         if ($l2m) { # close() calls LeiToMail reap_compress
@@ -466,7 +466,7 @@ sub do_post_augment {
         if ($err) {
                 if (my $lxs = delete $lei->{lxs}) {
                         $lxs->wq_kill('-TERM');
-                        $lxs->wq_close(0, undef, $lei);
+                        $lxs->wq_close;
                 }
                 $lei->fail("$err");
         }
@@ -514,7 +514,7 @@ sub start_query ($$) { # always runs in main (lei-daemon) process
         if ($self->{-do_lcat}) {
                 $self->wq_io_do('lcat_dump', []);
         }
-        $self->wq_close(1); # lei_xsearch workers stop when done
+        $self->wq_close; # lei_xsearch workers stop when done
 }
 
 sub incr_start_query { # called whenever an l2m shard starts do_post_auth
@@ -569,12 +569,14 @@ sub do_query {
                 }
                 $l2m->wq_workers_start('lei2mail', undef,
                                         $lei->oldset, { lei => $lei });
+                $l2m->wq_wait_async(\&xsearch_done_wait, $lei);
                 pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
                 fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
                 delete $l2m->{au_peers};
         }
         $self->wq_workers_start('lei_xsearch', undef,
                                 $lei->oldset, { lei => $lei });
+        $self->wq_wait_async(\&xsearch_done_wait, $lei);
         my $op_c = delete $lei->{pkt_op_c};
         delete $lei->{pkt_op_p};
         @$end = ();