From 68f83ca5236078128a0ccf47a1e54bd955777120 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 31 Dec 2016 11:16:47 +0000 Subject: initial git async work This will allow us to handle network operations while waiting on "git cat-file" to seek and unpack things. --- lib/PublicInbox/Git.pm | 89 ++++++++++++++++++++++++++++ lib/PublicInbox/GitAsyncRd.pm | 133 ++++++++++++++++++++++++++++++++++++++++++ lib/PublicInbox/GitAsyncWr.pm | 23 ++++++++ 3 files changed, 245 insertions(+) create mode 100644 lib/PublicInbox/GitAsyncRd.pm create mode 100644 lib/PublicInbox/GitAsyncWr.pm (limited to 'lib') diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 4dfc4099..0b482c7e 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -13,6 +13,10 @@ use POSIX qw(dup2); require IO::Handle; use PublicInbox::Spawn qw(spawn popen_rd); use Fcntl qw(:seek); +my $have_async = eval { + require PublicInbox::EvCleanup; + require PublicInbox::GitAsyncRd; +}; # Documentation/SubmittingPatches recommends 12 (Linux v4.4) my $abbrev = `git config core.abbrev` || 12; @@ -64,6 +68,7 @@ sub _bidi_pipe { $self->{$in} = $in_r; } +# legacy synchronous API sub cat_file_begin { my ($self, $obj) = @_; $self->_bidi_pipe(qw(--batch in out pid)); @@ -79,6 +84,7 @@ sub cat_file_begin { ($in, $1, $2, $3); } +# legacy synchronous API sub cat_file_finish { my ($self, $left) = @_; my $max = 8192; @@ -96,6 +102,7 @@ sub cat_file_finish { fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n"); } +# legacy synchronous API sub cat_file { my ($self, $obj, $ref) = @_; @@ -131,6 +138,7 @@ sub cat_file { sub batch_prepare ($) { _bidi_pipe($_[0], qw(--batch in out pid)) } +# legacy synchronous API sub check { my ($self, $obj) = @_; $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c)); @@ -185,10 +193,91 @@ sub cleanup { my ($self) = @_; _destroy($self, qw(in out pid)); _destroy($self, qw(in_c out_c pid_c)); + + if ($have_async) { + my %h = %$self; # yup, copy ourselves + %$self = (); + my $ds_closed; + + # schedule closing with Danga::Socket::close: + foreach (qw(async async_c)) { + my $ds = delete $h{$_} or next; + $ds->close; + $ds_closed = 1; + } + + # can't do waitpid in _destroy() until next tick, + # since D::S defers closing until end of current event loop + $ds_closed and PublicInbox::EvCleanup::next_tick(sub { + _destroy(\%h, qw(in_a out_a pid_a)); + _destroy(\%h, qw(in_ac out_ac pid_ac)); + }); + } } sub DESTROY { cleanup(@_) } +# modern async API +sub check_async_ds ($$$) { + my ($self, $obj, $cb) = @_; + ($self->{async_c} ||= do { + _bidi_pipe($self, qw(--batch-check in_ac out_ac pid_ac)); + PublicInbox::GitAsyncRd->new($self->{in_ac}, $self->{out_ac}, 1) + })->cat_file_async($obj, $cb); +} + +sub cat_async_ds ($$$) { + my ($self, $obj, $cb) = @_; + ($self->{async} ||= do { + _bidi_pipe($self, qw(--batch in_a out_a pid_a)); + PublicInbox::GitAsyncRd->new($self->{in_a}, $self->{out_a}); + })->cat_file_async($obj, $cb); +} + +sub async_info_compat ($) { + local $/ = "\n"; + chomp(my $line = $_[0]->getline); + [ split(/ /, $line) ]; +} + +sub check_async_compat ($$$) { + my ($self, $obj, $cb) = @_; + $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c)); + $self->{out_c}->print($obj."\n") or fail($self, "write error: $!"); + my $info = async_info_compat($self->{in_c}); + $cb->($info); +} + +sub cat_async_compat ($$$) { + my ($self, $obj, $cb) = @_; + $self->_bidi_pipe(qw(--batch in out pid)); + $self->{out}->print($obj."\n") or fail($self, "write error: $!"); + my $in = $self->{in}; + my $info = async_info_compat($in); + $cb->($info); + return if scalar(@$info) != 3; # missing + my $max = 8192; + my $left = $info->[2]; + my ($buf, $r); + while ($left > 0) { + $r = read($in, $buf, $left > $max ? $max : $left); + return $cb->($r) unless $r; # undef or 0 + $left -= $r; + $cb->(\$buf); + } + $r = read($in, $buf, 1); + defined($r) or fail($self, "read failed: $!"); + fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n"); +} + +if ($have_async) { + *check_async = *check_async_ds; + *cat_async = *cat_async_ds; +} else { + *check_async = *check_async_compat; + *cat_async = *cat_async_compat; +} + 1; __END__ =pod diff --git a/lib/PublicInbox/GitAsyncRd.pm b/lib/PublicInbox/GitAsyncRd.pm new file mode 100644 index 00000000..a56dc392 --- /dev/null +++ b/lib/PublicInbox/GitAsyncRd.pm @@ -0,0 +1,133 @@ +# Copyright (C) 2016 all contributors +# License: AGPL-3.0+ +# +# internal class used by PublicInbox::Git + Danga::Socket +# This parses the output pipe of "git cat-file --batch/--batch-check" +package PublicInbox::GitAsyncRd; +use strict; +use warnings; +use base qw(Danga::Socket); +use fields qw(jobq rbuf wr check); +use PublicInbox::GitAsyncWr; +our $MAX = 65536; # Import may bump this in the future + +sub new { + my ($class, $rd, $wr, $check) = @_; + my $self = fields::new($class); + IO::Handle::blocking($rd, 0); + $self->SUPER::new($rd); + $self->{jobq} = []; # [ [ $obj, $cb, $state ], ... ] + my $buf = ''; + $self->{rbuf} = \$buf; + $self->{wr} = PublicInbox::GitAsyncWr->new($wr); + $self->{check} = $check; + $self->watch_read(1); + $self; +} + +sub cat_file_async { + my ($self, $obj, $cb) = @_; + # order matters + push @{$self->{jobq}}, [ $obj, $cb ]; + $self->{wr}->write($obj."\n"); +} + +# Returns: an array ref of the info line for --batch-check and --batch, +# which may be: [ $obj, 'missing'] +# Returns undef on error +sub read_info ($) { + my ($self) = @_; + my $rbuf = $self->{rbuf}; + my $rd = $self->{sock}; + + while (1) { + $$rbuf =~ s/\A([^\n]+)\n//s and return [ split(/ /, $1) ]; + + my $r = sysread($rd, $$rbuf, 110, length($$rbuf)); + next if $r; + return $r; + } +} + +sub event_read { + my ($self) = @_; + my $jobq = $self->{jobq}; + my ($cur, $obj, $cb, $info, $left); + my $check = $self->{check}; + my ($rbuf, $rlen, $need, $buf); +take_job: + $cur = shift @$jobq or die 'BUG: empty job queue in '.__PACKAGE__; + ($obj, $cb, $info, $left) = @$cur; + if (!$info) { + $info = read_info($self); + if (!defined $info && ($!{EAGAIN} || $!{EINTR})) { + return unshift(@$jobq, $cur) + } + $cb->($info); # $info may 0 (EOF, or undef, $cb will see $!) + return $self->close unless $info; + if ($check || (scalar(@$info) != 3)) { + # do not monopolize the event loop if we're drained: + return if ${$self->{rbuf}} eq ''; + goto take_job; + } + $cur->[2] = $info; + my $len = $info->[2]; + $left = \$len; + $cur->[3] = $left; # onto reading body... + } + ref($left) or die 'BUG: $left not ref in '.__PACKAGE__; + + $rbuf = $self->{rbuf}; + $rlen = length($$rbuf); + $need = $$left + 1; # +1 for trailing LF + $buf = ''; + + if ($rlen == $need) { +final_hunk: + $self->{rbuf} = \$buf; + $$left = undef; + my $lf = chop $$rbuf; + $lf eq "\n" or die "BUG: missing LF (got $lf)"; + $cb->($rbuf); + + return if $buf eq ''; + goto take_job; + } elsif ($rlen < $need) { + my $all = $need - $rlen; + my $n = $all > $MAX ? $MAX : $all; + my $r = sysread($self->{sock}, $$rbuf, $n, $rlen); + if ($r) { + goto final_hunk if $r == $all; + + # more to read later... + $$left -= $r; + $self->{rbuf} = \$buf; + $cb->($rbuf); + + # don't monopolize the event loop + return unshift(@$jobq, $cur); + } elsif (!defined $r) { + return unshift(@$jobq, $cur) if $!{EAGAIN} || $!{EINTR}; + } + $cb->($r); # $cb should handle 0 and undef (and see $!) + $self->close; # FAIL... + } else { # too much data in rbuf + $buf = substr($$rbuf, $need, $rlen - $need); + $$rbuf = substr($$rbuf, 0, $need); + goto final_hunk; + } +} + +sub close { + my $self = shift; + my $jobq = $self->{jobq}; + $self->{jobq} = []; + $_->[1]->(0) for @$jobq; + $self->{wr}->close; + $self->SUPER::close(@_); +} + +sub event_hup { $_[0]->close } +sub event_err { $_[0]->close } + +1; diff --git a/lib/PublicInbox/GitAsyncWr.pm b/lib/PublicInbox/GitAsyncWr.pm new file mode 100644 index 00000000..c22f2fcc --- /dev/null +++ b/lib/PublicInbox/GitAsyncWr.pm @@ -0,0 +1,23 @@ +# Copyright (C) 2016 all contributors +# License: AGPL-3.0+ +# +# internal class used by PublicInbox::Git + Danga::Socket +# This writes to the input pipe of "git cat-file --batch/--batch-check" +package PublicInbox::GitAsyncWr; +use strict; +use warnings; +use base qw(Danga::Socket); + +sub new { + my ($class, $io) = @_; + my $self = fields::new($class); + IO::Handle::blocking($io, 0); + $self->SUPER::new($io); +} + +# we only care about write + event_write + +sub event_hup { $_[0]->close } +sub event_err { $_[0]->close } + +1; -- cgit v1.2.3-24-ge0c7