about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-11-26 02:11:04 +0000
committerEric Wong <e@80x24.org>2023-11-26 19:34:57 +0000
commit51b2fa7fb2b7b1caed8b1dde4613992a192225ed (patch)
tree2ca5f2161edc525a895223837e4455d4d9ed7646 /lib/PublicInbox
parente7f0919b6ec2e959444efb12af44658aa1ea9fb4 (diff)
downloadpublic-inbox-51b2fa7fb2b7b1caed8b1dde4613992a192225ed.tar.gz
While the {inflight} array should be tied to the IO object even
more tightly, that's not an easy task with our current code.  So
take some small steps by introducing a gcf_inflight helper to
validate the ownership of the process and to drain the inflight
array via the awaitpid callback.

This hopefully fix problems with t/lei-q-save.t (still) hanging
occasionally on v2 outputs since git->cleanup/->DESTROY was getting
called in v2 shard workers.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/Gcf2Client.pm6
-rw-r--r--lib/PublicInbox/Git.pm79
-rw-r--r--lib/PublicInbox/GitAsyncCat.pm2
-rw-r--r--lib/PublicInbox/IO.pm17
4 files changed, 65 insertions, 39 deletions
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 19d77e32..07ff7dcb 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -31,15 +31,16 @@ sub new  {
         $opt->{0} = $opt->{1} = $s2;
         my $cmd = [$^X, $^W ? ('-w') : (),
                         qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
-        PublicInbox::IO::attach_pid($s1, spawn($cmd, $env, $opt));
         $self->{inflight} = [];
+        PublicInbox::IO::attach_pid($s1, spawn($cmd, $env, $opt),
+                        \&PublicInbox::Git::gcf_drain, $self->{inflight});
         $self->{epwatch} = \undef; # for Git->cleanup
         $self->SUPER::new($s1, EPOLLIN);
 }
 
 sub gcf2_async ($$$;$) {
         my ($self, $req, $cb, $arg) = @_;
-        my $inflight = $self->{inflight} or return $self->close;
+        my $inflight = $self->gcf_inflight or return;
         PublicInbox::Git::write_all($self, $req, \&cat_async_step, $inflight);
         push @$inflight, \$req, $cb, $arg; # ref prevents Git.pm retries
 }
@@ -49,6 +50,7 @@ sub alternates_changed {}
 
 no warnings 'once';
 
+*gcf_inflight = \&PublicInbox::Git::gcf_inflight; # for event_step
 *cat_async_step = \&PublicInbox::Git::cat_async_step; # for event_step
 *event_step = \&PublicInbox::Git::event_step;
 *fail = \&PublicInbox::Git::fail;
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 93736cf0..fe834210 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -140,6 +140,18 @@ sub last_check_err {
         $buf;
 }
 
+sub gcf_drain { # awaitpid cb
+        my ($pid, $inflight, $bc) = @_;
+        while (@$inflight) {
+                my ($req, $cb, $arg) = splice(@$inflight, 0, 3);
+                $req = $$req if ref($req);
+                $bc and $req =~ s/\A(?:contents|info) //;
+                $req =~ s/ .*//; # drop git_dir for Gcf2Client
+                eval { $cb->(undef, $req, undef, undef, $arg) };
+                warn "E: (in abort) $req: $@" if $@;
+        }
+}
+
 sub _sock_cmd {
         my ($self, $batch, $err_c) = @_;
         $self->{sock} and Carp::confess('BUG: {sock} exists');
@@ -162,8 +174,11 @@ sub _sock_cmd {
                 $self->{err_c} = $opt->{2} = tmpfile($id, undef, 1) or
                                                 $self->fail("tmpfile($id): $!");
         }
+        my $inflight = []; # TODO consider moving this into the IO object
         my $pid = spawn(\@cmd, undef, $opt);
-        $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid);
+        $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid,
+                                \&gcf_drain, $inflight, $self->{-bc});
+        $self->{inflight} = $inflight;
 }
 
 sub cat_async_retry ($$) {
@@ -171,8 +186,8 @@ sub cat_async_retry ($$) {
 
         # {inflight} may be non-existent, but if it isn't we delete it
         # here to prevent cleanup() from waiting:
-        delete $self->{inflight};
-        cleanup($self);
+        my ($sock, $epwatch) = delete @$self{qw(sock epwatch inflight)};
+        $self->SUPER::close if $epwatch;
         my $new_inflight = batch_prepare($self);
 
         while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) {
@@ -180,13 +195,25 @@ sub cat_async_retry ($$) {
                 $oid = \$oid if !@$new_inflight; # to indicate oid retried
                 push @$new_inflight, $oid, $cb, $arg;
         }
+        $sock->close if $sock; # only safe once old_inflight is empty
         cat_async_step($self, $new_inflight); # take one step
 }
 
+sub gcf_inflight ($) {
+        my ($self) = @_;
+        if ($self->{sock}) {
+                return $self->{inflight} if $self->{sock}->owner_pid == $$;
+                delete @$self{qw(sock inflight)};
+        } else {
+                $self->close;
+        }
+        undef;
+}
+
 # returns true if prefetch is successful
 sub async_prefetch {
         my ($self, $oid, $cb, $arg) = @_;
-        my $inflight = $self->{inflight} or return;
+        my $inflight = gcf_inflight($self) or return;
         return if @$inflight;
         substr($oid, 0, 0) = 'contents ' if $self->{-bc};
         write_all($self, "$oid\n", \&cat_async_step, $inflight);
@@ -195,7 +222,7 @@ sub async_prefetch {
 
 sub cat_async_step ($$) {
         my ($self, $inflight) = @_;
-        die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+        croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
         my ($req, $cb, $arg) = @$inflight[0, 1, 2];
         my ($bref, $oid, $type, $size);
         my $head = $self->{sock}->my_readline;
@@ -237,11 +264,8 @@ sub cat_async_step ($$) {
 
 sub cat_async_wait ($) {
         my ($self) = @_;
-        return $self->close if !$self->{sock};
-        my $inflight = $self->{inflight} or return;
-        while (scalar(@$inflight)) {
-                cat_async_step($self, $inflight);
-        }
+        my $inflight = gcf_inflight($self) or return;
+        cat_async_step($self, $inflight) while (scalar(@$inflight));
 }
 
 sub batch_prepare ($) {
@@ -253,7 +277,6 @@ sub batch_prepare ($) {
         } else {
                 _sock_cmd($self, 'batch');
         }
-        $self->{inflight} = [];
 }
 
 sub _cat_file_cb {
@@ -271,7 +294,7 @@ sub cat_file {
 
 sub check_async_step ($$) {
         my ($ck, $inflight) = @_;
-        die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+        croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
         my ($req, $cb, $arg) = @$inflight[0, 1, 2];
         chomp(my $line = $ck->{sock}->my_readline);
         my ($hex, $type, $size) = split(/ /, $line);
@@ -291,8 +314,7 @@ sub check_async_wait ($) {
         my ($self) = @_;
         return cat_async_wait($self) if $self->{-bc};
         my $ck = $self->{ck} or return;
-        return $ck->close if !$ck->{sock};
-        my $inflight = $ck->{inflight} or return;
+        my $inflight = gcf_inflight($ck) or return;
         check_async_step($ck, $inflight) while (scalar(@$inflight));
 }
 
@@ -312,7 +334,6 @@ sub check_async_begin ($) {
         } else {
                 _sock_cmd($self = ck($self), 'batch-check', 1);
         }
-        $self->{inflight} = [];
 }
 
 sub write_all {
@@ -337,12 +358,13 @@ sub check_async ($$$$) {
         my $inflight;
         if ($self->{-bc}) { # likely as time goes on
 batch_command:
-                $inflight = $self->{inflight} // cat_async_begin($self);
+                $inflight = gcf_inflight($self) // cat_async_begin($self);
                 substr($oid, 0, 0) = 'info ';
                 write_all($self, "$oid\n", \&cat_async_step, $inflight);
         } else { # accounts for git upgrades while we're running:
                 my $ck = $self->{ck}; # undef OK, maybe set in check_async_begin
-                $inflight = $ck->{inflight} // check_async_begin($self);
+                $inflight = ($ck ? gcf_inflight($ck) : undef)
+                                 // check_async_begin($self);
                 goto batch_command if $self->{-bc};
                 write_all($self->{ck}, "$oid\n", \&check_async_step, $inflight);
         }
@@ -417,8 +439,8 @@ sub date_parse {
 }
 
 sub _active ($) {
-        scalar(@{$_[0]->{inflight} // []}) ||
-                ($_[0]->{ck} && scalar(@{$_[0]->{ck}->{inflight} // []}))
+        scalar(@{gcf_inflight($_[0]) // []}) ||
+                ($_[0]->{ck} && scalar(@{gcf_inflight($_[0]->{ck}) // []}))
 }
 
 # check_async and cat_async may trigger the other, so ensure they're
@@ -493,13 +515,13 @@ sub pub_urls {
 sub cat_async_begin {
         my ($self) = @_;
         cleanup($self) if $self->alternates_changed;
-        die 'BUG: already in async' if $self->{inflight};
+        die 'BUG: already in async' if gcf_inflight($self);
         batch_prepare($self);
 }
 
 sub cat_async ($$$;$) {
         my ($self, $oid, $cb, $arg) = @_;
-        my $inflight = $self->{inflight} // cat_async_begin($self);
+        my $inflight = gcf_inflight($self) // cat_async_begin($self);
         substr($oid, 0, 0) = 'contents ' if $self->{-bc};
         write_all($self, $oid."\n", \&cat_async_step, $inflight);
         push(@$inflight, $oid, $cb, $arg);
@@ -596,8 +618,7 @@ sub cleanup_if_unlinked {
 
 sub event_step {
         my ($self) = @_;
-        $self->close if !$self->{sock}; # process died while requeued
-        my $inflight = $self->{inflight};
+        my $inflight = gcf_inflight($self);
         if ($inflight && @$inflight) {
                 $self->cat_async_step($inflight);
                 return $self->close unless $self->{sock};
@@ -619,18 +640,10 @@ sub watch_async ($) {
 
 sub close {
         my ($self) = @_;
-        if (my $q = $self->{inflight}) { # abort inflight requests
-                while (@$q) {
-                        my ($req, $cb, $arg) = splice(@$q, 0, 3);
-                        $req = $$req if ref($req);
-                        $self->{-bc} and $req =~ s/\A(?:contents|info) //;
-                        $req =~ s/ .*//; # drop git_dir for Gcf2Client
-                        eval { $cb->(undef, $req, undef, undef, $arg) };
-                        warn "E: (in abort) $req: $@" if $@;
-                }
-        }
+        my $sock = $self->{sock};
         delete @$self{qw(-bc err_c inflight)};
         delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
+        $sock->close if $sock; # calls gcf_drain via awaitpid
 }
 
 package PublicInbox::GitCheck; # only for git <2.36
diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm
index f8b2a9fc..09744b34 100644
--- a/lib/PublicInbox/GitAsyncCat.pm
+++ b/lib/PublicInbox/GitAsyncCat.pm
@@ -40,7 +40,7 @@ sub ibx_async_prefetch {
         my ($ibx, $oid, $cb, $arg) = @_;
         my $git = $ibx->git;
         if (!defined($ibx->{topdir}) && $GCF2C) {
-                if (!@{$GCF2C->{inflight} // []}) {
+                if (!@{$GCF2C->gcf_inflight // []}) {
                         $oid .= " $git->{git_dir}\n";
                         return $GCF2C->gcf2_async($oid, $cb, $arg); # true
                 }
diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm
index fcebac59..6593dcdf 100644
--- a/lib/PublicInbox/IO.pm
+++ b/lib/PublicInbox/IO.pm
@@ -15,8 +15,10 @@ use Errno qw(EINTR EAGAIN);
 
 sub waitcb { # awaitpid callback
         my ($pid, $errref, $cb, @args) = @_;
+        $errref //= \my $workaround_await_pids_clobbered;
         $$errref = $?; # sets .cerr for _close
-        $cb->($pid, @args) if $cb;
+        $cb->($pid, @args) if $cb; # may clobber $?
+        $? = $$errref;
 }
 
 sub attach_pid {
@@ -33,6 +35,11 @@ sub attached_pid {
         ${${*$io}{pi_io_reap} // []}[1];
 }
 
+sub owner_pid {
+        my ($io) = @_;
+        ${${*$io}{pi_io_reap} // [-1]}[0];
+}
+
 # caller cares about error result if they call close explicitly
 # reap->[2] may be set before this is called via waitcb
 sub close {
@@ -40,8 +47,12 @@ sub close {
         my $ret = $io->SUPER::close;
         my $reap = delete ${*$io}{pi_io_reap};
         return $ret unless $reap && $reap->[0] == $$;
-        ${$reap->[2]} // (my $w = awaitpid($reap->[1])); # sets [2]
-        ($? = ${$reap->[2]}) ? '' : $ret;
+        if (defined ${$reap->[2]}) { # reap_pids already reaped asynchronously
+                $? = ${$reap->[2]};
+        } else { # wait synchronously
+                my $w = awaitpid($reap->[1]);
+        }
+        $? ? '' : $ret; # use $?, AWAIT_PIDS may be cleared on ->Reset (FIXME?)
 }
 
 sub DESTROY {