about summary refs log tree commit homepage
path: root/lib/PublicInbox/NNTP.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/NNTP.pm')
-rw-r--r--lib/PublicInbox/NNTP.pm61
1 files changed, 25 insertions, 36 deletions
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 12ce4e68..6acfcc1b 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -6,7 +6,7 @@ package PublicInbox::NNTP;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng long_res);
+use fields qw(nntpd article rbuf ng);
 use PublicInbox::Search;
 use PublicInbox::Msgmap;
 use PublicInbox::MID qw(mid_escape);
@@ -45,17 +45,7 @@ sub next_tick () {
         $nextt = undef;
         my $q = $nextq;
         $nextq = [];
-        foreach my $nntp (@$q) {
-                # for request && response protocols, always finish writing
-                # before finishing reading:
-                if (my $long_cb = $nntp->{long_res}) {
-                        $nntp->write($long_cb);
-                } else {
-                        # pipelined request, we bypassed socket-readiness
-                        # checks to get here:
-                        event_step($nntp);
-                }
-        }
+        event_step($_) for @$q;
 }
 
 sub requeue ($) {
@@ -633,8 +623,7 @@ sub get_range ($$) {
 }
 
 sub long_response ($$) {
-        my ($self, $cb) = @_;
-        die "BUG: nested long response" if $self->{long_res};
+        my ($self, $cb) = @_; # cb returns true if more, false if done
 
         my $fd = fileno($self->{sock});
         defined $fd or return;
@@ -642,36 +631,38 @@ sub long_response ($$) {
         # clients should not be sending us stuff and making us do more
         # work while we are stream a response to them
         my $t0 = now();
-        $self->{long_res} = sub {
+        my $long_cb; # DANGER: self-referential
+        $long_cb = sub {
+                # wbuf is unset or empty, here; $cb may add to it
                 my $more = eval { $cb->() };
                 if ($@ || !$self->{sock}) { # something bad happened...
-                        delete $self->{long_res};
-
+                        $long_cb = undef;
+                        my $diff = now() - $t0;
                         if ($@) {
                                 err($self,
                                     "%s during long response[$fd] - %0.6f",
-                                    $@, now() - $t0);
-                        }
-                        if ($self->{sock}) {
-                                update_idle_time($self);
-                                requeue($self);
-                        } else {
-                                out($self, " deferred[$fd] aborted - %0.6f",
-                                           now() - $t0);
+                                    $@, $diff);
                         }
+                        out($self, " deferred[$fd] aborted - %0.6f", $diff);
+                        $self->close;
                 } elsif ($more) { # $self->{wbuf}:
+                        update_idle_time($self);
+
                         # no recursion, schedule another call ASAP
                         # but only after all pending writes are done
-                        update_idle_time($self);
-                        requeue($self);
+                        my $wbuf = $self->{wbuf} ||= [];
+                        push @$wbuf, $long_cb;
+
+                        # wbuf may be populated by $cb, no need to rearm if so:
+                        requeue($self) if scalar(@$wbuf) == 1;
                 } else { # all done!
-                        delete $self->{long_res};
+                        $long_cb = undef;
                         res($self, '.');
                         out($self, " deferred[$fd] done - %0.6f", now() - $t0);
-                        requeue($self);
+                        requeue($self) unless $self->{wbuf};
                 }
         };
-        $self->{long_res}->(); # kick off!
+        $self->write($long_cb); # kick off!
         undef;
 }
 
@@ -986,9 +977,8 @@ sub event_step {
                 my $t0 = now();
                 my $fd = fileno($self->{sock});
                 $r = eval { process_line($self, $line) };
-                my $d = $self->{long_res} ?
-                        " deferred[$fd]" : '';
-                out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0);
+                my $pending = $self->{wbuf} ? ' pending' : '';
+                out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
         }
 
         return $self->close if $r < 0;
@@ -998,7 +988,7 @@ sub event_step {
 
         # maybe there's more pipelined data, or we'll have
         # to register it for socket-readiness notifications
-        requeue($self) unless ($self->{long_res} || $self->{wbuf});
+        requeue($self) unless $self->{wbuf};
 }
 
 sub not_idle_long ($$) {
@@ -1012,8 +1002,7 @@ sub not_idle_long ($$) {
 # for graceful shutdown in PublicInbox::Daemon:
 sub busy {
         my ($self, $now) = @_;
-        ($self->{rbuf} ne '' || $self->{long_res} ||
-                $self->{wbuf} || not_idle_long($self, $now));
+        ($self->{rbuf} ne '' || $self->{wbuf} || not_idle_long($self, $now));
 }
 
 1;