From 8648f519a95872600689c3a5d6d87fd17770f9fc Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 24 May 2016 03:41:52 +0000 Subject: http: fix various race conditions We no longer override Danga::Socket::event_write and instead re-enable reads by queuing up another callback in the $close response callback. This is necessary because event_write may not be completely done writing a response, only the existing buffered data. Furthermore, the {closed} field can almost be set at any time when writing, so we must check it before acting on pipelined requests as well as during write callbacks in more(). --- lib/PublicInbox/HTTP.pm | 60 ++++++++++++++++++++++++------------------------- 1 file changed, 30 insertions(+), 30 deletions(-) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 00c9a044..6aae5c86 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -39,7 +39,10 @@ sub process_pipelineq () { my $q = $pipelineq; $pipet = undef; $pipelineq = []; - rbuf_process($_) foreach @$q; + foreach (@$q) { + next if $_->{closed}; + rbuf_process($_); + } } # Use the same configuration parameter as git since this is primarily @@ -228,26 +231,36 @@ sub identity_wcb ($) { sub { $self->write(\($_[0])) if $_[0] ne '' } } +sub next_request ($) { + my ($self) = @_; + $self->watch_write(0); + if ($self->{rbuf} eq '') { # wait for next request + $self->watch_read(1); + } else { # avoid recursion for pipelined requests + push @$pipelineq, $self; + $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq); + } +} + +sub response_done ($$) { + my ($self, $alive) = @_; + my $env = $self->{env}; + $self->{env} = undef; + $self->write("0\r\n\r\n") if $alive == 2; + $self->write(sub { $alive ? next_request($self) : $self->close }); + if (my $obj = $env->{'pi-httpd.inbox'}) { + # grace period for reaping resources + $WEAKEN->{"$obj"} = $obj; + PublicInbox::EvCleanup::later(*weaken_task); + } +} + sub response_write { my ($self, $env, $res) = @_; my $alive = response_header_write($self, $env, $res); my $write = $alive == 2 ? chunked_wcb($self) : identity_wcb($self); - my $close = sub { - $self->write("0\r\n\r\n") if $alive == 2; - if ($alive) { - $self->event_write; # watch for readability if done - } else { - Danga::Socket::write($self, sub { $self->close }); - } - if (my $obj = $env->{'pi-httpd.inbox'}) { - # grace period for reaping resources - $WEAKEN->{"$obj"} = $obj; - $weakt ||= PublicInbox::EvCleanup::later(*weaken_task); - } - $self->{env} = undef; - }; - + my $close = sub { response_done($self, $alive) }; if (defined(my $body = $res->[2])) { if (ref $body eq 'ARRAY') { $write->($_) foreach @$body; @@ -278,6 +291,7 @@ sub response_write { use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; sub more ($$) { my $self = $_[0]; + return if $self->{closed}; if (MSG_MORE && !$self->{write_buf_size}) { my $n = send($self->{sock}, $_[1], MSG_MORE); if (defined $n) { @@ -290,20 +304,6 @@ sub more ($$) { $self->write($_[1]); } -# overrides existing Danga::Socket method -sub event_write { - my ($self) = @_; - # only continue watching for readability when we are done writing: - return if $self->write(undef) != 1; - - if ($self->{rbuf} eq '') { # wait for next request - $self->watch_read(1); - } else { # avoid recursion for pipelined requests - push @$pipelineq, $self; - $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq); - } -} - sub input_prepare { my ($self, $env) = @_; my $input = $null_io; -- cgit v1.2.3-24-ge0c7