about summary refs log tree commit homepage
path: root/lib/PublicInbox/Git.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-09-30 15:20:39 +0000
committerEric Wong <e@80x24.org>2023-10-01 07:05:23 +0000
commit9b068be88b252355c1c62f804d4f081e9a20570d (patch)
tree4cf3ab9eebd8361bd9a3857c4d307783997be63c /lib/PublicInbox/Git.pm
parentbf929e8ddfb9359a97cd8be3d3017c038564d52d (diff)
downloadpublic-inbox-9b068be88b252355c1c62f804d4f081e9a20570d.tar.gz
The benefit of 1MB potential pipe buffer size (on Linux) doesn't
seem noticeable when reading from git (unlike when writing to v2
shards), so Unix stream sockets seem fine, here.

This allows us to simplify our process management by using the
same socket FD for reads and writes and enables us to use our
ProcessPipe class for reaping (as we can do with Gcf2Client).

Gcf2Client no longer relies on PublicInbox::DS for write
buffering, and instead just waits for requests to complete
once the number of inflight requests hits the MAX_INFLIGHT
threshold as we do with PublicInbox::Git.

We reuse the existing MAX_INFLIGHT limit (18) that was
determined by the minimum allowed PIPE_BUF (512).  (AFAIK) Unix
stream sockets have no analogy to PIPE_BUF, but all *BSDs and
Linux I've checked have default SO_RCVBUF and SO_SNDBUF values
larger than the previously-required PIPE_BUF size of 512 bytes.
Diffstat (limited to 'lib/PublicInbox/Git.pm')
-rw-r--r--lib/PublicInbox/Git.pm254
1 files changed, 131 insertions, 123 deletions
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 8ac40d2b..3062f293 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -9,23 +9,23 @@
 package PublicInbox::Git;
 use strict;
 use v5.10.1;
-use parent qw(Exporter);
+use parent qw(Exporter PublicInbox::DS);
 use POSIX ();
-use IO::Handle; # ->autoflush
+use IO::Handle; # ->blocking
+use Socket qw(AF_UNIX SOCK_STREAM);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 use Errno qw(EINTR EAGAIN);
 use File::Glob qw(bsd_glob GLOB_NOSORT);
 use File::Spec ();
 use Time::HiRes qw(stat);
-use PublicInbox::Spawn qw(popen_rd which);
+use PublicInbox::Spawn qw(spawn popen_rd which);
 use PublicInbox::Tmpfile;
 use IO::Poll qw(POLLIN);
 use Carp qw(croak carp);
 use PublicInbox::SHA ();
-use PublicInbox::DS qw(awaitpid);
 our %HEXLEN2SHA = (40 => 1, 64 => 256);
 our %OFMT2HEXLEN = (sha1 => 40, sha256 => 64);
 our @EXPORT_OK = qw(git_unquote git_quote %HEXLEN2SHA %OFMT2HEXLEN);
-our $PIPE_BUFSIZ = 65536; # Linux default
 our $in_cleanup;
 our $RDTIMEO = 60_000; # milliseconds
 our $async_warn; # true in read-only daemons
@@ -34,11 +34,8 @@ our $async_warn; # true in read-only daemons
 my @MODIFIED_DATE = qw[for-each-ref --sort=-committerdate
                         --format=%(committerdate:raw) --count=1];
 
-# 512: POSIX PIPE_BUF minimum (see pipe(7))
-# 65: SHA-256 hex size + "\n" in preparation for git using non-SHA1
-# 3: @$inflight is flattened [ $OID, $cb, $arg ]
 use constant {
-        MAX_INFLIGHT => int(512 / (65 + length('contents '))) * 3,
+        MAX_INFLIGHT => 18, # arbitrary, formerly based on PIPE_BUF
         BATCH_CMD_VER => v2.36.0, # git 2.36+
 };
 
