From 51b2fa7fb2b7b1caed8b1dde4613992a192225ed Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 26 Nov 2023 02:11:04 +0000 Subject: git: improve coupling with {sock} and {inflight} fields While the {inflight} array should be tied to the IO object even more tightly, that's not an easy task with our current code. So take some small steps by introducing a gcf_inflight helper to validate the ownership of the process and to drain the inflight array via the awaitpid callback. This hopefully fix problems with t/lei-q-save.t (still) hanging occasionally on v2 outputs since git->cleanup/->DESTROY was getting called in v2 shard workers. --- lib/PublicInbox/Git.pm | 79 +++++++++++++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 33 deletions(-) (limited to 'lib/PublicInbox/Git.pm') diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 93736cf0..fe834210 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -140,6 +140,18 @@ sub last_check_err { $buf; } +sub gcf_drain { # awaitpid cb + my ($pid, $inflight, $bc) = @_; + while (@$inflight) { + my ($req, $cb, $arg) = splice(@$inflight, 0, 3); + $req = $$req if ref($req); + $bc and $req =~ s/\A(?:contents|info) //; + $req =~ s/ .*//; # drop git_dir for Gcf2Client + eval { $cb->(undef, $req, undef, undef, $arg) }; + warn "E: (in abort) $req: $@" if $@; + } +} + sub _sock_cmd { my ($self, $batch, $err_c) = @_; $self->{sock} and Carp::confess('BUG: {sock} exists'); @@ -162,8 +174,11 @@ sub _sock_cmd { $self->{err_c} = $opt->{2} = tmpfile($id, undef, 1) or $self->fail("tmpfile($id): $!"); } + my $inflight = []; # TODO consider moving this into the IO object my $pid = spawn(\@cmd, undef, $opt); - $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid); + $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid, + \&gcf_drain, $inflight, $self->{-bc}); + $self->{inflight} = $inflight; } sub cat_async_retry ($$) { @@ -171,8 +186,8 @@ sub cat_async_retry ($$) { # {inflight} may be non-existent, but if it isn't we delete it # here to prevent cleanup() from waiting: - delete $self->{inflight}; - cleanup($self); + my ($sock, $epwatch) = delete @$self{qw(sock epwatch inflight)}; + $self->SUPER::close if $epwatch; my $new_inflight = batch_prepare($self); while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) { @@ -180,13 +195,25 @@ sub cat_async_retry ($$) { $oid = \$oid if !@$new_inflight; # to indicate oid retried push @$new_inflight, $oid, $cb, $arg; } + $sock->close if $sock; # only safe once old_inflight is empty cat_async_step($self, $new_inflight); # take one step } +sub gcf_inflight ($) { + my ($self) = @_; + if ($self->{sock}) { + return $self->{inflight} if $self->{sock}->owner_pid == $$; + delete @$self{qw(sock inflight)}; + } else { + $self->close; + } + undef; +} + # returns true if prefetch is successful sub async_prefetch { my ($self, $oid, $cb, $arg) = @_; - my $inflight = $self->{inflight} or return; + my $inflight = gcf_inflight($self) or return; return if @$inflight; substr($oid, 0, 0) = 'contents ' if $self->{-bc}; write_all($self, "$oid\n", \&cat_async_step, $inflight); @@ -195,7 +222,7 @@ sub async_prefetch { sub cat_async_step ($$) { my ($self, $inflight) = @_; - die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; + croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; my ($req, $cb, $arg) = @$inflight[0, 1, 2]; my ($bref, $oid, $type, $size); my $head = $self->{sock}->my_readline; @@ -237,11 +264,8 @@ sub cat_async_step ($$) { sub cat_async_wait ($) { my ($self) = @_; - return $self->close if !$self->{sock}; - my $inflight = $self->{inflight} or return; - while (scalar(@$inflight)) { - cat_async_step($self, $inflight); - } + my $inflight = gcf_inflight($self) or return; + cat_async_step($self, $inflight) while (scalar(@$inflight)); } sub batch_prepare ($) { @@ -253,7 +277,6 @@ sub batch_prepare ($) { } else { _sock_cmd($self, 'batch'); } - $self->{inflight} = []; } sub _cat_file_cb { @@ -271,7 +294,7 @@ sub cat_file { sub check_async_step ($$) { my ($ck, $inflight) = @_; - die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; + croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; my ($req, $cb, $arg) = @$inflight[0, 1, 2]; chomp(my $line = $ck->{sock}->my_readline); my ($hex, $type, $size) = split(/ /, $line); @@ -291,8 +314,7 @@ sub check_async_wait ($) { my ($self) = @_; return cat_async_wait($self) if $self->{-bc}; my $ck = $self->{ck} or return; - return $ck->close if !$ck->{sock}; - my $inflight = $ck->{inflight} or return; + my $inflight = gcf_inflight($ck) or return; check_async_step($ck, $inflight) while (scalar(@$inflight)); } @@ -312,7 +334,6 @@ sub check_async_begin ($) { } else { _sock_cmd($self = ck($self), 'batch-check', 1); } - $self->{inflight} = []; } sub write_all { @@ -337,12 +358,13 @@ sub check_async ($$$$) { my $inflight; if ($self->{-bc}) { # likely as time goes on batch_command: - $inflight = $self->{inflight} // cat_async_begin($self); + $inflight = gcf_inflight($self) // cat_async_begin($self); substr($oid, 0, 0) = 'info '; write_all($self, "$oid\n", \&cat_async_step, $inflight); } else { # accounts for git upgrades while we're running: my $ck = $self->{ck}; # undef OK, maybe set in check_async_begin - $inflight = $ck->{inflight} // check_async_begin($self); + $inflight = ($ck ? gcf_inflight($ck) : undef) + // check_async_begin($self); goto batch_command if $self->{-bc}; write_all($self->{ck}, "$oid\n", \&check_async_step, $inflight); } @@ -417,8 +439,8 @@ sub date_parse { } sub _active ($) { - scalar(@{$_[0]->{inflight} // []}) || - ($_[0]->{ck} && scalar(@{$_[0]->{ck}->{inflight} // []})) + scalar(@{gcf_inflight($_[0]) // []}) || + ($_[0]->{ck} && scalar(@{gcf_inflight($_[0]->{ck}) // []})) } # check_async and cat_async may trigger the other, so ensure they're @@ -493,13 +515,13 @@ sub pub_urls { sub cat_async_begin { my ($self) = @_; cleanup($self) if $self->alternates_changed; - die 'BUG: already in async' if $self->{inflight}; + die 'BUG: already in async' if gcf_inflight($self); batch_prepare($self); } sub cat_async ($$$;$) { my ($self, $oid, $cb, $arg) = @_; - my $inflight = $self->{inflight} // cat_async_begin($self); + my $inflight = gcf_inflight($self) // cat_async_begin($self); substr($oid, 0, 0) = 'contents ' if $self->{-bc}; write_all($self, $oid."\n", \&cat_async_step, $inflight); push(@$inflight, $oid, $cb, $arg); @@ -596,8 +618,7 @@ sub cleanup_if_unlinked { sub event_step { my ($self) = @_; - $self->close if !$self->{sock}; # process died while requeued - my $inflight = $self->{inflight}; + my $inflight = gcf_inflight($self); if ($inflight && @$inflight) { $self->cat_async_step($inflight); return $self->close unless $self->{sock}; @@ -619,18 +640,10 @@ sub watch_async ($) { sub close { my ($self) = @_; - if (my $q = $self->{inflight}) { # abort inflight requests - while (@$q) { - my ($req, $cb, $arg) = splice(@$q, 0, 3); - $req = $$req if ref($req); - $self->{-bc} and $req =~ s/\A(?:contents|info) //; - $req =~ s/ .*//; # drop git_dir for Gcf2Client - eval { $cb->(undef, $req, undef, undef, $arg) }; - warn "E: (in abort) $req: $@" if $@; - } - } + my $sock = $self->{sock}; delete @$self{qw(-bc err_c inflight)}; delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock}); + $sock->close if $sock; # calls gcf_drain via awaitpid } package PublicInbox::GitCheck; # only for git <2.36 -- cgit v1.2.3-24-ge0c7