user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 04/12] ds: introduce awaitpid, switch ProcessPipe users
Date: Tue, 17 Jan 2023 07:19:03 +0000	[thread overview]
Message-ID: <20230117071911.1577890-5-e@80x24.org> (raw)
In-Reply-To: <20230117071911.1577890-1-e@80x24.org>

awaitpid is the new API which will eventually replace dwaitpid.
It enables early registration of callback handlers.  Eventually
(once dwaitpid is gone) it'll be able to use fewer waitpid
calls.

The avoidance of waitpid(-1) in our earlier days was driven by
the belief that threads may eventually become relevant for Perl 5,
but that's extremely unlikely at this stage.  I will still
introduce optional threads via C, but they definitely won't be
spawning/reaping processes.

Argument order to callbacks is swapped (PID first) to allow
flattened multiple arguments more natrually.  The previous API
(allowing only a single argument, as influenced by
pthread_create(3)) was more tedious as it involved packing
multiple arguments into yet another array.
---
 lib/PublicInbox/DS.pm          | 43 +++++++++++++++++++++---
 lib/PublicInbox/LeiToMail.pm   |  4 +--
 lib/PublicInbox/ProcessPipe.pm | 42 ++++++++++++------------
 lib/PublicInbox/Qspawn.pm      | 60 ++++++++++++++++++----------------
 lib/PublicInbox/Spawn.pm       |  6 ++--
 t/spawn.t                      | 12 ++++---
 6 files changed, 104 insertions(+), 63 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index e4629e97..9563a1cb 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -32,11 +32,12 @@ use PublicInbox::Syscall qw(:epoll);
 use PublicInbox::Tmpfile;
 use Errno qw(EAGAIN EINVAL);
 use Carp qw(carp croak);
-our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer);
+our @EXPORT_OK = qw(now msg_more dwaitpid awaitpid add_timer add_uniq_timer);
 
 my %Stack;
 my $nextq; # queue for next_tick
 my $wait_pids; # list of [ pid, callback, callback_arg ]
