about summary refs log tree commit homepage
path: root/lib/PublicInbox/HTTPD/Async.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/HTTPD/Async.pm')
-rw-r--r--lib/PublicInbox/HTTPD/Async.pm44
1 files changed, 24 insertions, 20 deletions
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 604627ab..b46baeb2 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -11,6 +11,7 @@ use warnings;
 use base qw(PublicInbox::DS);
 use fields qw(cb cleanup);
 require PublicInbox::EvCleanup;
+use Errno qw(EAGAIN);
 
 sub new {
         my ($class, $io, $cb, $cleanup) = @_;
@@ -25,18 +26,13 @@ sub new {
 
         my $self = fields::new($class);
         IO::Handle::blocking($io, 0);
-        $self->SUPER::new($io);
+        $self->SUPER::new($io, PublicInbox::DS::EPOLLIN());
         $self->{cb} = $cb;
         $self->{cleanup} = $cleanup;
-        $self->watch_read(1);
         $self;
 }
 
-# fires after pending writes are complete:
-sub restart_read_cb ($) {
-        my ($self) = @_;
-        sub { $self->watch_read(1) }
-}
+sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
 
 sub main_cb ($$$) {
         my ($http, $fh, $bref) = @_;
@@ -44,24 +40,31 @@ sub main_cb ($$$) {
                 my ($self) = @_;
                 my $r = sysread($self->{sock}, $$bref, 8192);
                 if ($r) {
-                        $fh->write($$bref);
-                        unless ($http->{closed}) { # PublicInbox::DS sets this
-                                if (scalar @{$http->{wbuf}}) {
-                                        $self->watch_read(0);
-                                        $http->write(restart_read_cb($self));
+                        $fh->write($$bref); # may call $http->close
+
+                        if ($http->{sock}) { # !closed
+                                if ($http->{wbuf}) {
+                                        # HTTP client could not keep up, so
+                                        # stop reading and buffering.
+                                        $self->watch(0);
+
+                                        # Tell the HTTP socket to restart us
+                                        # when HTTP client is done draining
+                                        # $http->{wbuf}:
+                                        $http->enqueue_restart_pass;
                                 }
-                                # stay in watch_read, but let other clients
+                                # stay in EPOLLIN, but let other clients
                                 # get some work done, too.
                                 return;
                         }
                         # fall through to close below...
                 } elsif (!defined $r) {
-                        return if $!{EAGAIN} || $!{EINTR};
+                        return restart_read($self) if $! == EAGAIN;
                 }
 
                 # Done! Error handling will happen in $fh->close
                 # called by the {cleanup} handler
-                $http->{forward} = undef;
+                delete $http->{forward};
                 $self->close;
         }
 }
@@ -78,13 +81,14 @@ sub async_pass {
 sub event_step { $_[0]->{cb}->(@_) }
 
 sub close {
-        my $self = shift;
-        my $cleanup = $self->{cleanup};
-        $self->{cleanup} = $self->{cb} = undef;
-        $self->SUPER::close(@_);
+        my $self = $_[0];
+        delete $self->{cb};
+        $self->SUPER::close;
 
         # we defer this to the next timer loop since close is deferred
-        PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+        if (my $cleanup = delete $self->{cleanup}) {
+                PublicInbox::EvCleanup::next_tick($cleanup);
+        }
 }
 
 1;