about summary refs log tree commit homepage
path: root/lib/PublicInbox/DS.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-01-17 07:19:03 +0000
committerEric Wong <e@80x24.org>2023-01-18 23:25:48 +0000
commit6e9397d12635eae55c9114ed9689413154fed8ce (patch)
tree4ffa8148f76b2fd4c71651df0f2e682f16dfe8d0 /lib/PublicInbox/DS.pm
parenta1ee3e0d84fedc4a2dd4e16e7054ee6fdfbe111a (diff)
downloadpublic-inbox-6e9397d12635eae55c9114ed9689413154fed8ce.tar.gz
awaitpid is the new API which will eventually replace dwaitpid.
It enables early registration of callback handlers.  Eventually
(once dwaitpid is gone) it'll be able to use fewer waitpid
calls.

The avoidance of waitpid(-1) in our earlier days was driven by
the belief that threads may eventually become relevant for Perl 5,
but that's extremely unlikely at this stage.  I will still
introduce optional threads via C, but they definitely won't be
spawning/reaping processes.

Argument order to callbacks is swapped (PID first) to allow
flattened multiple arguments more natrually.  The previous API
(allowing only a single argument, as influenced by
pthread_create(3)) was more tedious as it involved packing
multiple arguments into yet another array.
Diffstat (limited to 'lib/PublicInbox/DS.pm')
-rw-r--r--lib/PublicInbox/DS.pm43
1 files changed, 39 insertions, 4 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index e4629e97..9563a1cb 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -32,11 +32,12 @@ use PublicInbox::Syscall qw(:epoll);
 use PublicInbox::Tmpfile;
 use Errno qw(EAGAIN EINVAL);
 use Carp qw(carp croak);
-our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer);
+our @EXPORT_OK = qw(now msg_more dwaitpid awaitpid add_timer add_uniq_timer);
 
 my %Stack;
 my $nextq; # queue for next_tick
 my $wait_pids; # list of [ pid, callback, callback_arg ]
+my $AWAIT_PIDS; # pid => [ $callback, @args ]
 my $reap_armed;
 my $ToClose; # sockets to close when event loop is done
 our (
@@ -74,11 +75,11 @@ sub Reset {
                 # we may be iterating inside one of these on our stack
                 my @q = delete @Stack{keys %Stack};
                 for my $q (@q) { @$q = () }
-                $wait_pids = $nextq = $ToClose = undef;
+                $AWAIT_PIDS = $wait_pids = $nextq = $ToClose = undef;
                 $ep_io = undef; # closes real $Epoll FD
                 $Epoll = undef; # may call DSKQXS::DESTROY
         } while (@Timers || keys(%Stack) || $nextq || $wait_pids ||
-                $ToClose || keys(%DescriptorMap) ||
+                $ToClose || keys(%DescriptorMap) || $AWAIT_PIDS ||
                 $PostLoopCallback || keys(%UniqTimer));
 
         $reap_armed = undef;
@@ -201,6 +202,13 @@ sub block_signals () {
         $oldset;
 }
 
+sub await_cb ($;@) {
+        my ($pid, @cb_args) = @_;
+        my $cb = shift @cb_args or return;
+        eval { $cb->($pid, @cb_args) };
+        warn "E: awaitpid($pid): $@" if $@;
+}
+
 # We can't use waitpid(-1) safely here since it can hit ``, system(),
 # and other things.  So we scan the $wait_pids list, which is hopefully
 # not too big.  We keep $wait_pids small by not calling dwaitpid()
@@ -208,10 +216,12 @@ sub block_signals () {
 
 sub reap_pids {
         $reap_armed = undef;
-        my $tmp = $wait_pids or return;
+        my $tmp = $wait_pids // [];
         $wait_pids = undef;
         $Stack{reap_runq} = $tmp;
         my $oldset = block_signals();
+
+        # old API
         foreach my $ary (@$tmp) {
                 my ($pid, $cb, $arg) = @$ary;
                 my $ret = waitpid($pid, WNOHANG);
@@ -226,6 +236,14 @@ sub reap_pids {
                         warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
                 }
         }
+
+        # new API TODO: convert to waitpid(-1) in the future as long
+        # as we don't use threads
+        for my $pid (keys %$AWAIT_PIDS) {
+                my $wpid = waitpid($pid, WNOHANG) // next;
+                my $cb_args = delete $AWAIT_PIDS->{$wpid} or next;
+                await_cb($pid, @$cb_args);
+        }
         sig_setmask($oldset);
         delete $Stack{reap_runq};
 }
@@ -720,6 +738,23 @@ sub dwaitpid ($;$$) {
         }
 }
 
+sub awaitpid {
+        my ($pid, @cb_args) = @_;
+        $AWAIT_PIDS->{$pid} //= @cb_args ? \@cb_args : 0;
+        # provide synchronous API
+        if (defined(wantarray) || (!$in_loop && !@cb_args)) {
+                my $ret = waitpid($pid, 0) // -2;
+                if ($ret == $pid) {
+                        my $cb_args = delete $AWAIT_PIDS->{$pid};
+                        @cb_args = @$cb_args if !@cb_args && $cb_args;
+                        await_cb($pid, @cb_args);
+                        return $ret;
+                }
+        }
+        # We could've just missed our SIGCHLD, cover it, here:
+        enqueue_reap() if $in_loop;
+}
+
 1;
 
 =head1 AUTHORS (Danga::Socket)