From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 043631FA2D for ; Tue, 17 Jan 2023 07:19:12 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1673939952; bh=RzLRq57DcE0/Nc1YXqqBc8j/pIdIT6ge9vm9zh44dc0=; h=From:To:Subject:Date:In-Reply-To:References:From; b=EfRUMvOg6Zg9PyFt2CbW9lat0vNmhri5EiMogUdHAb3wPDxHRN7+y5U6tBKLsGlFw y0vamlSMtfFcyi5wmAIrFp10Q+rxXQb+YaX/L2qINhKix1dgCKdvKrkDdRkovfpJQR MYS0CL8pPVRAZwM7GZ0yUmVa6ilwMgLjTwD9zaJs= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 04/12] ds: introduce awaitpid, switch ProcessPipe users Date: Tue, 17 Jan 2023 07:19:03 +0000 Message-Id: <20230117071911.1577890-5-e@80x24.org> In-Reply-To: <20230117071911.1577890-1-e@80x24.org> References: <20230117071911.1577890-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: awaitpid is the new API which will eventually replace dwaitpid. It enables early registration of callback handlers. Eventually (once dwaitpid is gone) it'll be able to use fewer waitpid calls. The avoidance of waitpid(-1) in our earlier days was driven by the belief that threads may eventually become relevant for Perl 5, but that's extremely unlikely at this stage. I will still introduce optional threads via C, but they definitely won't be spawning/reaping processes. Argument order to callbacks is swapped (PID first) to allow flattened multiple arguments more natrually. The previous API (allowing only a single argument, as influenced by pthread_create(3)) was more tedious as it involved packing multiple arguments into yet another array. --- lib/PublicInbox/DS.pm | 43 +++++++++++++++++++++--- lib/PublicInbox/LeiToMail.pm | 4 +-- lib/PublicInbox/ProcessPipe.pm | 42 ++++++++++++------------ lib/PublicInbox/Qspawn.pm | 60 ++++++++++++++++++---------------- lib/PublicInbox/Spawn.pm | 6 ++-- t/spawn.t | 12 ++++--- 6 files changed, 104 insertions(+), 63 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index e4629e97..9563a1cb 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -32,11 +32,12 @@ use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; use Errno qw(EAGAIN EINVAL); use Carp qw(carp croak); -our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer); +our @EXPORT_OK = qw(now msg_more dwaitpid awaitpid add_timer add_uniq_timer); my %Stack; my $nextq; # queue for next_tick my $wait_pids; # list of [ pid, callback, callback_arg ] +my $AWAIT_PIDS; # pid => [ $callback, @args ] my $reap_armed; my $ToClose; # sockets to close when event loop is done our ( @@ -74,11 +75,11 @@ sub Reset { # we may be iterating inside one of these on our stack my @q = delete @Stack{keys %Stack}; for my $q (@q) { @$q = () } - $wait_pids = $nextq = $ToClose = undef; + $AWAIT_PIDS = $wait_pids = $nextq = $ToClose = undef; $ep_io = undef; # closes real $Epoll FD $Epoll = undef; # may call DSKQXS::DESTROY } while (@Timers || keys(%Stack) || $nextq || $wait_pids || - $ToClose || keys(%DescriptorMap) || + $ToClose || keys(%DescriptorMap) || $AWAIT_PIDS || $PostLoopCallback || keys(%UniqTimer)); $reap_armed = undef; @@ -201,6 +202,13 @@ sub block_signals () { $oldset; } +sub await_cb ($;@) { + my ($pid, @cb_args) = @_; + my $cb = shift @cb_args or return; + eval { $cb->($pid, @cb_args) }; + warn "E: awaitpid($pid): $@" if $@; +} + # We can't use waitpid(-1) safely here since it can hit ``, system(), # and other things. So we scan the $wait_pids list, which is hopefully # not too big. We keep $wait_pids small by not calling dwaitpid() @@ -208,10 +216,12 @@ sub block_signals () { sub reap_pids { $reap_armed = undef; - my $tmp = $wait_pids or return; + my $tmp = $wait_pids // []; $wait_pids = undef; $Stack{reap_runq} = $tmp; my $oldset = block_signals(); + + # old API foreach my $ary (@$tmp) { my ($pid, $cb, $arg) = @$ary; my $ret = waitpid($pid, WNOHANG); @@ -226,6 +236,14 @@ sub reap_pids { warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?"; } } + + # new API TODO: convert to waitpid(-1) in the future as long + # as we don't use threads + for my $pid (keys %$AWAIT_PIDS) { + my $wpid = waitpid($pid, WNOHANG) // next; + my $cb_args = delete $AWAIT_PIDS->{$wpid} or next; + await_cb($pid, @$cb_args); + } sig_setmask($oldset); delete $Stack{reap_runq}; } @@ -720,6 +738,23 @@ sub dwaitpid ($;$$) { } } +sub awaitpid { + my ($pid, @cb_args) = @_; + $AWAIT_PIDS->{$pid} //= @cb_args ? \@cb_args : 0; + # provide synchronous API + if (defined(wantarray) || (!$in_loop && !@cb_args)) { + my $ret = waitpid($pid, 0) // -2; + if ($ret == $pid) { + my $cb_args = delete $AWAIT_PIDS->{$pid}; + @cb_args = @$cb_args if !@cb_args && $cb_args; + await_cb($pid, @cb_args); + return $ret; + } + } + # We could've just missed our SIGCHLD, cover it, here: + enqueue_reap() if $in_loop; +} + 1; =head1 AUTHORS (Danga::Socket) diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index b58e2652..1528165a 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -150,8 +150,8 @@ sub git_to_mail { # git->cat_async callback $self->{lei}->fail("$@ (oid=$oid)") if $@; } -sub reap_compress { # dwaitpid callback - my ($lei, $pid) = @_; +sub reap_compress { # awaitpid callback + my ($pid, $lei) = @_; my $cmd = delete $lei->{"pid.$pid"}; return if $? == 0; $lei->fail("@$cmd failed", $? >> 8); diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm index 97e9c268..068631c6 100644 --- a/lib/PublicInbox/ProcessPipe.pm +++ b/lib/PublicInbox/ProcessPipe.pm @@ -1,16 +1,25 @@ -# Copyright (C) 2016-2021 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # a tied handle for auto reaping of children tied to a pipe, see perltie(1) package PublicInbox::ProcessPipe; -use strict; -use v5.10.1; +use v5.12; use Carp qw(carp); +use PublicInbox::DS qw(awaitpid); + +sub waitcb { # awaitpid callback + my ($pid, $err_ref, $cb, @args) = @_; + $$err_ref = $?; # sets >{pp_chld_err} for _close + $cb->($pid, @args) if $cb; +} sub TIEHANDLE { - my ($class, $pid, $fh, $cb, $arg) = @_; - bless { pid => $pid, fh => $fh, ppid => $$, cb => $cb, arg => $arg }, - $class; + my ($cls, $pid, $fh, @cb_arg) = @_; + my $self = bless { pid => $pid, fh => $fh, ppid => $$ }, $cls; + # we share $err (and not $self) with awaitpid to avoid a ref cycle + $self->{pp_chld_err} = \(my $err); + awaitpid($pid, \&waitcb, \$err, @cb_arg); + $self; } sub BINMODE { binmode(shift->{fh}) } # for IO::Uncompress::Gunzip @@ -33,24 +42,15 @@ sub FILENO { fileno($_[0]->{fh}) } sub _close ($;$) { my ($self, $wait) = @_; - my $fh = delete $self->{fh}; + my ($fh, $pid) = delete(@$self{qw(fh pid)}); my $ret = defined($fh) ? close($fh) : ''; - my ($pid, $cb, $arg) = delete @$self{qw(pid cb arg)}; return $ret unless defined($pid) && $self->{ppid} == $$; if ($wait) { # caller cares about the exit status: - my $wp = waitpid($pid, 0); - if ($wp == $pid) { - $ret = '' if $?; - if ($cb) { - eval { $cb->($arg, $pid) }; - carp "E: cb(arg, $pid): $@" if $@; - } - } else { - carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?"; - } - } else { # caller just undef-ed it, let event loop deal with it - require PublicInbox::DS; - PublicInbox::DS::dwaitpid($pid, $cb, $arg); + # synchronous wait via defined(wantarray) on awaitpid: + defined(${$self->{pp_chld_err}}) or $wait = awaitpid($pid); + ($? = ${$self->{pp_chld_err}}) and $ret = ''; + } else { + awaitpid($pid); # depends on $in_loop or not } $ret; } diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 779b703a..02357dbf 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -28,6 +28,7 @@ package PublicInbox::Qspawn; use v5.12; use PublicInbox::Spawn qw(popen_rd); use PublicInbox::GzipFilter; +use PublicInbox::DS qw(awaitpid); use Scalar::Util qw(blessed); # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers @@ -57,35 +58,21 @@ sub _do_spawn { } } $self->{cmd} = $o{quiet} ? undef : $cmd; + $o{cb_arg} = [ \&waitpid_err, $self ]; eval { # popen_rd may die on EMFILE, ENFILE - $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o); - - die "E: $!" unless defined($self->{rpipe}); - + $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o) // die "E: $!"; $limiter->{running}++; $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM }; finish($self, $@) if $@; } -sub child_err ($) { - my ($child_error) = @_; # typically $? - my $exitstatus = ($child_error >> 8) or return; - my $sig = $child_error & 127; - my $msg = "exit status=$exitstatus"; - $msg .= " signal=$sig" if $sig; - $msg; -} - -sub finalize ($$) { - my ($self, $err) = @_; - - my ($env, $qx_cb, $qx_arg, $qx_buf) = - delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)}; +sub finalize ($) { + my ($self) = @_; - # done, spawn whatever's in the queue - my $limiter = $self->{limiter}; + # process is done, spawn whatever's in the queue + my $limiter = delete $self->{limiter} or return; my $running = --$limiter->{running}; if ($running < $limiter->{max}) { @@ -93,14 +80,16 @@ sub finalize ($$) { _do_spawn(@$next, $limiter); } } - - if ($err) { + if (my $err = $self->{_err}) { # set by finish or waitpid_err utf8::decode($err); if (my $dst = $self->{qsp_err}) { $$dst .= $$dst ? " $err" : "; $err"; } warn "@{$self->{cmd}}: $err" if $self->{cmd}; } + + my ($env, $qx_cb, $qx_arg, $qx_buf) = + delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)}; if ($qx_cb) { eval { $qx_cb->($qx_buf, $qx_arg) }; return unless $@; @@ -115,14 +104,28 @@ sub finalize ($$) { } } -# callback for dwaitpid or ProcessPipe -sub waitpid_err { finalize($_[0], child_err($?)) } +sub waitpid_err { # callback for awaitpid + my (undef, $self) = @_; # $_[0]: pid + $self->{_err} = ''; # for defined check in ->finish + if ($?) { + my $status = $? >> 8; + my $sig = $? & 127; + $self->{_err} .= "exit status=$status"; + $self->{_err} .= " signal=$sig" if $sig; + } + finalize($self) if !$self->{rpipe}; +} sub finish ($;$) { my ($self, $err) = @_; - my $tied_pp = delete($self->{rpipe}) or return finalize($self, $err); - my PublicInbox::ProcessPipe $pp = tied *$tied_pp; - @$pp{qw(cb arg)} = (\&waitpid_err, $self); # for ->DESTROY + $self->{_err} //= $err; # only for $@ + + # we can safely finalize if pipe was closed before, or if + # {_err} is defined by waitpid_err. Deleting {rpipe} will + # trigger PublicInbox::ProcessPipe::DESTROY -> waitpid_err, + # but it may not fire right away if inside the event loop. + my $closed_before = !delete($self->{rpipe}); + finalize($self) if $closed_before || defined($self->{_err}); } sub start ($$$) { @@ -247,10 +250,9 @@ sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb} if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error if ($async) { # calls rpipe->close && ->event_step $async->close; # PublicInbox::HTTPD::Async::close - } else { # generic PSGI: + } else { # generic PSGI, use PublicInbox::ProcessPipe::CLOSE delete($self->{rpipe})->close; event_step($self); - waitpid_err($self); } if (ref($r) eq 'ARRAY') { # error $wcb->($r) diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 7f61d8db..826ee508 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -365,9 +365,9 @@ sub popen_rd { $opt->{1} = fileno($w); my $pid = spawn($cmd, $env, $opt); return ($r, $pid) if wantarray; - my $ret = gensym; - tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)}; - $ret; + my $s = gensym; + tie *$s, 'PublicInbox::ProcessPipe', $pid, $r, @{$opt->{cb_arg} // []}; + $s; } sub run_die ($;$$) { diff --git a/t/spawn.t b/t/spawn.t index 5fc99a2a..c22cfcfc 100644 --- a/t/spawn.t +++ b/t/spawn.t @@ -140,13 +140,13 @@ EOF { # ->CLOSE vs ->DESTROY waitpid caller distinction my @c; - my $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } }); + my $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] }); ok(close($fh), '->CLOSE fired and successful'); ok(scalar(@c), 'callback fired by ->CLOSE'); ok(grep(!m[/PublicInbox/DS\.pm\z], @c), 'callback not invoked by DS'); @c = (); - $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } }); + $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] }); undef $fh; # ->DESTROY ok(scalar(@c), 'callback fired by ->DESTROY'); ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c), @@ -156,8 +156,9 @@ EOF { # children don't wait on siblings use POSIX qw(_exit); pipe(my ($r, $w)) or BAIL_OUT $!; - my $cb = sub { warn "x=$$\n" }; - my $fh = popen_rd(['cat'], undef, { 0 => $r, cb => $cb }); + my @arg; + my $cb = [ sub { @arg = @_; warn "x=$$\n" }, 'hi' ]; + my $fh = popen_rd(['cat'], undef, { 0 => $r, cb_arg => $cb }); my $pp = tied *$fh; my $pid = fork // BAIL_OUT $!; local $SIG{__WARN__} = sub { _exit(1) }; @@ -173,6 +174,9 @@ EOF close $w; close $fh; is($?, 0, 'cat exited'); + is(scalar(@arg), 2, 'callback got args'); + is($arg[1], 'hi', 'passed arg'); + like($arg[0], qr/\A\d+\z/, 'PID'); is_deeply(\@w, [ "x=$$\n" ], 'callback fired from owner'); }