about summary refs log tree commit homepage
path: root/lib/PublicInbox/LEI.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-10-15 13:30:55 +0000
committerEric Wong <e@80x24.org>2021-10-15 15:58:24 +0000
commita1733d3406dfbde52d1468e671edd1d76893f546 (patch)
tree118763410e5c6f32776671a435595792f0555ee4 /lib/PublicInbox/LEI.pm
parent2ca12a7fd78d7c9c27fec4f50fdcb58f9c838003 (diff)
downloadpublic-inbox-a1733d3406dfbde52d1468e671edd1d76893f546.tar.gz
Simplify our APIs and force dwaitpid() to work in async mode for
all lei workers.  This avoids having lingering zombies for
parallel searches if one worker finishes soon before another.

The old distinction between "old" and "new" workers was
needlessly complex, error-prone, and embarrasingly bad.

We also never handled v2:// writers properly before on
Ctrl-C/Ctrl-Z (SIGINT/SIGTSTP), so add them to @WQ_KEYS
to ensure they get handled by $lei when appropropriate.
Diffstat (limited to 'lib/PublicInbox/LEI.pm')
-rw-r--r--lib/PublicInbox/LEI.pm16
1 files changed, 6 insertions, 10 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index b6338377..83534878 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -455,16 +455,12 @@ my %CONFIG_KEYS = (
         'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers
 
 sub _drop_wq {
         my ($self) = @_;
         for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) {
-                if ($wq->wq_kill('-TERM')) {
-                        $wq->wq_close(0, undef, $self);
-                } elsif ($wq->wq_kill_old('-TERM')) {
-                        $wq->wq_wait_old(undef, $self);
-                }
+                $wq->wq_kill('-TERM');
                 $wq->DESTROY;
         }
 }
@@ -644,6 +640,7 @@ sub workers_start {
         my $op_c = delete $lei->{pkt_op_c};
         @$end = ();
         $lei->event_step_init;
+        $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
         ($op_c, $ops);
 }
 
@@ -651,7 +648,7 @@ sub workers_start {
 sub wait_wq_events {
         my ($lei, $op_c, $ops) = @_;
         for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
-                $wq->wq_close(1);
+                $wq->wq_close;
         }
         $op_c->{ops} = $ops;
 }
@@ -1150,7 +1147,7 @@ sub event_step {
                 if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) {
                         my $sig = "-$buf";
                         for my $wq (grep(defined, @$self{@WQ_KEYS})) {
-                                $wq->wq_kill($sig) or $wq->wq_kill_old($sig);
+                                $wq->wq_kill($sig);
                         }
                 } else {
                         die "unrecognized client signal: $buf";
@@ -1393,8 +1390,7 @@ sub fchdir {
 sub wq_eof { # EOF callback for main daemon
         my ($lei) = @_;
         local $current_lei = $lei;
-        my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed
-        $wq1->wq_wait_old($wq1->can('_wq_done_wait') // \&wq_done_wait, $lei);
+        delete $lei->{wq1} // return $lei->fail; # already failed
 }
 
 sub watch_state_ok ($) {