about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/GitHTTPBackend.pm111
1 files changed, 84 insertions, 27 deletions
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index 937b2e9a..020b0886 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -33,6 +33,15 @@ our $ANY = join('|', @binary, @text);
 my $BIN = join('|', @binary);
 my $TEXT = join('|', @text);
 
+my $nextq;
+sub do_next () {
+        my $q = $nextq;
+        $nextq = undef;
+        while (my $cb = shift @$q) {
+                $cb->(); # this may redefine nextq
+        }
+}
+
 sub r {
         [ $_[0] , [qw(Content-Type text/plain Content-Length 0) ], [] ]
 }
@@ -50,6 +59,17 @@ sub serve {
         serve_dumb($cgi, $git, $path);
 }
 
+sub err ($@) {
+        my ($env, @msg) = @_;
+        $env->{'psgi.errors'}->print(@msg, "\n");
+}
+
+sub drop_client ($) {
+        if (my $io = $_[0]->{'psgix.io'}) {
+                $io->close; # this is Danga::Socket::close
+        }
+}
+
 sub serve_dumb {
         my ($cgi, $git, $path) = @_;
 
@@ -61,18 +81,51 @@ sub serve_dumb {
         } else {
                 return r(404);
         }
+
         my $f = "$git->{git_dir}/$path";
-        return r(404) unless -f $f && -r _;
+        return r(404) unless -f $f && -r _; # just in case it's a FIFO :P
         my @st = stat(_);
         my $size = $st[7];
+        my $env = $cgi->{env};
 
-        # TODO: If-Modified-Since and Last-Modified
+        # TODO: If-Modified-Since and Last-Modified?
         open my $in, '<', $f or return r(404);
-        my $code = 200;
         my $len = $size;
-        my @h;
+        my $n = 65536; # try to negotiate a big TCP window, first
+        my ($next, $fh);
+        my $cb = sub {
+                $n = $len if $len < $n;
+                my $r = sysread($in, my $buf, $n);
+                if (!defined $r) {
+                        err($env, "$f read error: $!");
+                        drop_client($env);
+                } elsif ($r <= 0) {
+                        err($env, "$f EOF with $len bytes left");
+                        drop_client($env);
+                } else {
+                        $len -= $r;
+                        $fh->write($buf);
+                        if ($len == 0) {
+                                $fh->close;
+                        } elsif ($next) {
+                                # avoid recursion in Danga::Socket::write
+                                unless ($nextq) {
+                                        $nextq = [];
+                                        Danga::Socket->AddTimer(0, *do_next);
+                                }
+                                # avoid buffering too much in case we have
+                                # slow clients:
+                                $n = 8192;
+                                push @$nextq, $next;
+                                return;
+                        }
+                }
+                # all done, cleanup references:
+                $fh = $next = undef;
+        };
 
-        my $env = $cgi->{env};
+        my $code = 200;
+        my @h = ('Content-Type', $type);
         my $range = $env->{HTTP_RANGE};
         if (defined $range && $range =~ /\bbytes=(\d*)-(\d*)\z/) {
                 ($code, $len) = prepare_range($cgi, $in, \@h, $1, $2, $size);
@@ -81,22 +134,18 @@ sub serve_dumb {
                         return [ 416, \@h, [] ];
                 }
         }
+        push @h, 'Content-Length', $len;
 
-        push @h, 'Content-Type', $type, 'Content-Length', $len;
         sub {
                 my ($res) = @_; # Plack callback
-                my $fh = $res->([ $code, \@h ]);
-                my $buf;
-                my $n = 8192;
-                while ($len > 0) {
-                        $n = $len if $len < $n;
-                        my $r = sysread($in, $buf, $n);
-                        last if (!defined($r) || $r <= 0);
-                        $len -= $r;
-                        $fh->write($buf);
+                $fh = $res->([ $code, \@h ]);
+                if (defined $env->{'pi-httpd.async'}) {
+                        my $pi_http = $env->{'psgix.io'};
+                        $next = sub { $pi_http->write($cb) };
+                        $cb->(); # start it off!
+                } else {
+                        $cb->() while $fh;
                 }
-                die "$f truncated with $len bytes remaining\n" if $len;
-                $fh->close;
         }
 }
 
@@ -149,7 +198,6 @@ sub serve_smart {
         my $input = $env->{'psgi.input'};
         my $buf;
         my $in;
-        my $err = $env->{'psgi.errors'};
         my $fd = eval { fileno($input) };
         if (defined $fd && $fd >= 0) {
                 $in = $input;
@@ -158,7 +206,7 @@ sub serve_smart {
         }
         my ($rpipe, $wpipe);
         unless (pipe($rpipe, $wpipe)) {
-                $err->print("error creating pipe: $! - going static\n");
+                err($env, "error creating pipe: $! - going static");
                 return;
         }
         my %env = %ENV;
@@ -179,13 +227,23 @@ sub serve_smart {
         my %rdr = ( 0 => fileno($in), 1 => fileno($wpipe) );
         my $pid = spawn([qw(git http-backend)], \%env, \%rdr);
         unless (defined $pid) {
-                $err->print("error spawning: $! - going static\n");
+                err($env, "error spawning: $! - going static");
                 return;
         }
         $wpipe = $in = undef;
         $buf = '';
         my ($vin, $fh, $res);
         $nr_running++;
+
+        # Danga::Socket users, we queue up the read_enable callback to
+        # fire after pending writes are complete:
+        my $pi_http = $env->{'psgix.io'};
+        my $read_enable = sub { $rpipe->watch_read(1) };
+        my $read_disable = sub {
+                $rpipe->watch_read(0);
+                $pi_http->write($read_enable);
+        };
+
         my $end = sub {
                 if ($fh) {
                         $fh->close;
@@ -202,10 +260,8 @@ sub serve_smart {
                         my $e = $pid == waitpid($pid, 0) ?
                                 $? : "PID:$pid still running?";
                         if ($e) {
-                                $err->print("http-backend ($git_dir): $e\n");
-                                if (my $io = $env->{'psgix.io'}) {
-                                        $io->close;
-                                }
+                                err($env, "git http-backend ($git_dir): $e");
+                                drop_client($env);
                         }
                 }
                 return unless $res;
@@ -220,7 +276,7 @@ sub serve_smart {
                 }
                 my $e = $!;
                 $end->();
-                $err->print("git http-backend ($git_dir): $e\n");
+                err($env, "git http-backend ($git_dir): $e\n");
         };
         my $cb = sub { # read git-http-backend output and stream to client
                 my $r = $rpipe ? $rpipe->sysread($buf, 8192, length($buf)) : 0;
@@ -229,6 +285,7 @@ sub serve_smart {
                 if ($fh) { # stream body from git-http-backend to HTTP client
                         $fh->write($buf);
                         $buf = '';
+                        $read_disable->() if $read_disable;
                 } elsif ($buf =~ s/\A(.*?)\r\n\r\n//s) { # parse headers
                         my $h = $1;
                         my $code = 200;
@@ -257,6 +314,7 @@ sub serve_smart {
                 $rpipe = $async->($rpipe, $cb);
                 sub { ($res) = @_ } # let Danga::Socket handle the rest.
         } else { # synchronous loop for other PSGI servers
+                $read_enable = $read_disable = undef;
                 $vin = '';
                 vec($vin, fileno($rpipe), 1) = 1;
                 sub {
@@ -274,8 +332,7 @@ sub input_to_file {
         while (1) {
                 my $r = $input->read($buf, 8192);
                 unless (defined $r) {
-                        my $err = $env->{'psgi.errors'};
-                        $err->print("error reading input: $!\n");
+                        err($env, "error reading input: $!");
                         return;
                 }
                 last if ($r == 0);