about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2016-05-03 02:34:57 +0000
committerEric Wong <e@80x24.org>2016-05-03 09:12:42 +0000
commit1bd4f49d3e6e19f71d170058cc8c6cb466dc5b9f (patch)
tree966c920f4bbb367cf43af4ab997ec2eabe789bd3
parentce44148b662558922523465f29d372912b7c6a61 (diff)
downloadpublic-inbox-1bd4f49d3e6e19f71d170058cc8c6cb466dc5b9f.tar.gz
When serving large static files or large packs, we may call
Danga::Socket::write directly to queue up callbacks to resume
reading and defer firing them until the socket is writable.
This prevents us from scheduling writes or buffering until we
know the socket is writable and prevents needless buffering by
Danga::Socket when faced with slow clients.

For smart clones, this comes at the cost of throttling the
output of "git pack-objects" to the speed of the client
connection.  This is probably not ideal, but is the behavior of
the standard git-daemon, too; and is preferable to running the
httpd out-of-memory.  Buffering to the filesystem may be an
option in the future...
-rw-r--r--lib/PublicInbox/GitHTTPBackend.pm111
-rw-r--r--t/git-http-backend.psgi28
-rw-r--r--t/git-http-backend.t134
3 files changed, 246 insertions, 27 deletions
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index 937b2e9a..020b0886 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -33,6 +33,15 @@ our $ANY = join('|', @binary, @text);
 my $BIN = join('|', @binary);
 my $TEXT = join('|', @text);
 
+my $nextq;
+sub do_next () {
+        my $q = $nextq;
+        $nextq = undef;
+        while (my $cb = shift @$q) {
+                $cb->(); # this may redefine nextq
+        }
+}
+
 sub r {
         [ $_[0] , [qw(Content-Type text/plain Content-Length 0) ], [] ]
 }
@@ -50,6 +59,17 @@ sub serve {
         serve_dumb($cgi, $git, $path);
 }
 
