about summary refs log tree commit homepage
path: root/lib/PublicInbox/HTTPD
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2016-05-23 04:01:14 +0000
committerEric Wong <e@80x24.org>2016-05-23 05:58:35 +0000
commit347c6ee595c37d4e2214cb297811f154a41c452f (patch)
tree3c93033bac15169389ce4b579502759de1698fda /lib/PublicInbox/HTTPD
parent311c2adc8c639813e0078631a8d97e5008452682 (diff)
downloadpublic-inbox-347c6ee595c37d4e2214cb297811f154a41c452f.tar.gz
We will have clients dropping connections during long clone
and fetch operations; so do not retain references holding
backend processes once we detect a client has dropped.
Diffstat (limited to 'lib/PublicInbox/HTTPD')
-rw-r--r--lib/PublicInbox/HTTPD/Async.pm23
1 files changed, 14 insertions, 9 deletions
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 8efa7a66..bd2eacbf 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -9,32 +9,33 @@ package PublicInbox::HTTPD::Async;
 use strict;
 use warnings;
 use base qw(Danga::Socket);
-use fields qw(cb);
+use fields qw(cb cleanup);
 
 sub new {
-        my ($class, $io, $cb) = @_;
+        my ($class, $io, $cb, $cleanup) = @_;
         my $self = fields::new($class);
         IO::Handle::blocking($io, 0);
         $self->SUPER::new($io);
         $self->{cb} = $cb;
+        $self->{cleanup} = $cleanup;
         $self->watch_read(1);
         $self;
 }
 
 sub async_pass {
-        my ($self, $io, $fh) = @_;
+        my ($self, $io, $fh, $bref) = @_;
         my $restart_read = sub { $self->watch_read(1) };
-
         # In case the client HTTP connection ($io) dies, it
         # will automatically close this ($self) object.
         $io->{forward} = $self;
+        $fh->write($$bref);
         $self->{cb} = sub {
-                my $r = sysread($self->{sock}, my $buf, 8192);
+                my $r = sysread($self->{sock}, $$bref, 8192);
                 if ($r) {
-                        $fh->write($buf);
+                        $fh->write($$bref);
                         if ($io->{write_buf_size}) {
                                 $self->watch_read(0);
-                                $io->write($restart_read);
+                                $io->write($restart_read); # D::S::write
                         }
                         return; # stay in watch_read
                 } elsif (!defined $r) {
@@ -42,9 +43,9 @@ sub async_pass {
                 }
 
                 # Done! Error handling will happen in $fh->close
+                # called by the {cleanup} handler
                 $io->{forward} = undef;
                 $self->close;
-                $fh->close;
         }
 }
 
@@ -55,8 +56,12 @@ sub sysread { shift->{sock}->sysread(@_) }
 
 sub close {
         my $self = shift;
-        $self->{cb} = undef;
+        my $cleanup = $self->{cleanup};
+        $self->{cleanup} = $self->{cb} = undef;
         $self->SUPER::close(@_);
+
+        # we defer this to the next timer loop since close is deferred
+        Danga::Socket->AddTimer(0, $cleanup) if $cleanup;
 }
 
 # do not let ourselves be closed during graceful termination