about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2016-05-22 03:58:00 +0000
committerEric Wong <e@80x24.org>2016-05-22 09:05:00 +0000
commit5f1e464e0ea66f0be884f9df0a260dfafabfbee1 (patch)
tree13e886428cc2e6f75734a3e6f3f463b61c6c3522
parentbe964491217f27aa01652becd5a173c5044772f4 (diff)
downloadpublic-inbox-5f1e464e0ea66f0be884f9df0a260dfafabfbee1.tar.gz
Unfortunately, the original design did not work because
middleware can wrap the response body and make `async_pass'
invisible to HTTP.pm
-rw-r--r--lib/PublicInbox/HTTP.pm24
-rw-r--r--lib/PublicInbox/HTTPD/Async.pm45
2 files changed, 31 insertions, 38 deletions
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 4eb14481..480800bd 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -11,7 +11,7 @@ package PublicInbox::HTTP;
 use strict;
 use warnings;
 use base qw(Danga::Socket);
-use fields qw(httpd env rbuf input_left remote_addr remote_port);
+use fields qw(httpd env rbuf input_left remote_addr remote_port forward);
 use Fcntl qw(:seek);
 use Plack::HTTPParser qw(parse_http_request); # XS or pure Perl
 use HTTP::Status qw(status_message);
@@ -219,24 +219,6 @@ sub response_write {
                 if (ref $body eq 'ARRAY') {
                         $write->($_) foreach @$body;
                         $close->();
-                } elsif ($body->can('async_pass')) { # HTTPD::Async
-                        # prevent us from reading the body faster than we
-                        # can write to the client
-                        my $restart_read = sub { $body->watch_read(1) };
-                        $body->async_pass(sub {
-                                local $/ = \8192;
-                                my $buf = $body->getline;
-                                if (defined $buf) {
-                                        $write->($buf);
-                                        if ($self->{write_buf_size}) {
-                                                $body->watch_read(0);
-                                                $self->write($restart_read);
-                                        }
-                                        return; # continue waiting
-                                }
-                                $body->close;
-                                $close->();
-                        });
                 } else {
                         my $pull;
                         $pull = sub {
@@ -438,7 +420,9 @@ sub event_err { $_[0]->close }
 
 sub close {
         my $self = shift;
-        $self->{env} = undef;
+        my $forward = $self->{forward};
+        $forward->close if $forward;
+        $self->{forward} = $self->{env} = undef;
         $self->SUPER::close(@_);
 }
 
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 8f3a6a09..8efa7a66 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -21,29 +21,38 @@ sub new {
         $self;
 }
 
-sub async_pass { $_[0]->{cb} = $_[1] }
-sub event_read { $_[0]->{cb}->() }
-sub event_hup { $_[0]->{cb}->() }
-sub event_err { $_[0]->{cb}->() }
-sub sysread { shift->{sock}->sysread(@_) }
-
-sub getline {
-        my ($self) = @_;
-        die 'getline called without $/ ref' unless ref $/;
-        while (1) {
-                my $ret = $self->read(8192); # Danga::Socket::read
-                return $$ret if defined $ret;
+sub async_pass {
+        my ($self, $io, $fh) = @_;
+        my $restart_read = sub { $self->watch_read(1) };
 
-                return unless $!{EAGAIN} || $!{EINTR};
+        # In case the client HTTP connection ($io) dies, it
+        # will automatically close this ($self) object.
+        $io->{forward} = $self;
+        $self->{cb} = sub {
+                my $r = sysread($self->{sock}, my $buf, 8192);
+                if ($r) {
+                        $fh->write($buf);
+                        if ($io->{write_buf_size}) {
+                                $self->watch_read(0);
+                                $io->write($restart_read);
+                        }
+                        return; # stay in watch_read
+                } elsif (!defined $r) {
+                        return if $!{EAGAIN} || $!{EINTR};
+                }
 
-                # in case of spurious wakeup, hopefully we never hit this
-                my $vin = '';
-                vec($vin, $self->{fd}, 1) = 1;
-                my $n;
-                do { $n = select($vin, undef, undef, undef) } until $n;
+                # Done! Error handling will happen in $fh->close
+                $io->{forward} = undef;
+                $self->close;
+                $fh->close;
         }
 }
 
+sub event_read { $_[0]->{cb}->() }
+sub event_hup { $_[0]->{cb}->() }
+sub event_err { $_[0]->{cb}->() }
+sub sysread { shift->{sock}->sysread(@_) }
+
 sub close {
         my $self = shift;
         $self->{cb} = undef;