diff options
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/GitHTTPBackend.pm | 111 |
1 files changed, 84 insertions, 27 deletions
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm index 937b2e9a..020b0886 100644 --- a/lib/PublicInbox/GitHTTPBackend.pm +++ b/lib/PublicInbox/GitHTTPBackend.pm @@ -33,6 +33,15 @@ our $ANY = join('|', @binary, @text); my $BIN = join('|', @binary); my $TEXT = join('|', @text); +my $nextq; +sub do_next () { + my $q = $nextq; + $nextq = undef; + while (my $cb = shift @$q) { + $cb->(); # this may redefine nextq + } +} + sub r { [ $_[0] , [qw(Content-Type text/plain Content-Length 0) ], [] ] } @@ -50,6 +59,17 @@ sub serve { serve_dumb($cgi, $git, $path); } +sub err ($@) { + my ($env, @msg) = @_; + $env->{'psgi.errors'}->print(@msg, "\n"); +} + +sub drop_client ($) { + if (my $io = $_[0]->{'psgix.io'}) { + $io->close; # this is Danga::Socket::close + } +} + sub serve_dumb { my ($cgi, $git, $path) = @_; @@ -61,18 +81,51 @@ sub serve_dumb { } else { return r(404); } + my $f = "$git->{git_dir}/$path"; - return r(404) unless -f $f && -r _; + return r(404) unless -f $f && -r _; # just in case it's a FIFO :P my @st = stat(_); my $size = $st[7]; + my $env = $cgi->{env}; - # TODO: If-Modified-Since and Last-Modified + # TODO: If-Modified-Since and Last-Modified? open my $in, '<', $f or return r(404); - my $code = 200; my $len = $size; - my @h; + my $n = 65536; # try to negotiate a big TCP window, first + my ($next, $fh); + my $cb = sub { + $n = $len if $len < $n; + my $r = sysread($in, my $buf, $n); + if (!defined $r) { + err($env, "$f read error: $!"); + drop_client($env); + } elsif ($r <= 0) { + err($env, "$f EOF with $len bytes left"); + drop_client($env); + } else { + $len -= $r; + $fh->write($buf); + if ($len == 0) { + $fh->close; + } elsif ($next) { + # avoid recursion in Danga::Socket::write + unless ($nextq) { + $nextq = []; + Danga::Socket->AddTimer(0, *do_next); + } + # avoid buffering too much in case we have + # slow clients: + $n = 8192; + push @$nextq, $next; + return; + } + } + # all done, cleanup references: + $fh = $next = undef; + }; - my $env = $cgi->{env}; + my $code = 200; + my @h = ('Content-Type', $type); my $range = $env->{HTTP_RANGE}; if (defined $range && $range =~ /\bbytes=(\d*)-(\d*)\z/) { ($code, $len) = prepare_range($cgi, $in, \@h, $1, $2, $size); @@ -81,22 +134,18 @@ sub serve_dumb { return [ 416, \@h, [] ]; } } + push @h, 'Content-Length', $len; - push @h, 'Content-Type', $type, 'Content-Length', $len; sub { my ($res) = @_; # Plack callback - my $fh = $res->([ $code, \@h ]); - my $buf; - my $n = 8192; - while ($len > 0) { - $n = $len if $len < $n; - my $r = sysread($in, $buf, $n); - last if (!defined($r) || $r <= 0); - $len -= $r; - $fh->write($buf); + $fh = $res->([ $code, \@h ]); + if (defined $env->{'pi-httpd.async'}) { + my $pi_http = $env->{'psgix.io'}; + $next = sub { $pi_http->write($cb) }; + $cb->(); # start it off! + } else { + $cb->() while $fh; } - die "$f truncated with $len bytes remaining\n" if $len; - $fh->close; } } @@ -149,7 +198,6 @@ sub serve_smart { my $input = $env->{'psgi.input'}; my $buf; my $in; - my $err = $env->{'psgi.errors'}; my $fd = eval { fileno($input) }; if (defined $fd && $fd >= 0) { $in = $input; @@ -158,7 +206,7 @@ sub serve_smart { } my ($rpipe, $wpipe); unless (pipe($rpipe, $wpipe)) { - $err->print("error creating pipe: $! - going static\n"); + err($env, "error creating pipe: $! - going static"); return; } my %env = %ENV; @@ -179,13 +227,23 @@ sub serve_smart { my %rdr = ( 0 => fileno($in), 1 => fileno($wpipe) ); my $pid = spawn([qw(git http-backend)], \%env, \%rdr); unless (defined $pid) { - $err->print("error spawning: $! - going static\n"); + err($env, "error spawning: $! - going static"); return; } $wpipe = $in = undef; $buf = ''; my ($vin, $fh, $res); $nr_running++; + + # Danga::Socket users, we queue up the read_enable callback to + # fire after pending writes are complete: + my $pi_http = $env->{'psgix.io'}; + my $read_enable = sub { $rpipe->watch_read(1) }; + my $read_disable = sub { + $rpipe->watch_read(0); + $pi_http->write($read_enable); + }; + my $end = sub { if ($fh) { $fh->close; @@ -202,10 +260,8 @@ sub serve_smart { my $e = $pid == waitpid($pid, 0) ? $? : "PID:$pid still running?"; if ($e) { - $err->print("http-backend ($git_dir): $e\n"); - if (my $io = $env->{'psgix.io'}) { - $io->close; - } + err($env, "git http-backend ($git_dir): $e"); + drop_client($env); } } return unless $res; @@ -220,7 +276,7 @@ sub serve_smart { } my $e = $!; $end->(); - $err->print("git http-backend ($git_dir): $e\n"); + err($env, "git http-backend ($git_dir): $e\n"); }; my $cb = sub { # read git-http-backend output and stream to client my $r = $rpipe ? $rpipe->sysread($buf, 8192, length($buf)) : 0; @@ -229,6 +285,7 @@ sub serve_smart { if ($fh) { # stream body from git-http-backend to HTTP client $fh->write($buf); $buf = ''; + $read_disable->() if $read_disable; } elsif ($buf =~ s/\A(.*?)\r\n\r\n//s) { # parse headers my $h = $1; my $code = 200; @@ -257,6 +314,7 @@ sub serve_smart { $rpipe = $async->($rpipe, $cb); sub { ($res) = @_ } # let Danga::Socket handle the rest. } else { # synchronous loop for other PSGI servers + $read_enable = $read_disable = undef; $vin = ''; vec($vin, fileno($rpipe), 1) = 1; sub { @@ -274,8 +332,7 @@ sub input_to_file { while (1) { my $r = $input->read($buf, 8192); unless (defined $r) { - my $err = $env->{'psgi.errors'}; - $err->print("error reading input: $!\n"); + err($env, "error reading input: $!"); return; } last if ($r == 0); |