about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2016-05-24 03:41:52 +0000
committerEric Wong <e@80x24.org>2016-05-24 04:12:09 +0000
commit8648f519a95872600689c3a5d6d87fd17770f9fc (patch)
tree0a223fa836c02bf674d7d48b9b883908abeca1ce
parent74bbc3da398d00ba12e9294e360ad177ab2061ed (diff)
downloadpublic-inbox-8648f519a95872600689c3a5d6d87fd17770f9fc.tar.gz
We no longer override Danga::Socket::event_write and instead
re-enable reads by queuing up another callback in the $close
response callback.  This is necessary because event_write may not be
completely done writing a response, only the existing buffered data.

Furthermore, the {closed} field can almost be set at any time when
writing, so we must check it before acting on pipelined requests as
well as during write callbacks in more().
-rw-r--r--lib/PublicInbox/HTTP.pm60
1 files changed, 30 insertions, 30 deletions
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 00c9a044..6aae5c86 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -39,7 +39,10 @@ sub process_pipelineq () {
         my $q = $pipelineq;
         $pipet = undef;
         $pipelineq = [];
-        rbuf_process($_) foreach @$q;
+        foreach (@$q) {
+                next if $_->{closed};
+                rbuf_process($_);
+        }
 }
 
 # Use the same configuration parameter as git since this is primarily
@@ -228,26 +231,36 @@ sub identity_wcb ($) {
         sub { $self->write(\($_[0])) if $_[0] ne '' }
 }
 
+sub next_request ($) {
+        my ($self) = @_;
+        $self->watch_write(0);
+        if ($self->{rbuf} eq '') { # wait for next request
+                $self->watch_read(1);
+        } else { # avoid recursion for pipelined requests
+                push @$pipelineq, $self;
+                $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
+        }
+}
+
+sub response_done ($$) {
+        my ($self, $alive) = @_;
+        my $env = $self->{env};
+        $self->{env} = undef;
+        $self->write("0\r\n\r\n") if $alive == 2;
+        $self->write(sub { $alive ? next_request($self) : $self->close });
+        if (my $obj = $env->{'pi-httpd.inbox'}) {
+                # grace period for reaping resources
+                $WEAKEN->{"$obj"} = $obj;
+                PublicInbox::EvCleanup::later(*weaken_task);
+        }
+}
+
 sub response_write {
         my ($self, $env, $res) = @_;
         my $alive = response_header_write($self, $env, $res);
 
         my $write = $alive == 2 ? chunked_wcb($self) : identity_wcb($self);
-        my $close = sub {
-                $self->write("0\r\n\r\n") if $alive == 2;
-                if ($alive) {
-                        $self->event_write; # watch for readability if done
-                } else {
-                        Danga::Socket::write($self, sub { $self->close });
-                }
-                if (my $obj = $env->{'pi-httpd.inbox'}) {
-                        # grace period for reaping resources
-                        $WEAKEN->{"$obj"} = $obj;
-                        $weakt ||= PublicInbox::EvCleanup::later(*weaken_task);
-                }
-                $self->{env} = undef;
-        };
-
+        my $close = sub { response_done($self, $alive) };
         if (defined(my $body = $res->[2])) {
                 if (ref $body eq 'ARRAY') {
                         $write->($_) foreach @$body;
@@ -278,6 +291,7 @@ sub response_write {
 use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
 sub more ($$) {
         my $self = $_[0];
+        return if $self->{closed};
         if (MSG_MORE && !$self->{write_buf_size}) {
                 my $n = send($self->{sock}, $_[1], MSG_MORE);
                 if (defined $n) {
@@ -290,20 +304,6 @@ sub more ($$) {
         $self->write($_[1]);
 }
 
-# overrides existing Danga::Socket method
-sub event_write {
-        my ($self) = @_;
-        # only continue watching for readability when we are done writing:
-        return if $self->write(undef) != 1;
-
-        if ($self->{rbuf} eq '') { # wait for next request
-                $self->watch_read(1);
-        } else { # avoid recursion for pipelined requests
-                push @$pipelineq, $self;
-                $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
-        }
-}
-
 sub input_prepare {
         my ($self, $env) = @_;
         my $input = $null_io;