about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2016-05-24 03:41:52 +0000
committerEric Wong <e@80x24.org>2016-05-24 04:12:09 +0000
commit8648f519a95872600689c3a5d6d87fd17770f9fc (patch)
tree0a223fa836c02bf674d7d48b9b883908abeca1ce /lib
parent74bbc3da398d00ba12e9294e360ad177ab2061ed (diff)
downloadpublic-inbox-8648f519a95872600689c3a5d6d87fd17770f9fc.tar.gz
We no longer override Danga::Socket::event_write and instead
re-enable reads by queuing up another callback in the $close
response callback.  This is necessary because event_write may not be
completely done writing a response, only the existing buffered data.

Furthermore, the {closed} field can almost be set at any time when
writing, so we must check it before acting on pipelined requests as
well as during write callbacks in more().
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/HTTP.pm60
1 files changed, 30 insertions, 30 deletions
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 00c9a044..6aae5c86 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -39,7 +39,10 @@ sub process_pipelineq () {
         my $q = $pipelineq;
         $pipet = undef;
         $pipelineq = [];
-        rbuf_process($_) foreach @$q;
+        foreach (@$q) {
+                next if $_->{closed};
+                rbuf_process($_);
+        }
 }
 
 # Use the same configuration parameter as git since this is primarily
@@ -228,26 +231,36 @@ sub identity_wcb ($) {
         sub { $self->write(\($_[0])) if $_[0] ne '' }
 }
 
+sub next_request ($) {
+        my ($self) = @_;
+        $self->watch_write(0);
+        if ($self->{rbuf} eq '') { # wait for next request
+                $self->watch_read(1);
+        } else { # avoid recursion for pipelined requests
+                push @$pipelineq, $self;
+                $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
+        }
+}
+
+sub response_done ($$) {
+        my ($self, $alive) = @_;
+        my $env = $self->{env};
+        $self->{env} = undef;
+        $self->write("0\r\n\r\n") if $alive == 2;
+        $self->write(sub { $alive ? next_request($self) : $self->close });
+        if (my $obj = $env->{'pi-httpd.inbox'}) {
+                # grace period for reaping resources
+                $WEAKEN->{"$obj"} = $obj;
+                PublicInbox::EvCleanup::later(*weaken_task);
+        }
+}
+
 sub response_write {
         my ($self, $env, $res) = @_;
         my $alive = response_header_write($self, $env, $res);
 
         my $write = $alive == 2 ? chunked_wcb($self) : identity_wcb($self);
-        my $close = sub {
-                $self->write("0\r\n\r\n") if $alive == 2;
-                if ($alive) {
-                        $self->event_write; # watch for readability if done
-                } else {
-                        Danga::Socket::write($self, sub { $self->close });
-                }
-                if (my $obj = $env->{'pi-httpd.inbox'}) {
-                        # grace period for reaping resources
-                        $WEAKEN->{"$obj"} = $obj;
-                        $weakt ||= PublicInbox::EvCleanup::later(*weaken_task);
-                }
-                $self->{env} = undef;
-        };
-
+        my $close = sub { response_done($self, $alive) };
         if (defined(my $body = $res->[2])) {
                 if (ref $body eq 'ARRAY') {
                         $write->($_) foreach @$body;
@@ -278,6 +291,7 @@ sub response_write {
 use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
 sub more ($$) {
         my $self = $_[0];
+        return if $self->{closed};
         if (MSG_MORE && !$self->{write_buf_size}) {
                 my $n = send($self->{sock}, $_[1], MSG_MORE);
                 if (defined $n) {
@@ -290,20 +304,6 @@ sub more ($$) {
         $self->write($_[1]);
 }
 
-# overrides existing Danga::Socket method
-sub event_write {
-        my ($self) = @_;
-        # only continue watching for readability when we are done writing:
-        return if $self->write(undef) != 1;
-
-        if ($self->{rbuf} eq '') { # wait for next request
-                $self->watch_read(1);
-        } else { # avoid recursion for pipelined requests
-                push @$pipelineq, $self;
-                $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
-        }
-}
-
 sub input_prepare {
         my ($self, $env) = @_;
         my $input = $null_io;