* [PATCH 3/4] nntp: support COMPRESS DEFLATE per RFC 8054
2019-07-05 22:53 [PATCH 0/4] implement NNTP COMPRESS per RFC 8054 Eric Wong
2019-07-05 22:53 ` [PATCH 1/4] nntp: use msg_more as a method Eric Wong
2019-07-05 22:53 ` [PATCH 2/4] nntp: move LINE_MAX constant to the top Eric Wong
@ 2019-07-05 22:53 ` Eric Wong
2019-07-05 22:53 ` [RFC 4/4] nntp: reduce memory overhead of zlib Eric Wong
2019-07-07 7:08 ` [PATCH 5/4] nntp: improve error reporting for COMPRESS Eric Wong
4 siblings, 0 replies; 6+ messages in thread
From: Eric Wong @ 2019-07-05 22:53 UTC (permalink / raw)
To: meta
This is only tested so far with my patches to Net::NNTP at:
https://rt.cpan.org/Ticket/Display.html?id=129967
Memory use in C10K situations is disappointing, but that's
the nature of compression.
gzip compression over HTTPS does have the advantage of not
keeping zlib streams open when clients are idle, at the
cost of worse compression.
---
MANIFEST | 2 +
TODO | 2 -
lib/PublicInbox/NNTP.pm | 23 +++-
lib/PublicInbox/NNTPdeflate.pm | 104 ++++++++++++++++
script/public-inbox-nntpd | 2 +-
t/nntpd-tls.t | 11 ++
t/nntpd-validate.t | 210 +++++++++++++++++++++++++++++++++
t/nntpd.t | 7 ++
8 files changed, 357 insertions(+), 4 deletions(-)
create mode 100644 lib/PublicInbox/NNTPdeflate.pm
create mode 100644 t/nntpd-validate.t
diff --git a/MANIFEST b/MANIFEST
index 4cb5f38f..24280351 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -113,6 +113,7 @@ lib/PublicInbox/MsgTime.pm
lib/PublicInbox/Msgmap.pm
lib/PublicInbox/NNTP.pm
lib/PublicInbox/NNTPD.pm
+lib/PublicInbox/NNTPdeflate.pm
lib/PublicInbox/NewsWWW.pm
lib/PublicInbox/Over.pm
lib/PublicInbox/OverIdx.pm
@@ -230,6 +231,7 @@ t/msgmap.t
t/msgtime.t
t/nntp.t
t/nntpd-tls.t
+t/nntpd-validate.t
t/nntpd.t
t/nulsubject.t
t/over.t
diff --git a/TODO b/TODO
index 7bd68c7b..2d20bad4 100644
--- a/TODO
+++ b/TODO
@@ -33,8 +33,6 @@ all need to be considered for everything we introduce)
ensure things continue working as they should (or more better)
while retaining compatibility with old versions.
-* NNTP COMPRESS extension (see innd)
-
* Support more of RFC 3977 (NNTP)
* Combined "super server" for NNTP/HTTP/POP3 to reduce memory overhead
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 631fd3c7..d6f315ba 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -20,6 +20,7 @@ use Time::Local qw(timegm timelocal);
use constant {
LINE_MAX => 512, # RFC 977 section 2.3
r501 => '501 command syntax error',
+ r502 => '502 Command unavailable',
r221 => '221 Header follows',
r224 => '224 Overview information follows (multi-line)',
r225 => '225 Headers follow (multi-line)',
@@ -41,6 +42,7 @@ LIST ACTIVE ACTIVE.TIMES NEWSGROUPS OVERVIEW.FMT\r
HDR\r
OVER\r
+my $have_deflate;
my $EXPMAP; # fd -> [ idle_time, $self ]
my $expt;
our $EXPTIME = 180; # 3 minutes
@@ -897,11 +899,13 @@ sub cmd_xover ($;$) {
});
}
+sub compressed { undef }
+
sub cmd_starttls ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
# RFC 4642 2.2.1
- (ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable';
+ return r502 if (ref($sock) eq 'IO::Socket::SSL' || $self->compressed);
my $opt = $self->{nntpd}->{accept_tls} or
return '580 can not initiate TLS negotiation';
res($self, '382 Continue with TLS negotiation');
@@ -910,6 +914,17 @@ sub cmd_starttls ($) {
undef;
}
+# RFC 8054
+sub cmd_compress ($$) {
+ my ($self, $alg) = @_;
+ return '503 Only the DEFLATE is supported' if uc($alg) ne 'DEFLATE';
+ return r502 if $self->compressed || !$have_deflate;
+ res($self, '206 Compression active');
+ PublicInbox::NNTPdeflate->enable($self);
+ $self->requeue;
+ undef
+}
+
sub cmd_xpath ($$) {
my ($self, $mid) = @_;
return r501 unless $mid =~ /\A<(.+)>\z/;
@@ -997,4 +1012,10 @@ sub busy {
($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now));
}
+# this is an import to prevent "perl -c" from complaining about fields
+sub import {
+ $have_deflate = eval { require PublicInbox::NNTPdeflate } and
+ $CAPABILITIES .= "COMPRESS DEFLATE\r\n";
+}
+
1;
diff --git a/lib/PublicInbox/NNTPdeflate.pm b/lib/PublicInbox/NNTPdeflate.pm
new file mode 100644
index 00000000..66210bfa
--- /dev/null
+++ b/lib/PublicInbox/NNTPdeflate.pm
@@ -0,0 +1,104 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# RFC 8054 NNTP COMPRESS DEFLATE implementation
+# Warning, enabling compression for C10K NNTP clients is rather
+# expensive in terms of memory use.
+#
+# RSS usage for 10K idle-but-did-something NNTP clients on 64-bit:
+# TLS + DEFLATE : 1.8 GB (MemLevel=9, 1.2 GB with MemLevel=8)
+# TLS only : <200MB
+# plain : <50MB
+package PublicInbox::NNTPdeflate;
+use strict;
+use warnings;
+use 5.010_001;
+use base qw(PublicInbox::NNTP);
+use Compress::Raw::Zlib;
+use Hash::Util qw(unlock_hash); # dependency of fields for perl 5.10+, anyways
+
+my %IN_OPT = (
+ -Bufsize => PublicInbox::NNTP::LINE_MAX,
+ -WindowBits => -15, # RFC 1951
+ -AppendOutput => 1,
+);
+
+my %OUT_OPT = (
+ # nnrpd (INN) and Compress::Raw::Zlib favor MemLevel=9,
+ # but the zlib C library and git use MemLevel=8
+ # as the default. Using 8 drops our memory use with 10K
+ # TLS clients from 1.8 GB to 1.2 GB, but...
+ # FIXME: sometimes clients fail with 8, so we use 9
+ # -MemLevel => 9,
+
+ # needs more testing, nothing obviously different in terms of memory
+ -Bufsize => 65536,
+
+ -WindowBits => -15, # RFC 1951
+ -AppendOutput => 1,
+);
+
+sub enable {
+ my ($class, $self) = @_;
+ unlock_hash(%$self);
+ bless $self, $class;
+ $self->{zin} = [ Compress::Raw::Zlib::Inflate->new(%IN_OPT), '' ];
+ $self->{zout} = [ Compress::Raw::Zlib::Deflate->new(%OUT_OPT), '' ];
+}
+
+# overrides PublicInbox::NNTP::compressed
+sub compressed { 1 }
+
+# SUPER is PublicInbox::DS::do_read, so $_[1] may be a reference or not
+sub do_read ($$$$) {
+ my ($self, $rbuf, $len, $off) = @_;
+
+ my $zin = $self->{zin} or return; # closed
+ my $deflated = \($zin->[1]);
+ my $r = $self->SUPER::do_read($deflated, $len) or return;
+
+ # assert(length($$rbuf) == $off) as far as NNTP.pm is concerned
+ # -ConsumeInput is true, so $deflated is automatically emptied
+ my $err = $zin->[0]->inflate($deflated, $rbuf);
+ if ($err == Z_OK) {
+ $r = length($$rbuf) and return $r;
+ # nothing ready, yet, get more, later
+ $self->requeue;
+ } else {
+ delete $self->{zin};
+ $self->close;
+ }
+ 0;
+}
+
+# override PublicInbox::DS::msg_more
+sub msg_more ($$) {
+ my $self = $_[0];
+ my $zout = $self->{zout};
+
+ # $_[1] may be a reference or not for ->deflate
+ my $err = $zout->[0]->deflate($_[1], $zout->[1]);
+ $err == Z_OK or die "->deflate failed $err";
+ 1;
+}
+
+# SUPER is PublicInbox::DS::write, so $_[1] may be a reference or not
+sub write ($$) {
+ my $self = $_[0];
+ return $self->SUPER::write($_[1]) if ref($_[1]) eq 'CODE';
+ my $zout = $self->{zout};
+ my $deflated = pop @$zout;
+
+ # $_[1] may be a reference or not for ->deflate
+ my $err = $zout->[0]->deflate($_[1], $deflated);
+ $err == Z_OK or die "->deflate failed $err";
+ $err = $zout->[0]->flush($deflated, Z_PARTIAL_FLUSH);
+ $err == Z_OK or die "->flush failed $err";
+
+ # PublicInbox::DS::write puts partial writes into another buffer,
+ # so we can prepare the next deflate buffer:
+ $zout->[1] = '';
+ $self->SUPER::write(\$deflated);
+}
+
+1;
diff --git a/script/public-inbox-nntpd b/script/public-inbox-nntpd
index 55bf330e..cd273849 100755
--- a/script/public-inbox-nntpd
+++ b/script/public-inbox-nntpd
@@ -6,7 +6,7 @@
use strict;
use warnings;
require PublicInbox::Daemon;
-require PublicInbox::NNTP;
+use PublicInbox::NNTP; # need to call import
require PublicInbox::NNTPD;
my $nntpd = PublicInbox::NNTPD->new;
PublicInbox::Daemon::run('0.0.0.0:119',
diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t
index 4cf53daa..dba0f74c 100644
--- a/t/nntpd-tls.t
+++ b/t/nntpd-tls.t
@@ -27,6 +27,9 @@ require './t/common.perl';
require PublicInbox::InboxWritable;
require PublicInbox::MIME;
require PublicInbox::SearchIdx;
+my $need_zlib;
+eval { require Compress::Raw::Zlib } or
+ $need_zlib = 'Compress::Raw::Zlib missing';
my $version = 2; # v2 needs newer git
require_git('2.6') if $version >= 2;
my $tmpdir = tempdir('pi-nntpd-tls-XXXXXX', TMPDIR => 1, CLEANUP => 1);
@@ -232,6 +235,14 @@ sub get_capa {
die "unexpected: $!" unless defined($r);
die 'unexpected EOF' if $r == 0;
} until $capa =~ /\.\r\n\z/;
+
+ my $deflate_capa = qr/\r\nCOMPRESS DEFLATE\r\n/;
+ if ($need_zlib) {
+ unlike($capa, $deflate_capa,
+ 'COMPRESS DEFLATE NOT advertised '.$need_zlib);
+ } else {
+ like($capa, $deflate_capa, 'COMPRESS DEFLATE advertised');
+ }
$capa;
}
diff --git a/t/nntpd-validate.t b/t/nntpd-validate.t
new file mode 100644
index 00000000..1a325105
--- /dev/null
+++ b/t/nntpd-validate.t
@@ -0,0 +1,210 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Integration test to validate compression.
+use strict;
+use warnings;
+use File::Temp qw(tempdir);
+use Test::More;
+use Symbol qw(gensym);
+use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+my $inbox_dir = $ENV{GIANT_INBOX_DIR};
+plan skip_all => "GIANT_INBOX_DIR not defined for $0" unless $inbox_dir;
+my $mid = $ENV{TEST_MID};
+
+# This test is also an excuse for me to experiment with Perl threads :P
+unless (eval 'use threads; 1') {
+ plan skip_all => "$0 requires a threaded perl" if $@;
+}
+
+# Net::NNTP is part of the standard library, but distros may split it off...
+foreach my $mod (qw(DBD::SQLite Net::NNTP Compress::Raw::Zlib)) {
+ eval "require $mod";
+ plan skip_all => "$mod missing for $0" if $@;
+}
+
+my $test_compress = Net::NNTP->can('compress');
+if (!$test_compress) {
+ diag 'Your Net::NNTP does not yet support compression';
+ diag 'See: https://rt.cpan.org/Ticket/Display.html?id=129967';
+}
+my $test_tls = $ENV{TEST_SKIP_TLS} ? 0 : eval { require IO::Socket::SSL };
+my $cert = 'certs/server-cert.pem';
+my $key = 'certs/server-key.pem';
+if ($test_tls && !-r $key || !-r $cert) {
+ plan skip_all => "certs/ missing for $0, run ./certs/create-certs.perl";
+}
+require './t/common.perl';
+my $keep_tmp = !!$ENV{TEST_KEEP_TMP};
+my $tmpdir = tempdir('nntpd-validate-XXXXXX',TMPDIR => 1,CLEANUP => $keep_tmp);
+my (%OPT, $pid, $tail_pid, $host_port, $group);
+my $batch = 1000;
+END {
+ foreach ($pid, $tail_pid) {
+ kill 'TERM', $_ if defined $_;
+ }
+};
+if (($ENV{NNTP_TEST_URL} // '') =~ m!\Anntp://([^/]+)/([^/]+)\z!) {
+ ($host_port, $group) = ($1, $2);
+ $host_port .= ":119" unless index($host_port, ':') > 0;
+} else {
+ make_local_server();
+}
+my $test_article = $ENV{TEST_ARTICLE} // 0;
+my $test_xover = $ENV{TEST_XOVER} // 1;
+
+if ($test_tls) {
+ my $nntp = Net::NNTP->new($host_port, %OPT);
+ ok($nntp->starttls, 'STARTTLS works');
+ ok($nntp->compress, 'COMPRESS works') if $test_compress;
+ ok($nntp->quit, 'QUIT after starttls OK');
+}
+if ($test_compress) {
+ my $nntp = Net::NNTP->new($host_port, %OPT);
+ ok($nntp->compress, 'COMPRESS works');
+ ok($nntp->quit, 'QUIT after compress OK');
+}
+
+sub do_get_all {
+ my ($methods) = @_;
+ my $desc = join(',', @$methods);
+ my $t0 = clock_gettime(CLOCK_MONOTONIC);
+ my $dig = Digest::SHA->new(1);
+ my $digfh = gensym;
+ my $tmpfh;
+ if ($keep_tmp) {
+ open $tmpfh, '>', "$tmpdir/$desc.raw" or die $!;
+ }
+ my $tmp = { dig => $dig, tmpfh => $tmpfh };
+ tie *$digfh, 'DigestPipe', $tmp;
+ my $nntp = Net::NNTP->new($host_port, %OPT);
+ $nntp->article("<$mid>", $digfh) if $mid;
+ foreach my $m (@$methods) {
+ my $res = $nntp->$m;
+ print STDERR "# $m got $res ($desc)\n" if !$res;
+ }
+ $nntp->article("<$mid>", $digfh) if $mid;
+ my ($num, $first, $last) = $nntp->group($group);
+ unless (defined $num && defined $first && defined $last) {
+ warn "Invalid group\n";
+ return undef;
+ }
+ my $i;
+ for ($i = $first; $i < $last; $i += $batch) {
+ my $j = $i + $batch - 1;
+ $j = $last if $j > $last;
+ if ($test_xover) {
+ my $xover = $nntp->xover("$i-$j");
+ for my $n (sort { $a <=> $b } keys %$xover) {
+ my $line = join("\t", @{$xover->{$n}});
+ $line =~ tr/\r//d;
+ $dig->add("$n\t".$line);
+ }
+ }
+ if ($test_article) {
+ for my $n ($i..$j) {
+ $nntp->article($n, $digfh) and next;
+ next if $nntp->code == 423;
+ my $res = $nntp->code.' '. $nntp->message;
+
+ $res =~ tr/\r\n//d;
+ print STDERR "# Article $n ($desc): $res\n";
+ }
+ }
+ }
+ my $q = $nntp->quit;
+ print STDERR "# quit failed: ".$nntp->code."\n" if !$q;
+ my $elapsed = sprintf('%0.3f', clock_gettime(CLOCK_MONOTONIC) - $t0);
+ my $res = $dig->hexdigest;
+ print STDERR "# $desc - $res (${elapsed}s)\n";
+ $res;
+}
+my @tests = ([]);
+push @tests, [ 'compress' ] if $test_compress;
+push @tests, [ 'starttls' ] if $test_tls;
+push @tests, [ 'starttls', 'compress' ] if $test_tls && $test_compress;
+my (@keys, %thr, %res);
+for my $m (@tests) {
+ my $key = join(',', @$m);
+ push @keys, $key;
+ diag "$key start";
+ $thr{$key} = threads->create(\&do_get_all, $m);
+}
+
+$res{$_} = $thr{$_}->join for @keys;
+my $plain = $res{''};
+ok($plain, "plain got $plain");
+is($res{$_}, $plain, "$_ matches '' result") for @keys;
+
+done_testing();
+
+sub make_local_server {
+ require PublicInbox::Inbox;
+ $group = 'inbox.test.perf.nntpd';
+ my $ibx = { mainrepo => $inbox_dir, newsgroup => $group };
+ $ibx = PublicInbox::Inbox->new($ibx);
+ my $nntpd = 'blib/script/public-inbox-nntpd';
+ my $pi_config = "$tmpdir/config";
+ {
+ open my $fh, '>', $pi_config or die "open($pi_config): $!";
+ print $fh <<"" or die "print $pi_config: $!";
+[publicinbox "test"]
+ newsgroup = $group
+ mainrepo = $inbox_dir
+ address = test\@example.com
+
+ close $fh or die "close($pi_config): $!";
+ }
+ my ($out, $err) = ("$tmpdir/out", "$tmpdir/err");
+ for ($out, $err) {
+ open my $fh, '>', $_ or die "truncate: $!";
+ }
+ if (my $tail_cmd = $ENV{TAIL}) { # don't assume GNU tail
+ $tail_pid = fork;
+ if (defined $tail_pid && $tail_pid == 0) {
+ open STDOUT, '>&STDERR' or die ">&2 failed: $!";
+ exec(split(' ', $tail_cmd), $out, $err);
+ }
+ }
+ my $sock = tcp_server();
+ ok($sock, 'sock created');
+ $host_port = $sock->sockhost . ':' . $sock->sockport;
+
+ # not using multiple workers, here, since we want to increase
+ # the chance of tripping concurrency bugs within PublicInbox/NNTP*.pm
+ my $cmd = [ $nntpd, "--stdout=$out", "--stderr=$err", '-W0' ];
+ push @$cmd, "-lnntp://$host_port";
+ if ($test_tls) {
+ push @$cmd, "--cert=$cert", "--key=$key";
+ %OPT = (
+ SSL_hostname => 'server.local',
+ SSL_verifycn_name => 'server.local',
+ SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_PEER(),
+ SSL_ca_file => 'certs/test-ca.pem',
+ );
+ }
+ print STDERR "# CMD ". join(' ', @$cmd). "\n";
+ $pid = spawn_listener({ PI_CONFIG => $pi_config }, $cmd, [$sock]);
+}
+
+package DigestPipe;
+use strict;
+use warnings;
+
+sub TIEHANDLE {
+ my ($class, $self) = @_;
+ bless $self, $class;
+}
+
+sub PRINT {
+ my $self = shift;
+ my $data = join('', @_);
+ # Net::NNTP emit different line-endings depending on TLS or not...:
+ $data =~ tr/\r//d;
+ $self->{dig}->add($data);
+ if (my $tmpfh = $self->{tmpfh}) {
+ print $tmpfh $data;
+ }
+ 1;
+}
+1;
diff --git a/t/nntpd.t b/t/nntpd.t
index 1c5ae8d7..74b5e06d 100644
--- a/t/nntpd.t
+++ b/t/nntpd.t
@@ -147,6 +147,13 @@ EOF
$buf = read_til_dot($s);
like($buf, qr/\r\nVERSION 2\r\n/s, 'CAPABILITIES works');
unlike($buf, qr/STARTTLS/s, 'STARTTLS not advertised');
+ my $deflate_capa = qr/\r\nCOMPRESS DEFLATE\r\n/;
+ if (eval { require Compress::Raw::Zlib }) {
+ like($buf, $deflate_capa, 'DEFLATE advertised');
+ } else {
+ unlike($buf, $deflate_capa,
+ 'DEFLATE not advertised (Compress::Raw::Zlib missing)');
+ }
syswrite($s, "NEWGROUPS 19990424 000000 GMT\r\n");
$buf = read_til_dot($s);
--
EW
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [RFC 4/4] nntp: reduce memory overhead of zlib
2019-07-05 22:53 [PATCH 0/4] implement NNTP COMPRESS per RFC 8054 Eric Wong
` (2 preceding siblings ...)
2019-07-05 22:53 ` [PATCH 3/4] nntp: support COMPRESS DEFLATE per RFC 8054 Eric Wong
@ 2019-07-05 22:53 ` Eric Wong
2019-07-07 7:08 ` [PATCH 5/4] nntp: improve error reporting for COMPRESS Eric Wong
4 siblings, 0 replies; 6+ messages in thread
From: Eric Wong @ 2019-07-05 22:53 UTC (permalink / raw)
To: meta
Using Z_FULL_FLUSH at the right places in our event loop, it
appears we can share a single zlib deflate context across ALL
clients in a process.
The zlib deflate context is the biggest factor in per-client
memory use, so being able to share that across many clients
results in a large memory savings.
With 10K idle-but-did-something NNTP clients connected to a
single process on a 64-bit system, TLS+DEFLATE used around
1.8 GB of RSS before this change. It now uses around 300 MB.
TLS via IO::Socket::SSL alone uses <200MB in the same situation,
so the actual memory reduction is over 10x.
This makes compression less efficient and bandwidth increases
around 45% in informal testing, but it's far better than no
compression at all. It's likely around the same level of
compression gzip gives on the HTTP side.
Security implications with TLS? I don't know, but I don't
really care, either... public-inbox-nntpd doesn't support
authentication and it's up to the client to enable compression.
It's not too different than Varnish caching gzipped responses
on the HTTP side and having responses go to multiple HTTPS
clients.
---
lib/PublicInbox/DS.pm | 4 ++-
lib/PublicInbox/NNTP.pm | 7 ++++
lib/PublicInbox/NNTPdeflate.pm | 59 +++++++++++++++++++++-------------
t/nntpd-validate.t | 8 ++++-
4 files changed, 54 insertions(+), 24 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 586c47cd..b16b1896 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -553,7 +553,9 @@ sub msg_more ($$) {
return 0;
}
}
- $self->write(\($_[1]));
+
+ # don't redispatch into NNTPdeflate::write
+ PublicInbox::DS::write($self, \($_[1]));
}
sub epwait ($$) {
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index d6f315ba..895858b7 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -642,6 +642,11 @@ sub long_response ($$) {
} elsif ($more) { # $self->{wbuf}:
update_idle_time($self);
+ # COMPRESS users all share the same DEFLATE context.
+ # Flush it here to ensure clients don't see
+ # each other's data
+ $self->zflush;
+
# no recursion, schedule another call ASAP
# but only after all pending writes are done
my $wbuf = $self->{wbuf} ||= [];
@@ -925,6 +930,8 @@ sub cmd_compress ($$) {
undef
}
+sub zflush {} # overridden by NNTPdeflate
+
sub cmd_xpath ($$) {
my ($self, $mid) = @_;
return r501 unless $mid =~ /\A<(.+)>\z/;
diff --git a/lib/PublicInbox/NNTPdeflate.pm b/lib/PublicInbox/NNTPdeflate.pm
index 66210bfa..78da2a58 100644
--- a/lib/PublicInbox/NNTPdeflate.pm
+++ b/lib/PublicInbox/NNTPdeflate.pm
@@ -2,13 +2,18 @@
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# RFC 8054 NNTP COMPRESS DEFLATE implementation
-# Warning, enabling compression for C10K NNTP clients is rather
-# expensive in terms of memory use.
#
# RSS usage for 10K idle-but-did-something NNTP clients on 64-bit:
-# TLS + DEFLATE : 1.8 GB (MemLevel=9, 1.2 GB with MemLevel=8)
-# TLS only : <200MB
-# plain : <50MB
+# TLS + DEFLATE[a] : 1.8 GB (MemLevel=9, 1.2 GB with MemLevel=8)
+# TLS + DEFLATE[b] : ~300MB
+# TLS only : <200MB
+# plain : <50MB
+#
+# [a] - initial implementation using per-client Deflate contexts and buffer
+#
+# [b] - memory-optimized implementation using a global deflate context.
+# It's less efficient in terms of compression, but way more
+# efficient in terms of server memory usage.
package PublicInbox::NNTPdeflate;
use strict;
use warnings;
@@ -23,11 +28,11 @@ my %IN_OPT = (
-AppendOutput => 1,
);
-my %OUT_OPT = (
+# global deflate context and buffer
+my $zbuf = \(my $buf = '');
+my $zout = Compress::Raw::Zlib::Deflate->new(
# nnrpd (INN) and Compress::Raw::Zlib favor MemLevel=9,
- # but the zlib C library and git use MemLevel=8
- # as the default. Using 8 drops our memory use with 10K
- # TLS clients from 1.8 GB to 1.2 GB, but...
+ # but the zlib C library and git use MemLevel=8 as the default.
# FIXME: sometimes clients fail with 8, so we use 9
# -MemLevel => 9,
@@ -43,7 +48,6 @@ sub enable {
unlock_hash(%$self);
bless $self, $class;
$self->{zin} = [ Compress::Raw::Zlib::Inflate->new(%IN_OPT), '' ];
- $self->{zout} = [ Compress::Raw::Zlib::Deflate->new(%OUT_OPT), '' ];
}
# overrides PublicInbox::NNTP::compressed
@@ -74,31 +78,42 @@ sub do_read ($$$$) {
# override PublicInbox::DS::msg_more
sub msg_more ($$) {
my $self = $_[0];
- my $zout = $self->{zout};
# $_[1] may be a reference or not for ->deflate
- my $err = $zout->[0]->deflate($_[1], $zout->[1]);
+ my $err = $zout->deflate($_[1], $zbuf);
$err == Z_OK or die "->deflate failed $err";
1;
}
-# SUPER is PublicInbox::DS::write, so $_[1] may be a reference or not
+sub zflush ($) {
+ my ($self) = @_;
+
+ my $deflated = $zbuf;
+ $zbuf = \(my $next = '');
+
+ my $err = $zout->flush($deflated, Z_FULL_FLUSH);
+ $err == Z_OK or die "->flush failed $err";
+
+ # We can still let the lower socket layer do buffering:
+ PublicInbox::DS::msg_more($self, $$deflated);
+}
+
+# compatible with PublicInbox::DS::write, so $_[1] may be a reference or not
sub write ($$) {
my $self = $_[0];
- return $self->SUPER::write($_[1]) if ref($_[1]) eq 'CODE';
- my $zout = $self->{zout};
- my $deflated = pop @$zout;
+ return PublicInbox::DS::write($self, $_[1]) if ref($_[1]) eq 'CODE';
+
+ my $deflated = $zbuf;
+ $zbuf = \(my $next = '');
# $_[1] may be a reference or not for ->deflate
- my $err = $zout->[0]->deflate($_[1], $deflated);
+ my $err = $zout->deflate($_[1], $deflated);
$err == Z_OK or die "->deflate failed $err";
- $err = $zout->[0]->flush($deflated, Z_PARTIAL_FLUSH);
+ $err = $zout->flush($deflated, Z_FULL_FLUSH);
$err == Z_OK or die "->flush failed $err";
- # PublicInbox::DS::write puts partial writes into another buffer,
- # so we can prepare the next deflate buffer:
- $zout->[1] = '';
- $self->SUPER::write(\$deflated);
+ # We can still let the socket layer do buffering:
+ PublicInbox::DS::write($self, $deflated);
}
1;
diff --git a/t/nntpd-validate.t b/t/nntpd-validate.t
index 1a325105..532ef729 100644
--- a/t/nntpd-validate.t
+++ b/t/nntpd-validate.t
@@ -112,11 +112,17 @@ sub do_get_all {
}
}
}
+
+ # hacky bytes_read thing added to Net::NNTP for testing:
+ my $bytes_read = '';
+ if ($nntp->can('bytes_read')) {
+ $bytes_read .= ' '.$nntp->bytes_read.'b';
+ }
my $q = $nntp->quit;
print STDERR "# quit failed: ".$nntp->code."\n" if !$q;
my $elapsed = sprintf('%0.3f', clock_gettime(CLOCK_MONOTONIC) - $t0);
my $res = $dig->hexdigest;
- print STDERR "# $desc - $res (${elapsed}s)\n";
+ print STDERR "# $desc - $res (${elapsed}s)$bytes_read\n";
$res;
}
my @tests = ([]);
--
EW
^ permalink raw reply related [flat|nested] 6+ messages in thread