From 80bcb77099c2d3d7179c4f2f3ffac2b8083c3bb5 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 22 May 2016 09:06:03 +0000 Subject: git-http-backend: switch to async_pass This simplifies the code somewhat; but it could probably still be made simpler. It will need to support command queueing for expensive commands so expensive processes can be queued up. --- lib/PublicInbox/GitHTTPBackend.pm | 174 ++++++++++++++++++-------------------- 1 file changed, 83 insertions(+), 91 deletions(-) diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm index 97d96d52..cca8a6d6 100644 --- a/lib/PublicInbox/GitHTTPBackend.pm +++ b/lib/PublicInbox/GitHTTPBackend.pm @@ -10,6 +10,7 @@ use Fcntl qw(:seek); use IO::File; use PublicInbox::Spawn qw(spawn); use HTTP::Date qw(time2str); +use HTTP::Status qw(status_message); # n.b. serving "description" and "cloneurl" should be innocuous enough to # not cause problems. serving "config" might... @@ -39,9 +40,12 @@ sub do_next () { } } -sub r ($) { - my ($s) = @_; - [ $s, [qw(Content-Type text/plain Content-Length 0), @no_cache ], [] ] +sub r ($;$) { + my ($code, $msg) = @_; + $msg ||= status_message($code); + my $len = length($msg); + [ $code, [qw(Content-Type text/plain Content-Length), $len, @no_cache], + [$msg] ] } sub serve { @@ -167,14 +171,9 @@ sub prepare_range { sub serve_smart { my ($cgi, $git, $path) = @_; my $env = $cgi->{env}; - - my $input = $env->{'psgi.input'}; - my $buf; - my $in; - my $fd = eval { fileno($input) }; - if (defined $fd && $fd >= 0) { - $in = $input; - } else { + my $in = $env->{'psgi.input'}; + my $fd = eval { fileno($in) }; + unless (defined $fd && $fd >= 0) { $in = input_to_file($env) or return r(500); } my ($rpipe, $wpipe); @@ -204,91 +203,67 @@ sub serve_smart { return; } $wpipe = $in = undef; - $buf = ''; - my ($vin, $fh, $res); - - # Danga::Socket users, we queue up the read_enable callback to - # fire after pending writes are complete: - my $pi_http = $env->{'psgix.io'}; - my $read_enable = sub { $rpipe->watch_read(1) }; - my $read_disable = sub { - $rpipe->watch_read(0); - $pi_http->write($read_enable); - }; - my $end = sub { - if ($fh) { - $fh->close; - $fh = undef; - } - if ($rpipe) { - # _may_ be Danga::Socket::close via - # PublicInbox::HTTPD::Async::close: - $rpipe->close; - $rpipe = undef; - } - if (defined $pid) { - my $e = $pid == waitpid($pid, 0) ? - $? : "PID:$pid still running?"; - err($env, "git http-backend ($git_dir): $e") if $e; - } - return unless $res; - my $dumb = serve_dumb($cgi, $git, $path); - ref($dumb) eq 'ARRAY' ? $res->($dumb) : $dumb->($res); - }; - my $fail = sub { - if ($!{EAGAIN} || $!{EINTR}) { - select($vin, undef, undef, undef) if defined $vin; - # $vin is undef on async, so this is a noop on EAGAIN - return; + $rpipe = undef; + my $e = $pid == waitpid($pid, 0) ? + $? : "PID:$pid still running?"; + if ($e) { + err($env, "git http-backend ($git_dir): $e"); + drop_client($env); } - my $e = $!; - $end->(); - err($env, "git http-backend ($git_dir): $e\n"); - }; - my $cb = sub { # read git-http-backend output and stream to client - my $r = $rpipe ? $rpipe->sysread($buf, 8192, length($buf)) : 0; - return $fail->() unless defined $r; - return $end->() if $r == 0; # EOF - if ($fh) { # stream body from git-http-backend to HTTP client - $fh->write($buf); - $buf = ''; - $read_disable->() if $read_disable; - } elsif ($buf =~ s/\A(.*?)\r\n\r\n//s) { # parse headers - my $h = $1; - my $code = 200; - my @h; - foreach my $l (split(/\r\n/, $h)) { - my ($k, $v) = split(/:\s*/, $l, 2); - if ($k =~ /\AStatus\z/i) { - ($code) = ($v =~ /\b(\d+)\b/); - } else { - push @h, $k, $v; - } - } - if ($code == 403) { - # smart cloning disabled, serve dumbly - # in $end since we never undef $res in here - } else { # write response header: - $fh = $res->([ $code, \@h ]); - $res = undef; - $fh->write($buf); - } - $buf = ''; - } # else { keep reading ... } }; + + # Danga::Socket users, we queue up the read_enable callback to + # fire after pending writes are complete: + my $buf = ''; if (my $async = $env->{'pi-httpd.async'}) { + my $res; + my $q = sub { + $async->close; + $end->(); + $res->(@_); + }; # $async is PublicInbox::HTTPD::Async->new($rpipe, $cb) - $rpipe = $async->($rpipe, $cb); - sub { ($res) = @_ } # let Danga::Socket handle the rest. - } else { # synchronous loop for other PSGI servers - $read_enable = $read_disable = undef; - $vin = ''; - vec($vin, fileno($rpipe), 1) = 1; - sub { - ($res) = @_; - while ($rpipe) { $cb->() } - } + $async = $async->($rpipe, sub { + my $r = sysread($rpipe, $buf, 1024, length($buf)); + if (!defined $r || $r == 0) { + return $q->(r(500, 'http-backend error')); + } + $r = parse_cgi_headers(\$buf) or return; + if ($r->[0] == 403) { + return $q->(serve_dumb($cgi, $git, $path)); + } + my $fh = $res->($r); + $fh->write($buf); + $buf = undef; + my $dst = Plack::Util::inline_object( + write => sub { $fh->write(@_) }, + close => sub { + $end->(); + $fh->close; + }); + $async->async_pass($env->{'psgix.io'}, $dst); + }); + sub { ($res) = @_ }; # let Danga::Socket handle the rest. + } else { # getline + close for other PSGI servers + my $r; + do { + $r = read($rpipe, $buf, 1024, length($buf)); + if (!defined $r || $r == 0) { + return r(500, 'http-backend error'); + } + $r = parse_cgi_headers(\$buf); + } until ($r); + return serve_dumb($cgi, $git, $path) if $r->[0] == 403; + $r->[2] = Plack::Util::inline_object( + close => sub { $end->() }, + getline => sub { + my $ret = $buf; + $buf = undef; + defined $ret ? $ret : $rpipe->getline; + }); + $r; + } } @@ -311,4 +286,21 @@ sub input_to_file { return $in; } +sub parse_cgi_headers { + my ($bref) = @_; + $$bref =~ s/\A(.*?)\r\n\r\n//s or return; + my $h = $1; + my $code = 200; + my @h; + foreach my $l (split(/\r\n/, $h)) { + my ($k, $v) = split(/:\s*/, $l, 2); + if ($k =~ /\AStatus\z/i) { + ($code) = ($v =~ /\b(\d+)\b/); + } else { + push @h, $k, $v; + } + } + [ $code, \@h ] +} + 1; -- cgit v1.2.3-24-ge0c7