From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id C16C11F4BD for ; Mon, 24 Jun 2019 02:58:08 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 47/57] nntp: simplify long response logic and fix nesting Date: Mon, 24 Jun 2019 02:52:48 +0000 Message-Id: <20190624025258.25592-48-e@80x24.org> In-Reply-To: <20190624025258.25592-1-e@80x24.org> References: <20190624025258.25592-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We can get rid of the {long_res} field and reuse the write buffer ordering logic to prevent nesting of responses from requeue. On FreeBSD, this fixes a problem of callbacks firing twice because kqueue as event_step is now our only callback entry point. There's a slight change in the stdout "logging" format, in that we can no longer distinguish between writes blocked due to slow clients or deferred long responses. Not sure if this affects anybody parsing logs or not, but preserving the old format could prove expensive and not worth the effort. --- lib/PublicInbox/NNTP.pm | 61 +++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 12ce4e68..6acfcc1b 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -6,7 +6,7 @@ package PublicInbox::NNTP; use strict; use warnings; use base qw(PublicInbox::DS); -use fields qw(nntpd article rbuf ng long_res); +use fields qw(nntpd article rbuf ng); use PublicInbox::Search; use PublicInbox::Msgmap; use PublicInbox::MID qw(mid_escape); @@ -45,17 +45,7 @@ sub next_tick () { $nextt = undef; my $q = $nextq; $nextq = []; - foreach my $nntp (@$q) { - # for request && response protocols, always finish writing - # before finishing reading: - if (my $long_cb = $nntp->{long_res}) { - $nntp->write($long_cb); - } else { - # pipelined request, we bypassed socket-readiness - # checks to get here: - event_step($nntp); - } - } + event_step($_) for @$q; } sub requeue ($) { @@ -633,8 +623,7 @@ sub get_range ($$) { } sub long_response ($$) { - my ($self, $cb) = @_; - die "BUG: nested long response" if $self->{long_res}; + my ($self, $cb) = @_; # cb returns true if more, false if done my $fd = fileno($self->{sock}); defined $fd or return; @@ -642,36 +631,38 @@ sub long_response ($$) { # clients should not be sending us stuff and making us do more # work while we are stream a response to them my $t0 = now(); - $self->{long_res} = sub { + my $long_cb; # DANGER: self-referential + $long_cb = sub { + # wbuf is unset or empty, here; $cb may add to it my $more = eval { $cb->() }; if ($@ || !$self->{sock}) { # something bad happened... - delete $self->{long_res}; - + $long_cb = undef; + my $diff = now() - $t0; if ($@) { err($self, "%s during long response[$fd] - %0.6f", - $@, now() - $t0); - } - if ($self->{sock}) { - update_idle_time($self); - requeue($self); - } else { - out($self, " deferred[$fd] aborted - %0.6f", - now() - $t0); + $@, $diff); } + out($self, " deferred[$fd] aborted - %0.6f", $diff); + $self->close; } elsif ($more) { # $self->{wbuf}: + update_idle_time($self); + # no recursion, schedule another call ASAP # but only after all pending writes are done - update_idle_time($self); - requeue($self); + my $wbuf = $self->{wbuf} ||= []; + push @$wbuf, $long_cb; + + # wbuf may be populated by $cb, no need to rearm if so: + requeue($self) if scalar(@$wbuf) == 1; } else { # all done! - delete $self->{long_res}; + $long_cb = undef; res($self, '.'); out($self, " deferred[$fd] done - %0.6f", now() - $t0); - requeue($self); + requeue($self) unless $self->{wbuf}; } }; - $self->{long_res}->(); # kick off! + $self->write($long_cb); # kick off! undef; } @@ -986,9 +977,8 @@ sub event_step { my $t0 = now(); my $fd = fileno($self->{sock}); $r = eval { process_line($self, $line) }; - my $d = $self->{long_res} ? - " deferred[$fd]" : ''; - out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0); + my $pending = $self->{wbuf} ? ' pending' : ''; + out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0); } return $self->close if $r < 0; @@ -998,7 +988,7 @@ sub event_step { # maybe there's more pipelined data, or we'll have # to register it for socket-readiness notifications - requeue($self) unless ($self->{long_res} || $self->{wbuf}); + requeue($self) unless $self->{wbuf}; } sub not_idle_long ($$) { @@ -1012,8 +1002,7 @@ sub not_idle_long ($$) { # for graceful shutdown in PublicInbox::Daemon: sub busy { my ($self, $now) = @_; - ($self->{rbuf} ne '' || $self->{long_res} || - $self->{wbuf} || not_idle_long($self, $now)); + ($self->{rbuf} ne '' || $self->{wbuf} || not_idle_long($self, $now)); } 1; -- EW