user/dev discussion of public-inbox itself
 help / color / Atom feed
* [PATCH] ds: use WNOHANG with waitpid if inside event loop
@ 2019-07-08  5:23 Eric Wong
  2019-07-08  7:01 ` [PATCH v2] " Eric Wong
  0 siblings, 1 reply; 2+ messages in thread
From: Eric Wong @ 2019-07-08  5:23 UTC (permalink / raw)
  To: meta

While we're usually not stuck waiting on waitpid after
seeing a pipe EOF or even triggering SIGPIPE in the process
(e.g. git-http-backend) we're reading from, it MAY happen
and we should be careful to never hang the daemon process
on waitpid calls.
---
 lib/PublicInbox/DS.pm          | 47 ++++++++++++++++++++-
 lib/PublicInbox/Git.pm         |  5 ++-
 lib/PublicInbox/ProcessPipe.pm |  9 ++++-
 lib/PublicInbox/Qspawn.pm      | 74 +++++++++++++++++++++++++---------
 4 files changed, 111 insertions(+), 24 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 586c47cd..f5c58bdb 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -16,7 +16,7 @@
 package PublicInbox::DS;
 use strict;
 use bytes;
-use POSIX ();
+use POSIX qw(WNOHANG);
 use IO::Handle qw();
 use Fcntl qw(SEEK_SET :DEFAULT);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
@@ -38,6 +38,8 @@ use Carp   qw(croak confess carp);
 require File::Spec;
 
 my $nextq = []; # queue for next_tick
+my $WaitPids = [];               # list of [ pid, callback, callback_arg ]
+my $reap_timer;
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
      $Epoll,                     # Global epoll fd (or DSKQXS ref)
@@ -64,6 +66,8 @@ Reset all state
 =cut
 sub Reset {
     %DescriptorMap = ();
+    $WaitPids = [];
+    $reap_timer = undef;
     @ToClose = ();
     $LoopTimeout = -1;  # no timeout by default
     @Timers = ();
@@ -215,7 +219,33 @@ sub RunTimers {
     return $timeout;
 }
 
+# We can't use waitpid(-1) safely here since it can hit ``, system(),
+# and other things.  So we scan the $WaitPids list, which is hopefully
+# not too big.
+sub reap_pids {
+    my $tmp = $WaitPids;
+    $WaitPids = [];
+    $reap_timer = undef;
+    foreach my $ary (@$tmp) {
+        my ($pid, $cb, $arg) = @$ary;
+        my $ret = waitpid($pid, WNOHANG);
+        if ($ret == 0) {
+            push @$WaitPids, $ary;
+        } elsif ($cb) {
+            eval { $cb->($arg, $pid) };
+        }
+    }
+    if (@$WaitPids) {
+        # we may not be donea, and we may miss our
+        $reap_timer = AddTimer(undef, 1, \&reap_pids);
+    }
+}
+
+# reentrant SIGCHLD handler (since reap_pids is not reentrant)
+sub enqueue_reap ($) { push @$nextq, \&reap_pids };
+
 sub EpollEventLoop {
+    local $SIG{CHLD} = \&enqueue_reap;
     while (1) {
         my @events;
         my $i;
@@ -595,6 +625,21 @@ sub shutdn ($) {
 	$self->close;
     }
 }
+
+# must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t)
+sub dwaitpid ($$$) {
+    my ($pid, $cb, $arg) = @_;
+    my $chld = $SIG{CHLD};
+    if (defined($chld) && $chld == \&enqueue_reap) {
+        push @$WaitPids, [ $pid, $cb, $arg ];
+
+        # We could've just missed our SIGCHLD, cover it, here:
+        requeue(\&reap_pids);
+    } else {
+        die "Not in EventLoop\n";
+    }
+}
+
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index f5c7a95c..2f1a61f9 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -198,7 +198,10 @@ sub _destroy {
 	my $p = delete $self->{$pid} or return;
 	delete @$self{($in, $out)};
 	delete $self->{$err} if $err; # `err_c'
