about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/GitHTTPBackend.pm111
-rw-r--r--t/git-http-backend.psgi28
-rw-r--r--t/git-http-backend.t134
3 files changed, 246 insertions, 27 deletions
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index 937b2e9a..020b0886 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -33,6 +33,15 @@ our $ANY = join('|', @binary, @text);
 my $BIN = join('|', @binary);
 my $TEXT = join('|', @text);
 
+my $nextq;
+sub do_next () {
+        my $q = $nextq;
+        $nextq = undef;
+        while (my $cb = shift @$q) {
+                $cb->(); # this may redefine nextq
+        }
+}
+
 sub r {
         [ $_[0] , [qw(Content-Type text/plain Content-Length 0) ], [] ]
 }
@@ -50,6 +59,17 @@ sub serve {
         serve_dumb($cgi, $git, $path);
 }
 
+sub err ($@) {
+        my ($env, @msg) = @_;
+        $env->{'psgi.errors'}->print(@msg, "\n");
+}
+
+sub drop_client ($) {
+        if (my $io = $_[0]->{'psgix.io'}) {
+                $io->close; # this is Danga::Socket::close
+        }
+}
+
 sub serve_dumb {
         my ($cgi, $git, $path) = @_;
 
@@ -61,18 +81,51 @@ sub serve_dumb {
         } else {
                 return r(404);
         }
+
         my $f = "$git->{git_dir}/$path";
-        return r(404) unless -f $f && -r _;
+        return r(404) unless -f $f && -r _; # just in case it's a FIFO :P
         my @st = stat(_);
         my $size = $st[7];
+        my $env = $cgi->{env};
 
-        # TODO: If-Modified-Since and Last-Modified
+        # TODO: If-Modified-Since and Last-Modified?
         open my $in, '<', $f or return r(404);
-        my $code = 200;
         my $len = $size;
-        my @h;
+        my $n = 65536; # try to negotiate a big TCP window, first
+        my ($next, $fh);
+        my $cb = sub {
+                $n = $len if $len < $n;
+                my $r = sysread($in, my $buf, $n);
+                if (!defined $r) {
+                        err($env, "$f read error: $!");
+                        drop_client($env);
+                } elsif ($r <= 0) {
+                        err($env, "$f EOF with $len bytes left");
+                        drop_client($env);
+                } else {
+                        $len -= $r;
+                        $fh->write($buf);
+                        if ($len == 0) {
+                                $fh->close;
+                        } elsif ($next) {
+                                # avoid recursion in Danga::Socket::write
+                                unless ($nextq) {
+                                        $nextq = [];
+                                        Danga::Socket->AddTimer(0, *do_next);
+                                }
+                                # avoid buffering too much in case we have
+                                # slow clients:
+                                $n = 8192;
+                                push @$nextq, $next;
+                                return;
+                        }
+                }
+                # all done, cleanup references:
+                $fh = $next = undef;
+        };
 
-        my $env = $cgi->{env};
+        my $code = 200;
+        my @h = ('Content-Type', $type);
         my $range = $env->{HTTP_RANGE};
         if (defined $range && $range =~ /\bbytes=(\d*)-(\d*)\z/) {
                 ($code, $len) = prepare_range($cgi, $in, \@h, $1, $2, $size);
@@ -81,22 +134,18 @@ sub serve_dumb {
                         return [ 416, \@h, [] ];
                 }
         }
+        push @h, 'Content-Length', $len;
 
-        push @h, 'Content-Type', $type, 'Content-Length', $len;
         sub {
                 my ($res) = @_; # Plack callback
-                my $fh = $res->([ $code, \@h ]);
-                my $buf;
-                my $n = 8192;
-                while ($len > 0) {
-                        $n = $len if $len < $n;
-                        my $r = sysread($in, $buf, $n);
-                        last if (!defined($r) || $r <= 0);
-                        $len -= $r;
-                        $fh->write($buf);
+                $fh = $res->([ $code, \@h ]);
+                if (defined $env->{'pi-httpd.async'}) {
+                        my $pi_http = $env->{'psgix.io'};
+                        $next = sub { $pi_http->write($cb) };
+                        $cb->(); # start it off!
+                } else {
+                        $cb->() while $fh;
                 }
-                die "$f truncated with $len bytes remaining\n" if $len;
-                $fh->close;
         }
 }
 
@@ -149,7 +198,6 @@ sub serve_smart {
         my $input = $env->{'psgi.input'};
         my $buf;
         my $in;
-        my $err = $env->{'psgi.errors'};
         my $fd = eval { fileno($input) };
         if (defined $fd && $fd >= 0) {
                 $in = $input;
@@ -158,7 +206,7 @@ sub serve_smart {
         }
         my ($rpipe, $wpipe);
         unless (pipe($rpipe, $wpipe)) {
-                $err->print("error creating pipe: $! - going static\n");
+                err($env, "error creating pipe: $! - going static");
                 return;
         }
         my %env = %ENV;
@@ -179,13 +227,23 @@ sub serve_smart {
         my %rdr = ( 0 => fileno($in), 1 => fileno($wpipe) );
         my $pid = spawn([qw(git http-backend)], \%env, \%rdr);
         unless (defined $pid) {
-                $err->print("error spawning: $! - going static\n");
+                err($env, "error spawning: $! - going static");
                 return;
         }
         $wpipe = $in = undef;
         $buf = '';
         my ($vin, $fh, $res);
         $nr_running++;
+
+        # Danga::Socket users, we queue up the read_enable callback to
+        # fire after pending writes are complete:
+        my $pi_http = $env->{'psgix.io'};
+        my $read_enable = sub { $rpipe->watch_read(1) };
+        my $read_disable = sub {
+                $rpipe->watch_read(0);
+                $pi_http->write($read_enable);
+        };
+
         my $end = sub {
                 if ($fh) {
                         $fh->close;
@@ -202,10 +260,8 @@ sub serve_smart {
                         my $e = $pid == waitpid($pid, 0) ?
                                 $? : "PID:$pid still running?";
                         if ($e) {
-                                $err->print("http-backend ($git_dir): $e\n");
-                                if (my $io = $env->{'psgix.io'}) {
-                                        $io->close;
-                                }
+                                err($env, "git http-backend ($git_dir): $e");
+                                drop_client($env);
                         }
                 }
                 return unless $res;
@@ -220,7 +276,7 @@ sub serve_smart {
                 }
                 my $e = $!;
                 $end->();
-                $err->print("git http-backend ($git_dir): $e\n");
+                err($env, "git http-backend ($git_dir): $e\n");
         };
         my $cb = sub { # read git-http-backend output and stream to client
                 my $r = $rpipe ? $rpipe->sysread($buf, 8192, length($buf)) : 0;
@@ -229,6 +285,7 @@ sub serve_smart {
                 if ($fh) { # stream body from git-http-backend to HTTP client
                         $fh->write($buf);
                         $buf = '';
+                        $read_disable->() if $read_disable;
                 } elsif ($buf =~ s/\A(.*?)\r\n\r\n//s) { # parse headers
                         my $h = $1;
                         my $code = 200;
@@ -257,6 +314,7 @@ sub serve_smart {
                 $rpipe = $async->($rpipe, $cb);
                 sub { ($res) = @_ } # let Danga::Socket handle the rest.
         } else { # synchronous loop for other PSGI servers
+                $read_enable = $read_disable = undef;
                 $vin = '';
                 vec($vin, fileno($rpipe), 1) = 1;
                 sub {
@@ -274,8 +332,7 @@ sub input_to_file {
         while (1) {
                 my $r = $input->read($buf, 8192);
                 unless (defined $r) {
-                        my $err = $env->{'psgi.errors'};
-                        $err->print("error reading input: $!\n");
+                        err($env, "error reading input: $!");
                         return;
                 }
                 last if ($r == 0);
diff --git a/t/git-http-backend.psgi b/t/git-http-backend.psgi
new file mode 100644
index 00000000..8cec7d35
--- /dev/null
+++ b/t/git-http-backend.psgi
@@ -0,0 +1,28 @@
+#!/usr/bin/perl -w
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use PublicInbox::GitHTTPBackend;
+use PublicInbox::Git;
+use Plack::Builder;
+use Plack::Request;
+use BSD::Resource qw(getrusage);
+my $git_dir = $ENV{GIANT_GIT_DIR} or die 'GIANT_GIT_DIR not defined in env';
+my $git = PublicInbox::Git->new($git_dir);
+builder {
+        enable 'Chunked' if $ENV{TEST_CHUNK};
+        enable 'Head';
+        sub {
+                my ($env) = @_;
+                my $pr = Plack::Request->new($env);
+                if ($pr->path_info =~ m!\A/(.+)\z!s) {
+                        PublicInbox::GitHTTPBackend::serve($pr, $git, $1);
+                } else {
+                        my $ru = getrusage();
+                        my $b = $ru->maxrss . "\n";
+                        [ 200, [ qw(Content-Type text/plain Content-Length),
+                                 length($b) ], [ $b ] ]
+                }
+        }
+}
diff --git a/t/git-http-backend.t b/t/git-http-backend.t
new file mode 100644
index 00000000..889d507a
--- /dev/null
+++ b/t/git-http-backend.t
@@ -0,0 +1,134 @@
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+use File::Temp qw/tempdir/;
+use IO::Socket;
+use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
+use Socket qw(SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
+use POSIX qw(dup2 setsid);
+use Cwd qw(getcwd);
+
+my $git_dir = $ENV{GIANT_GIT_DIR};
+plan 'skip_all' => 'GIANT_GIT_DIR not defined' unless $git_dir;
+foreach my $mod (qw(Danga::Socket
+                        Plack::Util Plack::Request Plack::Builder
+                        HTTP::Date HTTP::Status Net::HTTP)) {
+        eval "require $mod";
+        plan skip_all => "$mod missing for git-http-backend.t" if $@;
+}
+my $psgi = getcwd()."/t/git-http-backend.psgi";
+my $tmpdir = tempdir('pi-git-http-backend-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+my $err = "$tmpdir/stderr.log";
+my $out = "$tmpdir/stdout.log";
+my $httpd = 'blib/script/public-inbox-httpd';
+my %opts = (
+        LocalAddr => '127.0.0.1',
+        ReuseAddr => 1,
+        Proto => 'tcp',
+        Type => SOCK_STREAM,
+        Listen => 1024,
+);
+my $sock = IO::Socket::INET->new(%opts);
+my $host = $sock->sockhost;
+my $port = $sock->sockport;
+my $pid;
+END { kill 'TERM', $pid if defined $pid };
+
+my $get_maxrss = sub {
+        my $http = Net::HTTP->new(Host => "$host:$port");
+        ok($http, 'Net::HTTP object created for maxrss');
+        $http->write_request(GET => '/');
+        my ($code, $mess, %h) = $http->read_response_headers;
+        is($code, 200, 'success reading maxrss');
+        my $n = $http->read_entity_body(my $buf, 256);
+        ok(defined $n, 'read response body');
+        like($buf, qr/\A\d+\n\z/, 'got memory response');
+        ok(int($buf) > 0, 'got non-zero memory response');
+        int($buf);
+};
+
+{
+        ok($sock, 'sock created');
+        $pid = fork;
+        if ($pid == 0) { # pretend to be systemd
+                fcntl($sock, F_SETFD, 0);
+                dup2(fileno($sock), 3) or die "dup2 failed: $!\n";
+                $ENV{LISTEN_PID} = $$;
+                $ENV{LISTEN_FDS} = 1;
+                $ENV{TEST_CHUNK} = '1';
+                exec $httpd, "--stdout=$out", "--stderr=$err", $psgi;
+                die "FAIL: $!\n";
+        }
+        ok(defined $pid, 'forked httpd process successfully');
+}
+my $mem_a = $get_maxrss->();
+
+SKIP: {
+        my $max = 0;
+        my $pack;
+        my $glob = "$git_dir/objects/pack/pack-*.pack";
+        foreach my $f (glob($glob)) {
+                my $n = -s $f;
+                if ($n > $max) {
+                        $max = $n;
+                        $pack = $f;
+                }
+        }
+        skip "no packs found in $git_dir" unless defined $pack;
+        if ($pack !~ m!(/objects/pack/pack-[a-f0-9]{40}.pack)\z!) {
+                skip "bad pack name: $pack";
+        }
+        my $url = $1;
+        my $http = Net::HTTP->new(Host => "$host:$port");
+        ok($http, 'Net::HTTP object created');
+        $http->write_request(GET => $url);
+        my ($code, $mess, %h) = $http->read_response_headers;
+        is(200, $code, 'got 200 success for pack');
+        is($max, $h{'Content-Length'}, 'got expected Content-Length for pack');
+        foreach my $i (1..3) {
+                sleep 1;
+                my $diff = $get_maxrss->() - $mem_a;
+                note "${diff}K memory increase after $i seconds";
+                ok($diff < 1024, 'no bloating caused by slow dumb client');
+        }
+}
+
+{
+        my $c = fork;
+        if ($c == 0) {
+                setsid();
+                exec qw(git clone -q --mirror), "http://$host:$port/",
+                        "$tmpdir/mirror.git";
+                die "Failed start git clone: $!\n";
+        }
+        select(undef, undef, undef, 0.1);
+        foreach my $i (1..10) {
+                is(1, kill('STOP', -$c), 'signaled clone STOP');
+                sleep 1;
+                ok(kill('CONT', -$c), 'continued clone');
+                my $diff = $get_maxrss->() - $mem_a;
+                note "${diff}K memory increase after $i seconds";
+                ok($diff < 2048, 'no bloating caused by slow smart client');
+        }
+        ok(kill('CONT', -$c), 'continued clone');
+        is($c, waitpid($c, 0), 'reaped wayward slow clone');
+        is($?, 0, 'clone did not error out');
+        note 'clone done, fsck-ing clone result...';
+        is(0, system("git", "--git-dir=$tmpdir/mirror.git",
+                        qw(fsck --no-progress)),
+                'fsck did not report corruption');
+
+        my $diff = $get_maxrss->() - $mem_a;
+        note "${diff}K memory increase after smart clone";
+        ok($diff < 2048, 'no bloating caused by slow smart client');
+}
+
+{
+        ok(kill('TERM', $pid), 'killed httpd');
+        $pid = undef;
+        waitpid(-1, 0);
+}
+
+done_testing();