@@ -65,7 +62,7 @@ sub check_git_exe () {
         if ($st ne $EXE_ST) {
                 my $rd = popen_rd([ $GIT_EXE, '--version' ]);
                 my $v = readline($rd);
-                close($rd) or die "$GIT_EXE --version: $?";
+                CORE::close($rd) or die "$GIT_EXE --version: $?";
                 $v =~ /\b([0-9]+(?:\.[0-9]+){2})/ or die
                         "$GIT_EXE --version output: $v # unparseable";
                 $GIT_VER = eval("v$1") // die "BUG: bad vstring: $1 ($v)";
@@ -144,17 +141,16 @@ sub last_check_err {
         $buf;
 }
 
-sub _bidi_pipe {
-        my ($self, $batch, $in, $out, $pid, $err) = @_;
-        if (defined $self->{$pid}) {
-                Carp::cluck("BUG: self->{$pid} exists unexpectedly");
-                return;
-        }
-        pipe(my ($out_r, $out_w)) or $self->fail("pipe failed: $!");
-        my $rdr = { 0 => $out_r, pgid => 0 };
+sub _sock_cmd {
+        my ($self, $batch, $err_c) = @_;
+        $self->{sock} and Carp::confess('BUG: {sock} exists');
+        my ($s1, $s2);
+        socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair $!";
+        $s1->blocking(0);
+        my $opt = { pgid => 0, 0 => $s2, 1 => $s2 };
         my $gd = $self->{git_dir};
         if ($gd =~ s!/([^/]+/[^/]+)\z!/!) {
-                $rdr->{-C} = $gd;
+                $opt->{-C} = $gd;
                 $gd = $1;
         }
 
@@ -163,23 +159,13 @@ sub _bidi_pipe {
         my $abbr = $GIT_VER lt v2.31.0 ? 40 : 'no';
         my @cmd = ($GIT_EXE, "--git-dir=$gd", '-c', "core.abbrev=$abbr",
                         'cat-file', "--$batch");
-        if ($err) {
+        if ($err_c) {
                 my $id = "git.$self->{git_dir}.$batch.err";
-                $self->{$err} = $rdr->{2} = tmpfile($id, undef, 1) or
+                $self->{err_c} = $opt->{2} = tmpfile($id, undef, 1) or
                                                 $self->fail("tmpfile($id): $!");
         }
-        # see lib/PublicInbox/ProcessPipe.pm for why we don't use that here
-        my ($in_r, $p) = popen_rd(\@cmd, undef, $rdr);
-        awaitpid($self->{$pid} = $p, undef);
-        $self->{"$pid.owner"} = $$;
-        $out_w->autoflush(1);
-        if ($^O eq 'linux') { # 1031: F_SETPIPE_SZ
-                fcntl($out_w, 1031, 4096);
-                fcntl($in_r, 1031, 4096) if $batch eq 'batch-check';
-        }
-        $out_w->blocking(0);
-        $self->{$out} = $out_w;
-        $self->{$in} = $in_r;
+        my $pid = spawn(\@cmd, undef, $opt);
+        $self->{sock} = PublicInbox::ProcessPipe->maybe_new($pid, $s1);
 }
 
 sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) }
@@ -189,7 +175,7 @@ sub my_read ($$$) {
         my $left = $len - length($$rbuf);
         my $r;
         while ($left > 0) {
-                $r = sysread($fh, $$rbuf, $PIPE_BUFSIZ, length($$rbuf));
+                $r = sysread($fh, $$rbuf, $left, length($$rbuf));
                 if ($r) {
                         $left -= $r;
                 } elsif (defined($r)) { # EOF
@@ -210,8 +196,7 @@ sub my_readline ($$) {
                 if ((my $n = index($$rbuf, "\n")) >= 0) {
                         return substr($$rbuf, 0, $n + 1, '');
                 }
-                my $r = sysread($fh, $$rbuf, $PIPE_BUFSIZ, length($$rbuf))
-                                                                and next;
+                my $r = sysread($fh, $$rbuf, 65536, length($$rbuf)) and next;
 
                 # return whatever's left on EOF
                 return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r);
@@ -229,11 +214,10 @@ sub cat_async_retry ($$) {
         # here to prevent cleanup() from waiting:
         delete $self->{inflight};
         cleanup($self);
-        batch_prepare($self, my $new_inflight = []);
+        my $new_inflight = batch_prepare($self);
 
         while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) {
-                write_all($self, $self->{out}, $oid."\n",
-                                \&cat_async_step, $new_inflight);
+                write_all($self, $oid."\n", \&cat_async_step, $new_inflight);
                 $oid = \$oid if !@$new_inflight; # to indicate oid retried
                 push @$new_inflight, $oid, $cb, $arg;
         }
@@ -246,7 +230,7 @@ sub async_prefetch {
         my $inflight = $self->{inflight} or return;
         return if @$inflight;
         substr($oid, 0, 0) = 'contents ' if $self->{-bc};
-        write_all($self, $self->{out}, "$oid\n", \&cat_async_step, $inflight);
+        write_all($self, "$oid\n", \&cat_async_step, $inflight);
         push(@$inflight, $oid, $cb, $arg);
 }
 
@@ -256,14 +240,14 @@ sub cat_async_step ($$) {
         my ($req, $cb, $arg) = @$inflight[0, 1, 2];
         my $rbuf = delete($self->{rbuf}) // \(my $new = '');
         my ($bref, $oid, $type, $size);
-        my $head = my_readline($self->{in}, $rbuf);
+        my $head = my_readline($self->{sock}, $rbuf);
         my $cmd = ref($req) ? $$req : $req;
         # ->fail may be called via Gcf2Client.pm
         my $info = $self->{-bc} && substr($cmd, 0, 5) eq 'info ';
         if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) {
                 ($oid, $type, $size) = ($1, $2, $3 + 0);
                 unless ($info) { # --batch-command
-                        $bref = my_read($self->{in}, $rbuf, $size + 1) or
+                        $bref = my_read($self->{sock}, $rbuf, $size + 1) or
                                 $self->fail(defined($bref) ?
                                                 'read EOF' : "read: $!");
                         chop($$bref) eq "\n" or
@@ -302,16 +286,16 @@ sub cat_async_wait ($) {
         }
 }
 
-sub batch_prepare ($$) {
-        my ($self, $inflight) = @_;
+sub batch_prepare ($) {
+        my ($self) = @_;
         check_git_exe();
         if ($GIT_VER ge BATCH_CMD_VER) {
-                _bidi_pipe($self, qw(batch-command in out pid err_c));
                 $self->{-bc} = 1;
+                _sock_cmd($self, 'batch-command', 1);
         } else {
-                _bidi_pipe($self, qw(batch in out pid));
+                _sock_cmd($self, 'batch');
         }
-        $self->{inflight} = $inflight;
+        $self->{inflight} = [];
 }
 
 sub _cat_file_cb {
@@ -328,52 +312,59 @@ sub cat_file {
 }
 
 sub check_async_step ($$) {
-        my ($self, $inflight_c) = @_;
-        die 'BUG: inflight empty or odd' if scalar(@$inflight_c) < 3;
-        my ($req, $cb, $arg) = @$inflight_c[0, 1, 2];
-        my $rbuf = delete($self->{rbuf_c}) // \(my $new = '');
-        chomp(my $line = my_readline($self->{in_c}, $rbuf));
+        my ($ck, $inflight) = @_;
+        die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+        my ($req, $cb, $arg) = @$inflight[0, 1, 2];
+        my $rbuf = delete($ck->{rbuf}) // \(my $new = '');
+        chomp(my $line = my_readline($ck->{sock}, $rbuf));
         my ($hex, $type, $size) = split(/ /, $line);
 
         # git <2.21 would show `dangling' (2.21+ shows `ambiguous')
         # https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/
         if ($hex eq 'dangling') {
-                my $ret = my_read($self->{in_c}, $rbuf, $type + 1);
-                $self->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret;
+                my $ret = my_read($ck->{sock}, $rbuf, $type + 1);
+                $ck->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret;
         }
-        $self->{rbuf_c} = $rbuf if $$rbuf ne '';
-        splice(@$inflight_c, 0, 3); # don't retry $cb on ->fail
+        $ck->{rbuf} = $rbuf if $$rbuf ne '';
+        splice(@$inflight, 0, 3); # don't retry $cb on ->fail
         eval { $cb->(undef, $hex, $type, $size, $arg) };
-        async_err($self, $req, $hex, $@, 'check') if $@;
+        async_err($ck, $req, $hex, $@, 'check') if $@;
 }
 
 sub check_async_wait ($) {
         my ($self) = @_;
         return cat_async_wait($self) if $self->{-bc};
-        my $inflight_c = $self->{inflight_c} or return;
-        check_async_step($self, $inflight_c) while (scalar(@$inflight_c));
+        my $ck = $self->{ck} or return;
+        my $inflight = $ck->{inflight} or return;
+        check_async_step($ck, $inflight) while (scalar(@$inflight));
+}
+
+# git <2.36
+sub ck {
+        $_[0]->{ck} //= bless { git_dir => $_[0]->{git_dir} },
+                                'PublicInbox::GitCheck';
 }
 
 sub check_async_begin ($) {
         my ($self) = @_;
-        die 'BUG: already in async check' if $self->{inflight_c};
         cleanup($self) if alternates_changed($self);
         check_git_exe();
         if ($GIT_VER ge BATCH_CMD_VER) {
-                _bidi_pipe($self, qw(batch-command in out pid err_c));
                 $self->{-bc} = 1;
-                $self->{inflight} = [];
+                _sock_cmd($self, 'batch-command', 1);
         } else {
-                _bidi_pipe($self, qw(batch-check in_c out_c pid_c err_c));
-                $self->{inflight_c} = [];
+                _sock_cmd($self = ck($self), 'batch-check', 1);
         }
+        $self->{inflight} = [];
 }
 
 sub write_all {
-        my ($self, $out, $buf, $read_step, $inflight) = @_;
+        my ($self, $buf, $read_step, $inflight) = @_;
+        $self->{sock} // Carp::confess 'BUG: no {sock}';
+        Carp::confess('BUG: not an array') if ref($inflight) ne 'ARRAY';
         $read_step->($self, $inflight) while @$inflight >= MAX_INFLIGHT;
         do {
-                my $w = syswrite($out, $buf);
+                my $w = syswrite($self->{sock}, $buf);
                 if (defined $w) {
                         return if $w == length($buf);
                         substr($buf, 0, $w, ''); # sv_chop
@@ -386,16 +377,17 @@ sub write_all {
 
 sub check_async ($$$$) {
         my ($self, $oid, $cb, $arg) = @_;
-        my $inflight = $self->{-bc} ?
-                        ($self->{inflight} // cat_async_begin($self)) :
-                        ($self->{inflight_c} // check_async_begin($self));
-        if ($self->{-bc}) {
+        my $inflight;
+        if ($self->{-bc}) { # likely as time goes on
+batch_command:
+                $inflight = $self->{inflight} // cat_async_begin($self);
                 substr($oid, 0, 0) = 'info ';
-                write_all($self, $self->{out}, "$oid\n",
-                                \&cat_async_step, $inflight);
-        } else {
-                write_all($self, $self->{out_c}, "$oid\n",
-                                \&check_async_step, $inflight);
+                write_all($self, "$oid\n", \&cat_async_step, $inflight);
+        } else { # accounts for git upgrades while we're running:
+                my $ck = $self->{ck}; # undef OK, maybe set in check_async_begin
+                $inflight = $ck->{inflight} // check_async_begin($self);
+                goto batch_command if $self->{-bc};
+                write_all($self->{ck}, "$oid\n", \&check_async_step, $inflight);
         }
         push(@$inflight, $oid, $cb, $arg);
 }
@@ -418,39 +410,9 @@ sub check {
         ($hex, $type, $size);
 }
 
-sub _destroy {
-        my ($self, $pid, @rest) = @_; # rest = rbuf, in, out, err
-        my ($p) = delete @$self{($pid, @rest)};
-
-        # GitAsyncCat::event_step may delete {$pid}
-        awaitpid($p) if defined($p) && $$ == $self->{"$pid.owner"};
-}
-
-sub async_abort ($) {
-        my ($self) = @_;
-        while (scalar(@{$self->{inflight_c} // []}) ||
-                        scalar(@{$self->{inflight} // []})) {
-                for my $c ('', '_c') {
-                        my $q = $self->{"inflight$c"} or next;
-                        while (@$q) {
-                                my ($req, $cb, $arg) = splice(@$q, 0, 3);
-                                $req = $$req if ref($req);
-                                $self->{-bc} and
-                                        $req =~ s/\A(?:contents|info) //;
-                                $req =~ s/ .*//; # drop git_dir for Gcf2Client
-                                eval { $cb->(undef, $req, undef, undef, $arg) };
-                                warn "E: (in abort) $req: $@" if $@;
-                        }
-                        delete $self->{"inflight$c"};
-                        delete $self->{"rbuf$c"};
-                }
-        }
-        cleanup($self);
-}
-
-sub fail { # may be augmented in subclasses
+sub fail {
         my ($self, $msg) = @_;
-        async_abort($self);
+        $self->close;
         croak(ref($self) . ' ' . ($self->{git_dir} // '') . ": $msg");
 }
 
@@ -475,12 +437,12 @@ sub qx {
         my $fh = popen(@_);
         if (wantarray) {
                 my @ret = <$fh>;
-                close $fh; # caller should check $?
+                CORE::close $fh; # caller should check $?
                 @ret;
         } else {
                 local $/;
                 my $ret = <$fh>;
-                close $fh; # caller should check $?
+                CORE::close $fh; # caller should check $?
                 $ret;
         }
 }
@@ -492,12 +454,16 @@ sub date_parse {
         } $self->qx('rev-parse', map { "--since=$_" } @_);
 }
 
+sub _active ($) {
+        scalar(@{$_[0]->{inflight} // []}) ||
+                ($_[0]->{ck} && scalar(@{$_[0]->{ck}->{inflight} // []}))
+}
+
 # check_async and cat_async may trigger the other, so ensure they're
 # both completely done by using this:
 sub async_wait_all ($) {
         my ($self) = @_;
-        while (scalar(@{$self->{inflight_c} // []}) ||
-                        scalar(@{$self->{inflight} // []})) {
+        while (_active($self)) {
                 check_async_wait($self);
                 cat_async_wait($self);
         }
@@ -506,14 +472,10 @@ sub async_wait_all ($) {
 # returns true if there are pending "git cat-file" processes
 sub cleanup {
         my ($self, $lazy) = @_;
-        return 1 if $lazy && (scalar(@{$self->{inflight_c} // []}) ||
-                                scalar(@{$self->{inflight} // []}));
+        return 1 if $lazy && _active($self);
         local $in_cleanup = 1;
-        delete @$self{qw(async_cat async_chk)};
         async_wait_all($self);
-        delete @$self{qw(inflight inflight_c -bc)};
-        _destroy($self, qw(pid rbuf in out err_c));
-        _destroy($self, qw(pid_c rbuf_c in_c out_c err_c));
+        $_->close for ($self, (delete($self->{ck}) // ()));
         undef;
 }
 
@@ -530,7 +492,7 @@ sub packed_bytes {
         $n
 }
 
-sub DESTROY { cleanup(@_) }
+sub DESTROY { cleanup($_[0]) }
 
 sub local_nick ($) {
         # don't show full FS path, basename should be OK:
@@ -571,14 +533,14 @@ sub cat_async_begin {
         my ($self) = @_;
         cleanup($self) if $self->alternates_changed;
         die 'BUG: already in async' if $self->{inflight};
-        batch_prepare($self, []);
+        batch_prepare($self);
 }
 
 sub cat_async ($$$;$) {
         my ($self, $oid, $cb, $arg) = @_;
         my $inflight = $self->{inflight} // cat_async_begin($self);
         substr($oid, 0, 0) = 'contents ' if $self->{-bc};
-        write_all($self, $self->{out}, $oid."\n", \&cat_async_step, $inflight);
+        write_all($self, $oid."\n", \&cat_async_step, $inflight);
         push(@$inflight, $oid, $cb, $arg);
 }
 
@@ -648,7 +610,7 @@ sub manifest_entry {
         }
         my $dig = PublicInbox::SHA->new(1);
         while (read($sr, $buf, 65536)) { $dig->add($buf) }
-        close $sr or return; # empty, uninitialized git repo
+        CORE::close $sr or return; # empty, uninitialized git repo
         $ent->{fingerprint} = $dig->hexdigest;
         $ent->{modified} = modified(undef, $mod);
         chomp($buf = <$own> // '');
@@ -664,8 +626,10 @@ sub cleanup_if_unlinked {
         # Linux-specific /proc/$PID/maps access
         # TODO: support this inside git.git
         my $ret = 0;
-        for my $fld (qw(pid pid_c)) {
-                my $pid = $self->{$fld} // next;
+        for my $obj ($self, ($self->{ck} // ())) {
+                my $sock = $obj->{sock} // next;
+                my PublicInbox::ProcessPipe $pp = tied *$sock; # ProcessPipe
+                my $pid = $pp->{pid} // next;
                 open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1);
                 while (<$fh>) {
                         # n.b. we do not restart for unlinked multi-pack-index
@@ -679,6 +643,50 @@ sub cleanup_if_unlinked {
         $ret;
 }
 
+sub event_step {
+        my ($self) = @_;
+        $self->close if !$self->{sock}; # process died while requeued
+        my $inflight = $self->{inflight};
+        if ($inflight && @$inflight) {
+                $self->cat_async_step($inflight);
+                return $self->close unless $self->{sock};
+                # more to do? requeue for fairness:
+                $self->requeue if @$inflight || exists($self->{rbuf});
+        }
+}
+
+# idempotently registers with DS epoll/kqueue/select/poll
+sub watch_async ($) {
+        $_[0]->{epwatch} //= do {
+                $_[0]->SUPER::new($_[0]->{sock}, EPOLLIN|EPOLLET);
+                \undef;
+        }
+}
+
+sub close {
+        my ($self) = @_;
+        if (my $q = $self->{inflight}) { # abort inflight requests
+                while (@$q) {
+                        my ($req, $cb, $arg) = splice(@$q, 0, 3);
+                        $req = $$req if ref($req);
+                        $self->{-bc} and $req =~ s/\A(?:contents|info) //;
+                        $req =~ s/ .*//; # drop git_dir for Gcf2Client
+                        eval { $cb->(undef, $req, undef, undef, $arg) };
+                        warn "E: (in abort) $req: $@" if $@;
+                }
+        }
+        delete @$self{qw(-bc err_c inflight rbuf)};
+        delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
+}
+
+package PublicInbox::GitCheck; # only for git <2.36
+use v5.12;
+our @ISA = qw(PublicInbox::Git);
+no warnings 'once';
+
+# for event_step
+*cat_async_step = \&PublicInbox::Git::check_async_step;
+
 1;
 __END__
 =pod