user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 3/4] nntp: support COMPRESS DEFLATE per RFC 8054
Date: Fri,  5 Jul 2019 22:53:38 +0000	[thread overview]
Message-ID: <20190705225339.5698-4-e@80x24.org> (raw)
In-Reply-To: <20190705225339.5698-1-e@80x24.org>

This is only tested so far with my patches to Net::NNTP at:
https://rt.cpan.org/Ticket/Display.html?id=129967

Memory use in C10K situations is disappointing, but that's
the nature of compression.

gzip compression over HTTPS does have the advantage of not
keeping zlib streams open when clients are idle, at the
cost of worse compression.
---
 MANIFEST                       |   2 +
 TODO                           |   2 -
 lib/PublicInbox/NNTP.pm        |  23 +++-
 lib/PublicInbox/NNTPdeflate.pm | 104 ++++++++++++++++
 script/public-inbox-nntpd      |   2 +-
 t/nntpd-tls.t                  |  11 ++
 t/nntpd-validate.t             | 210 +++++++++++++++++++++++++++++++++
 t/nntpd.t                      |   7 ++
 8 files changed, 357 insertions(+), 4 deletions(-)
 create mode 100644 lib/PublicInbox/NNTPdeflate.pm
 create mode 100644 t/nntpd-validate.t

diff --git a/MANIFEST b/MANIFEST
index 4cb5f38f..24280351 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -113,6 +113,7 @@ lib/PublicInbox/MsgTime.pm
 lib/PublicInbox/Msgmap.pm
 lib/PublicInbox/NNTP.pm
 lib/PublicInbox/NNTPD.pm
+lib/PublicInbox/NNTPdeflate.pm
 lib/PublicInbox/NewsWWW.pm
 lib/PublicInbox/Over.pm
 lib/PublicInbox/OverIdx.pm
@@ -230,6 +231,7 @@ t/msgmap.t
 t/msgtime.t
 t/nntp.t
 t/nntpd-tls.t
+t/nntpd-validate.t
 t/nntpd.t
 t/nulsubject.t
 t/over.t
diff --git a/TODO b/TODO
index 7bd68c7b..2d20bad4 100644
--- a/TODO
+++ b/TODO
@@ -33,8 +33,6 @@ all need to be considered for everything we introduce)
   ensure things continue working as they should (or more better)
   while retaining compatibility with old versions.
 
-* NNTP COMPRESS extension (see innd)
-
 * Support more of RFC 3977 (NNTP)
 
 * Combined "super server" for NNTP/HTTP/POP3 to reduce memory overhead
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 631fd3c7..d6f315ba 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -20,6 +20,7 @@ use Time::Local qw(timegm timelocal);
 use constant {
 	LINE_MAX => 512, # RFC 977 section 2.3
 	r501 => '501 command syntax error',
+	r502 => '502 Command unavailable',
 	r221 => '221 Header follows',
 	r224 => '224 Overview information follows (multi-line)',
 	r225 =>	'225 Headers follow (multi-line)',
@@ -41,6 +42,7 @@ LIST ACTIVE ACTIVE.TIMES NEWSGROUPS OVERVIEW.FMT\r
 HDR\r
 OVER\r
 
+my $have_deflate;
 my $EXPMAP; # fd -> [ idle_time, $self ]
 my $expt;
 our $EXPTIME = 180; # 3 minutes
@@ -897,11 +899,13 @@ sub cmd_xover ($;$) {
 	});
 }
 
