# Copyright (C) 2016 all contributors # License: AGPL-3.0+ # # internal class used by PublicInbox::Git + Danga::Socket # This parses the output pipe of "git cat-file --batch/--batch-check" package PublicInbox::GitAsync; use strict; use warnings; use base qw(Danga::Socket); use fields qw(jobq rbuf wr check); use PublicInbox::GitAsyncWr; our $MAX = 65536; # Import may bump this in the future sub new { my ($class, $rd, $wr, $check) = @_; my $self = fields::new($class); IO::Handle::blocking($rd, 0); $self->SUPER::new($rd); $self->{jobq} = []; # [ [ $obj, $cb, $state ], ... ] my $buf = ''; $self->{rbuf} = \$buf; $self->{wr} = PublicInbox::GitAsyncWr->new($wr); $self->{check} = $check; $self->watch_read(1); $self; } sub cat_file_async { my ($self, $obj, $cb) = @_; # order matters push @{$self->{jobq}}, [ $obj, $cb ]; $self->{wr}->write(\"$obj\n"); } # Returns: an array ref of the info line for --batch-check and --batch, # which may be: [ $obj, 'missing'] # Returns undef on error sub read_info ($) { my ($self) = @_; my $rbuf = $self->{rbuf}; my $rd = $self->{sock}; while (1) { $$rbuf =~ s/\A([^\n]+)\n//s and return [ split(/ /, $1) ]; my $r = sysread($rd, $$rbuf, 110, length($$rbuf)); next if $r; return $r; } } sub event_read { my ($self) = @_; my $jobq = $self->{jobq}; my ($cur, $obj, $cb, $info, $left); my $check = $self->{check}; my ($rbuf, $rlen, $need, $buf); take_job: $cur = shift @$jobq or die 'BUG: empty job queue in '.__PACKAGE__; ($obj, $cb, $info, $left) = @$cur; if (!$info) { $info = read_info($self); if (!defined $info && ($!{EAGAIN} || $!{EINTR})) { return unshift(@$jobq, $cur) } $cb->($info); # $info may 0 (EOF, or undef, $cb will see $!) return $self->close unless $info; if ($check || (scalar(@$info) != 3)) { # do not monopolize the event loop if we're drained: return if ${$self->{rbuf}} eq ''; goto take_job; } $cur->[2] = $info; my $len = $info->[2]; $left = \$len; $cur->[3] = $left; # onto reading body... } ref($left) or die 'BUG: $left not ref in '.__PACKAGE__; $rbuf = $self->{rbuf}; $rlen = length($$rbuf); $need = $$left + 1; # +1 for trailing LF $buf = ''; if ($rlen == $need) { final_hunk: $self->{rbuf} = \$buf; $$left = undef; my $lf = chop $$rbuf; $lf eq "\n" or die "BUG: missing LF (got $lf)"; $cb->($rbuf); return if $buf eq ''; goto take_job; } elsif ($rlen < $need) { my $all = $need - $rlen; my $n = $all > $MAX ? $MAX : $all; my $r = sysread($self->{sock}, $$rbuf, $n, $rlen); if ($r) { goto final_hunk if $r == $all; # more to read later... $$left -= $r; $self->{rbuf} = \$buf; $cb->($rbuf); # don't monopolize the event loop return unshift(@$jobq, $cur); } elsif (!defined $r) { return unshift(@$jobq, $cur) if $!{EAGAIN} || $!{EINTR}; } $cb->($r); # $cb should handle 0 and undef (and see $!) $self->close; # FAIL... } else { # too much data in rbuf $buf = substr($$rbuf, $need, $rlen - $need); $$rbuf = substr($$rbuf, 0, $need); goto final_hunk; } } sub close { my $self = shift; my $jobq = $self->{jobq}; $self->{jobq} = []; $_->[1]->(0) for @$jobq; $self->{wr}->close; $self->SUPER::close(@_); } sub event_hup { $_[0]->close } sub event_err { $_[0]->close } 1;