+my $AWAIT_PIDS; # pid => [ $callback, @args ]
 my $reap_armed;
 my $ToClose; # sockets to close when event loop is done
 our (
@@ -74,11 +75,11 @@ sub Reset {
 		# we may be iterating inside one of these on our stack
 		my @q = delete @Stack{keys %Stack};
 		for my $q (@q) { @$q = () }
-		$wait_pids = $nextq = $ToClose = undef;
+		$AWAIT_PIDS = $wait_pids = $nextq = $ToClose = undef;
 		$ep_io = undef; # closes real $Epoll FD
 		$Epoll = undef; # may call DSKQXS::DESTROY
 	} while (@Timers || keys(%Stack) || $nextq || $wait_pids ||
-		$ToClose || keys(%DescriptorMap) ||
+		$ToClose || keys(%DescriptorMap) || $AWAIT_PIDS ||
 		$PostLoopCallback || keys(%UniqTimer));
 
 	$reap_armed = undef;
@@ -201,6 +202,13 @@ sub block_signals () {
 	$oldset;
 }
 
+sub await_cb ($;@) {
+	my ($pid, @cb_args) = @_;
+	my $cb = shift @cb_args or return;
+	eval { $cb->($pid, @cb_args) };
+	warn "E: awaitpid($pid): $@" if $@;
+}
+
 # We can't use waitpid(-1) safely here since it can hit ``, system(),
 # and other things.  So we scan the $wait_pids list, which is hopefully
 # not too big.  We keep $wait_pids small by not calling dwaitpid()
@@ -208,10 +216,12 @@ sub block_signals () {
 
 sub reap_pids {
 	$reap_armed = undef;
-	my $tmp = $wait_pids or return;
+	my $tmp = $wait_pids // [];
 	$wait_pids = undef;
 	$Stack{reap_runq} = $tmp;
 	my $oldset = block_signals();
+
+	# old API
 	foreach my $ary (@$tmp) {
 		my ($pid, $cb, $arg) = @$ary;
 		my $ret = waitpid($pid, WNOHANG);
@@ -226,6 +236,14 @@ sub reap_pids {
 			warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
 		}
 	}
+
+	# new API TODO: convert to waitpid(-1) in the future as long
+	# as we don't use threads
+	for my $pid (keys %$AWAIT_PIDS) {
+		my $wpid = waitpid($pid, WNOHANG) // next;
+		my $cb_args = delete $AWAIT_PIDS->{$wpid} or next;
+		await_cb($pid, @$cb_args);
+	}
 	sig_setmask($oldset);
 	delete $Stack{reap_runq};
 }
@@ -720,6 +738,23 @@ sub dwaitpid ($;$$) {
 	}
 }
 
+sub awaitpid {
+	my ($pid, @cb_args) = @_;
+	$AWAIT_PIDS->{$pid} //= @cb_args ? \@cb_args : 0;
+	# provide synchronous API
+	if (defined(wantarray) || (!$in_loop && !@cb_args)) {
+		my $ret = waitpid($pid, 0) // -2;
+		if ($ret == $pid) {
+			my $cb_args = delete $AWAIT_PIDS->{$pid};
+			@cb_args = @$cb_args if !@cb_args && $cb_args;
+			await_cb($pid, @cb_args);
+			return $ret;
+		}
+	}
+	# We could've just missed our SIGCHLD, cover it, here:
+	enqueue_reap() if $in_loop;
+}
+
 1;
 
 =head1 AUTHORS (Danga::Socket)
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index b58e2652..1528165a 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -150,8 +150,8 @@ sub git_to_mail { # git->cat_async callback
 	$self->{lei}->fail("$@ (oid=$oid)") if $@;
 }
 
-sub reap_compress { # dwaitpid callback
-	my ($lei, $pid) = @_;
+sub reap_compress { # awaitpid callback
+	my ($pid, $lei) = @_;
 	my $cmd = delete $lei->{"pid.$pid"};
 	return if $? == 0;
 	$lei->fail("@$cmd failed", $? >> 8);
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm
index 97e9c268..068631c6 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessPipe.pm
@@ -1,16 +1,25 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # a tied handle for auto reaping of children tied to a pipe, see perltie(1)
 package PublicInbox::ProcessPipe;
-use strict;
-use v5.10.1;
+use v5.12;
 use Carp qw(carp);
+use PublicInbox::DS qw(awaitpid);
+
+sub waitcb { # awaitpid callback
+	my ($pid, $err_ref, $cb, @args) = @_;
+	$$err_ref = $?; # sets >{pp_chld_err} for _close
+	$cb->($pid, @args) if $cb;
+}
 
 sub TIEHANDLE {
-	my ($class, $pid, $fh, $cb, $arg) = @_;
-	bless { pid => $pid, fh => $fh, ppid => $$, cb => $cb, arg => $arg },
-		$class;
+	my ($cls, $pid, $fh, @cb_arg) = @_;
+	my $self = bless { pid => $pid, fh => $fh, ppid => $$ }, $cls;
+	# we share $err (and not $self) with awaitpid to avoid a ref cycle
+	$self->{pp_chld_err} = \(my $err);
+	awaitpid($pid, \&waitcb, \$err, @cb_arg);
+	$self;
 }
 
 sub BINMODE { binmode(shift->{fh}) } # for IO::Uncompress::Gunzip
@@ -33,24 +42,15 @@ sub FILENO { fileno($_[0]->{fh}) }
 
 sub _close ($;$) {
 	my ($self, $wait) = @_;
-	my $fh = delete $self->{fh};
+	my ($fh, $pid) = delete(@$self{qw(fh pid)});
 	my $ret = defined($fh) ? close($fh) : '';
-	my ($pid, $cb, $arg) = delete @$self{qw(pid cb arg)};
 	return $ret unless defined($pid) && $self->{ppid} == $$;
 	if ($wait) { # caller cares about the exit status:
-		my $wp = waitpid($pid, 0);
-		if ($wp == $pid) {
-			$ret = '' if $?;
-			if ($cb) {
-				eval { $cb->($arg, $pid) };
-				carp "E: cb(arg, $pid): $@" if $@;
-			}
-		} else {
-			carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?";
-		}
-	} else { # caller just undef-ed it, let event loop deal with it
-		require PublicInbox::DS;
-		PublicInbox::DS::dwaitpid($pid, $cb, $arg);
+		# synchronous wait via defined(wantarray) on awaitpid:
+		defined(${$self->{pp_chld_err}}) or $wait = awaitpid($pid);
+		($? = ${$self->{pp_chld_err}}) and $ret = '';
+	} else {
+		awaitpid($pid); # depends on $in_loop or not
 	}
 	$ret;
 }
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 779b703a..02357dbf 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -28,6 +28,7 @@ package PublicInbox::Qspawn;
 use v5.12;
 use PublicInbox::Spawn qw(popen_rd);
 use PublicInbox::GzipFilter;
+use PublicInbox::DS qw(awaitpid);
 use Scalar::Util qw(blessed);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
@@ -57,35 +58,21 @@ sub _do_spawn {
 		}
 	}
 	$self->{cmd} = $o{quiet} ? undef : $cmd;
+	$o{cb_arg} = [ \&waitpid_err, $self ];
 	eval {
 		# popen_rd may die on EMFILE, ENFILE
-		$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o);
-
-		die "E: $!" unless defined($self->{rpipe});
-
+		$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o) // die "E: $!";
 		$limiter->{running}++;
 		$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
 	};
 	finish($self, $@) if $@;
 }
 
