about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2015-09-19 02:03:30 +0000
committerEric Wong <e@80x24.org>2015-09-19 04:18:37 +0000
commit0d0fde0bff97a283e3b433f2772a99fe9d6d8412 (patch)
tree5ea275e787362cbb2d666b37722edb58d7683c39 /lib
parentc6e35192bf5aa6b195e210674f9a30d189ab7457 (diff)
downloadpublic-inbox-0d0fde0bff97a283e3b433f2772a99fe9d6d8412.tar.gz
XOVER, NEWNEWS, XHDR responses may be arbitrarily long and cause
memory usage via buffering.  This problem would exist even if we
were to optimize them by caching headers for fast retrieval and
search.

Introduce a generic API to handle all of these commands fairly
without triggering excessive buffering and unfairness of the
existing implementation.

Generating these responses is still expensive for now, but at least
the implementation is fair to other clients and prevents one client
from using one of these commands to monopolize resources away from
other clients.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/NNTP.pm105
1 files changed, 80 insertions, 25 deletions
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 807b49f8..f86c6335 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -4,13 +4,14 @@ package PublicInbox::NNTP;
 use strict;
 use warnings;
 use base qw(Danga::Socket);
-use fields qw(nntpd article ng);
+use fields qw(nntpd article ng long_res);
 use PublicInbox::Msgmap;
 use PublicInbox::GitCatFile;
 use PublicInbox::MID qw(mid2path);
 use Email::Simple;
 use Data::Dumper qw(Dumper);
 use POSIX qw(strftime);
+use Time::HiRes qw(gettimeofday tv_interval ualarm);
 use constant {
         r501 => '501 command syntax error',
 };
@@ -233,26 +234,38 @@ sub cmd_newnews {
         my ($keep, $skip) = split('!', $newsgroups, 2);
         ngpat2re($keep);
         ngpat2re($skip);
-        $ts .= '..';
-
-        my $opts = { asc => 1, limit => 1000 };
+        my @srch;
         foreach my $ng (values %{$self->{nntpd}->{groups}}) {
                 $ng->{name} =~ $keep or next;
                 $ng->{name} =~ $skip and next;
                 my $srch = $ng->search or next;
-                $opts->{offset} = 0;
+                push @srch, $srch;
+        };
+        return '.' unless @srch;
 
-                while (1) {
-                        my $res = $srch->query($ts, $opts);
-                        my $msgs = $res->{msgs};
-                        my $nr = scalar @$msgs or last;
+        $ts .= '..';
+        my $opts = { asc => 1, limit => 1000, offset => 0 };
+
+        my $end = 0xffffffff; # would like to read 4 billion messages?
+        $self->long_response(0, $end, sub {
+                my ($i) = @_;
+                my $srch = $srch[0];
+                my $res = $srch->query($ts, $opts);
+                my $msgs = $res->{msgs};
+                if (my $nr = scalar @$msgs) {
                         more($self, '<' .
                                 join(">\r\n<", map { $_->mid } @$msgs ).
                                 '>');
                         $opts->{offset} += $nr;
+                } else {
+                        shift @srch;
+                        if (@srch) { # continue onto next newsgroup
+                                $opts->{offset} = 0;
+                        } else { # break out of the long response.
+                                $$i = $end;
+                        }
                 }
-        }
-        '.';
+        });
 }
 
 sub cmd_group {
@@ -441,6 +454,48 @@ sub xhdr {
         $r;
 }
 
+sub long_response {
+        my ($self, $beg, $end, $cb) = @_;
+        die "BUG: nested long response" if $self->{long_res};
+
+        # make sure we disable reading during a long response,
+        # clients should not be sending us stuff and making us do more
+        # work while we are stream a response to them
+        $self->watch_read(0);
+        $self->{long_res} = sub {
+                # limit our own running time for fairness with other
+                # clients and to avoid buffering too much:
+                my $yield;
+                local $SIG{ALRM} = sub { $yield = 1 };
+                ualarm(100000);
+
+                my $err;
+                do {
+                        eval { $cb->(\$beg) };
+                } until (($err = $@) || $self->{closed} || $yield ||
+                         $self->{write_buf_size} || ++$beg > $end);
+                ualarm(0);
+
+                if ($err || $self->{closed}) {
+                        $self->{long_res} = undef;
+                        warning("$err during long response") if $err;
+                        $self->watch_read(1) unless $self->{closed};
+                } elsif ($yield || $self->{write_buf_size}) {
+                        # no recursion, schedule another call ASAP
+                        # but only after all pending writes are done
+                        Danga::Socket->AddTimer(0, sub {
+                                $self->write($self->{long_res});
+                        });
+                } else { # all done!
+                        $self->{long_res} = undef;
+                        $self->watch_read(1);
+                        res($self, '.');
+                }
+        };
+        $self->{long_res}->(); # kick off!
+        undef;
+}
+
 sub cmd_xhdr {
         my ($self, $header, $range) = @_;
         defined $self->{ng} or return '412 no news group currently selected';
@@ -455,19 +510,20 @@ sub cmd_xhdr {
                 if (defined($r = xhdr($r, $header))) {
                         more($self, "<$range> $r");
                 }
+                '.';
         } else { # numeric range
                 my $r = get_range($self, $range);
                 return $r unless ref $r;
                 my ($beg, $end) = @$r;
                 more($self, '221 Header follows');
-                foreach my $i ($beg..$end) {
-                        $r = $self->art_lookup($i, 2);
-                        next unless ref $r;
-                        defined($r = xhdr($r, $header)) or next;
-                        more($self, "$i $r");
-                }
+                $self->long_response($beg, $end, sub {
+                        my ($i) = @_;
+                        $r = $self->art_lookup($$i, 2);
+                        return unless ref $r;
+                        defined($r = xhdr($r, $header)) or return;
+                        more($self, "$$i $r");
+                });
         }
-        '.';
 }
 
 sub cmd_xover {
@@ -476,16 +532,16 @@ sub cmd_xover {
         return $r unless ref $r;
         my ($beg, $end) = @$r;
         more($self, "224 Overview information follows for $beg to $end");
-        foreach my $i ($beg..$end) {
-                my $r = $self->art_lookup($i, 2);
-                next unless ref $r;
+        $self->long_response($beg, $end, sub {
+                my ($i) = @_;
+                my $r = $self->art_lookup($$i, 2);
+                return unless ref $r;
                 more($self, join("\t", $r->[0],
                                 map {
                                         my $h = xhdr($r, $_);
                                         defined $h ? $h : '';
                                 } @OVERVIEW ));
-        }
-        '.';
+        });
 }
 
 sub res {
@@ -505,7 +561,7 @@ sub do_write {
 
         # Do not watch for readability if we have data in the queue,
         # instead re-enable watching for readability when we can
-        $self->watch_read(0) unless $done;
+        $self->watch_read(0) if (!$done || $self->{long_res});
 
         $done;
 }
@@ -539,7 +595,6 @@ sub event_write {
 sub event_read {
         my ($self) = @_;
         use constant LINE_MAX => 512; # RFC 977 section 2.3
-        use Time::HiRes qw(gettimeofday tv_interval);
         my $r = 1;
         my $buf = $self->read(LINE_MAX) or return $self->close;
         while ($r > 0 && $$buf =~ s/\A([^\r\n]+)\r?\n//) {