about summary refs log tree commit homepage
path: root/lib/PublicInbox/NNTP.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/NNTP.pm')
-rw-r--r--lib/PublicInbox/NNTP.pm105
1 files changed, 80 insertions, 25 deletions
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 807b49f8..f86c6335 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -4,13 +4,14 @@ package PublicInbox::NNTP;
 use strict;
 use warnings;
 use base qw(Danga::Socket);
-use fields qw(nntpd article ng);
+use fields qw(nntpd article ng long_res);
 use PublicInbox::Msgmap;
 use PublicInbox::GitCatFile;
 use PublicInbox::MID qw(mid2path);
 use Email::Simple;
 use Data::Dumper qw(Dumper);
 use POSIX qw(strftime);
+use Time::HiRes qw(gettimeofday tv_interval ualarm);
 use constant {
         r501 => '501 command syntax error',
 };
@@ -233,26 +234,38 @@ sub cmd_newnews {
         my ($keep, $skip) = split('!', $newsgroups, 2);
         ngpat2re($keep);
         ngpat2re($skip);
-        $ts .= '..';
-
-        my $opts = { asc => 1, limit => 1000 };
+        my @srch;
         foreach my $ng (values %{$self->{nntpd}->{groups}}) {
                 $ng->{name} =~ $keep or next;
                 $ng->{name} =~ $skip and next;
                 my $srch = $ng->search or next;
-                $opts->{offset} = 0;
+                push @srch, $srch;
+        };
+        return '.' unless @srch;
 
-                while (1) {
-                        my $res = $srch->query($ts, $opts);
-                        my $msgs = $res->{msgs};
-                        my $nr = scalar @$msgs or last;
+        $ts .= '..';
+        my $opts = { asc => 1, limit => 1000, offset => 0 };
+
+        my $end = 0xffffffff; # would like to read 4 billion messages?
+        $self->long_response(0, $end, sub {
+                my ($i) = @_;
+                my $srch = $srch[0];
+                my $res = $srch->query($ts, $opts);
+                my $msgs = $res->{msgs};
+                if (my $nr = scalar @$msgs) {
                         more($self, '<' .
                                 join(">\r\n<", map { $_->mid } @$msgs ).
                                 '>');
                         $opts->{offset} += $nr;
+                } else {
+                        shift @srch;
+                        if (@srch) { # continue onto next newsgroup
+                                $opts->{offset} = 0;
+                        } else { # break out of the long response.
+                                $$i = $end;
+                        }
                 }
-        }
-        '.';
+        });
 }
 
 sub cmd_group {
@@ -441,6 +454,48 @@ sub xhdr {
         $r;
 }
 
+sub long_response {
+        my ($self, $beg, $end, $cb) = @_;
+        die "BUG: nested long response" if $self->{long_res};
+
+        # make sure we disable reading during a long response,
+        # clients should not be sending us stuff and making us do more
+        # work while we are stream a response to them
+        $self->watch_read(0);
+        $self->{long_res} = sub {
+                # limit our own running time for fairness with other
+                # clients and to avoid buffering too much:
+                my $yield;
+                local $SIG{ALRM} = sub { $yield = 1 };
+                ualarm(100000);
+
+                my $err;
+                do {
+                        eval { $cb->(\$beg) };
+                } until (($err = $@) || $self->{closed} || $yield ||
+                         $self->{write_buf_size} || ++$beg > $end);
+                ualarm(0);
+
+                if ($err || $self->{closed}) {
+                        $self->{long_res} = undef;
+                        warning("$err during long response") if $err;
+                        $self->watch_read(1) unless $self->{closed};
+                } elsif ($yield || $self->{write_buf_size}) {
+                        # no recursion, schedule another call ASAP
+                        # but only after all pending writes are done
+                        Danga::Socket->AddTimer(0, sub {
+                                $self->write($self->{long_res});
+                        });
+                } else { # all done!
+                        $self->{long_res} = undef;
+                        $self->watch_read(1);
+                        res($self, '.');
+                }
+        };
+        $self->{long_res}->(); # kick off!
+        undef;
+}
+
 sub cmd_xhdr {
         my ($self, $header, $range) = @_;
         defined $self->{ng} or return '412 no news group currently selected';
@@ -455,19 +510,20 @@ sub cmd_xhdr {
                 if (defined($r = xhdr($r, $header))) {
                         more($self, "<$range> $r");
                 }
+                '.';
         } else { # numeric range
                 my $r = get_range($self, $range);
                 return $r unless ref $r;
                 my ($beg, $end) = @$r;
                 more($self, '221 Header follows');
-                foreach my $i ($beg..$end) {
-                        $r = $self->art_lookup($i, 2);
-                        next unless ref $r;
-                        defined($r = xhdr($r, $header)) or next;
-                        more($self, "$i $r");
-                }
+                $self->long_response($beg, $end, sub {
+                        my ($i) = @_;
+                        $r = $self->art_lookup($$i, 2);
+                        return unless ref $r;
+                        defined($r = xhdr($r, $header)) or return;
+                        more($self, "$$i $r");
+                });
         }
-        '.';
 }
 
 sub cmd_xover {
@@ -476,16 +532,16 @@ sub cmd_xover {
         return $r unless ref $r;
         my ($beg, $end) = @$r;
         more($self, "224 Overview information follows for $beg to $end");
-        foreach my $i ($beg..$end) {
-                my $r = $self->art_lookup($i, 2);
-                next unless ref $r;
+        $self->long_response($beg, $end, sub {
+                my ($i) = @_;
+                my $r = $self->art_lookup($$i, 2);
+                return unless ref $r;
                 more($self, join("\t", $r->[0],
                                 map {
                                         my $h = xhdr($r, $_);
                                         defined $h ? $h : '';
                                 } @OVERVIEW ));
-        }
-        '.';
+        });
 }
 
 sub res {
@@ -505,7 +561,7 @@ sub do_write {
 
         # Do not watch for readability if we have data in the queue,
         # instead re-enable watching for readability when we can
-        $self->watch_read(0) unless $done;
+        $self->watch_read(0) if (!$done || $self->{long_res});
 
         $done;
 }
@@ -539,7 +595,6 @@ sub event_write {
 sub event_read {
         my ($self) = @_;
         use constant LINE_MAX => 512; # RFC 977 section 2.3
-        use Time::HiRes qw(gettimeofday tv_interval);
         my $r = 1;
         my $buf = $self->read(LINE_MAX) or return $self->close;
         while ($r > 0 && $$buf =~ s/\A([^\r\n]+)\r?\n//) {