about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-10-14 13:16:09 +0000
committerEric Wong <e@80x24.org>2021-10-15 02:23:27 +0000
commitf912df6e26aac8c20b4fa56dd69bc09c88f5403d (patch)
tree7cd929f9b5f72d7b32057100184d97cf0e011cfc
parente482a9fb713db1c14b52d6db05049842605764e2 (diff)
downloadpublic-inbox-f912df6e26aac8c20b4fa56dd69bc09c88f5403d.tar.gz
The redispatch mechanism wasn't routing signals and messages
between redispatched workers and script/lei properly.  We now
rely on PktOp to do bidirectional message forwarding and
carefully avoiding circular references by using PktOp.
-rw-r--r--lib/PublicInbox/LEI.pm7
-rw-r--r--lib/PublicInbox/LeiUp.pm13
2 files changed, 18 insertions, 2 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index d0905562..b6338377 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -573,6 +573,7 @@ sub _lei_atfork_child {
                 POSIX::setpgid(0, $$) // die "setpgid(0, $$): $!";
         }
         close($_) for (grep(defined, delete @$self{qw(3 old_1 au_done)}));
+        delete $self->{-socks};
         if (my $op_c = delete $self->{pkt_op_c}) {
                 close(delete $op_c->{sock});
         }
@@ -1144,7 +1145,9 @@ sub event_step {
                 if ($buf eq '') {
                         _drop_wq($self); # EOF, client disconnected
                         dclose($self);
-                } elsif ($buf =~ /\A(?:STOP|CONT)\z/) {
+                        $buf = 'TERM';
+                }
+                if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) {
                         my $sig = "-$buf";
                         for my $wq (grep(defined, @$self{@WQ_KEYS})) {
                                 $wq->wq_kill($sig) or $wq->wq_kill_old($sig);
@@ -1152,6 +1155,8 @@ sub event_step {
                 } else {
                         die "unrecognized client signal: $buf";
                 }
+                my $s = $self->{-socks} // []; # lei up --all
+                @$s = grep { send($_, $buf, MSG_EOR) } @$s;
         };
         if (my $err = $@) {
                 eval { $self->fail($err) };
diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm
index 719736e8..df65cb9b 100644
--- a/lib/PublicInbox/LeiUp.pm
+++ b/lib/PublicInbox/LeiUp.pm
@@ -166,7 +166,15 @@ sub event_step { # runs via PublicInbox::DS::requeue
                 push(@m, $o) if !@m || $m[-1] !~ s/\n\z/$o\n/;
                 $cb->(@m);
         };
-        $l->{-up1} = $self;
+        $l->{-up1} = $self; # for LeiUp1->DESTROY
+        delete @$l{qw(-socks -event_init_done)};
+        my ($op_c, $op_p) = PublicInbox::PktOp->pair;
+        $self->{unref_on_destroy} = $op_c->{sock}; # to cleanup $lei->{-socks}
+        $lei->pkt_ops($op_c->{ops} //= {}); # errors from $l -> script/lei
+        push @{$lei->{-socks}}, $op_c->{sock}; # script/lei signals to $l
+        $l->{sock} = $op_p->{op_p}; # receive signals from op_c->{sock}
+        $op_c = $op_p = undef;
+
         eval { $l->dispatch('up', $self->{out}) };
         $lei->child_error(0, $@) if $@ || $l->{failed}; # lei->fail()
 }
@@ -175,6 +183,9 @@ sub DESTROY {
         my ($self) = @_;
         my $lei = $self->{lei}; # the original, from lei_up
         return if $lei->{daemon_pid} != $$;
+        my $sock = delete $self->{unref_on_destroy};
+        my $s = $lei->{-socks} // [];
+        @$s = grep { $_ != $sock } @$s;
         my $out = shift(@{$lei->{-upq}}) or return;
         PublicInbox::DS::requeue(nxt($lei, $out, $self->{op_p}));
 }