user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 02/12] nntp: introduce long response API for streaming
Date: Sat, 19 Sep 2015 02:03:30 +0000	[thread overview]
Message-ID: <20150919020340.6484-3-e@80x24.org> (raw)
In-Reply-To: <20150919020340.6484-1-e@80x24.org>

XOVER, NEWNEWS, XHDR responses may be arbitrarily long and cause
memory usage via buffering.  This problem would exist even if we
were to optimize them by caching headers for fast retrieval and
search.

Introduce a generic API to handle all of these commands fairly
without triggering excessive buffering and unfairness of the
existing implementation.

Generating these responses is still expensive for now, but at least
the implementation is fair to other clients and prevents one client
from using one of these commands to monopolize resources away from
other clients.
---
 lib/PublicInbox/NNTP.pm | 105 ++++++++++++++++++++++++++++++++++++------------
 1 file changed, 80 insertions(+), 25 deletions(-)

diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 807b49f..f86c633 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -4,13 +4,14 @@ package PublicInbox::NNTP;
 use strict;
 use warnings;
 use base qw(Danga::Socket);
-use fields qw(nntpd article ng);
+use fields qw(nntpd article ng long_res);
 use PublicInbox::Msgmap;
 use PublicInbox::GitCatFile;
 use PublicInbox::MID qw(mid2path);
 use Email::Simple;
 use Data::Dumper qw(Dumper);
 use POSIX qw(strftime);
+use Time::HiRes qw(gettimeofday tv_interval ualarm);
 use constant {
 	r501 => '501 command syntax error',
 };
