From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id A336E1F461; Mon, 8 Jul 2019 07:01:59 +0000 (UTC) Date: Mon, 8 Jul 2019 07:01:59 +0000 From: Eric Wong To: meta@public-inbox.org Subject: [PATCH v2] ds: use WNOHANG with waitpid if inside event loop Message-ID: <20190708070159.tsvq3xu6ecuueu3w@whir> References: <20190708052335.8487-1-e@80x24.org> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Disposition: inline In-Reply-To: <20190708052335.8487-1-e@80x24.org> List-Id: While we're usually not stuck waiting on waitpid after seeing a pipe EOF or even triggering SIGPIPE in the process (e.g. git-http-backend) we're reading from, it MAY happen and we should be careful to never hang the daemon process on waitpid calls. v2: use "eq" for string comparison against 'DEFAULT' --- Interdiff: diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index f5c58bdb..51515bf6 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -630,7 +630,7 @@ sub shutdn ($) { sub dwaitpid ($$$) { my ($pid, $cb, $arg) = @_; my $chld = $SIG{CHLD}; - if (defined($chld) && $chld == \&enqueue_reap) { + if (defined($chld) && $chld eq \&enqueue_reap) { push @$WaitPids, [ $pid, $cb, $arg ]; # We could've just missed our SIGCHLD, cover it, here: lib/PublicInbox/DS.pm | 47 ++++++++++++++++++++- lib/PublicInbox/Git.pm | 5 ++- lib/PublicInbox/ProcessPipe.pm | 9 ++++- lib/PublicInbox/Qspawn.pm | 74 +++++++++++++++++++++++++--------- 4 files changed, 111 insertions(+), 24 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 586c47cd..51515bf6 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -16,7 +16,7 @@ package PublicInbox::DS; use strict; use bytes; -use POSIX (); +use POSIX qw(WNOHANG); use IO::Handle qw(); use Fcntl qw(SEEK_SET :DEFAULT); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); @@ -38,6 +38,8 @@ use Carp qw(croak confess carp); require File::Spec; my $nextq = []; # queue for next_tick +my $WaitPids = []; # list of [ pid, callback, callback_arg ] +my $reap_timer; our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object $Epoll, # Global epoll fd (or DSKQXS ref) @@ -64,6 +66,8 @@ Reset all state =cut sub Reset { %DescriptorMap = (); + $WaitPids = []; + $reap_timer = undef; @ToClose = (); $LoopTimeout = -1; # no timeout by default @Timers = (); @@ -215,7 +219,33 @@ sub RunTimers { return $timeout; } +# We can't use waitpid(-1) safely here since it can hit ``, system(), +# and other things. So we scan the $WaitPids list, which is hopefully +# not too big. +sub reap_pids { + my $tmp = $WaitPids; + $WaitPids = []; + $reap_timer = undef; + foreach my $ary (@$tmp) { + my ($pid, $cb, $arg) = @$ary; + my $ret = waitpid($pid, WNOHANG); + if ($ret == 0) { + push @$WaitPids, $ary; + } elsif ($cb) { + eval { $cb->($arg, $pid) }; + } + } + if (@$WaitPids) { + # we may not be donea, and we may miss our + $reap_timer = AddTimer(undef, 1, \&reap_pids); + } +} + +# reentrant SIGCHLD handler (since reap_pids is not reentrant) +sub enqueue_reap ($) { push @$nextq, \&reap_pids }; + sub EpollEventLoop { + local $SIG{CHLD} = \&enqueue_reap; while (1) { my @events; my $i; @@ -595,6 +625,21 @@ sub shutdn ($) { $self->close; } } + +# must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t) +sub dwaitpid ($$$) { + my ($pid, $cb, $arg) = @_; + my $chld = $SIG{CHLD}; + if (defined($chld) && $chld eq \&enqueue_reap) { + push @$WaitPids, [ $pid, $cb, $arg ]; + + # We could've just missed our SIGCHLD, cover it, here: + requeue(\&reap_pids); + } else { + die "Not in EventLoop\n"; + } +} + package PublicInbox::DS::Timer; # [$abs_float_firetime, $coderef]; sub cancel { diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index f5c7a95c..2f1a61f9 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -198,7 +198,10 @@ sub _destroy { my $p = delete $self->{$pid} or return; delete @$self{($in, $out)}; delete $self->{$err} if $err; # `err_c' - waitpid $p, 0; + + # PublicInbox::DS may not be loaded + eval { PublicInbox::DS::dwaitpid($p, undef, undef) }; + waitpid($p, 0) if $@; # wait synchronously if not in event loop } sub fail { diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm index 2769e064..4f5fc751 100644 --- a/lib/PublicInbox/ProcessPipe.pm +++ b/lib/PublicInbox/ProcessPipe.pm @@ -20,8 +20,13 @@ sub CLOSE { my $ret = defined $fh ? close($fh) : ''; my $pid = delete $_[0]->{pid}; if (defined $pid) { - waitpid($pid, 0); - $ret = '' if $?; + # PublicInbox::DS may not be loaded + eval { PublicInbox::DS::dwaitpid($pid, undef, undef) }; + + if ($@) { # ok, not in the event loop, work synchronously + waitpid($pid, 0); + $ret = '' if $?; + } } $ret; } diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index fb48585c..f2e91ab6 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -27,6 +27,7 @@ package PublicInbox::Qspawn; use strict; use warnings; use PublicInbox::Spawn qw(popen_rd); +use POSIX qw(WNOHANG); require Plack::Util; # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers @@ -73,24 +74,66 @@ sub child_err ($) { $msg; } -sub finish ($) { - my ($self) = @_; +# callback for dwaitpid +sub waitpid_err ($$) { + my ($self, $pid) = @_; + my $xpid = delete $self->{pid}; + my $err; + if ($pid > 0) { # success! + $err = child_err($?); + } elsif ($pid < 0) { # ??? does this happen in our case? + $err = "W: waitpid($xpid, 0) => $pid: $!"; + } # else should not be called with pid == 0 + + # done, spawn whatever's in the queue my $limiter = $self->{limiter}; - my $running; + my $running = --$limiter->{running}; + + # limiter->{max} may change dynamically + if (($running || $limiter->{running}) < $limiter->{max}) { + if (my $next = shift @{$limiter->{run_queue}}) { + _do_spawn(@$next); + } + } + + return unless $err; + $self->{err} = $err; + my $env = $self->{env} or return; + if (!$env->{'qspawn.quiet'}) { + $err = join(' ', @{$self->{args}->[0]}).": $err\n"; + $env->{'psgi.errors'}->print($err); + } +} + +sub do_waitpid ($;$) { + my ($self, $env) = @_; + my $pid = $self->{pid}; + eval { # PublicInbox::DS may not be loaded + PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self); + $self->{env} = $env; + }; + # done if we're running in PublicInbox::DS::EventLoop + if ($@) { + # non public-inbox-{httpd,nntpd} callers may block: + my $ret = waitpid($pid, 0); + waitpid_err($self, $ret); + } +} + +sub finish ($;$) { + my ($self, $env) = @_; if (delete $self->{rpipe}) { - my $pid = delete $self->{pid}; - $self->{err} = $pid == waitpid($pid, 0) ? child_err($?) : - "PID:$pid still running?"; - $running = --$limiter->{running}; + do_waitpid($self, $env); } # limiter->{max} may change dynamically - if (($running || $limiter->{running}) < $limiter->{max}) { + my $limiter = $self->{limiter}; + if ($limiter->{running} < $limiter->{max}) { if (my $next = shift @{$limiter->{run_queue}}) { _do_spawn(@$next); } } - $self->{err}; + $self->{err}; # may be meaningless if non-blocking } sub start { @@ -104,15 +147,6 @@ sub start { } } -sub _psgi_finish ($$) { - my ($self, $env) = @_; - my $err = $self->finish; - if ($err && !$env->{'qspawn.quiet'}) { - $err = join(' ', @{$self->{args}->[0]}).": $err\n"; - $env->{'psgi.errors'}->print($err); - } -} - # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with # the stdout of the given command when done; but respects the given limiter # $env is the PSGI env. As with ``/qx; only use this when output is small @@ -121,7 +155,7 @@ sub psgi_qx { my ($self, $env, $limiter, $qx_cb) = @_; my $qx = PublicInbox::Qspawn::Qx->new; my $end = sub { - _psgi_finish($self, $env); + finish($self, $env); eval { $qx_cb->($qx) }; $qx = undef; }; @@ -189,7 +223,7 @@ sub psgi_return { my ($self, $env, $limiter, $parse_hdr) = @_; my ($fh, $rpipe); my $end = sub { - _psgi_finish($self, $env); + finish($self, $env); $fh->close if $fh; # async-only }; -- EW