-sub child_err ($) {
-	my ($child_error) = @_; # typically $?
-	my $exitstatus = ($child_error >> 8) or return;
-	my $sig = $child_error & 127;
-	my $msg = "exit status=$exitstatus";
-	$msg .= " signal=$sig" if $sig;
-	$msg;
-}
-
-sub finalize ($$) {
-	my ($self, $err) = @_;
-
-	my ($env, $qx_cb, $qx_arg, $qx_buf) =
-		delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+sub finalize ($) {
+	my ($self) = @_;
 
-	# done, spawn whatever's in the queue
-	my $limiter = $self->{limiter};
+	# process is done, spawn whatever's in the queue
+	my $limiter = delete $self->{limiter} or return;
 	my $running = --$limiter->{running};
 
 	if ($running < $limiter->{max}) {
@@ -93,14 +80,16 @@ sub finalize ($$) {
 			_do_spawn(@$next, $limiter);
 		}
 	}
-
-	if ($err) {
+	if (my $err = $self->{_err}) { # set by finish or waitpid_err
 		utf8::decode($err);
 		if (my $dst = $self->{qsp_err}) {
 			$$dst .= $$dst ? " $err" : "; $err";
 		}
 		warn "@{$self->{cmd}}: $err" if $self->{cmd};
 	}
+
+	my ($env, $qx_cb, $qx_arg, $qx_buf) =
+		delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
 	if ($qx_cb) {
 		eval { $qx_cb->($qx_buf, $qx_arg) };
 		return unless $@;
@@ -115,14 +104,28 @@ sub finalize ($$) {
 	}
 }
 
-# callback for dwaitpid or ProcessPipe
-sub waitpid_err { finalize($_[0], child_err($?)) }
+sub waitpid_err { # callback for awaitpid
+	my (undef, $self) = @_; # $_[0]: pid
+	$self->{_err} = ''; # for defined check in ->finish
+	if ($?) {
+		my $status = $? >> 8;
+		my $sig = $? & 127;
+		$self->{_err} .= "exit status=$status";
+		$self->{_err} .= " signal=$sig" if $sig;
+	}
+	finalize($self) if !$self->{rpipe};
+}
 
 sub finish ($;$) {
 	my ($self, $err) = @_;
-	my $tied_pp = delete($self->{rpipe}) or return finalize($self, $err);
-	my PublicInbox::ProcessPipe $pp = tied *$tied_pp;
-	@$pp{qw(cb arg)} = (\&waitpid_err, $self); # for ->DESTROY
+	$self->{_err} //= $err; # only for $@
+
+	# we can safely finalize if pipe was closed before, or if
+	# {_err} is defined by waitpid_err.  Deleting {rpipe} will
+	# trigger PublicInbox::ProcessPipe::DESTROY -> waitpid_err,
+	# but it may not fire right away if inside the event loop.
+	my $closed_before = !delete($self->{rpipe});
+	finalize($self) if $closed_before || defined($self->{_err});
 }
 
 sub start ($$$) {
@@ -247,10 +250,9 @@ sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
 	if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error
 		if ($async) { # calls rpipe->close && ->event_step
 			$async->close; # PublicInbox::HTTPD::Async::close
-		} else { # generic PSGI:
+		} else { # generic PSGI, use PublicInbox::ProcessPipe::CLOSE
 			delete($self->{rpipe})->close;
 			event_step($self);
-			waitpid_err($self);
 		}
 		if (ref($r) eq 'ARRAY') { # error
 			$wcb->($r)
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 7f61d8db..826ee508 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -365,9 +365,9 @@ sub popen_rd {
 	$opt->{1} = fileno($w);
 	my $pid = spawn($cmd, $env, $opt);
 	return ($r, $pid) if wantarray;
-	my $ret = gensym;
-	tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)};
-	$ret;
+	my $s = gensym;
+	tie *$s, 'PublicInbox::ProcessPipe', $pid, $r, @{$opt->{cb_arg} // []};
+	$s;
 }
 
 sub run_die ($;$$) {
diff --git a/t/spawn.t b/t/spawn.t
index 5fc99a2a..c22cfcfc 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -140,13 +140,13 @@ EOF
 
 { # ->CLOSE vs ->DESTROY waitpid caller distinction
 	my @c;
-	my $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+	my $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
 	ok(close($fh), '->CLOSE fired and successful');
 	ok(scalar(@c), 'callback fired by ->CLOSE');
 	ok(grep(!m[/PublicInbox/DS\.pm\z], @c), 'callback not invoked by DS');
 
 	@c = ();
-	$fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+	$fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
 	undef $fh; # ->DESTROY
 	ok(scalar(@c), 'callback fired by ->DESTROY');
 	ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c),
@@ -156,8 +156,9 @@ EOF
 { # children don't wait on siblings
 	use POSIX qw(_exit);
 	pipe(my ($r, $w)) or BAIL_OUT $!;
-	my $cb = sub { warn "x=$$\n" };
-	my $fh = popen_rd(['cat'], undef, { 0 => $r, cb => $cb });
+	my @arg;
+	my $cb = [ sub { @arg = @_; warn "x=$$\n" }, 'hi' ];
+	my $fh = popen_rd(['cat'], undef, { 0 => $r, cb_arg => $cb });
 	my $pp = tied *$fh;
 	my $pid = fork // BAIL_OUT $!;
 	local $SIG{__WARN__} = sub { _exit(1) };
@@ -173,6 +174,9 @@ EOF
 	close $w;
 	close $fh;
 	is($?, 0, 'cat exited');
+	is(scalar(@arg), 2, 'callback got args');
+	is($arg[1], 'hi', 'passed arg');
+	like($arg[0], qr/\A\d+\z/, 'PID');
 	is_deeply(\@w, [ "x=$$\n" ], 'callback fired from owner');
 }
 

  parent reply	other threads:[~2023-01-17  7:19 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-01-17  7:18 [PATCH 00/12] improve process reaping Eric Wong
2023-01-17  7:19 ` [PATCH 01/12] ipc: remove {-reap_async} field Eric Wong
2023-01-17  7:19 ` [PATCH 02/12] t/solver_git.t: fix test message Eric Wong
2023-01-17  7:19 ` [PATCH 03/12] qspawn: drop {psgi_env} deref Eric Wong
2023-01-17  7:19 ` Eric Wong [this message]
2023-01-18  2:10   ` [PATCH 13/12] qspawn: use ->DESTROY to force ->finalize Eric Wong
2023-01-17  7:19 ` [PATCH 05/12] git|gcf2: switch to awaitpid Eric Wong
2023-01-17  7:19 ` [PATCH 06/12] watch: " Eric Wong
2023-01-17  7:19 ` [PATCH 07/12] watch: simplify internal data structures Eric Wong
2023-01-17  7:19 ` [PATCH 08/12] eofpipe: drop {arg} support for now Eric Wong
2023-01-17  7:19 ` [PATCH 09/12] watch: IMAP and NNTP polling can use the same interval Eric Wong
2023-01-17  7:19 ` [PATCH 10/12] ipc: drop unused $args from ->ipc_worker_stop Eric Wong
2023-01-17  7:19 ` [PATCH 11/12] ipc+lei: switch to awaitpid Eric Wong
2023-01-17  7:19 ` [PATCH 12/12] ds: drop dwaitpid, switch to waitpid(-1) Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230117071911.1577890-5-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).