diff options
-rw-r--r-- | lib/PublicInbox/GitHTTPBackend.pm | 111 | ||||
-rw-r--r-- | t/git-http-backend.psgi | 28 | ||||
-rw-r--r-- | t/git-http-backend.t | 134 |
3 files changed, 246 insertions, 27 deletions
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm index 937b2e9a..020b0886 100644 --- a/lib/PublicInbox/GitHTTPBackend.pm +++ b/lib/PublicInbox/GitHTTPBackend.pm @@ -33,6 +33,15 @@ our $ANY = join('|', @binary, @text); my $BIN = join('|', @binary); my $TEXT = join('|', @text); +my $nextq; +sub do_next () { + my $q = $nextq; + $nextq = undef; + while (my $cb = shift @$q) { + $cb->(); # this may redefine nextq + } +} + sub r { [ $_[0] , [qw(Content-Type text/plain Content-Length 0) ], [] ] } @@ -50,6 +59,17 @@ sub serve { serve_dumb($cgi, $git, $path); } +sub err ($@) { + my ($env, @msg) = @_; + $env->{'psgi.errors'}->print(@msg, "\n"); +} + +sub drop_client ($) { + if (my $io = $_[0]->{'psgix.io'}) { + $io->close; # this is Danga::Socket::close + } +} + sub serve_dumb { my ($cgi, $git, $path) = @_; @@ -61,18 +81,51 @@ sub serve_dumb { } else { return r(404); } + my $f = "$git->{git_dir}/$path"; - return r(404) unless -f $f && -r _; + return r(404) unless -f $f && -r _; # just in case it's a FIFO :P my @st = stat(_); my $size = $st[7]; + my $env = $cgi->{env}; - # TODO: If-Modified-Since and Last-Modified + # TODO: If-Modified-Since and Last-Modified? open my $in, '<', $f or return r(404); - my $code = 200; my $len = $size; - my @h; + my $n = 65536; # try to negotiate a big TCP window, first + my ($next, $fh); + my $cb = sub { + $n = $len if $len < $n; + my $r = sysread($in, my $buf, $n); + if (!defined $r) { + err($env, "$f read error: $!"); + drop_client($env); + } elsif ($r <= 0) { + err($env, "$f EOF with $len bytes left"); + drop_client($env); + } else { + $len -= $r; + $fh->write($buf); + if ($len == 0) { + $fh->close; + } elsif ($next) { + # avoid recursion in Danga::Socket::write + unless ($nextq) { + $nextq = []; + Danga::Socket->AddTimer(0, *do_next); + } + # avoid buffering too much in case we have + # slow clients: + $n = 8192; + push @$nextq, $next; + return; + } + } + # all done, cleanup references: + $fh = $next = undef; + }; - my $env = $cgi->{env}; + my $code = 200; + my @h = ('Content-Type', $type); my $range = $env->{HTTP_RANGE}; if (defined $range && $range =~ /\bbytes=(\d*)-(\d*)\z/) { ($code, $len) = prepare_range($cgi, $in, \@h, $1, $2, $size); @@ -81,22 +134,18 @@ sub serve_dumb { return [ 416, \@h, [] ]; } } + push @h, 'Content-Length', $len; - push @h, 'Content-Type', $type, 'Content-Length', $len; sub { my ($res) = @_; # Plack callback - my $fh = $res->([ $code, \@h ]); - my $buf; - my $n = 8192; - while ($len > 0) { - $n = $len if $len < $n; - my $r = sysread($in, $buf, $n); - last if (!defined($r) || $r <= 0); - $len -= $r; - $fh->write($buf); + $fh = $res->([ $code, \@h ]); + if (defined $env->{'pi-httpd.async'}) { + my $pi_http = $env->{'psgix.io'}; + $next = sub { $pi_http->write($cb) }; + $cb->(); # start it off! + } else { + $cb->() while $fh; } - die "$f truncated with $len bytes remaining\n" if $len; - $fh->close; } } @@ -149,7 +198,6 @@ sub serve_smart { my $input = $env->{'psgi.input'}; my $buf; my $in; - my $err = $env->{'psgi.errors'}; my $fd = eval { fileno($input) }; if (defined $fd && $fd >= 0) { $in = $input; @@ -158,7 +206,7 @@ sub serve_smart { } my ($rpipe, $wpipe); unless (pipe($rpipe, $wpipe)) { - $err->print("error creating pipe: $! - going static\n"); + err($env, "error creating pipe: $! - going static"); return; } my %env = %ENV; @@ -179,13 +227,23 @@ sub serve_smart { my %rdr = ( 0 => fileno($in), 1 => fileno($wpipe) ); my $pid = spawn([qw(git http-backend)], \%env, \%rdr); unless (defined $pid) { - $err->print("error spawning: $! - going static\n"); + err($env, "error spawning: $! - going static"); return; } $wpipe = $in = undef; $buf = ''; my ($vin, $fh, $res); $nr_running++; + + # Danga::Socket users, we queue up the read_enable callback to + # fire after pending writes are complete: + my $pi_http = $env->{'psgix.io'}; + my $read_enable = sub { $rpipe->watch_read(1) }; + my $read_disable = sub { + $rpipe->watch_read(0); + $pi_http->write($read_enable); + }; + my $end = sub { if ($fh) { $fh->close; @@ -202,10 +260,8 @@ sub serve_smart { my $e = $pid == waitpid($pid, 0) ? $? : "PID:$pid still running?"; if ($e) { - $err->print("http-backend ($git_dir): $e\n"); - if (my $io = $env->{'psgix.io'}) { - $io->close; - } + err($env, "git http-backend ($git_dir): $e"); + drop_client($env); } } return unless $res; @@ -220,7 +276,7 @@ sub serve_smart { } my $e = $!; $end->(); - $err->print("git http-backend ($git_dir): $e\n"); + err($env, "git http-backend ($git_dir): $e\n"); }; my $cb = sub { # read git-http-backend output and stream to client my $r = $rpipe ? $rpipe->sysread($buf, 8192, length($buf)) : 0; @@ -229,6 +285,7 @@ sub serve_smart { if ($fh) { # stream body from git-http-backend to HTTP client $fh->write($buf); $buf = ''; + $read_disable->() if $read_disable; } elsif ($buf =~ s/\A(.*?)\r\n\r\n//s) { # parse headers my $h = $1; my $code = 200; @@ -257,6 +314,7 @@ sub serve_smart { $rpipe = $async->($rpipe, $cb); sub { ($res) = @_ } # let Danga::Socket handle the rest. } else { # synchronous loop for other PSGI servers + $read_enable = $read_disable = undef; $vin = ''; vec($vin, fileno($rpipe), 1) = 1; sub { @@ -274,8 +332,7 @@ sub input_to_file { while (1) { my $r = $input->read($buf, 8192); unless (defined $r) { - my $err = $env->{'psgi.errors'}; - $err->print("error reading input: $!\n"); + err($env, "error reading input: $!"); return; } last if ($r == 0); diff --git a/t/git-http-backend.psgi b/t/git-http-backend.psgi new file mode 100644 index 00000000..8cec7d35 --- /dev/null +++ b/t/git-http-backend.psgi @@ -0,0 +1,28 @@ +#!/usr/bin/perl -w +# Copyright (C) 2016 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; +use warnings; +use PublicInbox::GitHTTPBackend; +use PublicInbox::Git; +use Plack::Builder; +use Plack::Request; +use BSD::Resource qw(getrusage); +my $git_dir = $ENV{GIANT_GIT_DIR} or die 'GIANT_GIT_DIR not defined in env'; +my $git = PublicInbox::Git->new($git_dir); +builder { + enable 'Chunked' if $ENV{TEST_CHUNK}; + enable 'Head'; + sub { + my ($env) = @_; + my $pr = Plack::Request->new($env); + if ($pr->path_info =~ m!\A/(.+)\z!s) { + PublicInbox::GitHTTPBackend::serve($pr, $git, $1); + } else { + my $ru = getrusage(); + my $b = $ru->maxrss . "\n"; + [ 200, [ qw(Content-Type text/plain Content-Length), + length($b) ], [ $b ] ] + } + } +} diff --git a/t/git-http-backend.t b/t/git-http-backend.t new file mode 100644 index 00000000..889d507a --- /dev/null +++ b/t/git-http-backend.t @@ -0,0 +1,134 @@ +# Copyright (C) 2016 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; +use warnings; +use Test::More; +use File::Temp qw/tempdir/; +use IO::Socket; +use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD); +use Socket qw(SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY); +use POSIX qw(dup2 setsid); +use Cwd qw(getcwd); + +my $git_dir = $ENV{GIANT_GIT_DIR}; +plan 'skip_all' => 'GIANT_GIT_DIR not defined' unless $git_dir; +foreach my $mod (qw(Danga::Socket + Plack::Util Plack::Request Plack::Builder + HTTP::Date HTTP::Status Net::HTTP)) { + eval "require $mod"; + plan skip_all => "$mod missing for git-http-backend.t" if $@; +} +my $psgi = getcwd()."/t/git-http-backend.psgi"; +my $tmpdir = tempdir('pi-git-http-backend-XXXXXX', TMPDIR => 1, CLEANUP => 1); +my $err = "$tmpdir/stderr.log"; +my $out = "$tmpdir/stdout.log"; +my $httpd = 'blib/script/public-inbox-httpd'; +my %opts = ( + LocalAddr => '127.0.0.1', + ReuseAddr => 1, + Proto => 'tcp', + Type => SOCK_STREAM, + Listen => 1024, +); +my $sock = IO::Socket::INET->new(%opts); +my $host = $sock->sockhost; +my $port = $sock->sockport; +my $pid; +END { kill 'TERM', $pid if defined $pid }; + +my $get_maxrss = sub { + my $http = Net::HTTP->new(Host => "$host:$port"); + ok($http, 'Net::HTTP object created for maxrss'); + $http->write_request(GET => '/'); + my ($code, $mess, %h) = $http->read_response_headers; + is($code, 200, 'success reading maxrss'); + my $n = $http->read_entity_body(my $buf, 256); + ok(defined $n, 'read response body'); + like($buf, qr/\A\d+\n\z/, 'got memory response'); + ok(int($buf) > 0, 'got non-zero memory response'); + int($buf); +}; + +{ + ok($sock, 'sock created'); + $pid = fork; + if ($pid == 0) { # pretend to be systemd + fcntl($sock, F_SETFD, 0); + dup2(fileno($sock), 3) or die "dup2 failed: $!\n"; + $ENV{LISTEN_PID} = $$; + $ENV{LISTEN_FDS} = 1; + $ENV{TEST_CHUNK} = '1'; + exec $httpd, "--stdout=$out", "--stderr=$err", $psgi; + die "FAIL: $!\n"; + } + ok(defined $pid, 'forked httpd process successfully'); +} +my $mem_a = $get_maxrss->(); + +SKIP: { + my $max = 0; + my $pack; + my $glob = "$git_dir/objects/pack/pack-*.pack"; + foreach my $f (glob($glob)) { + my $n = -s $f; + if ($n > $max) { + $max = $n; + $pack = $f; + } + } + skip "no packs found in $git_dir" unless defined $pack; + if ($pack !~ m!(/objects/pack/pack-[a-f0-9]{40}.pack)\z!) { + skip "bad pack name: $pack"; + } + my $url = $1; + my $http = Net::HTTP->new(Host => "$host:$port"); + ok($http, 'Net::HTTP object created'); + $http->write_request(GET => $url); + my ($code, $mess, %h) = $http->read_response_headers; + is(200, $code, 'got 200 success for pack'); + is($max, $h{'Content-Length'}, 'got expected Content-Length for pack'); + foreach my $i (1..3) { + sleep 1; + my $diff = $get_maxrss->() - $mem_a; + note "${diff}K memory increase after $i seconds"; + ok($diff < 1024, 'no bloating caused by slow dumb client'); + } +} + +{ + my $c = fork; + if ($c == 0) { + setsid(); + exec qw(git clone -q --mirror), "http://$host:$port/", + "$tmpdir/mirror.git"; + die "Failed start git clone: $!\n"; + } + select(undef, undef, undef, 0.1); + foreach my $i (1..10) { + is(1, kill('STOP', -$c), 'signaled clone STOP'); + sleep 1; + ok(kill('CONT', -$c), 'continued clone'); + my $diff = $get_maxrss->() - $mem_a; + note "${diff}K memory increase after $i seconds"; + ok($diff < 2048, 'no bloating caused by slow smart client'); + } + ok(kill('CONT', -$c), 'continued clone'); + is($c, waitpid($c, 0), 'reaped wayward slow clone'); + is($?, 0, 'clone did not error out'); + note 'clone done, fsck-ing clone result...'; + is(0, system("git", "--git-dir=$tmpdir/mirror.git", + qw(fsck --no-progress)), + 'fsck did not report corruption'); + + my $diff = $get_maxrss->() - $mem_a; + note "${diff}K memory increase after smart clone"; + ok($diff < 2048, 'no bloating caused by slow smart client'); +} + +{ + ok(kill('TERM', $pid), 'killed httpd'); + $pid = undef; + waitpid(-1, 0); +} + +done_testing(); |