+sub compressed { undef }
+
 sub cmd_starttls ($) {
 	my ($self) = @_;
 	my $sock = $self->{sock} or return;
 	# RFC 4642 2.2.1
-	(ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable';
+	return r502 if (ref($sock) eq 'IO::Socket::SSL' || $self->compressed);
 	my $opt = $self->{nntpd}->{accept_tls} or
 		return '580 can not initiate TLS negotiation';
 	res($self, '382 Continue with TLS negotiation');
@@ -910,6 +914,17 @@ sub cmd_starttls ($) {
 	undef;
 }
 
+# RFC 8054
+sub cmd_compress ($$) {
+	my ($self, $alg) = @_;
+	return '503 Only the DEFLATE is supported' if uc($alg) ne 'DEFLATE';
+	return r502 if $self->compressed || !$have_deflate;
+	res($self, '206 Compression active');
+	PublicInbox::NNTPdeflate->enable($self);
+	$self->requeue;
+	undef
+}
+
 sub cmd_xpath ($$) {
 	my ($self, $mid) = @_;
 	return r501 unless $mid =~ /\A<(.+)>\z/;
@@ -997,4 +1012,10 @@ sub busy {
 	($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now));
 }
 
+# this is an import to prevent "perl -c" from complaining about fields
+sub import {
+	$have_deflate = eval { require PublicInbox::NNTPdeflate } and
+		$CAPABILITIES .= "COMPRESS DEFLATE\r\n";
+}
+
 1;
diff --git a/lib/PublicInbox/NNTPdeflate.pm b/lib/PublicInbox/NNTPdeflate.pm
new file mode 100644
index 00000000..66210bfa
--- /dev/null
+++ b/lib/PublicInbox/NNTPdeflate.pm
@@ -0,0 +1,104 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# RFC 8054 NNTP COMPRESS DEFLATE implementation
+# Warning, enabling compression for C10K NNTP clients is rather
+# expensive in terms of memory use.
+#
+# RSS usage for 10K idle-but-did-something NNTP clients on 64-bit:
+#   TLS + DEFLATE :  1.8 GB  (MemLevel=9, 1.2 GB with MemLevel=8)
+#   TLS only      :  <200MB
+#   plain         :   <50MB
+package PublicInbox::NNTPdeflate;
+use strict;
+use warnings;
+use 5.010_001;
+use base qw(PublicInbox::NNTP);
+use Compress::Raw::Zlib;
+use Hash::Util qw(unlock_hash); # dependency of fields for perl 5.10+, anyways
+
+my %IN_OPT = (
+	-Bufsize => PublicInbox::NNTP::LINE_MAX,
+	-WindowBits => -15, # RFC 1951
+	-AppendOutput => 1,
+);
+
+my %OUT_OPT = (
+	# nnrpd (INN) and Compress::Raw::Zlib favor MemLevel=9,
+	# but the zlib C library and git use MemLevel=8
+	# as the default.  Using 8 drops our memory use with 10K
+	# TLS clients from 1.8 GB to 1.2 GB, but...
+	# FIXME: sometimes clients fail with 8, so we use 9
+	# -MemLevel => 9,
+
+	# needs more testing, nothing obviously different in terms of memory
+	-Bufsize => 65536,
+
+	-WindowBits => -15, # RFC 1951
+	-AppendOutput => 1,
+);
+
+sub enable {
+	my ($class, $self) = @_;
+	unlock_hash(%$self);
+	bless $self, $class;
+	$self->{zin} = [ Compress::Raw::Zlib::Inflate->new(%IN_OPT), '' ];
+	$self->{zout} = [ Compress::Raw::Zlib::Deflate->new(%OUT_OPT), '' ];
+}
+
+# overrides PublicInbox::NNTP::compressed
+sub compressed { 1 }
+
+# SUPER is PublicInbox::DS::do_read, so $_[1] may be a reference or not
+sub do_read ($$$$) {
+	my ($self, $rbuf, $len, $off) = @_;
+
+	my $zin = $self->{zin} or return; # closed
+	my $deflated = \($zin->[1]);
+	my $r = $self->SUPER::do_read($deflated, $len) or return;
+
+	# assert(length($$rbuf) == $off) as far as NNTP.pm is concerned
+	# -ConsumeInput is true, so $deflated is automatically emptied
+	my $err = $zin->[0]->inflate($deflated, $rbuf);
+	if ($err == Z_OK) {
+		$r = length($$rbuf) and return $r;
+		# nothing ready, yet, get more, later
+		$self->requeue;
+	} else {
+		delete $self->{zin};
+		$self->close;
+	}
+	0;
+}
+
+# override PublicInbox::DS::msg_more
+sub msg_more ($$) {
+	my $self = $_[0];
+	my $zout = $self->{zout};
+
+	# $_[1] may be a reference or not for ->deflate
+	my $err = $zout->[0]->deflate($_[1], $zout->[1]);
+	$err == Z_OK or die "->deflate failed $err";
+	1;
+}
+
+# SUPER is PublicInbox::DS::write, so $_[1] may be a reference or not
+sub write ($$) {
+	my $self = $_[0];
+	return $self->SUPER::write($_[1]) if ref($_[1]) eq 'CODE';
+	my $zout = $self->{zout};
+	my $deflated = pop @$zout;
+
+	# $_[1] may be a reference or not for ->deflate
+	my $err = $zout->[0]->deflate($_[1], $deflated);
+	$err == Z_OK or die "->deflate failed $err";
+	$err = $zout->[0]->flush($deflated, Z_PARTIAL_FLUSH);
+	$err == Z_OK or die "->flush failed $err";
+
+	# PublicInbox::DS::write puts partial writes into another buffer,
+	# so we can prepare the next deflate buffer:
+	$zout->[1] = '';
+	$self->SUPER::write(\$deflated);
+}
+
+1;
diff --git a/script/public-inbox-nntpd b/script/public-inbox-nntpd
index 55bf330e..cd273849 100755
--- a/script/public-inbox-nntpd
+++ b/script/public-inbox-nntpd
@@ -6,7 +6,7 @@
 use strict;
 use warnings;
 require PublicInbox::Daemon;
-require PublicInbox::NNTP;
+use PublicInbox::NNTP; # need to call import
 require PublicInbox::NNTPD;
 my $nntpd = PublicInbox::NNTPD->new;
 PublicInbox::Daemon::run('0.0.0.0:119',
diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t
index 4cf53daa..dba0f74c 100644
--- a/t/nntpd-tls.t
+++ b/t/nntpd-tls.t
@@ -27,6 +27,9 @@ require './t/common.perl';
 require PublicInbox::InboxWritable;
 require PublicInbox::MIME;
 require PublicInbox::SearchIdx;
+my $need_zlib;
+eval { require Compress::Raw::Zlib } or
+	$need_zlib = 'Compress::Raw::Zlib missing';
 my $version = 2; # v2 needs newer git
 require_git('2.6') if $version >= 2;
 my $tmpdir = tempdir('pi-nntpd-tls-XXXXXX', TMPDIR => 1, CLEANUP => 1);
@@ -232,6 +235,14 @@ sub get_capa {
 		die "unexpected: $!" unless defined($r);
 		die 'unexpected EOF' if $r == 0;
 	} until $capa =~ /\.\r\n\z/;
+
+	my $deflate_capa = qr/\r\nCOMPRESS DEFLATE\r\n/;
+	if ($need_zlib) {
+		unlike($capa, $deflate_capa,
+			'COMPRESS DEFLATE NOT advertised '.$need_zlib);
+	} else {
+		like($capa, $deflate_capa, 'COMPRESS DEFLATE advertised');
+	}
 	$capa;
 }
 
diff --git a/t/nntpd-validate.t b/t/nntpd-validate.t
new file mode 100644
index 00000000..1a325105
--- /dev/null
+++ b/t/nntpd-validate.t
@@ -0,0 +1,210 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Integration test to validate compression.
+use strict;
+use warnings;
+use File::Temp qw(tempdir);
+use Test::More;
+use Symbol qw(gensym);
+use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+my $inbox_dir = $ENV{GIANT_INBOX_DIR};
+plan skip_all => "GIANT_INBOX_DIR not defined for $0" unless $inbox_dir;
+my $mid = $ENV{TEST_MID};
+
+# This test is also an excuse for me to experiment with Perl threads :P
+unless (eval 'use threads; 1') {
+	plan skip_all => "$0 requires a threaded perl" if $@;
+}
+
+# Net::NNTP is part of the standard library, but distros may split it off...
+foreach my $mod (qw(DBD::SQLite Net::NNTP Compress::Raw::Zlib)) {
+	eval "require $mod";
+	plan skip_all => "$mod missing for $0" if $@;
+}
+
+my $test_compress = Net::NNTP->can('compress');
+if (!$test_compress) {
+	diag 'Your Net::NNTP does not yet support compression';
+	diag 'See: https://rt.cpan.org/Ticket/Display.html?id=129967';
+}
+my $test_tls = $ENV{TEST_SKIP_TLS} ? 0 : eval { require IO::Socket::SSL };
+my $cert = 'certs/server-cert.pem';
+my $key = 'certs/server-key.pem';
+if ($test_tls && !-r $key || !-r $cert) {
+	plan skip_all => "certs/ missing for $0, run ./certs/create-certs.perl";
+}
+require './t/common.perl';
+my $keep_tmp = !!$ENV{TEST_KEEP_TMP};
+my $tmpdir = tempdir('nntpd-validate-XXXXXX',TMPDIR => 1,CLEANUP => $keep_tmp);
+my (%OPT, $pid, $tail_pid, $host_port, $group);
+my $batch = 1000;
+END {
+	foreach ($pid, $tail_pid) {
+		kill 'TERM', $_ if defined $_;
+	}
+};
+if (($ENV{NNTP_TEST_URL} // '') =~ m!\Anntp://([^/]+)/([^/]+)\z!) {
+	($host_port, $group) = ($1, $2);
+	$host_port .= ":119" unless index($host_port, ':') > 0;
+} else {
+	make_local_server();
+}
+my $test_article = $ENV{TEST_ARTICLE} // 0;
+my $test_xover = $ENV{TEST_XOVER} // 1;
+
+if ($test_tls) {
+	my $nntp = Net::NNTP->new($host_port, %OPT);
+	ok($nntp->starttls, 'STARTTLS works');
+	ok($nntp->compress, 'COMPRESS works') if $test_compress;
+	ok($nntp->quit, 'QUIT after starttls OK');
+}
+if ($test_compress) {
+	my $nntp = Net::NNTP->new($host_port, %OPT);
+	ok($nntp->compress, 'COMPRESS works');
+	ok($nntp->quit, 'QUIT after compress OK');
+}
+
+sub do_get_all {
+	my ($methods) = @_;
+	my $desc = join(',', @$methods);
+	my $t0 = clock_gettime(CLOCK_MONOTONIC);
+	my $dig = Digest::SHA->new(1);
+	my $digfh = gensym;
+	my $tmpfh;
+	if ($keep_tmp) {
+		open $tmpfh, '>', "$tmpdir/$desc.raw" or die $!;
+	}
+	my $tmp = { dig => $dig, tmpfh => $tmpfh };
+	tie *$digfh, 'DigestPipe', $tmp;
+	my $nntp = Net::NNTP->new($host_port, %OPT);
+	$nntp->article("<$mid>", $digfh) if $mid;
+	foreach my $m (@$methods) {
+		my $res = $nntp->$m;
+		print STDERR "# $m got $res ($desc)\n" if !$res;
+	}
+	$nntp->article("<$mid>", $digfh) if $mid;
+	my ($num, $first, $last) = $nntp->group($group);
+	unless (defined $num && defined $first && defined $last) {
+		warn "Invalid group\n";
+		return undef;
+	}
+	my $i;
+	for ($i = $first; $i < $last; $i += $batch) {
+		my $j = $i + $batch - 1;
+		$j = $last if $j > $last;
+		if ($test_xover) {
+			my $xover = $nntp->xover("$i-$j");
+			for my $n (sort { $a <=> $b } keys %$xover) {
+				my $line = join("\t", @{$xover->{$n}});
+				$line =~ tr/\r//d;
+				$dig->add("$n\t".$line);
+			}
+		}
+		if ($test_article) {
+			for my $n ($i..$j) {
+				$nntp->article($n, $digfh) and next;
+				next if $nntp->code == 423;
+				my $res = $nntp->code.' '.  $nntp->message;
+
+				$res =~ tr/\r\n//d;
+				print STDERR "# Article $n ($desc): $res\n";
+			}
+		}
+	}
+	my $q = $nntp->quit;
+	print STDERR "# quit failed: ".$nntp->code."\n" if !$q;
+	my $elapsed = sprintf('%0.3f', clock_gettime(CLOCK_MONOTONIC) - $t0);
+	my $res = $dig->hexdigest;
+	print STDERR "# $desc - $res (${elapsed}s)\n";
+	$res;
+}
+my @tests = ([]);
+push @tests, [ 'compress' ] if $test_compress;
+push @tests, [ 'starttls' ] if $test_tls;
+push @tests, [ 'starttls', 'compress' ] if $test_tls && $test_compress;
+my (@keys, %thr, %res);
+for my $m (@tests) {
+	my $key = join(',', @$m);
+	push @keys, $key;
+	diag "$key start";
+	$thr{$key} = threads->create(\&do_get_all, $m);
+}
+
+$res{$_} = $thr{$_}->join for @keys;
+my $plain = $res{''};
+ok($plain, "plain got $plain");
+is($res{$_}, $plain, "$_ matches '' result") for @keys;
+
+done_testing();
+
+sub make_local_server {
+	require PublicInbox::Inbox;
+	$group = 'inbox.test.perf.nntpd';
+	my $ibx = { mainrepo => $inbox_dir, newsgroup => $group };
+	$ibx = PublicInbox::Inbox->new($ibx);
+	my $nntpd = 'blib/script/public-inbox-nntpd';
+	my $pi_config = "$tmpdir/config";
+	{
+		open my $fh, '>', $pi_config or die "open($pi_config): $!";
+		print $fh <<"" or die "print $pi_config: $!";
+[publicinbox "test"]
+	newsgroup = $group
+	mainrepo = $inbox_dir
+	address = test\@example.com
+
+		close $fh or die "close($pi_config): $!";
+	}
+	my ($out, $err) = ("$tmpdir/out", "$tmpdir/err");
+	for ($out, $err) {
+		open my $fh, '>', $_ or die "truncate: $!";
+	}
+	if (my $tail_cmd = $ENV{TAIL}) { # don't assume GNU tail
+		$tail_pid = fork;
+		if (defined $tail_pid && $tail_pid == 0) {
+			open STDOUT, '>&STDERR' or die ">&2 failed: $!";
+			exec(split(' ', $tail_cmd), $out, $err);
+		}
+	}
+	my $sock = tcp_server();
+	ok($sock, 'sock created');
+	$host_port = $sock->sockhost . ':' . $sock->sockport;
+
+	# not using multiple workers, here, since we want to increase
+	# the chance of tripping concurrency bugs within PublicInbox/NNTP*.pm
+	my $cmd = [ $nntpd, "--stdout=$out", "--stderr=$err", '-W0' ];
+	push @$cmd, "-lnntp://$host_port";
+	if ($test_tls) {
+		push @$cmd, "--cert=$cert", "--key=$key";
+		%OPT = (
+			SSL_hostname => 'server.local',
+			SSL_verifycn_name => 'server.local',
+			SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_PEER(),
+			SSL_ca_file => 'certs/test-ca.pem',
+		);
+	}
+	print STDERR "# CMD ". join(' ', @$cmd). "\n";
+	$pid = spawn_listener({ PI_CONFIG => $pi_config }, $cmd, [$sock]);
+}
+
+package DigestPipe;
+use strict;
+use warnings;
+
+sub TIEHANDLE {
+	my ($class, $self) = @_;
+	bless $self, $class;
+}
+
+sub PRINT {
+	my $self = shift;
+	my $data = join('', @_);
+	# Net::NNTP emit different line-endings depending on TLS or not...:
+	$data =~ tr/\r//d;
+	$self->{dig}->add($data);
+	if (my $tmpfh = $self->{tmpfh}) {
+		print $tmpfh $data;
+	}
+	1;
+}
+1;
diff --git a/t/nntpd.t b/t/nntpd.t
index 1c5ae8d7..74b5e06d 100644
--- a/t/nntpd.t
+++ b/t/nntpd.t
@@ -147,6 +147,13 @@ EOF
 	$buf = read_til_dot($s);
 	like($buf, qr/\r\nVERSION 2\r\n/s, 'CAPABILITIES works');
 	unlike($buf, qr/STARTTLS/s, 'STARTTLS not advertised');
+	my $deflate_capa = qr/\r\nCOMPRESS DEFLATE\r\n/;
+	if (eval { require Compress::Raw::Zlib }) {
+		like($buf, $deflate_capa, 'DEFLATE advertised');
+	} else {
+		unlike($buf, $deflate_capa,
+			'DEFLATE not advertised (Compress::Raw::Zlib missing)');
+	}
 
 	syswrite($s, "NEWGROUPS 19990424 000000 GMT\r\n");
 	$buf = read_til_dot($s);
-- 
EW


  parent reply	other threads:[~2019-07-05 22:53 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-07-05 22:53 [PATCH 0/4] implement NNTP COMPRESS per RFC 8054 Eric Wong
2019-07-05 22:53 ` [PATCH 1/4] nntp: use msg_more as a method Eric Wong
2019-07-05 22:53 ` [PATCH 2/4] nntp: move LINE_MAX constant to the top Eric Wong
2019-07-05 22:53 ` Eric Wong [this message]
2019-07-05 22:53 ` [RFC 4/4] nntp: reduce memory overhead of zlib Eric Wong
2019-07-07  7:08 ` [PATCH 5/4] nntp: improve error reporting for COMPRESS Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: http://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20190705225339.5698-4-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).