user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 6/7] git: improve coupling with {sock} and {inflight} fields
  2023-11-26  2:10  7% [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
@ 2023-11-26  2:11  5% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2023-11-26  2:11 UTC (permalink / raw)
  To: meta

While the {inflight} array should be tied to the IO object even
more tightly, that's not an easy task with our current code.  So
take some small steps by introducing a gcf_inflight helper to
validate the ownership of the process and to drain the inflight
array via the awaitpid callback.

This hopefully fix problems with t/lei-q-save.t (still) hanging
occasionally on v2 outputs since git->cleanup/->DESTROY was getting
called in v2 shard workers.
---
 lib/PublicInbox/Gcf2Client.pm  |  6 ++-
 lib/PublicInbox/Git.pm         | 79 ++++++++++++++++++++--------------
 lib/PublicInbox/GitAsyncCat.pm |  2 +-
 lib/PublicInbox/IO.pm          | 17 ++++++--
 4 files changed, 65 insertions(+), 39 deletions(-)

diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 19d77e32..07ff7dcb 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -31,15 +31,16 @@ sub new  {
 	$opt->{0} = $opt->{1} = $s2;
 	my $cmd = [$^X, $^W ? ('-w') : (),
 			qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
-	PublicInbox::IO::attach_pid($s1, spawn($cmd, $env, $opt));
 	$self->{inflight} = [];
+	PublicInbox::IO::attach_pid($s1, spawn($cmd, $env, $opt),
+			\&PublicInbox::Git::gcf_drain, $self->{inflight});
 	$self->{epwatch} = \undef; # for Git->cleanup
 	$self->SUPER::new($s1, EPOLLIN);
 }
 
 sub gcf2_async ($$$;$) {
 	my ($self, $req, $cb, $arg) = @_;
-	my $inflight = $self->{inflight} or return $self->close;
+	my $inflight = $self->gcf_inflight or return;
 	PublicInbox::Git::write_all($self, $req, \&cat_async_step, $inflight);
 	push @$inflight, \$req, $cb, $arg; # ref prevents Git.pm retries
 }
@@ -49,6 +50,7 @@ sub alternates_changed {}
 
 no warnings 'once';
 
+*gcf_inflight = \&PublicInbox::Git::gcf_inflight; # for event_step
 *cat_async_step = \&PublicInbox::Git::cat_async_step; # for event_step
 *event_step = \&PublicInbox::Git::event_step;
 *fail = \&PublicInbox::Git::fail;
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 93736cf0..fe834210 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -140,6 +140,18 @@ sub last_check_err {
 	$buf;
 }
 
+sub gcf_drain { # awaitpid cb
+	my ($pid, $inflight, $bc) = @_;
+	while (@$inflight) {
+		my ($req, $cb, $arg) = splice(@$inflight, 0, 3);
+		$req = $$req if ref($req);
+		$bc and $req =~ s/\A(?:contents|info) //;
+		$req =~ s/ .*//; # drop git_dir for Gcf2Client
+		eval { $cb->(undef, $req, undef, undef, $arg) };
+		warn "E: (in abort) $req: $@" if $@;
+	}
+}
+
 sub _sock_cmd {
 	my ($self, $batch, $err_c) = @_;
 	$self->{sock} and Carp::confess('BUG: {sock} exists');
@@ -162,8 +174,11 @@ sub _sock_cmd {
 		$self->{err_c} = $opt->{2} = tmpfile($id, undef, 1) or
 						$self->fail("tmpfile($id): $!");
 	}
+	my $inflight = []; # TODO consider moving this into the IO object
 	my $pid = spawn(\@cmd, undef, $opt);
-	$self->{sock} = PublicInbox::IO::attach_pid($s1, $pid);
+	$self->{sock} = PublicInbox::IO::attach_pid($s1, $pid,
+				\&gcf_drain, $inflight, $self->{-bc});
+	$self->{inflight} = $inflight;
 }
 
 sub cat_async_retry ($$) {
@@ -171,8 +186,8 @@ sub cat_async_retry ($$) {
 
 	# {inflight} may be non-existent, but if it isn't we delete it
 	# here to prevent cleanup() from waiting:
-	delete $self->{inflight};
-	cleanup($self);
+	my ($sock, $epwatch) = delete @$self{qw(sock epwatch inflight)};
+	$self->SUPER::close if $epwatch;
 	my $new_inflight = batch_prepare($self);
 
 	while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) {
@@ -180,13 +195,25 @@ sub cat_async_retry ($$) {
 		$oid = \$oid if !@$new_inflight; # to indicate oid retried
 		push @$new_inflight, $oid, $cb, $arg;
 	}
+	$sock->close if $sock; # only safe once old_inflight is empty
 	cat_async_step($self, $new_inflight); # take one step
 }
 
+sub gcf_inflight ($) {
+	my ($self) = @_;
+	if ($self->{sock}) {
+		return $self->{inflight} if $self->{sock}->owner_pid == $$;
+		delete @$self{qw(sock inflight)};
+	} else {
+		$self->close;
+	}
+	undef;
+}
+
 # returns true if prefetch is successful
 sub async_prefetch {
 	my ($self, $oid, $cb, $arg) = @_;
-	my $inflight = $self->{inflight} or return;
+	my $inflight = gcf_inflight($self) or return;
 	return if @$inflight;
 	substr($oid, 0, 0) = 'contents ' if $self->{-bc};
 	write_all($self, "$oid\n", \&cat_async_step, $inflight);
@@ -195,7 +222,7 @@ sub async_prefetch {
 
 sub cat_async_step ($$) {
 	my ($self, $inflight) = @_;
-	die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+	croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
 	my ($req, $cb, $arg) = @$inflight[0, 1, 2];
 	my ($bref, $oid, $type, $size);
 	my $head = $self->{sock}->my_readline;
@@ -237,11 +264,8 @@ sub cat_async_step ($$) {
 
 sub cat_async_wait ($) {
 	my ($self) = @_;
-	return $self->close if !$self->{sock};
-	my $inflight = $self->{inflight} or return;
-	while (scalar(@$inflight)) {
-		cat_async_step($self, $inflight);
-	}
+	my $inflight = gcf_inflight($self) or return;
+	cat_async_step($self, $inflight) while (scalar(@$inflight));
 }
 
 sub batch_prepare ($) {
@@ -253,7 +277,6 @@ sub batch_prepare ($) {
 	} else {
 		_sock_cmd($self, 'batch');
 	}
-	$self->{inflight} = [];
 }
 
 sub _cat_file_cb {
@@ -271,7 +294,7 @@ sub cat_file {
 
 sub check_async_step ($$) {
 	my ($ck, $inflight) = @_;
-	die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+	croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
 	my ($req, $cb, $arg) = @$inflight[0, 1, 2];
 	chomp(my $line = $ck->{sock}->my_readline);
 	my ($hex, $type, $size) = split(/ /, $line);
@@ -291,8 +314,7 @@ sub check_async_wait ($) {
 	my ($self) = @_;
 	return cat_async_wait($self) if $self->{-bc};
 	my $ck = $self->{ck} or return;
-	return $ck->close if !$ck->{sock};
-	my $inflight = $ck->{inflight} or return;
+	my $inflight = gcf_inflight($ck) or return;
 	check_async_step($ck, $inflight) while (scalar(@$inflight));
 }
 
@@ -312,7 +334,6 @@ sub check_async_begin ($) {
 	} else {
 		_sock_cmd($self = ck($self), 'batch-check', 1);
 	}
-	$self->{inflight} = [];
 }
 
 sub write_all {
@@ -337,12 +358,13 @@ sub check_async ($$$$) {
 	my $inflight;
 	if ($self->{-bc}) { # likely as time goes on
 batch_command:
-		$inflight = $self->{inflight} // cat_async_begin($self);
+		$inflight = gcf_inflight($self) // cat_async_begin($self);
 		substr($oid, 0, 0) = 'info ';
 		write_all($self, "$oid\n", \&cat_async_step, $inflight);
 	} else { # accounts for git upgrades while we're running:
 		my $ck = $self->{ck}; # undef OK, maybe set in check_async_begin
-		$inflight = $ck->{inflight} // check_async_begin($self);
+		$inflight = ($ck ? gcf_inflight($ck) : undef)
+				 // check_async_begin($self);
 		goto batch_command if $self->{-bc};
 		write_all($self->{ck}, "$oid\n", \&check_async_step, $inflight);
 	}
@@ -417,8 +439,8 @@ sub date_parse {
 }
 
 sub _active ($) {
-	scalar(@{$_[0]->{inflight} // []}) ||
-		($_[0]->{ck} && scalar(@{$_[0]->{ck}->{inflight} // []}))
+	scalar(@{gcf_inflight($_[0]) // []}) ||
+		($_[0]->{ck} && scalar(@{gcf_inflight($_[0]->{ck}) // []}))
 }
 
 # check_async and cat_async may trigger the other, so ensure they're
@@ -493,13 +515,13 @@ sub pub_urls {
 sub cat_async_begin {
 	my ($self) = @_;
 	cleanup($self) if $self->alternates_changed;
-	die 'BUG: already in async' if $self->{inflight};
+	die 'BUG: already in async' if gcf_inflight($self);
 	batch_prepare($self);
 }
 
 sub cat_async ($$$;$) {
 	my ($self, $oid, $cb, $arg) = @_;
-	my $inflight = $self->{inflight} // cat_async_begin($self);
+	my $inflight = gcf_inflight($self) // cat_async_begin($self);
 	substr($oid, 0, 0) = 'contents ' if $self->{-bc};
 	write_all($self, $oid."\n", \&cat_async_step, $inflight);
 	push(@$inflight, $oid, $cb, $arg);
@@ -596,8 +618,7 @@ sub cleanup_if_unlinked {
 
 sub event_step {
 	my ($self) = @_;
-	$self->close if !$self->{sock}; # process died while requeued
-	my $inflight = $self->{inflight};
+	my $inflight = gcf_inflight($self);
 	if ($inflight && @$inflight) {
 		$self->cat_async_step($inflight);
 		return $self->close unless $self->{sock};
@@ -619,18 +640,10 @@ sub watch_async ($) {
 
 sub close {
 	my ($self) = @_;
-	if (my $q = $self->{inflight}) { # abort inflight requests
-		while (@$q) {
-			my ($req, $cb, $arg) = splice(@$q, 0, 3);
-			$req = $$req if ref($req);
-			$self->{-bc} and $req =~ s/\A(?:contents|info) //;
-			$req =~ s/ .*//; # drop git_dir for Gcf2Client
-			eval { $cb->(undef, $req, undef, undef, $arg) };
-			warn "E: (in abort) $req: $@" if $@;
-		}
-	}
+	my $sock = $self->{sock};
 	delete @$self{qw(-bc err_c inflight)};
 	delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
+	$sock->close if $sock; # calls gcf_drain via awaitpid
 }
 
 package PublicInbox::GitCheck; # only for git <2.36
diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm
index f8b2a9fc..09744b34 100644
--- a/lib/PublicInbox/GitAsyncCat.pm
+++ b/lib/PublicInbox/GitAsyncCat.pm
@@ -40,7 +40,7 @@ sub ibx_async_prefetch {
 	my ($ibx, $oid, $cb, $arg) = @_;
 	my $git = $ibx->git;
 	if (!defined($ibx->{topdir}) && $GCF2C) {
-		if (!@{$GCF2C->{inflight} // []}) {
+		if (!@{$GCF2C->gcf_inflight // []}) {
 			$oid .= " $git->{git_dir}\n";
 			return $GCF2C->gcf2_async($oid, $cb, $arg); # true
 		}
diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm
index fcebac59..6593dcdf 100644
--- a/lib/PublicInbox/IO.pm
+++ b/lib/PublicInbox/IO.pm
@@ -15,8 +15,10 @@ use Errno qw(EINTR EAGAIN);
 
 sub waitcb { # awaitpid callback
 	my ($pid, $errref, $cb, @args) = @_;
+	$errref //= \my $workaround_await_pids_clobbered;
 	$$errref = $?; # sets .cerr for _close
-	$cb->($pid, @args) if $cb;
+	$cb->($pid, @args) if $cb; # may clobber $?
+	$? = $$errref;
 }
 
 sub attach_pid {
@@ -33,6 +35,11 @@ sub attached_pid {
 	${${*$io}{pi_io_reap} // []}[1];
 }
 
+sub owner_pid {
+	my ($io) = @_;
+	${${*$io}{pi_io_reap} // [-1]}[0];
+}
+
 # caller cares about error result if they call close explicitly
 # reap->[2] may be set before this is called via waitcb
 sub close {
@@ -40,8 +47,12 @@ sub close {
 	my $ret = $io->SUPER::close;
 	my $reap = delete ${*$io}{pi_io_reap};
 	return $ret unless $reap && $reap->[0] == $$;
-	${$reap->[2]} // (my $w = awaitpid($reap->[1])); # sets [2]
-	($? = ${$reap->[2]}) ? '' : $ret;
+	if (defined ${$reap->[2]}) { # reap_pids already reaped asynchronously
+		$? = ${$reap->[2]};
+	} else { # wait synchronously
+		my $w = awaitpid($reap->[1]);
+	}
+	$? ? '' : $ret; # use $?, AWAIT_PIDS may be cleared on ->Reset (FIXME?)
 }
 
 sub DESTROY {

^ permalink raw reply related	[relevance 5%]

* [PATCH 0/7] more I/O + process reliability and cleanups
@ 2023-11-26  2:10  7% Eric Wong
  2023-11-26  2:11  5% ` [PATCH 6/7] git: improve coupling with {sock} and {inflight} fields Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2023-11-26  2:10 UTC (permalink / raw)
  To: meta

6/7 ought to fix another hang in t/lei-q-save.t when writing to
v2 outputs.

Much of this stuff will be relevant to code search since Xapian
searches will be moved to C++ (if available) to support features
which aren't usable from Perl bindings and allow more
predictable performance anyways.

Eric Wong (7):
  xap_helper_cxx: do not copy xap_helper.h source
  xap_client: attach PID to the IO object
  xap_client: pass arguments to top-level xap_helper
  xap_helper: allow PI_NO_CXX to disable C++ in more places
  git: move rbuf handling to PublicInbox::IO
  git: improve coupling with {sock} and {inflight} fields
  drop redundant calls to DS->Reset

 lib/PublicInbox/CodeSearchIdx.pm |  11 +--
 lib/PublicInbox/Daemon.pm        |   1 -
 lib/PublicInbox/Gcf2Client.pm    |   7 +-
 lib/PublicInbox/Git.pm           | 138 ++++++++++++-------------------
 lib/PublicInbox/GitAsyncCat.pm   |   2 +-
 lib/PublicInbox/IO.pm            |  70 ++++++++++++++--
 lib/PublicInbox/TestCommon.pm    |   2 +-
 lib/PublicInbox/Watch.pm         |   6 +-
 lib/PublicInbox/XapClient.pm     |   9 +-
 lib/PublicInbox/XapHelperCxx.pm  |  11 +--
 lib/PublicInbox/Xapcmd.pm        |   6 +-
 t/xap_helper.t                   |   5 +-
 12 files changed, 145 insertions(+), 123 deletions(-)

^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2023-11-26  2:10  7% [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
2023-11-26  2:11  5% ` [PATCH 6/7] git: improve coupling with {sock} and {inflight} fields Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).