about summary refs log tree commit homepage
path: root/lib/PublicInbox/Git.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2016-12-31 11:16:47 +0000
committerEric Wong <e@80x24.org>2017-01-07 23:38:29 +0000
commit68f83ca5236078128a0ccf47a1e54bd955777120 (patch)
treef60805615cb69e97c7e79182d2b991f67f900210 /lib/PublicInbox/Git.pm
parent0753326bc4f148436b4c34dfb8133b5579de0198 (diff)
downloadpublic-inbox-68f83ca5236078128a0ccf47a1e54bd955777120.tar.gz
This will allow us to handle network operations while waiting
on "git cat-file" to seek and unpack things.
Diffstat (limited to 'lib/PublicInbox/Git.pm')
-rw-r--r--lib/PublicInbox/Git.pm89
1 files changed, 89 insertions, 0 deletions
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 4dfc4099..0b482c7e 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -13,6 +13,10 @@ use POSIX qw(dup2);
 require IO::Handle;
 use PublicInbox::Spawn qw(spawn popen_rd);
 use Fcntl qw(:seek);
+my $have_async = eval {
+        require PublicInbox::EvCleanup;
+        require PublicInbox::GitAsyncRd;
+};
 
 # Documentation/SubmittingPatches recommends 12 (Linux v4.4)
 my $abbrev = `git config core.abbrev` || 12;
@@ -64,6 +68,7 @@ sub _bidi_pipe {
         $self->{$in} = $in_r;
 }
 
+# legacy synchronous API
 sub cat_file_begin {
         my ($self, $obj) = @_;
         $self->_bidi_pipe(qw(--batch in out pid));
@@ -79,6 +84,7 @@ sub cat_file_begin {
         ($in, $1, $2, $3);
 }
 
+# legacy synchronous API
 sub cat_file_finish {
         my ($self, $left) = @_;
         my $max = 8192;
@@ -96,6 +102,7 @@ sub cat_file_finish {
         fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n");
 }
 
+# legacy synchronous API
 sub cat_file {
         my ($self, $obj, $ref) = @_;
 
@@ -131,6 +138,7 @@ sub cat_file {
 
 sub batch_prepare ($) { _bidi_pipe($_[0], qw(--batch in out pid)) }
 
+# legacy synchronous API
 sub check {
         my ($self, $obj) = @_;
         $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c));
@@ -185,10 +193,91 @@ sub cleanup {
         my ($self) = @_;
         _destroy($self, qw(in out pid));
         _destroy($self, qw(in_c out_c pid_c));
+
+        if ($have_async) {
+                my %h = %$self; # yup, copy ourselves
+                %$self = ();
+                my $ds_closed;
+
+                # schedule closing with Danga::Socket::close:
+                foreach (qw(async async_c)) {
+                        my $ds = delete $h{$_} or next;
+                        $ds->close;
+                        $ds_closed = 1;
+                }
+
+                # can't do waitpid in _destroy() until next tick,
+                # since D::S defers closing until end of current event loop
+                $ds_closed and PublicInbox::EvCleanup::next_tick(sub {
+                        _destroy(\%h, qw(in_a out_a pid_a));
+                        _destroy(\%h, qw(in_ac out_ac pid_ac));
+                });
+        }
 }
 
 sub DESTROY { cleanup(@_) }
 
+# modern async API
+sub check_async_ds ($$$) {
+        my ($self, $obj, $cb) = @_;
+        ($self->{async_c} ||= do {
+                _bidi_pipe($self, qw(--batch-check in_ac out_ac pid_ac));
+                PublicInbox::GitAsyncRd->new($self->{in_ac}, $self->{out_ac}, 1)
+        })->cat_file_async($obj, $cb);
+}
+
+sub cat_async_ds ($$$) {
+        my ($self, $obj, $cb) = @_;
+        ($self->{async} ||= do {
+                _bidi_pipe($self, qw(--batch in_a out_a pid_a));
+                PublicInbox::GitAsyncRd->new($self->{in_a}, $self->{out_a});
+        })->cat_file_async($obj, $cb);
+}
+
+sub async_info_compat ($) {
+        local $/ = "\n";
+        chomp(my $line = $_[0]->getline);
+        [ split(/ /, $line) ];
+}
+
+sub check_async_compat ($$$) {
+        my ($self, $obj, $cb) = @_;
+        $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c));
+        $self->{out_c}->print($obj."\n") or fail($self, "write error: $!");
+        my $info = async_info_compat($self->{in_c});
+        $cb->($info);
+}
+
+sub cat_async_compat ($$$) {
+        my ($self, $obj, $cb) = @_;
+        $self->_bidi_pipe(qw(--batch in out pid));
+        $self->{out}->print($obj."\n") or fail($self, "write error: $!");
+        my $in = $self->{in};
+        my $info = async_info_compat($in);
+        $cb->($info);
+        return if scalar(@$info) != 3; # missing
+        my $max = 8192;
+        my $left = $info->[2];
+        my ($buf, $r);
+        while ($left > 0) {
+                $r = read($in, $buf, $left > $max ? $max : $left);
+                return $cb->($r) unless $r; # undef or 0
+                $left -= $r;
+                $cb->(\$buf);
+        }
+        $r = read($in, $buf, 1);
+        defined($r) or fail($self, "read failed: $!");
+        fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n");
+}
+
+if ($have_async) {
+        *check_async = *check_async_ds;
+        *cat_async = *cat_async_ds;
+} else {
+        *check_async = *check_async_compat;
+        *cat_async = *cat_async_compat;
+}
+
 1;
 __END__
 =pod