public-inbox.git  about / heads / tags
an "archives first" approach to mailing lists
blob 24e0bf3bdeda541274ced9d314bfeb8641439372 3370 bytes (raw)
$ git show repobrowse:lib/PublicInbox/GitAsync.pm	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
 
# Copyright (C) 2016 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
#
# internal class used by PublicInbox::Git + Danga::Socket
# This parses the output pipe of "git cat-file --batch/--batch-check"
package PublicInbox::GitAsync;
use strict;
use warnings;
use base qw(Danga::Socket);
use fields qw(jobq rbuf wr check);
use PublicInbox::GitAsyncWr;
our $MAX = 65536; # Import may bump this in the future

sub new {
	my ($class, $rd, $wr, $check) = @_;
	my $self = fields::new($class);
	IO::Handle::blocking($rd, 0);
	$self->SUPER::new($rd);
	$self->{jobq} = []; # [ [ $obj, $cb, $state ], ... ]
	my $buf = '';
	$self->{rbuf} = \$buf;
	$self->{wr} = PublicInbox::GitAsyncWr->new($wr);
	$self->{check} = $check;
	$self->watch_read(1);
	$self;
}

sub cat_file_async {
	my ($self, $obj, $cb) = @_;
	# order matters
	push @{$self->{jobq}}, [ $obj, $cb ];
	$self->{wr}->write(\"$obj\n");
}

# Returns: an array ref of the info line for --batch-check and --batch,
# which may be: [ $obj, 'missing']
# Returns undef on error
sub read_info ($) {
	my ($self) = @_;
	my $rbuf = $self->{rbuf};
	my $rd = $self->{sock};

	while (1) {
		$$rbuf =~ s/\A([^\n]+)\n//s and return [ split(/ /, $1) ];

		my $r = sysread($rd, $$rbuf, 110, length($$rbuf));
		next if $r;
		return $r;
	}
}

sub event_read {
	my ($self) = @_;
	my $jobq = $self->{jobq};
	my ($cur, $obj, $cb, $info, $left);
	my $check = $self->{check};
	my ($rbuf, $rlen, $need, $buf);
take_job:
	$cur = shift @$jobq or die 'BUG: empty job queue in '.__PACKAGE__;
	($obj, $cb, $info, $left) = @$cur;
	if (!$info) {
		$info = read_info($self);
		if (!defined $info && ($!{EAGAIN} || $!{EINTR})) {
			return unshift(@$jobq, $cur)
		}
		$cb->($info); # $info may 0 (EOF, or undef, $cb will see $!)
		return $self->close unless $info;
		if ($check || $info->[1] eq 'missing') {
			# do not monopolize the event loop if we're drained:
			return if ${$self->{rbuf}} eq '';
			goto take_job;
		}
		$cur->[2] = $info;
		my $len = $info->[2];
		$left = \$len;
		$cur->[3] = $left; # onto reading body...
	}
	ref($left) or die 'BUG: $left not ref in '.__PACKAGE__;

	$rbuf = $self->{rbuf};
	$rlen = length($$rbuf);
	$need = $$left + 1; # +1 for trailing LF
	$buf = '';

	if ($rlen == $need) {
final_hunk:
		$self->{rbuf} = \$buf;
		$$left = undef;
		my $lf = chop $$rbuf;
		$lf eq "\n" or die "BUG: missing LF (got $lf)";
		$cb->($rbuf);
		$cb->(0);

		return if $buf eq '';
		goto take_job;
	} elsif ($rlen < $need) {
		my $all = $need - $rlen;
		my $n = $all > $MAX ? $MAX : $all;
		my $r = sysread($self->{sock}, $$rbuf, $n, $rlen);
		if ($r) {
			goto final_hunk if $r == $all;

			# more to read later...
			$$left -= $r;
			$self->{rbuf} = \$buf;
			$cb->($rbuf);

			# don't monopolize the event loop
			return unshift(@$jobq, $cur);
		} elsif (!defined $r) {
			return unshift(@$jobq, $cur) if $!{EAGAIN} || $!{EINTR};
		}
		$cb->($r); # $cb should handle 0 and undef (and see $!)
		$self->close; # FAIL...
	} else { # too much data in rbuf
		$buf = substr($$rbuf, $need, $rlen - $need);
		$$rbuf = substr($$rbuf, 0, $need);
		goto final_hunk;
	}
}

sub close {
	my $self = shift;
	my $jobq = $self->{jobq};
	$self->{jobq} = [];
	$_->[1]->(0) for @$jobq;
	$self->{wr}->close;
	$self->SUPER::close(@_);
}

sub event_hup { $_[0]->close }
sub event_err { $_[0]->close }

1;

git clone https://public-inbox.org/public-inbox.git
git clone http://7fh6tueqddpjyxjmgtdiueylzoqt6pt7hec3pukyptlmohoowvhde4yd.onion/public-inbox.git