From 1bd4f49d3e6e19f71d170058cc8c6cb466dc5b9f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 3 May 2016 02:34:57 +0000 Subject: git-http-backend: reduce memory use for clone/fetch When serving large static files or large packs, we may call Danga::Socket::write directly to queue up callbacks to resume reading and defer firing them until the socket is writable. This prevents us from scheduling writes or buffering until we know the socket is writable and prevents needless buffering by Danga::Socket when faced with slow clients. For smart clones, this comes at the cost of throttling the output of "git pack-objects" to the speed of the client connection. This is probably not ideal, but is the behavior of the standard git-daemon, too; and is preferable to running the httpd out-of-memory. Buffering to the filesystem may be an option in the future... --- lib/PublicInbox/GitHTTPBackend.pm | 111 ++++++++++++++++++++++++++++---------- 1 file changed, 84 insertions(+), 27 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm index 937b2e9a..020b0886 100644 --- a/lib/PublicInbox/GitHTTPBackend.pm +++ b/lib/PublicInbox/GitHTTPBackend.pm @@ -33,6 +33,15 @@ our $ANY = join('|', @binary, @text); my $BIN = join('|', @binary); my $TEXT = join('|', @text); +my $nextq; +sub do_next () { + my $q = $nextq; + $nextq = undef; + while (my $cb = shift @$q) { + $cb->(); # this may redefine nextq + } +} + sub r { [ $_[0] , [qw(Content-Type text/plain Content-Length 0) ], [] ] } @@ -50,6 +59,17 @@ sub serve { serve_dumb($cgi, $git, $path); } +sub err ($@) { + my ($env, @msg) = @_; + $env->{'psgi.errors'}->print(@msg, "\n"); +} + +sub drop_client ($) { + if (my $io = $_[0]->{'psgix.io'}) { + $io->close; # this is Danga::Socket::close + } +} + sub serve_dumb { my ($cgi, $git, $path) = @_; @@ -61,18 +81,51 @@ sub serve_dumb { } else { return r(404); } + my $f = "$git->{git_dir}/$path"; - return r(404) unless -f $f && -r _; + return r(404) unless -f $f && -r _; # just in case it's a FIFO :P my @st = stat(_); my $size = $st[7]; + my $env = $cgi->{env}; - # TODO: If-Modified-Since and Last-Modified + # TODO: If-Modified-Since and Last-Modified? open my $in, '<', $f or return r(404); - my $code = 200; my $len = $size; - my @h; + my $n = 65536; # try to negotiate a big TCP window, first + my ($next, $fh); + my $cb = sub { + $n = $len if $len < $n; + my $r = sysread($in, my $buf, $n); + if (!defined $r) { + err($env, "$f read error: $!"); + drop_client($env); + } elsif ($r <= 0) { + err($env, "$f EOF with $len bytes left"); + drop_client($env); + } else { + $len -= $r; + $fh->write($buf); + if ($len == 0) { + $fh->close; + } elsif ($next) { + # avoid recursion in Danga::Socket::write + unless ($nextq) { + $nextq = []; + Danga::Socket->AddTimer(0, *do_next); + } + # avoid buffering too much in case we have + # slow clients: + $n = 8192; + push @$nextq, $next; + return; + } + } + # all done, cleanup references: + $fh = $next = undef; + }; - my $env = $cgi->{env}; + my $code = 200; + my @h = ('Content-Type', $type); my $range = $env->{HTTP_RANGE}; if (defined $range && $range =~ /\bbytes=(\d*)-(\d*)\z/) { ($code, $len) = prepare_range($cgi, $in, \@h, $1, $2, $size); @@ -81,22 +134,18 @@ sub serve_dumb { return [ 416, \@h, [] ]; } } + push @h, 'Content-Length', $len; - push @h, 'Content-Type', $type, 'Content-Length', $len; sub { my ($res) = @_; # Plack callback - my $fh = $res->([ $code, \@h ]); - my $buf; - my $n = 8192; - while ($len > 0) { - $n = $len if $len < $n; - my $r = sysread($in, $buf, $n); - last if (!defined($r) || $r <= 0); - $len -= $r; - $fh->write($buf); + $fh = $res->([ $code, \@h ]); + if (defined $env->{'pi-httpd.async'}) { + my $pi_http = $env->{'psgix.io'}; + $next = sub { $pi_http->write($cb) }; + $cb->(); # start it off! + } else { + $cb->() while $fh; } - die "$f truncated with $len bytes remaining\n" if $len; - $fh->close; } } @@ -149,7 +198,6 @@ sub serve_smart { my $input = $env->{'psgi.input'}; my $buf; my $in; - my $err = $env->{'psgi.errors'}; my $fd = eval { fileno($input) }; if (defined $fd && $fd >= 0) { $in = $input; @@ -158,7 +206,7 @@ sub serve_smart { } my ($rpipe, $wpipe); unless (pipe($rpipe, $wpipe)) { - $err->print("error creating pipe: $! - going static\n"); + err($env, "error creating pipe: $! - going static"); return; } my %env = %ENV; @@ -179,13 +227,23 @@ sub serve_smart { my %rdr = ( 0 => fileno($in), 1 => fileno($wpipe) ); my $pid = spawn([qw(git http-backend)], \%env, \%rdr); unless (defined $pid) { - $err->print("error spawning: $! - going static\n"); + err($env, "error spawning: $! - going static"); return; } $wpipe = $in = undef; $buf = ''; my ($vin, $fh, $res); $nr_running++; + + # Danga::Socket users, we queue up the read_enable callback to + # fire after pending writes are complete: + my $pi_http = $env->{'psgix.io'}; + my $read_enable = sub { $rpipe->watch_read(1) }; + my $read_disable = sub { + $rpipe->watch_read(0); + $pi_http->write($read_enable); + }; + my $end = sub { if ($fh) { $fh->close; @@ -202,10 +260,8 @@ sub serve_smart { my $e = $pid == waitpid($pid, 0) ? $? : "PID:$pid still running?"; if ($e) { - $err->print("http-backend ($git_dir): $e\n"); - if (my $io = $env->{'psgix.io'}) { - $io->close; - } + err($env, "git http-backend ($git_dir): $e"); + drop_client($env); } } return unless $res; @@ -220,7 +276,7 @@ sub serve_smart { } my $e = $!; $end->(); - $err->print("git http-backend ($git_dir): $e\n"); + err($env, "git http-backend ($git_dir): $e\n"); }; my $cb = sub { # read git-http-backend output and stream to client my $r = $rpipe ? $rpipe->sysread($buf, 8192, length($buf)) : 0; @@ -229,6 +285,7 @@ sub serve_smart { if ($fh) { # stream body from git-http-backend to HTTP client $fh->write($buf); $buf = ''; + $read_disable->() if $read_disable; } elsif ($buf =~ s/\A(.*?)\r\n\r\n//s) { # parse headers my $h = $1; my $code = 200; @@ -257,6 +314,7 @@ sub serve_smart { $rpipe = $async->($rpipe, $cb); sub { ($res) = @_ } # let Danga::Socket handle the rest. } else { # synchronous loop for other PSGI servers + $read_enable = $read_disable = undef; $vin = ''; vec($vin, fileno($rpipe), 1) = 1; sub { @@ -274,8 +332,7 @@ sub input_to_file { while (1) { my $r = $input->read($buf, 8192); unless (defined $r) { - my $err = $env->{'psgi.errors'}; - $err->print("error reading input: $!\n"); + err($env, "error reading input: $!"); return; } last if ($r == 0); -- cgit v1.2.3-24-ge0c7