diff options
Diffstat (limited to 'lib/PublicInbox/HTTPD/Async.pm')
-rw-r--r-- | lib/PublicInbox/HTTPD/Async.pm | 44 |
1 files changed, 24 insertions, 20 deletions
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 604627ab..b46baeb2 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -11,6 +11,7 @@ use warnings; use base qw(PublicInbox::DS); use fields qw(cb cleanup); require PublicInbox::EvCleanup; +use Errno qw(EAGAIN); sub new { my ($class, $io, $cb, $cleanup) = @_; @@ -25,18 +26,13 @@ sub new { my $self = fields::new($class); IO::Handle::blocking($io, 0); - $self->SUPER::new($io); + $self->SUPER::new($io, PublicInbox::DS::EPOLLIN()); $self->{cb} = $cb; $self->{cleanup} = $cleanup; - $self->watch_read(1); $self; } -# fires after pending writes are complete: -sub restart_read_cb ($) { - my ($self) = @_; - sub { $self->watch_read(1) } -} +sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) } sub main_cb ($$$) { my ($http, $fh, $bref) = @_; @@ -44,24 +40,31 @@ sub main_cb ($$$) { my ($self) = @_; my $r = sysread($self->{sock}, $$bref, 8192); if ($r) { - $fh->write($$bref); - unless ($http->{closed}) { # PublicInbox::DS sets this - if (scalar @{$http->{wbuf}}) { - $self->watch_read(0); - $http->write(restart_read_cb($self)); + $fh->write($$bref); # may call $http->close + + if ($http->{sock}) { # !closed + if ($http->{wbuf}) { + # HTTP client could not keep up, so + # stop reading and buffering. + $self->watch(0); + + # Tell the HTTP socket to restart us + # when HTTP client is done draining + # $http->{wbuf}: + $http->enqueue_restart_pass; } - # stay in watch_read, but let other clients + # stay in EPOLLIN, but let other clients # get some work done, too. return; } # fall through to close below... } elsif (!defined $r) { - return if $!{EAGAIN} || $!{EINTR}; + return restart_read($self) if $! == EAGAIN; } # Done! Error handling will happen in $fh->close # called by the {cleanup} handler - $http->{forward} = undef; + delete $http->{forward}; $self->close; } } @@ -78,13 +81,14 @@ sub async_pass { sub event_step { $_[0]->{cb}->(@_) } sub close { - my $self = shift; - my $cleanup = $self->{cleanup}; - $self->{cleanup} = $self->{cb} = undef; - $self->SUPER::close(@_); + my $self = $_[0]; + delete $self->{cb}; + $self->SUPER::close; # we defer this to the next timer loop since close is deferred - PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup; + if (my $cleanup = delete $self->{cleanup}) { + PublicInbox::EvCleanup::next_tick($cleanup); + } } 1; |