about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/DS.pm4
-rw-r--r--lib/PublicInbox/NNTP.pm45
-rw-r--r--lib/PublicInbox/NNTPdeflate.pm128
3 files changed, 167 insertions, 10 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 51515bf6..1e51dc41 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -583,7 +583,9 @@ sub msg_more ($$) {
             return 0;
         }
     }
-    $self->write(\($_[1]));
+
+    # don't redispatch into NNTPdeflate::write
+    PublicInbox::DS::write($self, \($_[1]));
 }
 
 sub epwait ($$) {
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index d106e315..6fee29f4 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -14,11 +14,13 @@ use PublicInbox::Git;
 require PublicInbox::EvCleanup;
 use Email::Simple;
 use POSIX qw(strftime);
-PublicInbox::DS->import(qw(now msg_more));
+PublicInbox::DS->import(qw(now));
 use Digest::SHA qw(sha1_hex);
 use Time::Local qw(timegm timelocal);
 use constant {
+        LINE_MAX => 512, # RFC 977 section 2.3
         r501 => '501 command syntax error',
+        r502 => '502 Command unavailable',
         r221 => '221 Header follows',
         r224 => '224 Overview information follows (multi-line)',
         r225 =>        '225 Headers follow (multi-line)',
@@ -40,6 +42,7 @@ LIST ACTIVE ACTIVE.TIMES NEWSGROUPS OVERVIEW.FMT\r
 HDR\r
 OVER\r
 
+my $have_deflate;
 my $EXPMAP; # fd -> [ idle_time, $self ]
 my $expt;
 our $EXPTIME = 180; # 3 minutes
@@ -158,12 +161,12 @@ sub cmd_xgtitle ($;$) {
 
 sub list_overview_fmt ($) {
         my ($self) = @_;
-        msg_more($self, $OVERVIEW_FMT);
+        $self->msg_more($OVERVIEW_FMT);
 }
 
 sub list_headers ($;$) {
         my ($self) = @_;
-        msg_more($self, $LIST_HEADERS);
+        $self->msg_more($LIST_HEADERS);
 }
 
 sub list_active ($;$) {
@@ -517,7 +520,7 @@ sub msg_body_write ($$) {
         $$msg =~ s/^\./../smg;
         $$msg =~ s/(?<!\r)\n/\r\n/sg; # Alpine barfs without this
         $$msg .= "\r\n" unless $$msg =~ /\r\n\z/s;
-        msg_more($self, $$msg);
+        $self->msg_more($$msg);
         '.'
 }
 
@@ -538,7 +541,7 @@ sub msg_hdr_write ($$$) {
         # affect messages already in the archive.
         $hdr =~ s/^(Message-ID:)[ \t]*\r\n[ \t]+([^\r]+)\r\n/$1 $2\r\n/igsm;
         $hdr .= "\r\n" if $body_follows;
-        msg_more($self, $hdr);
+        $self->msg_more($hdr);
 }
 
 sub cmd_article ($;$) {
@@ -639,6 +642,11 @@ sub long_response ($$) {
                 } elsif ($more) { # $self->{wbuf}:
                         update_idle_time($self);
 
+                        # COMPRESS users all share the same DEFLATE context.
+                        # Flush it here to ensure clients don't see
+                        # each other's data
+                        $self->zflush;
+
                         # no recursion, schedule another call ASAP
                         # but only after all pending writes are done
                         my $wbuf = $self->{wbuf} ||= [];
@@ -757,7 +765,7 @@ sub hdr_searchmsg ($$$$) {
                                 $tmp .= $s->{num} . ' ' . $s->$field . "\r\n";
                         }
                         utf8::encode($tmp);
-                        msg_more($self, $tmp);
+                        $self->msg_more($tmp);
                         $cur = $msgs->[-1]->{num} + 1;
                 });
         }
@@ -896,11 +904,13 @@ sub cmd_xover ($;$) {
         });
 }
 
+sub compressed { undef }
+
 sub cmd_starttls ($) {
         my ($self) = @_;
         my $sock = $self->{sock} or return;
         # RFC 4642 2.2.1
-        (ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable';
+        return r502 if (ref($sock) eq 'IO::Socket::SSL' || $self->compressed);
         my $opt = $self->{nntpd}->{accept_tls} or
                 return '580 can not initiate TLS negotiation';
         res($self, '382 Continue with TLS negotiation');
@@ -909,6 +919,18 @@ sub cmd_starttls ($) {
         undef;
 }
 
+# RFC 8054
+sub cmd_compress ($$) {
+        my ($self, $alg) = @_;
+        return '503 Only DEFLATE is supported' if uc($alg) ne 'DEFLATE';
+        return r502 if $self->compressed || !$have_deflate;
+        PublicInbox::NNTPdeflate->enable($self);
+        $self->requeue;
+        undef
+}
+
+sub zflush {} # overridden by NNTPdeflate
+
 sub cmd_xpath ($$) {
         my ($self, $mid) = @_;
         return r501 unless $mid =~ /\A<(.+)>\z/;
@@ -924,7 +946,7 @@ sub cmd_xpath ($$) {
 
 sub res ($$) { do_write($_[0], $_[1] . "\r\n") }
 
-sub more ($$) { msg_more($_[0], $_[1] . "\r\n") }
+sub more ($$) { $_[0]->msg_more($_[1] . "\r\n") }
 
 sub do_write ($$) {
         my $self = $_[0];
@@ -954,7 +976,6 @@ sub event_step {
         # only read more requests if we've drained the write buffer,
         # otherwise we can be buffering infinitely w/o backpressure
 
-        use constant LINE_MAX => 512; # RFC 977 section 2.3
         my $rbuf = $self->{rbuf} // (\(my $x = ''));
         my $r = 1;
 
@@ -997,4 +1018,10 @@ sub busy {
         ($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now));
 }
 
+# this is an import to prevent "perl -c" from complaining about fields
+sub import {
+        $have_deflate = eval { require PublicInbox::NNTPdeflate } and
+                $CAPABILITIES .= "COMPRESS DEFLATE\r\n";
+}
+
 1;
diff --git a/lib/PublicInbox/NNTPdeflate.pm b/lib/PublicInbox/NNTPdeflate.pm
new file mode 100644
index 00000000..10e2337c
--- /dev/null
+++ b/lib/PublicInbox/NNTPdeflate.pm
@@ -0,0 +1,128 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# RFC 8054 NNTP COMPRESS DEFLATE implementation
+#
+# RSS usage for 10K idle-but-did-something NNTP clients on 64-bit:
+#   TLS + DEFLATE[a] :  1.8 GB  (MemLevel=9, 1.2 GB with MemLevel=8)
+#   TLS + DEFLATE[b] :  ~300MB
+#   TLS only         :  <200MB
+#   plain            :   <50MB
+#
+# [a] - initial implementation using per-client Deflate contexts and buffer
+#
+# [b] - memory-optimized implementation using a global deflate context.
+#       It's less efficient in terms of compression, but way more
+#       efficient in terms of server memory usage.
+package PublicInbox::NNTPdeflate;
+use strict;
+use warnings;
+use 5.010_001;
+use base qw(PublicInbox::NNTP);
+use Compress::Raw::Zlib;
+use Hash::Util qw(unlock_hash); # dependency of fields for perl 5.10+, anyways
+
+my %IN_OPT = (
+        -Bufsize => PublicInbox::NNTP::LINE_MAX,
+        -WindowBits => -15, # RFC 1951
+        -AppendOutput => 1,
+);
+
+# global deflate context and buffer
+my $zbuf = \(my $buf = '');
+my $zout;
+{
+        my $err;
+        ($zout, $err) = Compress::Raw::Zlib::Deflate->new(
+                # nnrpd (INN) and Compress::Raw::Zlib favor MemLevel=9,
+                # the zlib C library and git use MemLevel=8 as the default
+                # -MemLevel => 9,
+                -Bufsize => 65536, # same as nnrpd
+                -WindowBits => -15, # RFC 1951
+                -AppendOutput => 1,
+        );
+        $err == Z_OK or die "Failed to initialize zlib deflate stream: $err";
+}
+
+
+sub enable {
+        my ($class, $self) = @_;
+        my ($in, $err) = Compress::Raw::Zlib::Inflate->new(%IN_OPT);
+        if ($err != Z_OK) {
+                $self->err("Inflate->new failed: $err");
+                $self->res('403 Unable to activate compression');
+                return;
+        }
+        unlock_hash(%$self);
+        $self->res('206 Compression active');
+        bless $self, $class;
+        $self->{zin} = [ $in, '' ];
+}
+
+# overrides PublicInbox::NNTP::compressed
+sub compressed { 1 }
+
+# SUPER is PublicInbox::DS::do_read, so $_[1] may be a reference or not
+sub do_read ($$$$) {
+        my ($self, $rbuf, $len, $off) = @_;
+
+        my $zin = $self->{zin} or return; # closed
+        my $deflated = \($zin->[1]);
+        my $r = $self->SUPER::do_read($deflated, $len) or return;
+
+        # assert(length($$rbuf) == $off) as far as NNTP.pm is concerned
+        # -ConsumeInput is true, so $deflated is automatically emptied
+        my $err = $zin->[0]->inflate($deflated, $rbuf);
+        if ($err == Z_OK) {
+                $r = length($$rbuf) and return $r;
+                # nothing ready, yet, get more, later
+                $self->requeue;
+        } else {
+                delete $self->{zin};
+                $self->close;
+        }
+        0;
+}
+
+# override PublicInbox::DS::msg_more
+sub msg_more ($$) {
+        my $self = $_[0];
+
+        # $_[1] may be a reference or not for ->deflate
+        my $err = $zout->deflate($_[1], $zbuf);
+        $err == Z_OK or die "->deflate failed $err";
+        1;
+}
+
+sub zflush ($) {
+        my ($self) = @_;
+
+        my $deflated = $zbuf;
+        $zbuf = \(my $next = '');
+
+        my $err = $zout->flush($deflated, Z_FULL_FLUSH);
+        $err == Z_OK or die "->flush failed $err";
+
+        # We can still let the lower socket layer do buffering:
+        PublicInbox::DS::msg_more($self, $$deflated);
+}
+
+# compatible with PublicInbox::DS::write, so $_[1] may be a reference or not
+sub write ($$) {
+        my $self = $_[0];
+        return PublicInbox::DS::write($self, $_[1]) if ref($_[1]) eq 'CODE';
+
+        my $deflated = $zbuf;
+        $zbuf = \(my $next = '');
+
+        # $_[1] may be a reference or not for ->deflate
+        my $err = $zout->deflate($_[1], $deflated);
+        $err == Z_OK or die "->deflate failed $err";
+        $err = $zout->flush($deflated, Z_FULL_FLUSH);
+        $err == Z_OK or die "->flush failed $err";
+
+        # We can still let the socket layer do buffering:
+        PublicInbox::DS::write($self, $deflated);
+}
+
+1;