about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2016-05-23 04:01:14 +0000
committerEric Wong <e@80x24.org>2016-05-23 05:58:35 +0000
commit347c6ee595c37d4e2214cb297811f154a41c452f (patch)
tree3c93033bac15169389ce4b579502759de1698fda /lib
parent311c2adc8c639813e0078631a8d97e5008452682 (diff)
downloadpublic-inbox-347c6ee595c37d4e2214cb297811f154a41c452f.tar.gz
We will have clients dropping connections during long clone
and fetch operations; so do not retain references holding
backend processes once we detect a client has dropped.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/GitHTTPBackend.pm82
-rw-r--r--lib/PublicInbox/HTTPD.pm5
-rw-r--r--lib/PublicInbox/HTTPD/Async.pm23
3 files changed, 55 insertions, 55 deletions
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index 70990ebc..ded56b33 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -194,6 +194,7 @@ sub serve_smart {
                 return;
         }
         $wpipe = $in = undef;
+        my $fh;
         my $end = sub {
                 $rpipe = undef;
                 my $e = $pid == waitpid($pid, 0) ?
@@ -202,60 +203,57 @@ sub serve_smart {
                         err($env, "git http-backend ($git_dir): $e");
                         drop_client($env);
                 }
+                $fh->close if $fh; # async-only
         };
 
         # Danga::Socket users, we queue up the read_enable callback to
         # fire after pending writes are complete:
         my $buf = '';
-        if (my $async = $env->{'pi-httpd.async'}) {
-                my $res;
-                my $q = sub {
-                        $async->close;
-                        $end->();
-                        $res->(@_);
-                };
-                # $async is PublicInbox::HTTPD::Async->new($rpipe, $cb)
-                $async = $async->($rpipe, sub {
-                        my $r = sysread($rpipe, $buf, 1024, length($buf));
-                        if (!defined $r || $r == 0) {
-                                return $q->(r(500, 'http-backend error'));
-                        }
-                        $r = parse_cgi_headers(\$buf) or return;
-                        if ($r->[0] == 403) {
-                                return $q->(serve_dumb($cgi, $git, $path));
-                        }
-                        my $fh = $res->($r);
-                        $fh->write($buf);
-                        $buf = undef;
-                        my $dst = Plack::Util::inline_object(
-                                write => sub { $fh->write(@_) },
-                                close => sub {
-                                        $end->();
-                                        $fh->close;
-                                });
-                        $async->async_pass($env->{'psgix.io'}, $dst);
-                });
-                sub { ($res) = @_ }; # let Danga::Socket handle the rest.
-        } else { # getline + close for other PSGI servers
-                my $r;
-                do {
-                        $r = read($rpipe, $buf, 1024, length($buf));
-                        if (!defined $r || $r == 0) {
-                                return r(500, 'http-backend error');
-                        }
-                        $r = parse_cgi_headers(\$buf);
-                } until ($r);
-                return serve_dumb($cgi, $git, $path) if $r->[0] == 403;
+        my $rd_hdr = sub {
+                my $r = sysread($rpipe, $buf, 1024, length($buf));
+                return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
+                return r(500, 'http-backend error') unless $r;
+                $r = parse_cgi_headers(\$buf) or return;
+                $r->[0] == 403 ? serve_dumb($cgi, $git, $path) : $r;
+        };
+        my $res;
+        my $async = $env->{'pi-httpd.async'};
+        my $io = $env->{'psgix.io'};
+        my $cb = sub {
+                my $r = $rd_hdr->() or return;
+                $rd_hdr = undef;
+                if (scalar(@$r) == 3) { # error:
+                        $async->close if $async;
+                        return $res->($r);
+                }
+                if ($async) {
+                        $fh = $res->($r);
+                        return $async->async_pass($io, $fh, \$buf);
+                }
+
+                # for synchronous PSGI servers
                 $r->[2] = Plack::Util::inline_object(
-                        close => sub { $end->() },
+                        close => $end,
                         getline => sub {
                                 my $ret = $buf;
                                 $buf = undef;
                                 defined $ret ? $ret : $rpipe->getline;
                         });
-                $r;
+                $res->($r);
+        };
+        sub {
+                ($res) = @_;
 
-        }
+                # hopefully this doesn't break any middlewares,
+                # holding the input here is a waste of FDs and memory
+                $env->{'psgi.input'} = undef;
+
+                if ($async) {
+                        $async = $async->($rpipe, $cb, $end);
+                } else { # generic PSGI
+                        $cb->() while $rd_hdr;
+                }
+        };
 }
 
 sub input_to_file {
diff --git a/lib/PublicInbox/HTTPD.pm b/lib/PublicInbox/HTTPD.pm
index 78efaa50..433d6da7 100644
--- a/lib/PublicInbox/HTTPD.pm
+++ b/lib/PublicInbox/HTTPD.pm
@@ -8,10 +8,7 @@ use Plack::Util;
 require PublicInbox::HTTPD::Async;
 require PublicInbox::Daemon;
 
-sub pi_httpd_async {
-        my ($io, $cb) = @_;
-        PublicInbox::HTTPD::Async->new($io, $cb);
-}
+sub pi_httpd_async { PublicInbox::HTTPD::Async->new(@_) }
 
 sub new {
         my ($class, $sock, $app) = @_;
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 8efa7a66..bd2eacbf 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -9,32 +9,33 @@ package PublicInbox::HTTPD::Async;
 use strict;
 use warnings;
 use base qw(Danga::Socket);
-use fields qw(cb);
+use fields qw(cb cleanup);
 
 sub new {
-        my ($class, $io, $cb) = @_;
+        my ($class, $io, $cb, $cleanup) = @_;
         my $self = fields::new($class);
         IO::Handle::blocking($io, 0);
         $self->SUPER::new($io);
         $self->{cb} = $cb;
+        $self->{cleanup} = $cleanup;
         $self->watch_read(1);
         $self;
 }
 
 sub async_pass {
-        my ($self, $io, $fh) = @_;
+        my ($self, $io, $fh, $bref) = @_;
         my $restart_read = sub { $self->watch_read(1) };
-
         # In case the client HTTP connection ($io) dies, it
         # will automatically close this ($self) object.
         $io->{forward} = $self;
+        $fh->write($$bref);
         $self->{cb} = sub {
-                my $r = sysread($self->{sock}, my $buf, 8192);
+                my $r = sysread($self->{sock}, $$bref, 8192);
                 if ($r) {
-                        $fh->write($buf);
+                        $fh->write($$bref);
                         if ($io->{write_buf_size}) {
                                 $self->watch_read(0);
-                                $io->write($restart_read);
+                                $io->write($restart_read); # D::S::write
                         }
                         return; # stay in watch_read
                 } elsif (!defined $r) {
@@ -42,9 +43,9 @@ sub async_pass {
                 }
 
                 # Done! Error handling will happen in $fh->close
+                # called by the {cleanup} handler
                 $io->{forward} = undef;
                 $self->close;
-                $fh->close;
         }
 }
 
@@ -55,8 +56,12 @@ sub sysread { shift->{sock}->sysread(@_) }
 
 sub close {
         my $self = shift;
-        $self->{cb} = undef;
+        my $cleanup = $self->{cleanup};
+        $self->{cleanup} = $self->{cb} = undef;
         $self->SUPER::close(@_);
+
+        # we defer this to the next timer loop since close is deferred
+        Danga::Socket->AddTimer(0, $cleanup) if $cleanup;
 }
 
 # do not let ourselves be closed during graceful termination