From 68f83ca5236078128a0ccf47a1e54bd955777120 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 31 Dec 2016 11:16:47 +0000 Subject: initial git async work This will allow us to handle network operations while waiting on "git cat-file" to seek and unpack things. --- lib/PublicInbox/Git.pm | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) (limited to 'lib/PublicInbox/Git.pm') diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 4dfc4099..0b482c7e 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -13,6 +13,10 @@ use POSIX qw(dup2); require IO::Handle; use PublicInbox::Spawn qw(spawn popen_rd); use Fcntl qw(:seek); +my $have_async = eval { + require PublicInbox::EvCleanup; + require PublicInbox::GitAsyncRd; +}; # Documentation/SubmittingPatches recommends 12 (Linux v4.4) my $abbrev = `git config core.abbrev` || 12; @@ -64,6 +68,7 @@ sub _bidi_pipe { $self->{$in} = $in_r; } +# legacy synchronous API sub cat_file_begin { my ($self, $obj) = @_; $self->_bidi_pipe(qw(--batch in out pid)); @@ -79,6 +84,7 @@ sub cat_file_begin { ($in, $1, $2, $3); } +# legacy synchronous API sub cat_file_finish { my ($self, $left) = @_; my $max = 8192; @@ -96,6 +102,7 @@ sub cat_file_finish { fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n"); } +# legacy synchronous API sub cat_file { my ($self, $obj, $ref) = @_; @@ -131,6 +138,7 @@ sub cat_file { sub batch_prepare ($) { _bidi_pipe($_[0], qw(--batch in out pid)) } +# legacy synchronous API sub check { my ($self, $obj) = @_; $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c)); @@ -185,10 +193,91 @@ sub cleanup { my ($self) = @_; _destroy($self, qw(in out pid)); _destroy($self, qw(in_c out_c pid_c)); + + if ($have_async) { + my %h = %$self; # yup, copy ourselves + %$self = (); + my $ds_closed; + + # schedule closing with Danga::Socket::close: + foreach (qw(async async_c)) { + my $ds = delete $h{$_} or next; + $ds->close; + $ds_closed = 1; + } + + # can't do waitpid in _destroy() until next tick, + # since D::S defers closing until end of current event loop + $ds_closed and PublicInbox::EvCleanup::next_tick(sub { + _destroy(\%h, qw(in_a out_a pid_a)); + _destroy(\%h, qw(in_ac out_ac pid_ac)); + }); + } } sub DESTROY { cleanup(@_) } +# modern async API +sub check_async_ds ($$$) { + my ($self, $obj, $cb) = @_; + ($self->{async_c} ||= do { + _bidi_pipe($self, qw(--batch-check in_ac out_ac pid_ac)); + PublicInbox::GitAsyncRd->new($self->{in_ac}, $self->{out_ac}, 1) + })->cat_file_async($obj, $cb); +} + +sub cat_async_ds ($$$) { + my ($self, $obj, $cb) = @_; + ($self->{async} ||= do { + _bidi_pipe($self, qw(--batch in_a out_a pid_a)); + PublicInbox::GitAsyncRd->new($self->{in_a}, $self->{out_a}); + })->cat_file_async($obj, $cb); +} + +sub async_info_compat ($) { + local $/ = "\n"; + chomp(my $line = $_[0]->getline); + [ split(/ /, $line) ]; +} + +sub check_async_compat ($$$) { + my ($self, $obj, $cb) = @_; + $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c)); + $self->{out_c}->print($obj."\n") or fail($self, "write error: $!"); + my $info = async_info_compat($self->{in_c}); + $cb->($info); +} + +sub cat_async_compat ($$$) { + my ($self, $obj, $cb) = @_; + $self->_bidi_pipe(qw(--batch in out pid)); + $self->{out}->print($obj."\n") or fail($self, "write error: $!"); + my $in = $self->{in}; + my $info = async_info_compat($in); + $cb->($info); + return if scalar(@$info) != 3; # missing + my $max = 8192; + my $left = $info->[2]; + my ($buf, $r); + while ($left > 0) { + $r = read($in, $buf, $left > $max ? $max : $left); + return $cb->($r) unless $r; # undef or 0 + $left -= $r; + $cb->(\$buf); + } + $r = read($in, $buf, 1); + defined($r) or fail($self, "read failed: $!"); + fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n"); +} + +if ($have_async) { + *check_async = *check_async_ds; + *cat_async = *cat_async_ds; +} else { + *check_async = *check_async_compat; + *cat_async = *cat_async_compat; +} + 1; __END__ =pod -- cgit v1.2.3-24-ge0c7