From 6e9397d12635eae55c9114ed9689413154fed8ce Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 17 Jan 2023 07:19:03 +0000 Subject: ds: introduce awaitpid, switch ProcessPipe users awaitpid is the new API which will eventually replace dwaitpid. It enables early registration of callback handlers. Eventually (once dwaitpid is gone) it'll be able to use fewer waitpid calls. The avoidance of waitpid(-1) in our earlier days was driven by the belief that threads may eventually become relevant for Perl 5, but that's extremely unlikely at this stage. I will still introduce optional threads via C, but they definitely won't be spawning/reaping processes. Argument order to callbacks is swapped (PID first) to allow flattened multiple arguments more natrually. The previous API (allowing only a single argument, as influenced by pthread_create(3)) was more tedious as it involved packing multiple arguments into yet another array. --- lib/PublicInbox/DS.pm | 43 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) (limited to 'lib/PublicInbox/DS.pm') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index e4629e97..9563a1cb 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -32,11 +32,12 @@ use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; use Errno qw(EAGAIN EINVAL); use Carp qw(carp croak); -our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer); +our @EXPORT_OK = qw(now msg_more dwaitpid awaitpid add_timer add_uniq_timer); my %Stack; my $nextq; # queue for next_tick my $wait_pids; # list of [ pid, callback, callback_arg ] +my $AWAIT_PIDS; # pid => [ $callback, @args ] my $reap_armed; my $ToClose; # sockets to close when event loop is done our ( @@ -74,11 +75,11 @@ sub Reset { # we may be iterating inside one of these on our stack my @q = delete @Stack{keys %Stack}; for my $q (@q) { @$q = () } - $wait_pids = $nextq = $ToClose = undef; + $AWAIT_PIDS = $wait_pids = $nextq = $ToClose = undef; $ep_io = undef; # closes real $Epoll FD $Epoll = undef; # may call DSKQXS::DESTROY } while (@Timers || keys(%Stack) || $nextq || $wait_pids || - $ToClose || keys(%DescriptorMap) || + $ToClose || keys(%DescriptorMap) || $AWAIT_PIDS || $PostLoopCallback || keys(%UniqTimer)); $reap_armed = undef; @@ -201,6 +202,13 @@ sub block_signals () { $oldset; } +sub await_cb ($;@) { + my ($pid, @cb_args) = @_; + my $cb = shift @cb_args or return; + eval { $cb->($pid, @cb_args) }; + warn "E: awaitpid($pid): $@" if $@; +} + # We can't use waitpid(-1) safely here since it can hit ``, system(), # and other things. So we scan the $wait_pids list, which is hopefully # not too big. We keep $wait_pids small by not calling dwaitpid() @@ -208,10 +216,12 @@ sub block_signals () { sub reap_pids { $reap_armed = undef; - my $tmp = $wait_pids or return; + my $tmp = $wait_pids // []; $wait_pids = undef; $Stack{reap_runq} = $tmp; my $oldset = block_signals(); + + # old API foreach my $ary (@$tmp) { my ($pid, $cb, $arg) = @$ary; my $ret = waitpid($pid, WNOHANG); @@ -226,6 +236,14 @@ sub reap_pids { warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?"; } } + + # new API TODO: convert to waitpid(-1) in the future as long + # as we don't use threads + for my $pid (keys %$AWAIT_PIDS) { + my $wpid = waitpid($pid, WNOHANG) // next; + my $cb_args = delete $AWAIT_PIDS->{$wpid} or next; + await_cb($pid, @$cb_args); + } sig_setmask($oldset); delete $Stack{reap_runq}; } @@ -720,6 +738,23 @@ sub dwaitpid ($;$$) { } } +sub awaitpid { + my ($pid, @cb_args) = @_; + $AWAIT_PIDS->{$pid} //= @cb_args ? \@cb_args : 0; + # provide synchronous API + if (defined(wantarray) || (!$in_loop && !@cb_args)) { + my $ret = waitpid($pid, 0) // -2; + if ($ret == $pid) { + my $cb_args = delete $AWAIT_PIDS->{$pid}; + @cb_args = @$cb_args if !@cb_args && $cb_args; + await_cb($pid, @cb_args); + return $ret; + } + } + # We could've just missed our SIGCHLD, cover it, here: + enqueue_reap() if $in_loop; +} + 1; =head1 AUTHORS (Danga::Socket) -- cgit v1.2.3-24-ge0c7