From 0d0fde0bff97a283e3b433f2772a99fe9d6d8412 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 19 Sep 2015 02:03:30 +0000 Subject: nntp: introduce long response API for streaming XOVER, NEWNEWS, XHDR responses may be arbitrarily long and cause memory usage via buffering. This problem would exist even if we were to optimize them by caching headers for fast retrieval and search. Introduce a generic API to handle all of these commands fairly without triggering excessive buffering and unfairness of the existing implementation. Generating these responses is still expensive for now, but at least the implementation is fair to other clients and prevents one client from using one of these commands to monopolize resources away from other clients. --- lib/PublicInbox/NNTP.pm | 105 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 80 insertions(+), 25 deletions(-) (limited to 'lib/PublicInbox/NNTP.pm') diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 807b49f8..f86c6335 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -4,13 +4,14 @@ package PublicInbox::NNTP; use strict; use warnings; use base qw(Danga::Socket); -use fields qw(nntpd article ng); +use fields qw(nntpd article ng long_res); use PublicInbox::Msgmap; use PublicInbox::GitCatFile; use PublicInbox::MID qw(mid2path); use Email::Simple; use Data::Dumper qw(Dumper); use POSIX qw(strftime); +use Time::HiRes qw(gettimeofday tv_interval ualarm); use constant { r501 => '501 command syntax error', }; @@ -233,26 +234,38 @@ sub cmd_newnews { my ($keep, $skip) = split('!', $newsgroups, 2); ngpat2re($keep); ngpat2re($skip); - $ts .= '..'; - - my $opts = { asc => 1, limit => 1000 }; + my @srch; foreach my $ng (values %{$self->{nntpd}->{groups}}) { $ng->{name} =~ $keep or next; $ng->{name} =~ $skip and next; my $srch = $ng->search or next; - $opts->{offset} = 0; + push @srch, $srch; + }; + return '.' unless @srch; - while (1) { - my $res = $srch->query($ts, $opts); - my $msgs = $res->{msgs}; - my $nr = scalar @$msgs or last; + $ts .= '..'; + my $opts = { asc => 1, limit => 1000, offset => 0 }; + + my $end = 0xffffffff; # would like to read 4 billion messages? + $self->long_response(0, $end, sub { + my ($i) = @_; + my $srch = $srch[0]; + my $res = $srch->query($ts, $opts); + my $msgs = $res->{msgs}; + if (my $nr = scalar @$msgs) { more($self, '<' . join(">\r\n<", map { $_->mid } @$msgs ). '>'); $opts->{offset} += $nr; + } else { + shift @srch; + if (@srch) { # continue onto next newsgroup + $opts->{offset} = 0; + } else { # break out of the long response. + $$i = $end; + } } - } - '.'; + }); } sub cmd_group { @@ -441,6 +454,48 @@ sub xhdr { $r; } +sub long_response { + my ($self, $beg, $end, $cb) = @_; + die "BUG: nested long response" if $self->{long_res}; + + # make sure we disable reading during a long response, + # clients should not be sending us stuff and making us do more + # work while we are stream a response to them + $self->watch_read(0); + $self->{long_res} = sub { + # limit our own running time for fairness with other + # clients and to avoid buffering too much: + my $yield; + local $SIG{ALRM} = sub { $yield = 1 }; + ualarm(100000); + + my $err; + do { + eval { $cb->(\$beg) }; + } until (($err = $@) || $self->{closed} || $yield || + $self->{write_buf_size} || ++$beg > $end); + ualarm(0); + + if ($err || $self->{closed}) { + $self->{long_res} = undef; + warning("$err during long response") if $err; + $self->watch_read(1) unless $self->{closed}; + } elsif ($yield || $self->{write_buf_size}) { + # no recursion, schedule another call ASAP + # but only after all pending writes are done + Danga::Socket->AddTimer(0, sub { + $self->write($self->{long_res}); + }); + } else { # all done! + $self->{long_res} = undef; + $self->watch_read(1); + res($self, '.'); + } + }; + $self->{long_res}->(); # kick off! + undef; +} + sub cmd_xhdr { my ($self, $header, $range) = @_; defined $self->{ng} or return '412 no news group currently selected'; @@ -455,19 +510,20 @@ sub cmd_xhdr { if (defined($r = xhdr($r, $header))) { more($self, "<$range> $r"); } + '.'; } else { # numeric range my $r = get_range($self, $range); return $r unless ref $r; my ($beg, $end) = @$r; more($self, '221 Header follows'); - foreach my $i ($beg..$end) { - $r = $self->art_lookup($i, 2); - next unless ref $r; - defined($r = xhdr($r, $header)) or next; - more($self, "$i $r"); - } + $self->long_response($beg, $end, sub { + my ($i) = @_; + $r = $self->art_lookup($$i, 2); + return unless ref $r; + defined($r = xhdr($r, $header)) or return; + more($self, "$$i $r"); + }); } - '.'; } sub cmd_xover { @@ -476,16 +532,16 @@ sub cmd_xover { return $r unless ref $r; my ($beg, $end) = @$r; more($self, "224 Overview information follows for $beg to $end"); - foreach my $i ($beg..$end) { - my $r = $self->art_lookup($i, 2); - next unless ref $r; + $self->long_response($beg, $end, sub { + my ($i) = @_; + my $r = $self->art_lookup($$i, 2); + return unless ref $r; more($self, join("\t", $r->[0], map { my $h = xhdr($r, $_); defined $h ? $h : ''; } @OVERVIEW )); - } - '.'; + }); } sub res { @@ -505,7 +561,7 @@ sub do_write { # Do not watch for readability if we have data in the queue, # instead re-enable watching for readability when we can - $self->watch_read(0) unless $done; + $self->watch_read(0) if (!$done || $self->{long_res}); $done; } @@ -539,7 +595,6 @@ sub event_write { sub event_read { my ($self) = @_; use constant LINE_MAX => 512; # RFC 977 section 2.3 - use Time::HiRes qw(gettimeofday tv_interval); my $r = 1; my $buf = $self->read(LINE_MAX) or return $self->close; while ($r > 0 && $$buf =~ s/\A([^\r\n]+)\r?\n//) { -- cgit v1.2.3-24-ge0c7