diff options
Diffstat (limited to 'lib/PublicInbox/NNTP.pm')
-rw-r--r-- | lib/PublicInbox/NNTP.pm | 61 |
1 files changed, 25 insertions, 36 deletions
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 12ce4e68..6acfcc1b 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -6,7 +6,7 @@ package PublicInbox::NNTP; use strict; use warnings; use base qw(PublicInbox::DS); -use fields qw(nntpd article rbuf ng long_res); +use fields qw(nntpd article rbuf ng); use PublicInbox::Search; use PublicInbox::Msgmap; use PublicInbox::MID qw(mid_escape); @@ -45,17 +45,7 @@ sub next_tick () { $nextt = undef; my $q = $nextq; $nextq = []; - foreach my $nntp (@$q) { - # for request && response protocols, always finish writing - # before finishing reading: - if (my $long_cb = $nntp->{long_res}) { - $nntp->write($long_cb); - } else { - # pipelined request, we bypassed socket-readiness - # checks to get here: - event_step($nntp); - } - } + event_step($_) for @$q; } sub requeue ($) { @@ -633,8 +623,7 @@ sub get_range ($$) { } sub long_response ($$) { - my ($self, $cb) = @_; - die "BUG: nested long response" if $self->{long_res}; + my ($self, $cb) = @_; # cb returns true if more, false if done my $fd = fileno($self->{sock}); defined $fd or return; @@ -642,36 +631,38 @@ sub long_response ($$) { # clients should not be sending us stuff and making us do more # work while we are stream a response to them my $t0 = now(); - $self->{long_res} = sub { + my $long_cb; # DANGER: self-referential + $long_cb = sub { + # wbuf is unset or empty, here; $cb may add to it my $more = eval { $cb->() }; if ($@ || !$self->{sock}) { # something bad happened... - delete $self->{long_res}; - + $long_cb = undef; + my $diff = now() - $t0; if ($@) { err($self, "%s during long response[$fd] - %0.6f", - $@, now() - $t0); - } - if ($self->{sock}) { - update_idle_time($self); - requeue($self); - } else { - out($self, " deferred[$fd] aborted - %0.6f", - now() - $t0); + $@, $diff); } + out($self, " deferred[$fd] aborted - %0.6f", $diff); + $self->close; } elsif ($more) { # $self->{wbuf}: + update_idle_time($self); + # no recursion, schedule another call ASAP # but only after all pending writes are done - update_idle_time($self); - requeue($self); + my $wbuf = $self->{wbuf} ||= []; + push @$wbuf, $long_cb; + + # wbuf may be populated by $cb, no need to rearm if so: + requeue($self) if scalar(@$wbuf) == 1; } else { # all done! - delete $self->{long_res}; + $long_cb = undef; res($self, '.'); out($self, " deferred[$fd] done - %0.6f", now() - $t0); - requeue($self); + requeue($self) unless $self->{wbuf}; } }; - $self->{long_res}->(); # kick off! + $self->write($long_cb); # kick off! undef; } @@ -986,9 +977,8 @@ sub event_step { my $t0 = now(); my $fd = fileno($self->{sock}); $r = eval { process_line($self, $line) }; - my $d = $self->{long_res} ? - " deferred[$fd]" : ''; - out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0); + my $pending = $self->{wbuf} ? ' pending' : ''; + out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0); } return $self->close if $r < 0; @@ -998,7 +988,7 @@ sub event_step { # maybe there's more pipelined data, or we'll have # to register it for socket-readiness notifications - requeue($self) unless ($self->{long_res} || $self->{wbuf}); + requeue($self) unless $self->{wbuf}; } sub not_idle_long ($$) { @@ -1012,8 +1002,7 @@ sub not_idle_long ($$) { # for graceful shutdown in PublicInbox::Daemon: sub busy { my ($self, $now) = @_; - ($self->{rbuf} ne '' || $self->{long_res} || - $self->{wbuf} || not_idle_long($self, $now)); + ($self->{rbuf} ne '' || $self->{wbuf} || not_idle_long($self, $now)); } 1; |