about summary refs log tree commit homepage
path: root/lib/PublicInbox/HTTPD/Async.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2019-06-26 06:36:27 +0000
committerEric Wong <e@80x24.org>2019-06-26 06:36:27 +0000
commit84d8920b92686e975929aebe845b6d4ea0a9ef0d (patch)
tree91a1123aaa44ad8fbb63c9dbf912d6dca95b0818 /lib/PublicInbox/HTTPD/Async.pm
parentc19a4e88f49ba3496751c4b87ebcfa0f6b47f0ce (diff)
parentc30b4427b340aeb242273a7b890fbd7e50132f51 (diff)
downloadpublic-inbox-84d8920b92686e975929aebe845b6d4ea0a9ef0d.tar.gz
* origin/nntp-tls: (59 commits)
  ds: ->write must not clobber empty wbuf array
  Makefile: skip DSKQXS in global syntax check
  ds: reduce overhead of tempfile creation
  Revert "ci: require IO::KQueue on FreeBSD, for now"
  ds: reimplement IO::Poll support to look like epoll
  ds: split out IO::KQueue-specific code
  daemon: use FreeBSD accept filters on non-NNTP
  daemon: set TCP_DEFER_ACCEPT on everything but NNTP
  nntp: send greeting immediately for plain sockets
  ci: require IO::KQueue on FreeBSD, for now
  nntp: lazily allocate and stash rbuf
  ds: flush_write runs ->write callbacks even if closed
  nntp: simplify long response logic and fix nesting
  ds: always use EV_ADD with EV_SET
  nntp: reduce allocations for greeting
  ds: allow ->write callbacks to syswrite directly
  daemon: use SSL_MODE_RELEASE_BUFFERS
  t/nntpd-tls: slow client connection test
  nntp: call SSL_shutdown in normal cases
  ds|nntp: use CORE::close on socket
  ...
Diffstat (limited to 'lib/PublicInbox/HTTPD/Async.pm')
-rw-r--r--lib/PublicInbox/HTTPD/Async.pm44
1 files changed, 24 insertions, 20 deletions
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 604627ab..b46baeb2 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -11,6 +11,7 @@ use warnings;
 use base qw(PublicInbox::DS);
 use fields qw(cb cleanup);
 require PublicInbox::EvCleanup;
+use Errno qw(EAGAIN);
 
 sub new {
         my ($class, $io, $cb, $cleanup) = @_;
@@ -25,18 +26,13 @@ sub new {
 
         my $self = fields::new($class);
         IO::Handle::blocking($io, 0);
-        $self->SUPER::new($io);
+        $self->SUPER::new($io, PublicInbox::DS::EPOLLIN());
         $self->{cb} = $cb;
         $self->{cleanup} = $cleanup;
-        $self->watch_read(1);
         $self;
 }
 
-# fires after pending writes are complete:
-sub restart_read_cb ($) {
-        my ($self) = @_;
-        sub { $self->watch_read(1) }
-}
+sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
 
 sub main_cb ($$$) {
         my ($http, $fh, $bref) = @_;
@@ -44,24 +40,31 @@ sub main_cb ($$$) {
                 my ($self) = @_;
                 my $r = sysread($self->{sock}, $$bref, 8192);
                 if ($r) {
-                        $fh->write($$bref);
-                        unless ($http->{closed}) { # PublicInbox::DS sets this
-                                if (scalar @{$http->{wbuf}}) {
-                                        $self->watch_read(0);
-                                        $http->write(restart_read_cb($self));
+                        $fh->write($$bref); # may call $http->close
+
+                        if ($http->{sock}) { # !closed
+                                if ($http->{wbuf}) {
+                                        # HTTP client could not keep up, so
+                                        # stop reading and buffering.
+                                        $self->watch(0);
+
+                                        # Tell the HTTP socket to restart us
+                                        # when HTTP client is done draining
+                                        # $http->{wbuf}:
+                                        $http->enqueue_restart_pass;
                                 }
-                                # stay in watch_read, but let other clients
+                                # stay in EPOLLIN, but let other clients
                                 # get some work done, too.
                                 return;
                         }
                         # fall through to close below...
                 } elsif (!defined $r) {
-                        return if $!{EAGAIN} || $!{EINTR};
+                        return restart_read($self) if $! == EAGAIN;
                 }
 
                 # Done! Error handling will happen in $fh->close
                 # called by the {cleanup} handler
-                $http->{forward} = undef;
+                delete $http->{forward};
                 $self->close;
         }
 }
@@ -78,13 +81,14 @@ sub async_pass {
 sub event_step { $_[0]->{cb}->(@_) }
 
 sub close {
-        my $self = shift;
-        my $cleanup = $self->{cleanup};
-        $self->{cleanup} = $self->{cb} = undef;
-        $self->SUPER::close(@_);
+        my $self = $_[0];
+        delete $self->{cb};
+        $self->SUPER::close;
 
         # we defer this to the next timer loop since close is deferred
-        PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+        if (my $cleanup = delete $self->{cleanup}) {
+                PublicInbox::EvCleanup::next_tick($cleanup);
+        }
 }
 
 1;