+sub err ($@) {
+        my ($env, @msg) = @_;
+        $env->{'psgi.errors'}->print(@msg, "\n");
+}
+
+sub drop_client ($) {
+        if (my $io = $_[0]->{'psgix.io'}) {
+                $io->close; # this is Danga::Socket::close
+        }
+}
+
 sub serve_dumb {
         my ($cgi, $git, $path) = @_;
 
@@ -61,18 +81,51 @@ sub serve_dumb {
         } else {
                 return r(404);
         }
+
         my $f = "$git->{git_dir}/$path";
-        return r(404) unless -f $f && -r _;
+        return r(404) unless -f $f && -r _; # just in case it's a FIFO :P
         my @st = stat(_);
         my $size = $st[7];
+        my $env = $cgi->{env};
 
-        # TODO: If-Modified-Since and Last-Modified
+        # TODO: If-Modified-Since and Last-Modified?
         open my $in, '<', $f or return r(404);
-        my $code = 200;
         my $len = $size;
-        my @h;
+        my $n = 65536; # try to negotiate a big TCP window, first
+        my ($next, $fh);
+        my $cb = sub {
+                $n = $len if $len < $n;
+                my $r = sysread($in, my $buf, $n);
+                if (!defined $r) {
+                        err($env, "$f read error: $!");
+                        drop_client($env);
+                } elsif ($r <= 0) {
+                        err($env, "$f EOF with $len bytes left");
+                        drop_client($env);
+                } else {
+                        $len -= $r;
+                        $fh->write($buf);
+                        if ($len == 0) {
+                                $fh->close;
+                        } elsif ($next) {
+                                # avoid recursion in Danga::Socket::write
+                                unless ($nextq) {
+                                        $nextq = [];
+                                        Danga::Socket->AddTimer(0, *do_next);
+                                }
+                                # avoid buffering too much in case we have
+                                # slow clients:
+                                $n = 8192;
+                                push @$nextq, $next;
+                                return;
+                        }
+                }
+                # all done, cleanup references:
+                $fh = $next = undef;
+        };
 
-        my $env = $cgi->{env};
+        my $code = 200;
+        my @h = ('Content-Type', $type);
         my $range = $env->{HTTP_RANGE};
         if (defined $range && $range =~ /\bbytes=(\d*)-(\d*)\z/) {
                 ($code, $len) = prepare_range($cgi, $in, \@h, $1, $2, $size);
@@ -81,22 +134,18 @@ sub serve_dumb {
                         return [ 416, \@h, [] ];
                 }
         }
+        push @h, 'Content-Length', $len;
 
-        push @h, 'Content-Type', $type, 'Content-Length', $len;
         sub {
                 my ($res) = @_; # Plack callback
-                my $fh = $res->([ $code, \@h ]);
-                my $buf;
-                my $n = 8192;
-                while ($len > 0) {
-                        $n = $len if $len < $n;
-                        my $r = sysread($in, $buf, $n);
-                        last if (!defined($r) || $r <= 0);
-                        $len -= $r;
-                        $fh->write($buf);
+                $fh = $res->([ $code, \@h ]);
+                if (defined $env->{'pi-httpd.async'}) {
+                        my $pi_http = $env->{'psgix.io'};
+                        $next = sub { $pi_http->write($cb) };
+                        $cb->(); # start it off!
+                } else {
+                        $cb->() while $fh;
                 }
-                die "$f truncated with $len bytes remaining\n" if $len;
-                $fh->close;
         }
 }
 
@@ -149,7 +198,6 @@ sub serve_smart {
         my $input = $env->{'psgi.input'};
         my $buf;
         my $in;
-        my $err = $env->{'psgi.errors'};
         my $fd = eval { fileno($input) };
         if (defined $fd && $fd >= 0) {
                 $in = $input;
@@ -158,7 +206,7 @@ sub serve_smart {
         }
         my ($rpipe, $wpipe);
         unless (pipe($rpipe, $wpipe)) {
-                $err->print("error creating pipe: $! - going static\n");
+                err($env, "error creating pipe: $! - going static");
                 return;
         }
         my %env = %ENV;
@@ -179,13 +227,23 @@ sub serve_smart {
         my %rdr = ( 0 => fileno($in), 1 => fileno($wpipe) );
         my $pid = spawn([qw(git http-backend)], \%env, \%rdr);
         unless (defined $pid) {
-                $err->print("error spawning: $! - going static\n");
+                err($env, "error spawning: $! - going static");
                 return;
         }
         $wpipe = $in = undef;
         $buf = '';
         my ($vin, $fh, $res);
         $nr_running++;
+
+        # Danga::Socket users, we queue up the read_enable callback to
+        # fire after pending writes are complete:
+        my $pi_http = $env->{'psgix.io'};
+        my $read_enable = sub { $rpipe->watch_read(1) };
+        my $read_disable = sub {
+                $rpipe->watch_read(0);
+                $pi_http->write($read_enable);
+        };
+
         my $end = sub {
                 if ($fh) {
                         $fh->close;
@@ -202,10 +260,8 @@ sub serve_smart {
                         my $e = $pid == waitpid($pid, 0) ?
                                 $? : "PID:$pid still running?";
                         if ($e) {
-                                $err->print("http-backend ($git_dir): $e\n");
-                                if (my $io = $env->{'psgix.io'}) {
-                                        $io->close;
-                                }
+                                err($env, "git http-backend ($git_dir): $e");
+                                drop_client($env);
                         }
                 }
                 return unless $res;
@@ -220,7 +276,7 @@ sub serve_smart {
                 }
                 my $e = $!;
                 $end->();
-                $err->print("git http-backend ($git_dir): $e\n");
+                err($env, "git http-backend ($git_dir): $e\n");
         };
         my $cb = sub { # read git-http-backend output and stream to client
                 my $r = $rpipe ? $rpipe->sysread($buf, 8192, length($buf)) : 0;
@@ -229,6 +285,7 @@ sub serve_smart {
                 if ($fh) { # stream body from git-http-backend to HTTP client
                         $fh->write($buf);
                         $buf = '';
+                        $read_disable->() if $read_disable;
                 } elsif ($buf =~ s/\A(.*?)\r\n\r\n//s) { # parse headers
                         my $h = $1;
                         my $code = 200;
@@ -257,6 +314,7 @@ sub serve_smart {
                 $rpipe = $async->($rpipe, $cb);
                 sub { ($res) = @_ } # let Danga::Socket handle the rest.
         } else { # synchronous loop for other PSGI servers
+                $read_enable = $read_disable = undef;
                 $vin = '';
                 vec($vin, fileno($rpipe), 1) = 1;
                 sub {
@@ -274,8 +332,7 @@ sub input_to_file {
         while (1) {
                 my $r = $input->read($buf, 8192);
                 unless (defined $r) {
-                        my $err = $env->{'psgi.errors'};
-                        $err->print("error reading input: $!\n");
+                        err($env, "error reading input: $!");
                         return;
                 }
                 last if ($r == 0);
diff --git a/t/git-http-backend.psgi b/t/git-http-backend.psgi
new file mode 100644
index 00000000..8cec7d35
--- /dev/null
+++ b/t/git-http-backend.psgi
@@ -0,0 +1,28 @@
+#!/usr/bin/perl -w
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use PublicInbox::GitHTTPBackend;
+use PublicInbox::Git;
+use Plack::Builder;
+use Plack::Request;
+use BSD::Resource qw(getrusage);
+my $git_dir = $ENV{GIANT_GIT_DIR} or die 'GIANT_GIT_DIR not defined in env';
+my $git = PublicInbox::Git->new($git_dir);
+builder {
+        enable 'Chunked' if $ENV{TEST_CHUNK};
+        enable 'Head';
+        sub {
+                my ($env) = @_;
+                my $pr = Plack::Request->new($env);
+                if ($pr->path_info =~ m!\A/(.+)\z!s) {
+                        PublicInbox::GitHTTPBackend::serve($pr, $git, $1);
+                } else {
+                        my $ru = getrusage();
+                        my $b = $ru->maxrss . "\n";
+                        [ 200, [ qw(Content-Type text/plain Content-Length),
+                                 length($b) ], [ $b ] ]
+                }
+        }
+}
diff --git a/t/git-http-backend.t b/t/git-http-backend.t
new file mode 100644
index 00000000..889d507a
--- /dev/null
+++ b/t/git-http-backend.t
@@ -0,0 +1,134 @@
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+use File::Temp qw/tempdir/;
+use IO::Socket;
+use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
+use Socket qw(SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
+use POSIX qw(dup2 setsid);
+use Cwd qw(getcwd);
+
+my $git_dir = $ENV{GIANT_GIT_DIR};
+plan 'skip_all' => 'GIANT_GIT_DIR not defined' unless $git_dir;
+foreach my $mod (qw(Danga::Socket
+                        Plack::Util Plack::Request Plack::Builder
+                        HTTP::Date HTTP::Status Net::HTTP)) {
+        eval "require $mod";
+        plan skip_all => "$mod missing for git-http-backend.t" if $@;
+}
+my $psgi = getcwd()."/t/git-http-backend.psgi";
+my $tmpdir = tempdir('pi-git-http-backend-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+my $err = "$tmpdir/stderr.log";
+my $out = "$tmpdir/stdout.log";
+my $httpd = 'blib/script/public-inbox-httpd';
+my %opts = (
+        LocalAddr => '127.0.0.1',
+        ReuseAddr => 1,
+        Proto => 'tcp',
+        Type => SOCK_STREAM,
+        Listen => 1024,
+);
+my $sock = IO::Socket::INET->new(%opts);
+my $host = $sock->sockhost;
+my $port = $sock->sockport;
+my $pid;
+END { kill 'TERM', $pid if defined $pid };
+
+my $get_maxrss = sub {
+        my $http = Net::HTTP->new(Host => "$host:$port");
+        ok($http, 'Net::HTTP object created for maxrss');
+        $http->write_request(GET => '/');
+        my ($code, $mess, %h) = $http->read_response_headers;
+        is($code, 200, 'success reading maxrss');
+        my $n = $http->read_entity_body(my $buf, 256);
+        ok(defined $n, 'read response body');
+        like($buf, qr/\A\d+\n\z/, 'got memory response');
+        ok(int($buf) > 0, 'got non-zero memory response');
+        int($buf);
+};
+
+{
+        ok($sock, 'sock created');
+        $pid = fork;
+        if ($pid == 0) { # pretend to be systemd
+                fcntl($sock, F_SETFD, 0);
+                dup2(fileno($sock), 3) or die "dup2 failed: $!\n";
+                $ENV{LISTEN_PID} = $$;
+                $ENV{LISTEN_FDS} = 1;
+                $ENV{TEST_CHUNK} = '1';
+                exec $httpd, "--stdout=$out", "--stderr=$err", $psgi;
+                die "FAIL: $!\n";
+        }
+        ok(defined $pid, 'forked httpd process successfully');
+}
+my $mem_a = $get_maxrss->();
+
+SKIP: {
+        my $max = 0;
+        my $pack;
+        my $glob = "$git_dir/objects/pack/pack-*.pack";
+        foreach my $f (glob($glob)) {
+                my $n = -s $f;
+                if ($n > $max) {
+                        $max = $n;
+                        $pack = $f;
+                }
+        }
+        skip "no packs found in $git_dir" unless defined $pack;
+        if ($pack !~ m!(/objects/pack/pack-[a-f0-9]{40}.pack)\z!) {
+                skip "bad pack name: $pack";
+        }
+        my $url = $1;
+        my $http = Net::HTTP->new(Host => "$host:$port");
+        ok($http, 'Net::HTTP object created');
+        $http->write_request(GET => $url);
+        my ($code, $mess, %h) = $http->read_response_headers;
+        is(200, $code, 'got 200 success for pack');
+        is($max, $h{'Content-Length'}, 'got expected Content-Length for pack');
+        foreach my $i (1..3) {
+                sleep 1;
+                my $diff = $get_maxrss->() - $mem_a;
+                note "${diff}K memory increase after $i seconds";
+                ok($diff < 1024, 'no bloating caused by slow dumb client');
+        }
+}
+
+{
+        my $c = fork;
+        if ($c == 0) {
+                setsid();
+                exec qw(git clone -q --mirror), "http://$host:$port/",
+                        "$tmpdir/mirror.git";
+                die "Failed start git clone: $!\n";
+        }
+        select(undef, undef, undef, 0.1);
+        foreach my $i (1..10) {
+                is(1, kill('STOP', -$c), 'signaled clone STOP');
+                sleep 1;
+                ok(kill('CONT', -$c), 'continued clone');
+                my $diff = $get_maxrss->() - $mem_a;
+                note "${diff}K memory increase after $i seconds";
+                ok($diff < 2048, 'no bloating caused by slow smart client');
+        }
+        ok(kill('CONT', -$c), 'continued clone');
+        is($c, waitpid($c, 0), 'reaped wayward slow clone');
+        is($?, 0, 'clone did not error out');
+        note 'clone done, fsck-ing clone result...';
+        is(0, system("git", "--git-dir=$tmpdir/mirror.git",
+                        qw(fsck --no-progress)),
+                'fsck did not report corruption');
+
+        my $diff = $get_maxrss->() - $mem_a;
+        note "${diff}K memory increase after smart clone";
+        ok($diff < 2048, 'no bloating caused by slow smart client');
+}
+
+{
+        ok(kill('TERM', $pid), 'killed httpd');
+        $pid = undef;
+        waitpid(-1, 0);
+}
+
+done_testing();