-	waitpid $p, 0;
+
+	# PublicInbox::DS may not be loaded
+	eval { PublicInbox::DS::dwaitpid($p, undef, undef) };
+	waitpid($p, 0) if $@; # wait synchronously if not in event loop
 }
 
 sub fail {
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm
index 2769e064..4f5fc751 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessPipe.pm
@@ -20,8 +20,13 @@ sub CLOSE {
 	my $ret = defined $fh ? close($fh) : '';
 	my $pid = delete $_[0]->{pid};
 	if (defined $pid) {
-		waitpid($pid, 0);
-		$ret = '' if $?;
+		# PublicInbox::DS may not be loaded
+		eval { PublicInbox::DS::dwaitpid($pid, undef, undef) };
+
+		if ($@) { # ok, not in the event loop, work synchronously
+			waitpid($pid, 0);
+			$ret = '' if $?;
+		}
 	}
 	$ret;
 }
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index fb48585c..f2e91ab6 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -27,6 +27,7 @@ package PublicInbox::Qspawn;
 use strict;
 use warnings;
 use PublicInbox::Spawn qw(popen_rd);
+use POSIX qw(WNOHANG);
 require Plack::Util;
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
@@ -73,24 +74,66 @@ sub child_err ($) {
 	$msg;
 }
 
-sub finish ($) {
-	my ($self) = @_;
+# callback for dwaitpid
+sub waitpid_err ($$) {
+	my ($self, $pid) = @_;
+	my $xpid = delete $self->{pid};
+	my $err;
+	if ($pid > 0) { # success!
+		$err = child_err($?);
+	} elsif ($pid < 0) { # ??? does this happen in our case?
+		$err = "W: waitpid($xpid, 0) => $pid: $!";
+	} # else should not be called with pid == 0
+
+	# done, spawn whatever's in the queue
 	my $limiter = $self->{limiter};
-	my $running;
+	my $running = --$limiter->{running};
+
+	# limiter->{max} may change dynamically
+	if (($running || $limiter->{running}) < $limiter->{max}) {
+		if (my $next = shift @{$limiter->{run_queue}}) {
+			_do_spawn(@$next);
+		}
+	}
+
+	return unless $err;
+	$self->{err} = $err;
+	my $env = $self->{env} or return;
+	if (!$env->{'qspawn.quiet'}) {
+		$err = join(' ', @{$self->{args}->[0]}).": $err\n";
+		$env->{'psgi.errors'}->print($err);
+	}
+}
+
+sub do_waitpid ($;$) {
+	my ($self, $env) = @_;
+	my $pid = $self->{pid};
+	eval { # PublicInbox::DS may not be loaded
+		PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self);
+		$self->{env} = $env;
+	};
+	# done if we're running in PublicInbox::DS::EventLoop
+	if ($@) {
+		# non public-inbox-{httpd,nntpd} callers may block:
+		my $ret = waitpid($pid, 0);
+		waitpid_err($self, $ret);
+	}
+}
+
+sub finish ($;$) {
+	my ($self, $env) = @_;
 	if (delete $self->{rpipe}) {
-		my $pid = delete $self->{pid};
-		$self->{err} = $pid == waitpid($pid, 0) ? child_err($?) :
-				"PID:$pid still running?";
-		$running = --$limiter->{running};
+		do_waitpid($self, $env);
 	}
 
 	# limiter->{max} may change dynamically
-	if (($running || $limiter->{running}) < $limiter->{max}) {
+	my $limiter = $self->{limiter};
+	if ($limiter->{running} < $limiter->{max}) {
 		if (my $next = shift @{$limiter->{run_queue}}) {
 			_do_spawn(@$next);
 		}
 	}
-	$self->{err};
+	$self->{err}; # may be meaningless if non-blocking
 }
 
 sub start {
@@ -104,15 +147,6 @@ sub start {
 	}
 }
 
-sub _psgi_finish ($$) {
-	my ($self, $env) = @_;
-	my $err = $self->finish;
-	if ($err && !$env->{'qspawn.quiet'}) {
-		$err = join(' ', @{$self->{args}->[0]}).": $err\n";
-		$env->{'psgi.errors'}->print($err);
-	}
-}
-
 # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
 # the stdout of the given command when done; but respects the given limiter
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
@@ -121,7 +155,7 @@ sub psgi_qx {
 	my ($self, $env, $limiter, $qx_cb) = @_;
 	my $qx = PublicInbox::Qspawn::Qx->new;
 	my $end = sub {
-		_psgi_finish($self, $env);
+		finish($self, $env);
 		eval { $qx_cb->($qx) };
 		$qx = undef;
 	};
@@ -189,7 +223,7 @@ sub psgi_return {
 	my ($self, $env, $limiter, $parse_hdr) = @_;
 	my ($fh, $rpipe);
 	my $end = sub {
-		_psgi_finish($self, $env);
+		finish($self, $env);
 		$fh->close if $fh; # async-only
 	};
 
-- 
EW


^ permalink raw reply	[flat|nested] 2+ messages in thread

* [PATCH v2] ds: use WNOHANG with waitpid if inside event loop
  2019-07-08  5:23 [PATCH] ds: use WNOHANG with waitpid if inside event loop Eric Wong
@ 2019-07-08  7:01 ` " Eric Wong
  0 siblings, 0 replies; 2+ messages in thread
From: Eric Wong @ 2019-07-08  7:01 UTC (permalink / raw)
  To: meta

While we're usually not stuck waiting on waitpid after
seeing a pipe EOF or even triggering SIGPIPE in the process
(e.g. git-http-backend) we're reading from, it MAY happen
and we should be careful to never hang the daemon process
on waitpid calls.

v2: use "eq" for string comparison against 'DEFAULT'
---
Interdiff:
  diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
  index f5c58bdb..51515bf6 100644
  --- a/lib/PublicInbox/DS.pm
  +++ b/lib/PublicInbox/DS.pm
  @@ -630,7 +630,7 @@ sub shutdn ($) {
   sub dwaitpid ($$$) {
       my ($pid, $cb, $arg) = @_;
       my $chld = $SIG{CHLD};
  -    if (defined($chld) && $chld == \&enqueue_reap) {
  +    if (defined($chld) && $chld eq \&enqueue_reap) {
           push @$WaitPids, [ $pid, $cb, $arg ];
   
           # We could've just missed our SIGCHLD, cover it, here:

 lib/PublicInbox/DS.pm          | 47 ++++++++++++++++++++-
 lib/PublicInbox/Git.pm         |  5 ++-
 lib/PublicInbox/ProcessPipe.pm |  9 ++++-
 lib/PublicInbox/Qspawn.pm      | 74 +++++++++++++++++++++++++---------
 4 files changed, 111 insertions(+), 24 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 586c47cd..51515bf6 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -16,7 +16,7 @@
 package PublicInbox::DS;
 use strict;
 use bytes;
-use POSIX ();
+use POSIX qw(WNOHANG);
 use IO::Handle qw();
 use Fcntl qw(SEEK_SET :DEFAULT);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
@@ -38,6 +38,8 @@ use Carp   qw(croak confess carp);
 require File::Spec;
 
 my $nextq = []; # queue for next_tick
+my $WaitPids = [];               # list of [ pid, callback, callback_arg ]
+my $reap_timer;
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
      $Epoll,                     # Global epoll fd (or DSKQXS ref)
@@ -64,6 +66,8 @@ Reset all state
 =cut
 sub Reset {
     %DescriptorMap = ();
+    $WaitPids = [];
+    $reap_timer = undef;
     @ToClose = ();
     $LoopTimeout = -1;  # no timeout by default
     @Timers = ();
@@ -215,7 +219,33 @@ sub RunTimers {
     return $timeout;
 }
 
+# We can't use waitpid(-1) safely here since it can hit ``, system(),
+# and other things.  So we scan the $WaitPids list, which is hopefully
+# not too big.
+sub reap_pids {
+    my $tmp = $WaitPids;
+    $WaitPids = [];
+    $reap_timer = undef;
+    foreach my $ary (@$tmp) {
+        my ($pid, $cb, $arg) = @$ary;
+        my $ret = waitpid($pid, WNOHANG);
+        if ($ret == 0) {
+            push @$WaitPids, $ary;
+        } elsif ($cb) {
+            eval { $cb->($arg, $pid) };
+        }
+    }
+    if (@$WaitPids) {
+        # we may not be donea, and we may miss our
+        $reap_timer = AddTimer(undef, 1, \&reap_pids);
+    }
+}
+
+# reentrant SIGCHLD handler (since reap_pids is not reentrant)
+sub enqueue_reap ($) { push @$nextq, \&reap_pids };
+
 sub EpollEventLoop {
+    local $SIG{CHLD} = \&enqueue_reap;
     while (1) {
         my @events;
         my $i;
@@ -595,6 +625,21 @@ sub shutdn ($) {
 	$self->close;
     }
 }
+
+# must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t)
+sub dwaitpid ($$$) {
+    my ($pid, $cb, $arg) = @_;
+    my $chld = $SIG{CHLD};
+    if (defined($chld) && $chld eq \&enqueue_reap) {
+        push @$WaitPids, [ $pid, $cb, $arg ];
+
+        # We could've just missed our SIGCHLD, cover it, here:
+        requeue(\&reap_pids);
+    } else {
+        die "Not in EventLoop\n";
+    }
+}
+
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index f5c7a95c..2f1a61f9 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -198,7 +198,10 @@ sub _destroy {
 	my $p = delete $self->{$pid} or return;
 	delete @$self{($in, $out)};
 	delete $self->{$err} if $err; # `err_c'
-	waitpid $p, 0;
+
+	# PublicInbox::DS may not be loaded
+	eval { PublicInbox::DS::dwaitpid($p, undef, undef) };
+	waitpid($p, 0) if $@; # wait synchronously if not in event loop
 }
 
 sub fail {
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm
index 2769e064..4f5fc751 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessPipe.pm
@@ -20,8 +20,13 @@ sub CLOSE {
 	my $ret = defined $fh ? close($fh) : '';
 	my $pid = delete $_[0]->{pid};
 	if (defined $pid) {
-		waitpid($pid, 0);
-		$ret = '' if $?;
+		# PublicInbox::DS may not be loaded
+		eval { PublicInbox::DS::dwaitpid($pid, undef, undef) };
+
+		if ($@) { # ok, not in the event loop, work synchronously
+			waitpid($pid, 0);
+			$ret = '' if $?;
+		}
 	}
 	$ret;
 }
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index fb48585c..f2e91ab6 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -27,6 +27,7 @@ package PublicInbox::Qspawn;
 use strict;
 use warnings;
 use PublicInbox::Spawn qw(popen_rd);
+use POSIX qw(WNOHANG);
 require Plack::Util;
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
@@ -73,24 +74,66 @@ sub child_err ($) {
 	$msg;
 }
 
-sub finish ($) {
-	my ($self) = @_;
+# callback for dwaitpid
+sub waitpid_err ($$) {
+	my ($self, $pid) = @_;
+	my $xpid = delete $self->{pid};
+	my $err;
+	if ($pid > 0) { # success!
+		$err = child_err($?);
+	} elsif ($pid < 0) { # ??? does this happen in our case?
+		$err = "W: waitpid($xpid, 0) => $pid: $!";
+	} # else should not be called with pid == 0
+
+	# done, spawn whatever's in the queue
 	my $limiter = $self->{limiter};
-	my $running;
+	my $running = --$limiter->{running};
+
+	# limiter->{max} may change dynamically
+	if (($running || $limiter->{running}) < $limiter->{max}) {
+		if (my $next = shift @{$limiter->{run_queue}}) {
+			_do_spawn(@$next);
+		}
+	}
+
+	return unless $err;
+	$self->{err} = $err;
+	my $env = $self->{env} or return;
+	if (!$env->{'qspawn.quiet'}) {
+		$err = join(' ', @{$self->{args}->[0]}).": $err\n";
+		$env->{'psgi.errors'}->print($err);
+	}
+}
+
+sub do_waitpid ($;$) {
+	my ($self, $env) = @_;
+	my $pid = $self->{pid};
+	eval { # PublicInbox::DS may not be loaded
+		PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self);
+		$self->{env} = $env;
+	};
+	# done if we're running in PublicInbox::DS::EventLoop
+	if ($@) {
+		# non public-inbox-{httpd,nntpd} callers may block:
+		my $ret = waitpid($pid, 0);
+		waitpid_err($self, $ret);
+	}
+}
+
+sub finish ($;$) {
+	my ($self, $env) = @_;
 	if (delete $self->{rpipe}) {
-		my $pid = delete $self->{pid};
-		$self->{err} = $pid == waitpid($pid, 0) ? child_err($?) :
-				"PID:$pid still running?";
-		$running = --$limiter->{running};
+		do_waitpid($self, $env);
 	}
 
 	# limiter->{max} may change dynamically
-	if (($running || $limiter->{running}) < $limiter->{max}) {
+	my $limiter = $self->{limiter};
+	if ($limiter->{running} < $limiter->{max}) {
 		if (my $next = shift @{$limiter->{run_queue}}) {
 			_do_spawn(@$next);
 		}
 	}
-	$self->{err};
+	$self->{err}; # may be meaningless if non-blocking
 }
 
 sub start {
@@ -104,15 +147,6 @@ sub start {
 	}
 }
 
-sub _psgi_finish ($$) {
-	my ($self, $env) = @_;
-	my $err = $self->finish;
-	if ($err && !$env->{'qspawn.quiet'}) {
-		$err = join(' ', @{$self->{args}->[0]}).": $err\n";
-		$env->{'psgi.errors'}->print($err);
-	}
-}
-
 # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
 # the stdout of the given command when done; but respects the given limiter
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
@@ -121,7 +155,7 @@ sub psgi_qx {
 	my ($self, $env, $limiter, $qx_cb) = @_;
 	my $qx = PublicInbox::Qspawn::Qx->new;
 	my $end = sub {
-		_psgi_finish($self, $env);
+		finish($self, $env);
 		eval { $qx_cb->($qx) };
 		$qx = undef;
 	};
@@ -189,7 +223,7 @@ sub psgi_return {
 	my ($self, $env, $limiter, $parse_hdr) = @_;
 	my ($fh, $rpipe);
 	my $end = sub {
-		_psgi_finish($self, $env);
+		finish($self, $env);
 		$fh->close if $fh; # async-only
 	};
 
-- 
EW

^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, back to index

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-07-08  5:23 [PATCH] ds: use WNOHANG with waitpid if inside event loop Eric Wong
2019-07-08  7:01 ` [PATCH v2] " Eric Wong

user/dev discussion of public-inbox itself

Archives are clonable:
	git clone --mirror https://public-inbox.org/meta
	git clone --mirror http://czquwvybam4bgbro.onion/meta
	git clone --mirror http://hjrcffqmbrq6wope.onion/meta
	git clone --mirror http://ou63pmih66umazou.onion/meta

Newsgroups are available over NNTP:
	nntp://news.public-inbox.org/inbox.comp.mail.public-inbox.meta
	nntp://ou63pmih66umazou.onion/inbox.comp.mail.public-inbox.meta
	nntp://czquwvybam4bgbro.onion/inbox.comp.mail.public-inbox.meta
	nntp://hjrcffqmbrq6wope.onion/inbox.comp.mail.public-inbox.meta
	nntp://news.gmane.org/gmane.mail.public-inbox.general

 note: .onion URLs require Tor: https://www.torproject.org/

AGPL code for this site: git clone https://public-inbox.org/ public-inbox