@@ -233,26 +234,38 @@ sub cmd_newnews {
 	my ($keep, $skip) = split('!', $newsgroups, 2);
 	ngpat2re($keep);
 	ngpat2re($skip);
-	$ts .= '..';
-
-	my $opts = { asc => 1, limit => 1000 };
+	my @srch;
 	foreach my $ng (values %{$self->{nntpd}->{groups}}) {
 		$ng->{name} =~ $keep or next;
 		$ng->{name} =~ $skip and next;
 		my $srch = $ng->search or next;
-		$opts->{offset} = 0;
+		push @srch, $srch;
+	};
+	return '.' unless @srch;
 
-		while (1) {
-			my $res = $srch->query($ts, $opts);
-			my $msgs = $res->{msgs};
-			my $nr = scalar @$msgs or last;
+	$ts .= '..';
+	my $opts = { asc => 1, limit => 1000, offset => 0 };
+
+	my $end = 0xffffffff; # would like to read 4 billion messages?
+	$self->long_response(0, $end, sub {
+		my ($i) = @_;
+		my $srch = $srch[0];
+		my $res = $srch->query($ts, $opts);
+		my $msgs = $res->{msgs};
+		if (my $nr = scalar @$msgs) {
 			more($self, '<' .
 				join(">\r\n<", map { $_->mid } @$msgs ).
 				'>');
 			$opts->{offset} += $nr;
+		} else {
+			shift @srch;
+			if (@srch) { # continue onto next newsgroup
+				$opts->{offset} = 0;
+			} else { # break out of the long response.
+				$$i = $end;
+			}
 		}
-	}
-	'.';
+	});
 }
 
 sub cmd_group {
@@ -441,6 +454,48 @@ sub xhdr {
 	$r;
 }
 
+sub long_response {
+	my ($self, $beg, $end, $cb) = @_;
+	die "BUG: nested long response" if $self->{long_res};
+
+	# make sure we disable reading during a long response,
+	# clients should not be sending us stuff and making us do more
+	# work while we are stream a response to them
+	$self->watch_read(0);
+	$self->{long_res} = sub {
+		# limit our own running time for fairness with other
+		# clients and to avoid buffering too much:
+		my $yield;
+		local $SIG{ALRM} = sub { $yield = 1 };
+		ualarm(100000);
+
+		my $err;
+		do {
+			eval { $cb->(\$beg) };
+		} until (($err = $@) || $self->{closed} || $yield ||
+			 $self->{write_buf_size} || ++$beg > $end);
+		ualarm(0);
+
+		if ($err || $self->{closed}) {
+			$self->{long_res} = undef;
+			warning("$err during long response") if $err;
+			$self->watch_read(1) unless $self->{closed};
+		} elsif ($yield || $self->{write_buf_size}) {
+			# no recursion, schedule another call ASAP
+			# but only after all pending writes are done
+			Danga::Socket->AddTimer(0, sub {
+				$self->write($self->{long_res});
+			});
+		} else { # all done!
+			$self->{long_res} = undef;
+			$self->watch_read(1);
+			res($self, '.');
+		}
+	};
+	$self->{long_res}->(); # kick off!
+	undef;
+}
+
 sub cmd_xhdr {
 	my ($self, $header, $range) = @_;
 	defined $self->{ng} or return '412 no news group currently selected';
@@ -455,19 +510,20 @@ sub cmd_xhdr {
 		if (defined($r = xhdr($r, $header))) {
 			more($self, "<$range> $r");
 		}
+		'.';
 	} else { # numeric range
 		my $r = get_range($self, $range);
 		return $r unless ref $r;
 		my ($beg, $end) = @$r;
 		more($self, '221 Header follows');
-		foreach my $i ($beg..$end) {
-			$r = $self->art_lookup($i, 2);
-			next unless ref $r;
-			defined($r = xhdr($r, $header)) or next;
-			more($self, "$i $r");
-		}
+		$self->long_response($beg, $end, sub {
+			my ($i) = @_;
+			$r = $self->art_lookup($$i, 2);
+			return unless ref $r;
+			defined($r = xhdr($r, $header)) or return;
+			more($self, "$$i $r");
+		});
 	}
-	'.';
 }
 
 sub cmd_xover {
@@ -476,16 +532,16 @@ sub cmd_xover {
 	return $r unless ref $r;
 	my ($beg, $end) = @$r;
 	more($self, "224 Overview information follows for $beg to $end");
-	foreach my $i ($beg..$end) {
-		my $r = $self->art_lookup($i, 2);
-		next unless ref $r;
+	$self->long_response($beg, $end, sub {
+		my ($i) = @_;
+		my $r = $self->art_lookup($$i, 2);
+		return unless ref $r;
 		more($self, join("\t", $r->[0],
 				map {
 					my $h = xhdr($r, $_);
 					defined $h ? $h : '';
 				} @OVERVIEW ));
-	}
-	'.';
+	});
 }
 
 sub res {
@@ -505,7 +561,7 @@ sub do_write {
 
 	# Do not watch for readability if we have data in the queue,
 	# instead re-enable watching for readability when we can
-	$self->watch_read(0) unless $done;
+	$self->watch_read(0) if (!$done || $self->{long_res});
 
 	$done;
 }
@@ -539,7 +595,6 @@ sub event_write {
 sub event_read {
 	my ($self) = @_;
 	use constant LINE_MAX => 512; # RFC 977 section 2.3
-	use Time::HiRes qw(gettimeofday tv_interval);
 	my $r = 1;
 	my $buf = $self->read(LINE_MAX) or return $self->close;
 	while ($r > 0 && $$buf =~ s/\A([^\r\n]+)\r?\n//) {
-- 
EW


  parent reply	other threads:[~2015-09-19  2:03 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-09-19  2:03 [PATCH 0/12] nntp: misc updates Eric Wong
2015-09-19  2:03 ` [PATCH 01/12] nntp: use write_buf_size instead write_buf Eric Wong
2015-09-19  2:03 ` Eric Wong [this message]
2015-09-19  2:03 ` [PATCH 03/12] nntp: use long response API for LISTGROUP Eric Wong
2015-09-19  2:03 ` [PATCH 04/12] nntp: implement command argument checking Eric Wong
2015-09-19  2:03 ` [PATCH 05/12] nntp: XOVER does not require range Eric Wong
2015-09-19  2:03 ` [PATCH 06/12] nntp: speed up XHDR for the Message-ID case Eric Wong
2015-09-19  2:03 ` [PATCH 07/12] nntp: implement XROVER, speed up XHDR for some cases Eric Wong
2015-09-19  2:03 ` [PATCH 08/12] nntp: implement XPATH Eric Wong
2015-09-19  2:03 ` [PATCH 09/12] nntp: fix logging of long responses Eric Wong
2015-09-19  2:03 ` [PATCH 10/12] nntp: fix ARTICLE/HEAD/BODY/STAT Eric Wong
2015-09-19  2:03 ` [PATCH 11/12] nntp: log to FDs given by the Nntpd module Eric Wong
2015-09-19  2:03 ` [PATCH 12/12] nntp: article lookups by Message-ID may cross newsgroups Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20150919020340.6484-3-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).