about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/NNTP.pm37
-rw-r--r--public-inbox-nntpd3
2 files changed, 33 insertions, 7 deletions
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 0e910824..8f866859 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -4,7 +4,7 @@ package PublicInbox::NNTP;
 use strict;
 use warnings;
 use base qw(Danga::Socket);
-use fields qw(nntpd article ng long_res);
+use fields qw(nntpd article rbuf ng long_res);
 use PublicInbox::Msgmap;
 use PublicInbox::GitCatFile;
 use PublicInbox::MID qw(mid2path);
@@ -32,6 +32,7 @@ sub new ($$$) {
         $self->SUPER::new($sock);
         $self->{nntpd} = $nntpd;
         res($self, '201 server ready - post via email');
+        $self->{rbuf} = '';
         $self->watch_read(1);
         $self;
 }
@@ -762,20 +763,44 @@ sub event_write {
 sub event_read {
         my ($self) = @_;
         use constant LINE_MAX => 512; # RFC 977 section 2.3
+        my $line;
         my $r = 1;
-        my $buf = $self->read(LINE_MAX) or return $self->close;
-        while ($r > 0 && $$buf =~ s/\A\s*([^\r\n]+)\r?\n//) {
-                my $line = $1;
+again:
+        while ($r > 0 && $self->{rbuf} =~ s/\A\s*([^\r\n]+)\r?\n//) {
+                $line = $1;
                 my $t0 = now();
                 $r = eval { $self->process_line($line) };
                 my $d = $self->{long_res} ?
                         ' deferred['.fileno($self->{sock}).']' : '';
                 out($self, "$line - %0.6f$d", now() - $t0);
         }
+        unless (defined $line) {
+                my $buf = $self->read(LINE_MAX) or return $self->close;
+                $self->{rbuf} .= $$buf;
+                goto again;
+        }
+
         return $self->close if $r < 0;
-        my $len = bytes::length($$buf);
+        my $len = bytes::length($self->{rbuf});
         return $self->close if ($len >= LINE_MAX);
-        $self->push_back_read($buf) if ($len);
+}
+
+sub watch_read {
+        my ($self, $bool) = @_;
+        my $rv = $self->SUPER::watch_read($bool);
+        if ($bool && $self->{rbuf} ne '') {
+                # Force another read if there is a pipelined request.
+                # We don't know if the socket has anything for us to read,
+                # and we must double-check again by the time the timer fires
+                # in case we really did dispatch a read event and started
+                # another long response.
+                Danga::Socket->AddTimer(0, sub {
+                        if (&Danga::Socket::POLLIN & $self->{event_watch}) {
+                                $self->event_read;
+                        }
+                });
+        }
+        $rv;
 }
 
 1;
diff --git a/public-inbox-nntpd b/public-inbox-nntpd
index 588efdd3..b66de58e 100644
--- a/public-inbox-nntpd
+++ b/public-inbox-nntpd
@@ -146,7 +146,8 @@ sub worker_quit {
                 my ($dmap, undef) = @_;
                 my $n = 0;
                 foreach my $s (values %$dmap) {
-                        if ($s->{write_buf_size} || @{$s->{read_push_back}}) {
+                        next unless ref($s) eq 'PublicInbox::NNTP';
+                        if ($s->{write_buf_size} || $s->{rbuf}) {
                                 ++$n;
                         } else {
                                 $s->close;