about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-02-07 08:51:54 +0000
committerEric Wong <e@80x24.org>2021-02-07 22:57:11 +0000
commit0727032b33a7dc3b5ac4c63e267a12df244ea650 (patch)
tree566d3615c4f5e3380a9b6e4e74383e320b144bf2 /lib
parent757652fd1ad6843c984610263a2a0b336c974111 (diff)
downloadpublic-inbox-0727032b33a7dc3b5ac4c63e267a12df244ea650.tar.gz
We're able to propagate $? from wq_workers in a consistent
manner, now.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/IPC.pm22
-rw-r--r--lib/PublicInbox/LEI.pm6
-rw-r--r--lib/PublicInbox/LeiImport.pm14
-rw-r--r--lib/PublicInbox/LeiXSearch.pm12
4 files changed, 33 insertions, 21 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 728f726c..c8673e26 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -140,10 +140,9 @@ sub ipc_worker_reap { # dwaitpid callback
 }
 
 sub wq_wait_old {
-        my ($self, @args) = @_;
-        my $cb = ref($args[0]) eq 'CODE' ? shift(@args) : \&ipc_worker_reap;
+        my ($self, $cb, @args) = @_;
         my $pids = delete $self->{"-wq_old_pids.$$"} or return;
-        dwaitpid($_, $cb, [$self, @args]) for @$pids;
+        dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids;
 }
 
 # for base class, override in sub classes
@@ -348,13 +347,12 @@ sub wq_exit { # wakes up wq_worker_decr_wait
 sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
         my ($self) = @_;
         return unless wq_workers($self);
-        my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
-        $self->wq_io_do('wq_exit', [ $s2, $s2, $s2 ]);
+        $self->wq_io_do('wq_exit');
         # caller must call wq_worker_decr_wait in main loop
 }
 
 sub wq_worker_decr_wait {
-        my ($self, $timeout) = @_;
+        my ($self, $timeout, $cb, @args) = @_;
         return if $self->{-wq_ppid} != $$; # can't reap siblings or parents
         my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1';
         vec(my $rin = '', fileno($s1), 1) = 1;
@@ -363,17 +361,17 @@ sub wq_worker_decr_wait {
         recv($s1, my $pid, 64, 0) // croak "recv: $!";
         my $workers = $self->{-wq_workers} // croak 'BUG: no wq_workers';
         delete $workers->{$pid} // croak "BUG: PID:$pid invalid";
-        dwaitpid($pid, \&ipc_worker_reap, $self);
+        dwaitpid($pid, $cb // \&ipc_worker_reap, [ $self, @args ]);
 }
 
 # set or retrieve number of workers
 sub wq_workers {
-        my ($self, $nr) = @_;
+        my ($self, $nr, $cb, @args) = @_;
         my $cur = $self->{-wq_workers} or return;
         if (defined $nr) {
                 while (scalar(keys(%$cur)) > $nr) {
                         $self->wq_worker_decr;
-                        $self->wq_worker_decr_wait;
+                        $self->wq_worker_decr_wait(undef, $cb, @args);
                 }
                 $self->wq_worker_incr while scalar(keys(%$cur)) < $nr;
         }
@@ -381,7 +379,7 @@ sub wq_workers {
 }
 
 sub wq_close {
-        my ($self, $nohang) = @_;
+        my ($self, $nohang, $cb, @args) = @_;
         delete @$self{qw(-wq_s1 -wq_s2)} or return;
         my $ppid = delete $self->{-wq_ppid} or return;
         my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
@@ -390,7 +388,9 @@ sub wq_close {
         if ($nohang) {
                 push @{$self->{"-wq_old_pids.$$"}}, @pids;
         } else {
-                dwaitpid($_, \&ipc_worker_reap, $self) for @pids;
+                $cb //= \&ipc_worker_reap;
+                unshift @args, $self;
+                dwaitpid($_, $cb, \@args) for @pids;
         }
 }
 
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 515bc2a3..21862488 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -360,7 +360,7 @@ sub fail_handler ($;$$) {
         my ($lei, $code, $io) = @_;
         for my $f (@WQ_KEYS) {
                 my $wq = delete $lei->{$f} or next;
-                $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
+                $wq->wq_wait_old(undef, $lei) if $wq->wq_kill_old; # lei-daemon
         }
         close($io) if $io; # needed to avoid warnings on SIGPIPE
         $lei->x_it($code // (1 >> 8));
@@ -827,9 +827,9 @@ sub dclose {
         for my $f (@WQ_KEYS) {
                 my $wq = delete $self->{$f} or next;
                 if ($wq->wq_kill) {
-                        $wq->wq_close
+                        $wq->wq_close(0, undef, $self);
                 } elsif ($wq->wq_kill_old) {
-                        $wq->wq_wait_old($self);
+                        $wq->wq_wait_old(undef, $self);
                 }
         }
         close(delete $self->{1}) if $self->{1}; # may reap_compress
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 3a99570e..2b2dc2f7 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -14,12 +14,18 @@ sub _import_eml { # MboxReader callback
         $sto->ipc_do('set_eml', $eml, $set_kw ? $sto->mbox_keywords($eml) : ());
 }
 
+sub import_done_wait { # dwaitpid callback
+        my ($arg, $pid) = @_;
+        my ($imp, $lei) = @$arg;
+        $lei->child_error($?, 'non-fatal errors during import') if $?;
+        my $ign = $lei->{sto}->ipc_do('done'); # PublicInbox::LeiStore::done
+        $lei->dclose;
+}
+
 sub import_done { # EOF callback for main daemon
         my ($lei) = @_;
-        my $imp = delete $lei->{imp};
-        $imp->wq_wait_old($lei) if $imp;
-        my $wait = $lei->{sto}->ipc_do('done');
-        $lei->dclose;
+        my $imp = delete $lei->{imp} or return;
+        $imp->wq_wait_old(\&import_done_wait, $lei);
 }
 
 sub call { # the main "lei import" method
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 1ba767c1..1024b020 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -279,12 +279,18 @@ sub git_tmp ($) {
         $git;
 }
 
+sub xsearch_done_wait { # dwaitpid callback
+        my ($arg, $pid) = @_;
+        my ($wq, $lei) = @$arg;
+        $lei->child_error($?, 'non-fatal error from '.ref($wq)) if $?;
+}
+
 sub query_done { # EOF callback for main daemon
         my ($lei) = @_;
         my $l2m = delete $lei->{l2m};
-        $l2m->wq_wait_old($lei) if $l2m;
+        $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m;
         if (my $lxs = delete $lei->{lxs}) {
-                $lxs->wq_wait_old($lei);
+                $lxs->wq_wait_old(\&xsearch_done_wait, $lei);
         }
         $lei->{ovv}->ovv_end($lei);
         if ($l2m) { # close() calls LeiToMail reap_compress
@@ -309,7 +315,7 @@ sub do_post_augment {
         if (my $err = $@) {
                 if (my $lxs = delete $lei->{lxs}) {
                         $lxs->wq_kill;
-                        $lxs->wq_close;
+                        $lxs->wq_close(0, undef, $lei);
                 }
                 $lei->fail("$err");
         }