* [PATCH 0/3] nntpd: updates from imapd
@ 2020-06-16 22:31 Eric Wong
2020-06-16 22:31 ` [PATCH 1/3] daemon: use ->can to check for IO::Socket::SSL Eric Wong
` (2 more replies)
0 siblings, 3 replies; 4+ messages in thread
From: Eric Wong @ 2020-06-16 22:31 UTC (permalink / raw)
To: meta
Since our IMAP code was forked from NNTP and a proving ground
for some new ideas, start bringing the NNTP up-to-date with slow
git storage support and make it less dependent on
IO::Socket::SSL for TLS.
Eric Wong (3):
daemon: use ->can to check for IO::Socket::SSL
nntp: event_step: prepare for async git reads
nntp: support slow blob retrievals
lib/PublicInbox/DS.pm | 6 +-
lib/PublicInbox/HTTP.pm | 2 +-
lib/PublicInbox/NNTP.pm | 150 +++++++++++++++++++--------------
lib/PublicInbox/NNTPdeflate.pm | 10 +++
t/nntp.t | 12 ++-
t/nntpd.t | 20 ++++-
6 files changed, 129 insertions(+), 71 deletions(-)
^ permalink raw reply [flat|nested] 4+ messages in thread
* [PATCH 1/3] daemon: use ->can to check for IO::Socket::SSL
2020-06-16 22:31 [PATCH 0/3] nntpd: updates from imapd Eric Wong
@ 2020-06-16 22:31 ` Eric Wong
2020-06-16 22:31 ` [PATCH 2/3] nntp: event_step: prepare for async git reads Eric Wong
2020-06-16 22:31 ` [PATCH 3/3] nntp: support slow blob retrievals Eric Wong
2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2020-06-16 22:31 UTC (permalink / raw)
To: meta
Doing a ref($obj) string comparison ties us to IO::Socket::SSL
(and OpenSSL) In the future, we may support GnuTLS or other TLS
implementations. This was already done in the IMAP code.
---
lib/PublicInbox/DS.pm | 6 +++---
lib/PublicInbox/HTTP.pm | 2 +-
lib/PublicInbox/NNTP.pm | 6 +++---
3 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 01c8917eafe..b7753e1a663 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -415,7 +415,7 @@ sub send_tmpio ($$) {
}
sub epbit ($$) { # (sock, default)
- ref($_[0]) eq 'IO::Socket::SSL' ? PublicInbox::TLS::epollbit() : $_[1];
+ $_[0]->can('stop_SSL') ? PublicInbox::TLS::epollbit() : $_[1];
}
# returns 1 if done, 0 if incomplete
@@ -569,7 +569,7 @@ sub msg_more ($$) {
my $wbuf = $self->{wbuf};
if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) &&
- ref($sock) ne 'IO::Socket::SSL') {
+ !$sock->can('stop_SSL')) {
my $n = send($sock, $_[1], MSG_MORE);
if (defined $n) {
my $nlen = bytes::length($_[1]) - $n;
@@ -619,7 +619,7 @@ sub shutdn_tls_step ($) {
sub shutdn ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
- if (ref($sock) eq 'IO::Socket::SSL') {
+ if ($sock->can('stop_SSL')) {
shutdn_tls_step($self);
} else {
$self->close;
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index b73ce2d7335..6ccf2059240 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -59,7 +59,7 @@ sub new ($$$) {
my $self = fields::new($class);
my $ev = EPOLLIN;
my $wbuf;
- if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
+ if ($sock->can('accept_SSL') && !$sock->accept_SSL) {
return CORE::close($sock) if $! != EAGAIN;
$ev = PublicInbox::TLS::epollbit();
$wbuf = [ \&PublicInbox::DS::accept_tls_step ];
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index bffd773cf9e..be3bddc3f5d 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -47,7 +47,7 @@ sub new ($$$) {
my $self = fields::new($class);
my $ev = EPOLLIN;
my $wbuf;
- if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
+ if ($sock->can('accept_SSL') && !$sock->accept_SSL) {
return CORE::close($sock) if $! != EAGAIN;
$ev = PublicInbox::TLS::epollbit();
$wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ];
@@ -97,7 +97,7 @@ sub process_line ($$) {
sub cmd_capabilities ($;$) {
my ($self, undef) = @_;
my $res = $CAPABILITIES;
- if (ref($self->{sock}) ne 'IO::Socket::SSL' &&
+ if (!$self->{sock}->can('accept_SSL') &&
$self->{nntpd}->{accept_tls}) {
$res .= "STARTTLS\r\n";
}
@@ -896,7 +896,7 @@ sub cmd_starttls ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
# RFC 4642 2.2.1
- return r502 if (ref($sock) eq 'IO::Socket::SSL' || $self->compressed);
+ return r502 if ($sock->can('accept_SSL') || $self->compressed);
my $opt = $self->{nntpd}->{accept_tls} or
return '580 can not initiate TLS negotiation';
res($self, '382 Continue with TLS negotiation');
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH 2/3] nntp: event_step: prepare for async git reads
2020-06-16 22:31 [PATCH 0/3] nntpd: updates from imapd Eric Wong
2020-06-16 22:31 ` [PATCH 1/3] daemon: use ->can to check for IO::Socket::SSL Eric Wong
@ 2020-06-16 22:31 ` Eric Wong
2020-06-16 22:31 ` [PATCH 3/3] nntp: support slow blob retrievals Eric Wong
2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2020-06-16 22:31 UTC (permalink / raw)
To: meta
This matches PublicInbox::IMAP::event_step and will allow us to
handle blob retrievals from git asynchronously without falling
over on pipelined requests.
---
lib/PublicInbox/NNTP.pm | 39 ++++++++++++++++------------------
lib/PublicInbox/NNTPdeflate.pm | 10 +++++++++
2 files changed, 28 insertions(+), 21 deletions(-)
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index be3bddc3f5d..80dd8614fe8 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -956,38 +956,35 @@ sub out ($$;@) {
sub event_step {
my ($self) = @_;
- return unless $self->flush_write && $self->{sock};
+ return unless $self->flush_write && $self->{sock} && !$self->{long_cb};
$self->update_idle_time;
# only read more requests if we've drained the write buffer,
# otherwise we can be buffering infinitely w/o backpressure
- my $rbuf = $self->{rbuf} // (\(my $x = ''));
- my $r = 1;
-
- if (index($$rbuf, "\n") < 0) {
- my $off = bytes::length($$rbuf);
- $r = $self->do_read($rbuf, LINE_MAX, $off) or return;
- }
- while ($r > 0 && $$rbuf =~ s/\A[ \t]*([^\n]*?)\r?\n//) {
- my $line = $1;
- return $self->close if $line =~ /[[:cntrl:]]/s;
- my $t0 = now();
- my $fd = fileno($self->{sock});
- $r = eval { process_line($self, $line) };
- my $pending = $self->{wbuf} ? ' pending' : '';
- out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
- }
-
+ my $rbuf = $self->{rbuf} // \(my $x = '');
+ my $line = index($$rbuf, "\n");
+ while ($line < 0) {
+ return $self->close if length($$rbuf) >= LINE_MAX;
+ $self->do_read($rbuf, LINE_MAX, length($$rbuf)) or return;
+ $line = index($$rbuf, "\n");
+ }
+ $line = substr($$rbuf, 0, $line + 1, '');
+ $line =~ s/\r?\n\z//s;
+ return $self->close if $line =~ /[[:cntrl:]]/s;
+
+ my $t0 = now();
+ my $fd = fileno($self->{sock});
+ my $r = eval { process_line($self, $line) };
+ my $pending = $self->{wbuf} ? ' pending' : '';
+ out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
return $self->close if $r < 0;
- my $len = bytes::length($$rbuf);
- return $self->close if ($len >= LINE_MAX);
$self->rbuf_idle($rbuf);
$self->update_idle_time;
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
- $self->requeue unless $self->{wbuf};
+ $self->requeue unless $pending;
}
# for graceful shutdown in PublicInbox::Daemon:
diff --git a/lib/PublicInbox/NNTPdeflate.pm b/lib/PublicInbox/NNTPdeflate.pm
index eb400c9c220..dec88aba3a5 100644
--- a/lib/PublicInbox/NNTPdeflate.pm
+++ b/lib/PublicInbox/NNTPdeflate.pm
@@ -71,6 +71,16 @@ sub do_read ($$$$) {
$doff = length($dbuf);
my $r = PublicInbox::DS::do_read($self, \$dbuf, $len, $doff) or return;
+ # Workaround inflate bug appending to OOK scalars:
+ # <https://rt.cpan.org/Ticket/Display.html?id=132734>
+ # We only have $off if the client is pipelining, and pipelining
+ # is where our substr() OOK optimization in event_step makes sense.
+ if ($off) {
+ my $copy = $$rbuf;
+ undef $$rbuf;
+ $$rbuf = $copy;
+ }
+
# assert(length($$rbuf) == $off) as far as NNTP.pm is concerned
# -ConsumeInput is true, so $dbuf is automatically emptied
my $err = $zin->inflate($dbuf, $rbuf);
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH 3/3] nntp: support slow blob retrievals
2020-06-16 22:31 [PATCH 0/3] nntpd: updates from imapd Eric Wong
2020-06-16 22:31 ` [PATCH 1/3] daemon: use ->can to check for IO::Socket::SSL Eric Wong
2020-06-16 22:31 ` [PATCH 2/3] nntp: event_step: prepare for async git reads Eric Wong
@ 2020-06-16 22:31 ` Eric Wong
2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2020-06-16 22:31 UTC (permalink / raw)
To: meta
Having `git cat-file' as a separate process naturally lends
itself to asynchronous dispatch. Our event loop for -nntpd no
longer blocks on slow git storage.
Pipelining in -imapd was tricky and bugs were exposed by
mbsync(1). Update t/nntpd.t to support pipelining ARTICLE
requests to ensure we don't have the same problems -imapd
did during development.
---
lib/PublicInbox/NNTP.pm | 107 +++++++++++++++++++++++++---------------
t/nntp.t | 12 +++--
t/nntpd.t | 20 +++++++-
3 files changed, 95 insertions(+), 44 deletions(-)
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 80dd8614fe8..6df19f322b6 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -13,6 +13,7 @@ use POSIX qw(strftime);
use PublicInbox::DS qw(now);
use Digest::SHA qw(sha1_hex);
use Time::Local qw(timegm timelocal);
+use PublicInbox::GitAsyncCat;
use constant {
LINE_MAX => 512, # RFC 977 section 2.3
r501 => '501 command syntax error',
@@ -408,8 +409,9 @@ sub xref ($$$$) {
$ret;
}
-sub set_nntp_headers ($$$$$) {
- my ($self, $hdr, $ng, $n, $mid) = @_;
+sub set_nntp_headers ($$$) {
+ my ($self, $hdr, $smsg) = @_;
+ my ($mid) = $smsg->{mid};
# why? leafnode requires a Path: header for some inexplicable
# reason. We'll fake the shortest one possible.
@@ -429,7 +431,8 @@ sub set_nntp_headers ($$$$$) {
}
# clobber some
- my $xref = xref($self, $ng, $n, $mid);
+ my $ng = $self->{ng};
+ my $xref = xref($self, $ng, $smsg->{num}, $mid);
$hdr->header_set('Xref', $xref);
$xref =~ s/:[0-9]+//g;
$hdr->header_set('Newsgroups', (split(/ /, $xref, 2))[1]);
@@ -441,8 +444,8 @@ sub set_nntp_headers ($$$$$) {
}
}
-sub art_lookup ($$$) {
- my ($self, $art, $set_headers) = @_;
+sub art_lookup ($$) {
+ my ($self, $art) = @_;
my $ng = $self->{ng};
my ($n, $mid);
my $err;
@@ -478,13 +481,7 @@ find_mid:
}
found:
my $smsg = $ng->over->get_art($n) or return $err;
- my $msg = $ng->msg_by_smsg($smsg) or return $err;
-
- # PublicInbox::Eml->new will modify $msg in-place, so what's
- # left is the body and we won't need to call ->body(), later
- my $hdr = PublicInbox::Eml->new($msg)->header_obj;
- set_nntp_headers($self, $hdr, $ng, $n, $mid) if $set_headers;
- [ $n, $mid, $msg, $hdr ];
+ $smsg;
}
sub msg_body_write ($$) {
@@ -495,7 +492,6 @@ sub msg_body_write ($$) {
$$msg =~ s/(?<!\r)\n/\r\n/sg; # Alpine barfs without this
$$msg .= "\r\n" unless $$msg =~ /\r\n\z/s;
$self->msg_more($$msg);
- '.'
}
sub set_art {
@@ -504,55 +500,88 @@ sub set_art {
}
sub msg_hdr_write ($$$) {
- my ($self, $hdr, $body_follows) = @_;
- $hdr = $hdr->as_string;
+ my ($self, $eml, $smsg) = @_;
+ set_nntp_headers($self, $eml, $smsg);
+
+ my $hdr = $eml->{hdr} // \(my $x = '');
# fixup old bug from import (pre-a0c07cba0e5d8b6a)
- $hdr =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
- utf8::encode($hdr);
- $hdr =~ s/(?<!\r)\n/\r\n/sg; # Alpine barfs without this
+ $$hdr =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+ $$hdr =~ s/(?<!\r)\n/\r\n/sg; # Alpine barfs without this
# for leafnode compatibility, we need to ensure Message-ID headers
# are only a single line.
- $hdr =~ s/^(Message-ID:)[ \t]*\r\n[ \t]+([^\r]+)\r\n/$1 $2\r\n/igsm;
- $hdr .= "\r\n" if $body_follows;
- $self->msg_more($hdr);
+ $$hdr =~ s/^(Message-ID:)[ \t]*\r\n[ \t]+([^\r]+)\r\n/$1 $2\r\n/igsm;
+ $self->msg_more($$hdr);
+}
+
+sub blob_cb { # called by git->cat_async via git_async_cat
+ my ($bref, $oid, $type, $size, $smsg) = @_;
+ my $self = $smsg->{nntp};
+ my $code = $smsg->{nntp_code} // 220;
+ if (!defined($oid)) {
+ # it's possible to have TOCTOU if an admin runs
+ # public-inbox-(edit|purge), just move onto the next message
+ return $self->requeue;
+ } elsif ($smsg->{blob} ne $oid) {
+ $self->close;
+ die "BUG: $smsg->{blob} != $oid";
+ }
+ my $r = "$code $smsg->{num} <$smsg->{mid}> article retrieved - ";
+ my $eml = PublicInbox::Eml->new($bref);
+ if ($code == 220) {
+ more($self, $r .= 'head and body follow');
+ msg_hdr_write($self, $eml, $smsg);
+ $self->msg_more("\r\n");
+ msg_body_write($self, $bref);
+ } elsif ($code == 221) {
+ more($self, $r .= 'head follows');
+ msg_hdr_write($self, $eml, $smsg);
+ } elsif ($code == 222) {
+ more($self, $r .= 'body follows');
+ msg_body_write($self, $bref);
+ } else {
+ $self->close;
+ die "BUG: bad code: $r";
+ }
+ $self->write(\".\r\n"); # flushes (includes ->zflush)
+ $self->requeue;
}
sub cmd_article ($;$) {
my ($self, $art) = @_;
- my $r = art_lookup($self, $art, 1);
- return $r unless ref $r;
- my ($n, $mid, $msg, $hdr) = @$r;
+ my $smsg = art_lookup($self, $art);
+ return $smsg unless ref $smsg;
set_art($self, $art);
- more($self, "220 $n <$mid> article retrieved - head and body follow");
- msg_hdr_write($self, $hdr, 1);
- msg_body_write($self, $msg);
+ $smsg->{nntp} = $self;
+ git_async_cat($self->{ng}->git, $smsg->{blob}, \&blob_cb, $smsg);
+ undef;
}
sub cmd_head ($;$) {
my ($self, $art) = @_;
- my $r = art_lookup($self, $art, 2);
- return $r unless ref $r;
- my ($n, $mid, undef, $hdr) = @$r;
+ my $smsg = art_lookup($self, $art);
+ return $smsg unless ref $smsg;
set_art($self, $art);
- more($self, "221 $n <$mid> article retrieved - head follows");
- msg_hdr_write($self, $hdr, 0);
- '.'
+ $smsg->{nntp} = $self;
+ $smsg->{nntp_code} = 221;
+ git_async_cat($self->{ng}->git, $smsg->{blob}, \&blob_cb, $smsg);
+ undef;
}
sub cmd_body ($;$) {
my ($self, $art) = @_;
- my $r = art_lookup($self, $art, 0);
- return $r unless ref $r;
- my ($n, $mid, $msg) = @$r;
+ my $smsg = art_lookup($self, $art);
+ return $smsg unless ref $smsg;
set_art($self, $art);
- more($self, "222 $n <$mid> article retrieved - body follows");
- msg_body_write($self, $msg);
+ $smsg->{nntp} = $self;
+ $smsg->{nntp_code} = 222;
+ git_async_cat($self->{ng}->git, $smsg->{blob}, \&blob_cb, $smsg);
+ undef;
}
sub cmd_stat ($;$) {
my ($self, $art) = @_;
- my $r = art_lookup($self, $art, 0);
+ my $r = art_lookup($self, $art);
return $r unless ref $r;
my ($n, $mid) = @$r;
set_art($self, $art);
diff --git a/t/nntp.t b/t/nntp.t
index 2a9f3a4f6eb..1db896cf46b 100644
--- a/t/nntp.t
+++ b/t/nntp.t
@@ -109,9 +109,12 @@ use_ok 'PublicInbox::Inbox';
my $mid = 'a@b';
my $mime = PublicInbox::Eml->new("Message-ID: <$mid>\r\n\r\n");
my $hdr = $mime->header_obj;
- my $mock_self = { nntpd => { grouplist => [],
- servername => 'example.com' } };
- PublicInbox::NNTP::set_nntp_headers($mock_self, $hdr, $ng, 1, $mid);
+ my $mock_self = {
+ nntpd => { grouplist => [], servername => 'example.com' },
+ ng => $ng,
+ };
+ my $smsg = { num => 1, mid => $mid };
+ PublicInbox::NNTP::set_nntp_headers($mock_self, $hdr, $smsg);
is_deeply([ $mime->header('Message-ID') ], [ "<$mid>" ],
'Message-ID unchanged');
is_deeply([ $mime->header('Archived-At') ], [ "<${u}a\@b/>" ],
@@ -126,7 +129,8 @@ use_ok 'PublicInbox::Inbox';
'Xref: set');
$ng->{-base_url} = 'http://mirror.example.com/m/';
- PublicInbox::NNTP::set_nntp_headers($mock_self, $hdr, $ng, 2, $mid);
+ $smsg->{num} = 2;
+ PublicInbox::NNTP::set_nntp_headers($mock_self, $hdr, $smsg);
is_deeply([ $mime->header('Message-ID') ], [ "<$mid>" ],
'Message-ID unchanged');
is_deeply([ $mime->header('Archived-At') ],
diff --git a/t/nntpd.t b/t/nntpd.t
index b24720eb9dd..c681b01c3d9 100644
--- a/t/nntpd.t
+++ b/t/nntpd.t
@@ -12,6 +12,8 @@ use IO::Socket;
use Socket qw(IPPROTO_TCP TCP_NODELAY);
use Net::NNTP;
use Sys::Hostname;
+use POSIX qw(_exit);
+use Digest::SHA;
# FIXME: make easier to test both versions
my $version = $ENV{PI_TEST_VERSION} || 1;
@@ -287,21 +289,37 @@ Date: Fri, 02 Oct 1993 00:00:00 +0000
# pipelined requests:
{
my $nreq = 90;
+ my $nart = 2;
syswrite($s, "GROUP $group\r\n");
my $res = <$s>;
my $rdr = fork;
if ($rdr == 0) {
- use POSIX qw(_exit);
for (1..$nreq) {
<$s> =~ /\A224 / or _exit(1);
<$s> =~ /\A1/ or _exit(2);
<$s> eq ".\r\n" or _exit(3);
}
+ my %sums;
+ for (1..$nart) {
+ <$s> =~ /\A220 / or _exit(4);
+ my $dig = Digest::SHA->new(1);
+ while (my $l = <$s>) {
+ last if $l eq ".\r\n";
+ $dig->add($l);
+ }
+ $dig = $dig->hexdigest;
+ $sums{$dig}++;
+ }
+ if ($nart) {
+ scalar(keys(%sums)) == 1 or _exit(5);
+ (values(%sums))[0] == $nart or _exit(6);
+ }
_exit(0);
}
for (1..$nreq) {
syswrite($s, "XOVER 1\r\n");
}
+ syswrite($s, "ARTICLE 1\r\n" x $nart);
is($rdr, waitpid($rdr, 0), 'reader done');
is($? >> 8, 0, 'no errors');
}
^ permalink raw reply related [flat|nested] 4+ messages in thread
end of thread, other threads:[~2020-06-16 22:31 UTC | newest]
Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-06-16 22:31 [PATCH 0/3] nntpd: updates from imapd Eric Wong
2020-06-16 22:31 ` [PATCH 1/3] daemon: use ->can to check for IO::Socket::SSL Eric Wong
2020-06-16 22:31 ` [PATCH 2/3] nntp: event_step: prepare for async git reads Eric Wong
2020-06-16 22:31 ` [PATCH 3/3] nntp: support slow blob retrievals Eric Wong
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).