diff options
author | Eric Wong <e@80x24.org> | 2019-06-26 06:36:27 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2019-06-26 06:36:27 +0000 |
commit | 84d8920b92686e975929aebe845b6d4ea0a9ef0d (patch) | |
tree | 91a1123aaa44ad8fbb63c9dbf912d6dca95b0818 | |
parent | c19a4e88f49ba3496751c4b87ebcfa0f6b47f0ce (diff) | |
parent | c30b4427b340aeb242273a7b890fbd7e50132f51 (diff) | |
download | public-inbox-84d8920b92686e975929aebe845b6d4ea0a9ef0d.tar.gz |
* origin/nntp-tls: (59 commits) ds: ->write must not clobber empty wbuf array Makefile: skip DSKQXS in global syntax check ds: reduce overhead of tempfile creation Revert "ci: require IO::KQueue on FreeBSD, for now" ds: reimplement IO::Poll support to look like epoll ds: split out IO::KQueue-specific code daemon: use FreeBSD accept filters on non-NNTP daemon: set TCP_DEFER_ACCEPT on everything but NNTP nntp: send greeting immediately for plain sockets ci: require IO::KQueue on FreeBSD, for now nntp: lazily allocate and stash rbuf ds: flush_write runs ->write callbacks even if closed nntp: simplify long response logic and fix nesting ds: always use EV_ADD with EV_SET nntp: reduce allocations for greeting ds: allow ->write callbacks to syswrite directly daemon: use SSL_MODE_RELEASE_BUFFERS t/nntpd-tls: slow client connection test nntp: call SSL_shutdown in normal cases ds|nntp: use CORE::close on socket ...
-rw-r--r-- | MANIFEST | 7 | ||||
-rw-r--r-- | Makefile.PL | 2 | ||||
-rw-r--r-- | certs/.gitignore | 4 | ||||
-rwxr-xr-x | certs/create-certs.perl | 132 | ||||
-rw-r--r-- | lib/PublicInbox/DS.pm | 638 | ||||
-rw-r--r-- | lib/PublicInbox/DSKQXS.pm | 73 | ||||
-rw-r--r-- | lib/PublicInbox/DSPoll.pm | 58 | ||||
-rw-r--r-- | lib/PublicInbox/Daemon.pm | 152 | ||||
-rw-r--r-- | lib/PublicInbox/EvCleanup.pm | 20 | ||||
-rw-r--r-- | lib/PublicInbox/GitHTTPBackend.pm | 18 | ||||
-rw-r--r-- | lib/PublicInbox/HTTP.pm | 154 | ||||
-rw-r--r-- | lib/PublicInbox/HTTPD/Async.pm | 44 | ||||
-rw-r--r-- | lib/PublicInbox/Listener.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/NNTP.pm | 243 | ||||
-rw-r--r-- | lib/PublicInbox/NNTPD.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/ParentPipe.pm | 3 | ||||
-rw-r--r-- | lib/PublicInbox/Qspawn.pm | 11 | ||||
-rw-r--r-- | lib/PublicInbox/Spawn.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/Syscall.pm | 25 | ||||
-rw-r--r-- | lib/PublicInbox/TLS.pm | 24 | ||||
-rwxr-xr-x | script/public-inbox-nntpd | 3 | ||||
-rw-r--r-- | t/ds-poll.t | 58 | ||||
-rw-r--r-- | t/httpd-corner.t | 38 | ||||
-rw-r--r-- | t/httpd.t | 18 | ||||
-rw-r--r-- | t/nntpd-tls.t | 224 | ||||
-rw-r--r-- | t/nntpd.t | 2 | ||||
-rw-r--r-- | t/spawn.t | 11 |
27 files changed, 1255 insertions, 715 deletions
@@ -31,6 +31,8 @@ MANIFEST Makefile.PL README TODO +certs/.gitignore +certs/create-certs.perl ci/README ci/deps.perl ci/profiles.sh @@ -75,6 +77,8 @@ lib/PublicInbox/Cgit.pm lib/PublicInbox/Config.pm lib/PublicInbox/ContentId.pm lib/PublicInbox/DS.pm +lib/PublicInbox/DSKQXS.pm +lib/PublicInbox/DSPoll.pm lib/PublicInbox/Daemon.pm lib/PublicInbox/Emergency.pm lib/PublicInbox/EvCleanup.pm @@ -129,6 +133,7 @@ lib/PublicInbox/Spamcheck/Spamc.pm lib/PublicInbox/Spawn.pm lib/PublicInbox/SpawnPP.pm lib/PublicInbox/Syscall.pm +lib/PublicInbox/TLS.pm lib/PublicInbox/Unsubscribe.pm lib/PublicInbox/UserContent.pm lib/PublicInbox/V2Writable.pm @@ -187,6 +192,7 @@ t/content_id.t t/convert-compact.t t/data/0001.patch t/ds-leak.t +t/ds-poll.t t/edit.t t/emergency.t t/fail-bin/spamc @@ -222,6 +228,7 @@ t/msg_iter.t t/msgmap.t t/msgtime.t t/nntp.t +t/nntpd-tls.t t/nntpd.t t/nulsubject.t t/over.t diff --git a/Makefile.PL b/Makefile.PL index 23822072..adcf91e5 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -77,7 +77,7 @@ changed = \$(shell git ls-files -m) %.syntax :: @\$(PERL) -w -I lib -c \$(subst .syntax,,\$@) -syntax:: \$(my_syntax) +syntax:: \$(filter-out lib/PublicInbox/DSKQXS.pm.syntax,\$(my_syntax)) dsyn :: \$(addsuffix .syntax, \$(filter \$(changed), \$(syn_files))) diff --git a/certs/.gitignore b/certs/.gitignore new file mode 100644 index 00000000..0b3a547b --- /dev/null +++ b/certs/.gitignore @@ -0,0 +1,4 @@ +*.pem +*.der +*.enc +*.p12 diff --git a/certs/create-certs.perl b/certs/create-certs.perl new file mode 100755 index 00000000..476be4d7 --- /dev/null +++ b/certs/create-certs.perl @@ -0,0 +1,132 @@ +#!/usr/bin/perl -w +# License: GPL-1.0+ or Artistic-1.0-Perl +# from IO::Socket::SSL 2.063 / https://github.com/noxxi/p5-io-socket-ssl +use strict; +use warnings; +use IO::Socket::SSL::Utils; +use Net::SSLeay; + +my $dir = "./"; +my $now = time(); +my $later = 0x7fffffff; # 2038 problems on 32-bit :< + +Net::SSLeay::SSLeay_add_ssl_algorithms(); +my $sha256 = Net::SSLeay::EVP_get_digestbyname('sha256') or die; +my $printfp = sub { + my ($w,$cert) = @_; + print $w.' sha256$'.unpack('H*',Net::SSLeay::X509_digest($cert, $sha256))."\n" +}; + +my %time_valid = (not_before => $now, not_after => $later); + +my @ca = CERT_create( + CA => 1, + subject => { CN => 'IO::Socket::SSL Demo CA' }, + %time_valid, +); +save('test-ca.pem',PEM_cert2string($ca[0])); + +my @server = CERT_create( + CA => 0, + subject => { CN => 'server.local' }, + purpose => 'server', + issuer => \@ca, + %time_valid, +); +save('server-cert.pem',PEM_cert2string($server[0])); +save('server-key.pem',PEM_key2string($server[1])); +$printfp->(server => $server[0]); + +@server = CERT_create( + CA => 0, + subject => { CN => 'server2.local' }, + purpose => 'server', + issuer => \@ca, + %time_valid, +); +save('server2-cert.pem',PEM_cert2string($server[0])); +save('server2-key.pem',PEM_key2string($server[1])); +$printfp->(server2 => $server[0]); + +@server = CERT_create( + CA => 0, + subject => { CN => 'server-ecc.local' }, + purpose => 'server', + issuer => \@ca, + key => KEY_create_ec(), + %time_valid, +); +save('server-ecc-cert.pem',PEM_cert2string($server[0])); +save('server-ecc-key.pem',PEM_key2string($server[1])); +$printfp->('server-ecc' => $server[0]); + + +my @client = CERT_create( + CA => 0, + subject => { CN => 'client.local' }, + purpose => 'client', + issuer => \@ca, + %time_valid, +); +save('client-cert.pem',PEM_cert2string($client[0])); +save('client-key.pem',PEM_key2string($client[1])); +$printfp->(client => $client[0]); + +my @swc = CERT_create( + CA => 0, + subject => { CN => 'server.local' }, + purpose => 'server', + issuer => \@ca, + subjectAltNames => [ + [ DNS => '*.server.local' ], + [ IP => '127.0.0.1' ], + [ DNS => 'www*.other.local' ], + [ DNS => 'smtp.mydomain.local' ], + [ DNS => 'xn--lwe-sna.idntest.local' ] + ], + %time_valid, +); +save('server-wildcard.pem',PEM_cert2string($swc[0]),PEM_key2string($swc[1])); + + +my @subca = CERT_create( + CA => 1, + issuer => \@ca, + subject => { CN => 'IO::Socket::SSL Demo Sub CA' }, + %time_valid, +); +save('test-subca.pem',PEM_cert2string($subca[0])); +@server = CERT_create( + CA => 0, + subject => { CN => 'server.local' }, + purpose => 'server', + issuer => \@subca, + %time_valid, +); +save('sub-server.pem',PEM_cert2string($server[0]).PEM_key2string($server[1])); + + + +my @cap = CERT_create( + CA => 1, + subject => { CN => 'IO::Socket::SSL::Intercept' }, + %time_valid, +); +save('proxyca.pem',PEM_cert2string($cap[0]).PEM_key2string($cap[1])); + +sub save { + my $file = shift; + open(my $fd,'>',$dir.$file) or die $!; + print $fd @_; +} + +system(<<CMD); +cd $dir +set -x +openssl x509 -in server-cert.pem -out server-cert.der -outform der +openssl rsa -in server-key.pem -out server-key.der -outform der +openssl rsa -in server-key.pem -out server-key.enc -passout pass:bluebell +openssl rsa -in client-key.pem -out client-key.enc -passout pass:opossum +openssl pkcs12 -export -in server-cert.pem -inkey server-key.pem -out server.p12 -passout pass: +openssl pkcs12 -export -in server-cert.pem -inkey server-key.pem -out server_enc.p12 -passout pass:bluebell +CMD diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 2b04886a..08f4e9e8 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -17,40 +17,28 @@ package PublicInbox::DS; use strict; use bytes; use POSIX (); -use Time::HiRes (); use IO::Handle qw(); -use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD); - +use Fcntl qw(SEEK_SET :DEFAULT); +use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); +use parent qw(Exporter); +our @EXPORT_OK = qw(now msg_more); use warnings; +use 5.010_001; use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket - 'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write + 'wbuf', # arrayref of coderefs or GLOB refs 'wbuf_off', # offset into first element of wbuf to start writing at - 'closed', # bool: socket is closed - 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) ); -use Errno qw(EAGAIN EINVAL); -use Carp qw(croak confess); - -use constant DebugLevel => 0; - -use constant POLLIN => 1; -use constant POLLOUT => 4; -use constant POLLERR => 8; -use constant POLLHUP => 16; -use constant POLLNVAL => 32; - -our $HAVE_KQUEUE = eval { require IO::KQueue; 1 }; +use Errno qw(EAGAIN EINVAL EEXIST); +use Carp qw(croak confess carp); +require File::Spec; our ( - $HaveEpoll, # Flag -- is epoll available? initially undefined. - $HaveKQueue, %DescriptorMap, # fd (num) -> PublicInbox::DS object - $Epoll, # Global epoll fd (for epoll mode only) - $KQueue, # Global kqueue fd ref (for kqueue mode only) + $Epoll, # Global epoll fd (or DSKQXS ref) $_io, # IO::Handle for Epoll @ToClose, # sockets to close when event loop is done @@ -61,8 +49,6 @@ our ( @Timers, # timers ); -# this may be set to zero with old kernels -our $EPOLLEXCLUSIVE = EPOLLEXCLUSIVE; Reset(); ##################################################################### @@ -83,13 +69,8 @@ sub Reset { $PostLoopCallback = undef; $DoneInit = 0; - # NOTE kqueue is close-on-fork, and we don't account for it, yet - # OTOH, we (public-inbox) don't need this sub outside of tests... - POSIX::close($$KQueue) if !$_io && $KQueue && $$KQueue >= 0; - $KQueue = undef; - - $_io = undef; # close $Epoll - $Epoll = undef; + $_io = undef; # closes real $Epoll FD + $Epoll = undef; # may call DSKQXS::DESTROY *EventLoop = *FirstTimeEventLoop; } @@ -106,18 +87,6 @@ sub SetLoopTimeout { return $LoopTimeout = $_[1] + 0; } -=head2 C<< CLASS->DebugMsg( $format, @args ) >> - -Print the debugging message specified by the C<sprintf>-style I<format> and -I<args> - -=cut -sub DebugMsg { - my ( $class, $fmt, @args ) = @_; - chomp $fmt; - printf STDERR ">>> $fmt\n", @args; -} - =head2 C<< CLASS->AddTimer( $seconds, $coderef ) >> Add a timer to occur $seconds from now. $seconds may be fractional, but timers @@ -127,10 +96,15 @@ Returns a timer object which you can call C<< $timer->cancel >> on if you need t =cut sub AddTimer { - my $class = shift; - my ($secs, $coderef) = @_; + my ($class, $secs, $coderef) = @_; - my $fire_time = Time::HiRes::time() + $secs; + if (!$secs) { + my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer'); + unshift(@Timers, $timer); + return $timer; + } + + my $fire_time = now() + $secs; my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer"; @@ -168,26 +142,19 @@ sub _InitPoller return if $DoneInit; $DoneInit = 1; - if ($HAVE_KQUEUE) { - $KQueue = IO::KQueue->new(); - $HaveKQueue = defined $KQueue; - if ($HaveKQueue) { - *EventLoop = *KQueueEventLoop; - } - } - elsif (PublicInbox::Syscall::epoll_defined()) { - $Epoll = eval { epoll_create(1024); }; - $HaveEpoll = defined $Epoll && $Epoll >= 0; - if ($HaveEpoll) { - set_cloexec($Epoll); - *EventLoop = *EpollEventLoop; + if (PublicInbox::Syscall::epoll_defined()) { + $Epoll = epoll_create(); + set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0); + } else { + my $cls; + for (qw(DSKQXS DSPoll)) { + $cls = "PublicInbox::$_"; + last if eval "require $cls"; } + $cls->import; + $Epoll = $cls->new; } - - if (!$HaveEpoll && !$HaveKQueue) { - require IO::Poll; - *EventLoop = *PollEventLoop; - } + *EventLoop = *EpollEventLoop; } =head2 C<< CLASS->EventLoop() >> @@ -201,20 +168,16 @@ sub FirstTimeEventLoop { _InitPoller(); - if ($HaveEpoll) { - EpollEventLoop($class); - } elsif ($HaveKQueue) { - KQueueEventLoop($class); - } else { - PollEventLoop($class); - } + EventLoop($class); } +sub now () { clock_gettime(CLOCK_MONOTONIC) } + # runs timers and returns milliseconds for next one, or next event loop sub RunTimers { return $LoopTimeout unless @Timers; - my $now = Time::HiRes::time(); + my $now = now(); # Run expired timers while (@Timers && $Timers[0][0] <= $now) { @@ -239,11 +202,7 @@ sub RunTimers { return $timeout; } -### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads -### okay. sub EpollEventLoop { - my $class = shift; - while (1) { my @events; my $i; @@ -260,78 +219,6 @@ sub EpollEventLoop { } return unless PostEventLoop(); } - exit 0; -} - -### The fallback IO::Poll-based event loop. Gets installed as EventLoop if -### IO::Epoll fails to load. -sub PollEventLoop { - my $class = shift; - - my PublicInbox::DS $pob; - - while (1) { - my $timeout = RunTimers(); - - # the following sets up @poll as a series of ($poll,$event_mask) - # items, then uses IO::Poll::_poll, implemented in XS, which - # modifies the array in place with the even elements being - # replaced with the event masks that occured. - my @poll; - while ( my ($fd, $sock) = each %DescriptorMap ) { - push @poll, $fd, $sock->{event_watch}; - } - - # if nothing to poll, either end immediately (if no timeout) - # or just keep calling the callback - unless (@poll) { - select undef, undef, undef, ($timeout / 1000); - return unless PostEventLoop(); - next; - } - - my $count = IO::Poll::_poll($timeout, @poll); - unless ($count >= 0) { - return unless PostEventLoop(); - next; - } - - # Fetch handles with read events - while (@poll) { - my ($fd, $state) = splice(@poll, 0, 2); - $DescriptorMap{$fd}->event_step if $state; - } - - return unless PostEventLoop(); - } - - exit 0; -} - -### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works -### okay. -sub KQueueEventLoop { - my $class = shift; - - while (1) { - my $timeout = RunTimers(); - my @ret = eval { $KQueue->kevent($timeout) }; - if (my $err = $@) { - # workaround https://rt.cpan.org/Ticket/Display.html?id=116615 - if ($err =~ /Interrupted system call/) { - @ret = (); - } else { - die $err; - } - } - - foreach my $kev (@ret) { - $DescriptorMap{$kev->[0]}->event_step; - } - return unless PostEventLoop(); - } - - exit(0); } =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >> @@ -362,11 +249,11 @@ sub PostEventLoop { while (my $sock = shift @ToClose) { my $fd = fileno($sock); - # close the socket. (not a PublicInbox::DS close) - $sock->close; + # close the socket. (not a PublicInbox::DS close) + CORE::close($sock); # and now we can finally remove the fd from the map. see - # comment above in _cleanup. + # comment above in ->close. delete $DescriptorMap{$fd}; } @@ -400,7 +287,7 @@ This is normally (always?) called from your subclass via: =cut sub new { - my ($self, $sock, $exclusive) = @_; + my ($self, $sock, $ev) = @_; $self = fields::new($self) unless ref $self; $self->{sock} = $sock; @@ -409,36 +296,15 @@ sub new { Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || "")) unless $sock && $fd; - $self->{wbuf} = []; - $self->{wbuf_off} = 0; - $self->{closed} = 0; - - my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; - _InitPoller(); - if ($HaveEpoll) { - if ($exclusive) { - $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP|$EPOLLEXCLUSIVE; + if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { + if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) { + $ev &= ~EPOLLEXCLUSIVE; + goto retry; } -retry: - if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { - if ($! == EINVAL && ($ev & $EPOLLEXCLUSIVE)) { - $EPOLLEXCLUSIVE = 0; # old kernel - $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP; - goto retry; - } - die "couldn't add epoll watch for $fd: $!\n"; - } - } - elsif ($HaveKQueue) { - # Add them to the queue but disabled for now - $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), - IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE()); - $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(), - IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE()); + die "couldn't add epoll watch for $fd: $!\n"; } - Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})") if $DescriptorMap{$fd}; @@ -457,74 +323,148 @@ Close the socket. =cut sub close { - my PublicInbox::DS $self = $_[0]; - return if $self->{closed}; - - # this does most of the work of closing us - $self->_cleanup(); - - # defer closing the actual socket until the event loop is done - # processing this round of events. (otherwise we might reuse fds) - if (my $sock = delete $self->{sock}) { - push @ToClose, $sock; - } - - return 0; -} - -### METHOD: _cleanup() -### Called by our closers so we can clean internal data structures. -sub _cleanup { - my PublicInbox::DS $self = $_[0]; - - # we're effectively closed; we have no fd and sock when we leave here - $self->{closed} = 1; + my ($self) = @_; + my $sock = delete $self->{sock} or return; # we need to flush our write buffer, as there may # be self-referential closures (sub { $client->close }) # preventing the object from being destroyed - @{$self->{wbuf}} = (); + delete $self->{wbuf}; # if we're using epoll, we have to remove this from our epoll fd so we stop getting # notifications about it - if ($HaveEpoll && $self->{sock}) { - my $fd = fileno($self->{sock}); - epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and - confess("EPOLL_CTL_DEL: $!"); - } + my $fd = fileno($sock); + epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and + confess("EPOLL_CTL_DEL: $!"); # we explicitly don't delete from DescriptorMap here until we # actually close the socket, as we might be in the middle of # processing an epoll_wait/etc that returned hundreds of fds, one # of which is not yet processed and is what we're closing. if we # keep it in DescriptorMap, then the event harnesses can just - # looked at $pob->{closed} and ignore it. but if it's an + # looked at $pob->{sock} == undef and ignore it. but if it's an # un-accounted for fd, then it (understandably) freak out a bit # and emit warnings, thinking their state got off. + + # defer closing the actual socket until the event loop is done + # processing this round of events. (otherwise we might reuse fds) + push @ToClose, $sock; + + return 0; } -=head2 C<< $obj->sock() >> +# portable, non-thread-safe sendfile emulation (no pread, yet) +sub psendfile ($$$) { + my ($sock, $fh, $off) = @_; + + seek($fh, $$off, SEEK_SET) or return; + defined(my $to_write = read($fh, my $buf, 16384)) or return; + my $written = 0; + while ($to_write > 0) { + if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) { + $written += $w; + $to_write -= $w; + } else { + return if $written == 0; + last; + } + } + $$off += $written; + $written; +} -Returns the underlying IO::Handle for the object. +# returns 1 if done, 0 if incomplete +sub flush_write ($) { + my ($self) = @_; + my $wbuf = $self->{wbuf} or return 1; + my $sock = $self->{sock}; + +next_buf: + while (my $bref = $wbuf->[0]) { + if (ref($bref) ne 'CODE') { + my $off = delete($self->{wbuf_off}) // 0; + while ($sock) { + my $w = psendfile($sock, $bref, \$off); + if (defined $w) { + if ($w == 0) { + shift @$wbuf; + goto next_buf; + } + } elsif ($! == EAGAIN) { + $self->{wbuf_off} = $off; + watch($self, EPOLLOUT|EPOLLONESHOT); + return 0; + } else { + return $self->close; + } + } + } else { #($ref eq 'CODE') { + shift @$wbuf; + my $before = scalar(@$wbuf); + $bref->($self); -=cut -sub sock { - my PublicInbox::DS $self = shift; - return $self->{sock}; + # bref may be enqueueing more CODE to call (see accept_tls_step) + return 0 if (scalar(@$wbuf) > $before); + } + } # while @$wbuf + + delete $self->{wbuf}; + 1; # all done +} + +sub do_read ($$$$) { + my ($self, $rbuf, $len, $off) = @_; + my $r = sysread($self->{sock}, $$rbuf, $len, $off); + return ($r == 0 ? $self->close : $r) if defined $r; + # common for clients to break connections without warning, + # would be too noisy to log here: + if (ref($self) eq 'IO::Socket::SSL') { + my $ev = PublicInbox::TLS::epollbit() or return $self->close; + watch($self, $ev | EPOLLONESHOT); + } elsif ($! == EAGAIN) { + watch($self, EPOLLIN | EPOLLONESHOT); + } else { + $self->close; + } +} + +# drop the socket if we hit unrecoverable errors on our system which +# require BOFH attention: ENOSPC, EFBIG, EIO, EMFILE, ENFILE... +sub drop { + my $self = shift; + carp(@_); + $self->close; +} + +# n.b.: use ->write/->read for this buffer to allow compatibility with +# PerlIO::mmap or PerlIO::scalar if needed +sub tmpio ($$$) { + my ($self, $bref, $off) = @_; + my $fh; # open(my $fh, '+>>', undef) doesn't set O_APPEND + do { + my $fn = File::Spec->tmpdir . '/wbuf-' . rand; + if (sysopen($fh, $fn, O_RDWR|O_CREAT|O_EXCL|O_APPEND, 0600)) { # likely + unlink($fn) or return drop($self, "unlink($fn) $!"); + } elsif ($! != EEXIST) { # EMFILE/ENFILE/ENOSPC/ENOMEM + return drop($self, "open: $!"); + } + } until (defined $fh); + $fh->autoflush(1); + my $len = bytes::length($$bref) - $off; + $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!"); + $fh } =head2 C<< $obj->write( $data ) >> Write the specified data to the underlying handle. I<data> may be scalar, -scalar ref, code ref (to run when there), or undef just to kick-start. +scalar ref, code ref (to run when there). Returns 1 if writes all went through, or 0 if there are writes in queue. If it returns 1, caller should stop waiting for 'writable' events) =cut sub write { - my PublicInbox::DS $self; - my $data; - ($self, $data) = @_; + my ($self, $data) = @_; # nobody should be writing to closed sockets, but caller code can # do two writes within an event, have the first fail and @@ -533,203 +473,113 @@ sub write { # now-dead object does its second write. that is this case. we # just lie and say it worked. it'll be dead soon and won't be # hurt by this lie. - return 1 if $self->{closed}; - - my $bref; - - # just queue data if there's already a wait - my $need_queue; + my $sock = $self->{sock} or return 1; + my $ref = ref $data; + my $bref = $ref ? $data : \$data; my $wbuf = $self->{wbuf}; - - if (defined $data) { - $bref = ref $data ? $data : \$data; - if (scalar @$wbuf) { + if ($wbuf && scalar(@$wbuf)) { # already buffering, can't write more... + if ($ref eq 'CODE') { push @$wbuf, $bref; - return 0; - } - - # this flag says we're bypassing the queue system, knowing we're the - # only outstanding write, and hoping we don't ever need to use it. - # if so later, though, we'll need to queue - $need_queue = 1; - } - - WRITE: - while (1) { - return 1 unless $bref ||= $wbuf->[0]; - - my $len; - eval { - $len = length($$bref); # this will die if $bref is a code ref, caught below - }; - if ($@) { - if (UNIVERSAL::isa($bref, "CODE")) { - unless ($need_queue) { - shift @$wbuf; - } - $bref->(); - - # code refs are just run and never get reenqueued - # (they're one-shot), so turn off the flag indicating the - # outstanding data needs queueing. - $need_queue = 0; - - undef $bref; - next WRITE; + } else { + my $last = $wbuf->[-1]; + if (ref($last) eq 'GLOB') { # append to tmp file buffer + $last->print($$bref) or return drop($self, "print: $!"); + } else { + my $tmpio = tmpio($self, $bref, 0) or return 0; + push @$wbuf, $tmpio; } - die "Write error: $@ <$bref>"; } - - my $to_write = $len - $self->{wbuf_off}; - my $written = syswrite($self->{sock}, $$bref, $to_write, - $self->{wbuf_off}); - - if (! defined $written) { - if ($! == EAGAIN) { - # since connection has stuff to write, it should now be - # interested in pending writes: - if ($need_queue) { - push @$wbuf, $bref; - } - $self->watch_write(1); - return 0; - } - + return 0; + } elsif ($ref eq 'CODE') { + $bref->($self); + return 1; + } else { + my $to_write = bytes::length($$bref); + my $written = syswrite($sock, $$bref, $to_write); + + if (defined $written) { + return 1 if $written == $to_write; + } elsif ($! == EAGAIN) { + $written = 0; + } else { return $self->close; - } elsif ($written != $to_write) { - if ($need_queue) { - push @$wbuf, $bref; - } - # since connection has stuff to write, it should now be - # interested in pending writes: - $self->{wbuf_off} += $written; - $self->on_incomplete_write; - return 0; - } elsif ($written == $to_write) { - $self->{wbuf_off} = 0; - $self->watch_write(0); - - # this was our only write, so we can return immediately - # since we avoided incrementing the buffer size or - # putting it in the buffer. we also know there - # can't be anything else to write. - return 1 if $need_queue; - - shift @$wbuf; - undef $bref; - next WRITE; } - } -} - -sub on_incomplete_write { - my PublicInbox::DS $self = shift; - $self->watch_write(1); -} - -=head2 C<< $obj->watch_read( $boolean ) >> - -Turn 'readable' event notification on or off. + my $tmpio = tmpio($self, $bref, $written) or return 0; -=cut -sub watch_read { - my PublicInbox::DS $self = shift; - return if $self->{closed} || !$self->{sock}; - - my $val = shift; - my $event = $self->{event_watch}; - - $event &= ~POLLIN if ! $val; - $event |= POLLIN if $val; - - my $fd = fileno($self->{sock}); - # If it changed, set it - if ($event != $self->{event_watch}) { - if ($HaveKQueue) { - $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), - $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); - } - elsif ($HaveEpoll) { - epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and - confess("EPOLL_CTL_MOD: $!"); - } - $self->{event_watch} = $event; + # wbuf may be an empty array if we're being called inside + # ->flush_write via CODE bref: + push @{$self->{wbuf} ||= []}, $tmpio; + watch($self, EPOLLOUT|EPOLLONESHOT); + return 0; } } -=head2 C<< $obj->watch_write( $boolean ) >> - -Turn 'writable' event notification on or off. - -=cut -sub watch_write { - my PublicInbox::DS $self = shift; - return if $self->{closed} || !$self->{sock}; - - my $val = shift; - my $event = $self->{event_watch}; - - $event &= ~POLLOUT if ! $val; - $event |= POLLOUT if $val; - my $fd = fileno($self->{sock}); - - # If it changed, set it - if ($event != $self->{event_watch}) { - if ($HaveKQueue) { - $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(), - $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); - } - elsif ($HaveEpoll) { - epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and - confess "EPOLL_CTL_MOD: $!"; +use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; + +sub msg_more ($$) { + my $self = $_[0]; + my $sock = $self->{sock} or return 1; + + if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') { + my $n = send($sock, $_[1], MSG_MORE); + if (defined $n) { + my $nlen = bytes::length($_[1]) - $n; + return 1 if $nlen == 0; # all done! + # queue up the unwritten substring: + my $tmpio = tmpio($self, \($_[1]), $n) or return 0; + $self->{wbuf} = [ $tmpio ]; + watch($self, EPOLLOUT|EPOLLONESHOT); + return 0; } - $self->{event_watch} = $event; } + $self->write(\($_[1])); } -=head2 C<< $obj->dump_error( $message ) >> - -Prints to STDERR a backtrace with information about this socket and what lead -up to the dump_error call. +sub watch ($$) { + my ($self, $ev) = @_; + my $sock = $self->{sock} or return; + epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and + confess("EPOLL_CTL_MOD $!"); + 0; +} -=cut -sub dump_error { - my $i = 0; - my @list; - while (my ($file, $line, $sub) = (caller($i++))[1..3]) { - push @list, "\t$file:$line called $sub\n"; +sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) } + +# return true if complete, false if incomplete (or failure) +sub accept_tls_step ($) { + my ($self) = @_; + my $sock = $self->{sock} or return; + return 1 if $sock->accept_SSL; + return $self->close if $! != EAGAIN; + if (my $ev = PublicInbox::TLS::epollbit()) { + unshift @{$self->{wbuf} ||= []}, \&accept_tls_step; + return watch($self, $ev | EPOLLONESHOT); } - - warn "ERROR: $_[1]\n" . - "\t$_[0] = " . $_[0]->as_string . "\n" . - join('', @list); + drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err()); } -=head2 C<< $obj->debugmsg( $format, @args ) >> - -Print the debugging message specified by the C<sprintf>-style I<format> and -I<args>. - -=cut -sub debugmsg { - my ( $self, $fmt, @args ) = @_; - confess "Not an object" unless ref $self; - - chomp $fmt; - printf STDERR ">>> $fmt\n", @args; +sub shutdn_tls_step ($) { + my ($self) = @_; + my $sock = $self->{sock} or return; + return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1); + return $self->close if $! != EAGAIN; + if (my $ev = PublicInbox::TLS::epollbit()) { + unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step; + return watch($self, $ev | EPOLLONESHOT); + } + drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err()); } -=head2 C<< $obj->as_string() >> - -Returns a string describing this socket. - -=cut -sub as_string { - my PublicInbox::DS $self = shift; - my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') . - ($self->{event_watch} & POLLOUT ? 'W' : '') . ")"; - my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open"); - return $ret; +# don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC +# or fork w/o exec, so no inadvertant socket sharing +sub shutdn ($) { + my ($self) = @_; + my $sock = $self->{sock} or return; + if (ref($sock) eq 'IO::Socket::SSL') { + shutdn_tls_step($self); + } else { + $self->close; + } } package PublicInbox::DS::Timer; diff --git a/lib/PublicInbox/DSKQXS.pm b/lib/PublicInbox/DSKQXS.pm new file mode 100644 index 00000000..38e13446 --- /dev/null +++ b/lib/PublicInbox/DSKQXS.pm @@ -0,0 +1,73 @@ +# Copyright (C) 2019 all contributors <meta@public-inbox.org> +# Licensed the same as Danga::Socket (and Perl5) +# License: GPL-1.0+ or Artistic-1.0-Perl +# <https://www.gnu.org/licenses/gpl-1.0.txt> +# <https://dev.perl.org/licenses/artistic.html> +# +# kqueue support via IO::KQueue XS module. This makes kqueue look +# like epoll to simplify the code in DS.pm. This is NOT meant to be +# an all encompassing emulation of epoll via IO::KQueue, but just to +# support cases public-inbox-nntpd/httpd care about. +# A pure-Perl version using syscall() is planned, and it should be +# faster due to the lack of syscall overhead. +package PublicInbox::DSKQXS; +use strict; +use warnings; +use parent qw(IO::KQueue); +use parent qw(Exporter); +use IO::KQueue; +use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL); +our @EXPORT = qw(epoll_ctl epoll_wait); +my $owner_pid = -1; # kqueue is close-on-fork (yes, fork, not exec) + +# map EPOLL* bits to kqueue EV_* flags for EV_SET +sub kq_flag ($$) { + my ($bit, $ev) = @_; + if ($ev & $bit) { + my $fl = EV_ADD | EV_ENABLE; + ($ev & EPOLLONESHOT) ? ($fl | EV_ONESHOT) : $fl; + } else { + EV_ADD | EV_DISABLE; + } +} + +sub new { + my ($class) = @_; + die 'non-singleton use not supported' if $owner_pid == $$; + $owner_pid = $$; + $class->SUPER::new; +} + +sub epoll_ctl { + my ($self, $op, $fd, $ev) = @_; + if ($op != EPOLL_CTL_DEL) { + $self->EV_SET($fd, EVFILT_READ, kq_flag(EPOLLIN, $ev)); + $self->EV_SET($fd, EVFILT_WRITE, kq_flag(EPOLLOUT, $ev)); + } + 0; +} + +sub epoll_wait { + my ($self, $maxevents, $timeout_msec, $events) = @_; + @$events = eval { $self->kevent($timeout_msec) }; + if (my $err = $@) { + # workaround https://rt.cpan.org/Ticket/Display.html?id=116615 + if ($err =~ /Interrupted system call/) { + @$events = (); + } else { + die $err; + } + } + # caller only cares for $events[$i]->[0] + scalar(@$events); +} + +sub DESTROY { + my ($self) = @_; + if ($owner_pid == $$) { + POSIX::close($$self); + $owner_pid = -1; + } +} + +1; diff --git a/lib/PublicInbox/DSPoll.pm b/lib/PublicInbox/DSPoll.pm new file mode 100644 index 00000000..e65640a8 --- /dev/null +++ b/lib/PublicInbox/DSPoll.pm @@ -0,0 +1,58 @@ +# Copyright (C) 2019 all contributors <meta@public-inbox.org> +# Licensed the same as Danga::Socket (and Perl5) +# License: GPL-1.0+ or Artistic-1.0-Perl +# <https://www.gnu.org/licenses/gpl-1.0.txt> +# <https://dev.perl.org/licenses/artistic.html> +# +# poll(2) via IO::Poll core module. This makes poll look +# like epoll to simplify the code in DS.pm. This is NOT meant to be +# an all encompassing emulation of epoll via IO::Poll, but just to +# support cases public-inbox-nntpd/httpd care about. +package PublicInbox::DSPoll; +use strict; +use warnings; +use parent qw(Exporter); +use IO::Poll; +use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL); +our @EXPORT = qw(epoll_ctl epoll_wait); + +sub new { bless {}, $_[0] } # fd => events + +sub epoll_ctl { + my ($self, $op, $fd, $ev) = @_; + + # not wasting time on error checking + if ($op != EPOLL_CTL_DEL) { + $self->{$fd} = $ev; + } else { + delete $self->{$fd}; + } + 0; +} + +sub epoll_wait { + my ($self, $maxevents, $timeout_msec, $events) = @_; + my @pset; + while (my ($fd, $events) = each %$self) { + my $pevents = $events & EPOLLIN ? POLLIN : 0; + $pevents |= $events & EPOLLOUT ? POLLOUT : 0; + push(@pset, $fd, $pevents); + } + @$events = (); + my $n = IO::Poll::_poll($timeout_msec, @pset); + if ($n >= 0) { + for (my $i = 0; $i < @pset; ) { + my $fd = $pset[$i++]; + my $revents = $pset[$i++] or next; + delete($self->{$fd}) if $self->{$fd} & EPOLLONESHOT; + push @$events, [ $fd ]; + } + my $nevents = scalar @$events; + if ($n != $nevents) { + warn "BUG? poll() returned $n, but got $nevents"; + } + } + $n; +} + +1; diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index 227ba5f9..cf011a20 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -8,11 +8,12 @@ use warnings; use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/; use IO::Handle; use IO::Socket; +use Socket qw(IPPROTO_TCP SOL_SOCKET); +sub SO_ACCEPTFILTER () { 0x1000 } use Cwd qw/abs_path/; -use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); STDOUT->autoflush(1); STDERR->autoflush(1); -require PublicInbox::DS; +use PublicInbox::DS qw(now); require PublicInbox::EvCleanup; require POSIX; require PublicInbox::Listener; @@ -23,11 +24,55 @@ my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize); my $worker_processes = 1; my @listeners; my %pids; -my %listener_names; +my %listener_names; # sockname => IO::Handle +my %tls_opt; # scheme://sockname => args for IO::Socket::SSL->start_SSL my $reexec_pid; my $cleanup; my ($uid, $gid); +my ($default_cert, $default_key); END { $cleanup->() if $cleanup }; +my %KNOWN_TLS = ( 443 => 'https', 563 => 'nntps' ); +my %KNOWN_STARTTLS = ( 119 => 'nntp' ); + +sub accept_tls_opt ($) { + my ($opt_str) = @_; + # opt_str: opt1=val1,opt2=val2 (opt may repeat for multi-value) + require PublicInbox::TLS; + my $o = {}; + # allow ',' as delimiter since '&' is shell-unfriendly + foreach (split(/[,&]/, $opt_str)) { + my ($k, $v) = split(/=/, $_, 2); + push @{$o->{$k} ||= []}, $v; + } + + # key may be a part of cert. At least + # p5-io-socket-ssl/example/ssl_server.pl has this fallback: + $o->{cert} //= [ $default_cert ]; + $o->{key} //= defined($default_key) ? [ $default_key ] : $o->{cert}; + my %ctx_opt = (SSL_server => 1); + # parse out hostname:/path/to/ mappings: + foreach my $k (qw(cert key)) { + my $x = $ctx_opt{'SSL_'.$k.'_file'} = {}; + foreach my $path (@{$o->{$k}}) { + my $host = ''; + $path =~ s/\A([^:]+):// and $host = $1; + $x->{$host} = $path; + } + } + my $ctx = IO::Socket::SSL::SSL_Context->new(%ctx_opt) or + die 'SSL_Context->new: '.PublicInbox::TLS::err(); + + # save ~34K per idle connection (cf. SSL_CTX_set_mode(3ssl)) + # RSS goes from 346MB to 171MB with 10K idle NNTPS clients on amd64 + # cf. https://rt.cpan.org/Ticket/Display.html?id=129463 + my $mode = eval { Net::SSLeay::MODE_RELEASE_BUFFERS() }; + if ($mode && $ctx->{context}) { + eval { Net::SSLeay::CTX_set_mode($ctx->{context}, $mode) }; + warn "W: $@ (setting SSL_MODE_RELEASE_BUFFERS)\n" if $@; + } + + { SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx }; +} sub daemon_prepare ($) { my ($default_listen) = @_; @@ -43,6 +88,8 @@ sub daemon_prepare ($) { 'u|user=s' => \$user, 'g|group=s' => \$group, 'D|daemonize' => \$daemonize, + 'cert=s' => \$default_cert, + 'key=s' => \$default_key, ); GetOptions(%opts) or die "bad command-line args\n"; @@ -50,12 +97,34 @@ sub daemon_prepare ($) { die "--pid-file cannot end with '.oldbin'\n"; } @listeners = inherit(); + + # allow socket-activation users to set certs once and not + # have to configure each socket: + my @inherited_names = keys(%listener_names) if defined($default_cert); + # ignore daemonize when inheriting $daemonize = undef if scalar @listeners; push @cfg_listen, $default_listen unless (@listeners || @cfg_listen); foreach my $l (@cfg_listen) { + my $orig = $l; + my $scheme = ''; + if ($l =~ s!\A([^:]+)://!!) { + $scheme = $1; + } elsif ($l =~ /\A(?:\[[^\]]+\]|[^:]+):([0-9])+/) { + my $s = $KNOWN_TLS{$1} // $KNOWN_STARTTLS{$1}; + $scheme = $s if defined $s; + } + if ($l =~ s!/?\?(.+)\z!!) { + $tls_opt{"$scheme://$l"} = accept_tls_opt($1); + } elsif (defined($default_cert)) { + $tls_opt{"$scheme://$l"} = accept_tls_opt(''); + } elsif ($scheme =~ /\A(?:nntps|https)\z/) { + die "$orig specified w/o cert=\n"; + } + # TODO: use scheme to load either NNTP.pm or HTTP.pm + next if $listener_names{$l}; # already inherited my (%o, $sock_pkg); if (index($l, '/') == 0) { @@ -92,6 +161,20 @@ sub daemon_prepare ($) { push @listeners, $s; } } + + # cert/key options in @cfg_listen takes precedence when inheriting, + # but map well-known inherited ports if --listen isn't specified + # at all + for my $sockname (@inherited_names) { + $sockname =~ /:([0-9]+)\z/ or next; + if (my $scheme = $KNOWN_TLS{$1}) { + $tls_opt{"$scheme://$sockname"} ||= accept_tls_opt(''); + } elsif (($scheme = $KNOWN_STARTTLS{$1})) { + next if $tls_opt{"$scheme://$sockname"}; + $tls_opt{''} ||= accept_tls_opt(''); + } + } + die "No listeners bound\n" unless @listeners; } @@ -183,7 +266,7 @@ sub worker_quit { PublicInbox::DS->SetPostLoopCallback(sub { my ($dmap, undef) = @_; my $n = 0; - my $now = clock_gettime(CLOCK_MONOTONIC); + my $now = now(); foreach my $s (values %$dmap) { $s->can('busy') or next; @@ -195,9 +278,9 @@ sub worker_quit { } } if ($n) { - if (($warn + 5) < time) { + if (($warn + 5) < now()) { warn "$$ quitting, $n client(s) left\n"; - $warn = time; + $warn = now(); } unless (defined $proc_name) { $proc_name = (split(/\s+/, $0))[0]; @@ -462,9 +545,43 @@ sub master_loop { exit # never gets here, just for documentation } -sub daemon_loop ($$) { - my ($refresh, $post_accept) = @_; +sub tls_start_cb ($$) { + my ($opt, $orig_post_accept) = @_; + sub { + my ($io, $addr, $srv) = @_; + my $ssl = IO::Socket::SSL->start_SSL($io, %$opt); + $orig_post_accept->($ssl, $addr, $srv); + } +} + +sub defer_accept ($$) { + my ($s, $af_name) = @_; + return unless defined $af_name; + if ($^O eq 'linux') { + my $x = getsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT()); + return unless defined $x; # may be Unix socket + my $sec = unpack('i', $x); + return if $sec > 0; # systemd users may set a higher value + setsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 1); + } elsif ($^O eq 'freebsd') { + my $x = getsockopt($s, SOL_SOCKET, SO_ACCEPTFILTER); + return if defined $x; # don't change if set + my $accf_arg = pack('a16a240', $af_name, ''); + setsockopt($s, SOL_SOCKET, SO_ACCEPTFILTER, $accf_arg); + } +} + +sub daemon_loop ($$$$) { + my ($refresh, $post_accept, $nntpd, $af_default) = @_; PublicInbox::EvCleanup::enable(); # early for $refresh + my %post_accept; + while (my ($k, $v) = each %tls_opt) { + if ($k =~ s!\A(?:nntps|https)://!!) { + $post_accept{$k} = tls_start_cb($v, $post_accept); + } elsif ($nntpd) { # STARTTLS, $k eq '' is OK + $nntpd->{accept_tls} = $v; + } + } my $parent_pipe; if ($worker_processes > 0) { $refresh->(); # preload by default @@ -483,20 +600,27 @@ sub daemon_loop ($$) { $SIG{HUP} = $refresh; $SIG{CHLD} = 'DEFAULT'; $SIG{$_} = 'IGNORE' for qw(USR2 TTIN TTOU WINCH); - # this calls epoll_create: - @listeners = map { - PublicInbox::Listener->new($_, $post_accept) + @listeners = map {; + my $tls_cb = $post_accept{sockname($_)}; + + # NNTPS, HTTPS, HTTP, and POP3S are client-first traffic + # NNTP and POP3 are server-first + defer_accept($_, $tls_cb ? 'dataready' : $af_default); + + # this calls epoll_create: + PublicInbox::Listener->new($_, $tls_cb || $post_accept) } @listeners; PublicInbox::DS->EventLoop; $parent_pipe = undef; } -sub run ($$$) { - my ($default, $refresh, $post_accept) = @_; +sub run ($$$;$) { + my ($default, $refresh, $post_accept, $nntpd) = @_; daemon_prepare($default); + my $af_default = $default =~ /:8080\z/ ? 'httpready' : undef; daemonize(); - daemon_loop($refresh, $post_accept); + daemon_loop($refresh, $post_accept, $nntpd, $af_default); } sub do_chown ($) { diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm index c64e2388..33b54ebc 100644 --- a/lib/PublicInbox/EvCleanup.pm +++ b/lib/PublicInbox/EvCleanup.pm @@ -6,6 +6,7 @@ package PublicInbox::EvCleanup; use strict; use warnings; use base qw(PublicInbox::DS); +use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT); my $ENABLED; sub enabled { $ENABLED } @@ -23,13 +24,13 @@ sub once_init () { # fires in the next event loop iteration. pipe($r, $w) or die "pipe: $!"; fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ - $self->SUPER::new($w); + $self->SUPER::new($w, 0); # always writable, since PublicInbox::EvCleanup::event_step # never drains wbuf. We can avoid wasting a hash slot by # stuffing the read-end of the pipe into the never-to-be-touched # wbuf - push @{$self->{wbuf}}, $r; + $self->{wbuf} = $r; $self; } @@ -45,7 +46,9 @@ sub _run_all ($) { # ensure PublicInbox::DS::ToClose processing after timers fire sub _asap_close () { $asapq->[1] ||= _asap_timer() } -sub _run_asap () { _run_all($asapq) } +# Called by PublicInbox::DS +sub event_step { _run_all($asapq) } + sub _run_next () { _run_all($nextq); _asap_close(); @@ -56,16 +59,9 @@ sub _run_later () { _asap_close(); } -# Called by PublicInbox::DS -sub event_step { - my ($self) = @_; - $self->watch_write(0); - _run_asap(); -} - sub _asap_timer () { $singleton ||= once_init(); - $singleton->watch_write(1); + $singleton->watch(EPOLLOUT|EPOLLONESHOT); 1; } @@ -88,7 +84,7 @@ sub later ($) { } END { - _run_asap(); + event_step(); _run_all($nextq); _run_all($laterq); } diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm index a2a81f8e..303d5073 100644 --- a/lib/PublicInbox/GitHTTPBackend.pm +++ b/lib/PublicInbox/GitHTTPBackend.pm @@ -231,18 +231,16 @@ sub input_prepare { return; } last if $r == 0; - my $off = 0; - while ($r > 0) { - my $w = syswrite($in, $buf, $r, $off); - if (defined $w) { - $r -= $w; - $off += $w; - } else { - err($env, "error writing temporary file: $!"); - return; - } + unless (print $in $buf) { + err($env, "error writing temporary file: $!"); + return; } } + # ensure it's visible to git-http-backend(1): + unless ($in->flush) { + err($env, "error writing temporary file: $!"); + return; + } unless (defined(sysseek($in, 0, SEEK_SET))) { err($env, "error seeking temporary file: $!"); return; diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 45bf23ec..a1cb4aca 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -19,12 +19,15 @@ use HTTP::Status qw(status_message); use HTTP::Date qw(time2str); use IO::Handle; require PublicInbox::EvCleanup; +PublicInbox::DS->import(qw(msg_more)); +use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); use constant { CHUNK_START => -1, # [a-f0-9]+\r\n CHUNK_END => -2, # \r\n CHUNK_ZEND => -3, # \r\n CHUNK_MAX_HDR => 256, }; +use Errno qw(EAGAIN); my $pipelineq = []; my $pipet; @@ -33,7 +36,7 @@ sub process_pipelineq () { $pipet = undef; $pipelineq = []; foreach (@$q) { - next if $_->{closed}; + next unless $_->{sock}; rbuf_process($_); } } @@ -55,39 +58,26 @@ sub http_date () { sub new ($$$) { my ($class, $sock, $addr, $httpd) = @_; my $self = fields::new($class); - $self->SUPER::new($sock); + $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT); $self->{httpd} = $httpd; $self->{rbuf} = ''; ($self->{remote_addr}, $self->{remote_port}) = PublicInbox::Daemon::host_with_port($addr); - $self->watch_read(1); $self; } sub event_step { # called by PublicInbox::DS my ($self) = @_; - my $wbuf = $self->{wbuf}; - if (@$wbuf) { - $self->write(undef); - return if $self->{closed} || scalar(@$wbuf); - } + return unless $self->flush_write && $self->{sock}; + # only read more requests if we've drained the write buffer, # otherwise we can be buffering infinitely w/o backpressure return read_input($self) if defined $self->{env}; - - my $off = length($self->{rbuf}); - my $r = sysread($self->{sock}, $self->{rbuf}, 8192, $off); - if (defined $r) { - return $self->close if $r == 0; - return rbuf_process($self); - } - return if $!{EAGAIN}; # no need to call watch_read(1) again - - # common for clients to break connections without warning, - # would be too noisy to log here: - return $self->close; + my $rbuf = \($self->{rbuf}); + my $off = bytes::length($$rbuf); + $self->do_read($rbuf, 8192, $off) and rbuf_process($self); } sub rbuf_process { @@ -100,10 +90,10 @@ sub rbuf_process { # (they are rarely-used and git (as of 2.7.2) does not use them) if ($r == -1 || $env{HTTP_TRAILER} || # this length-check is necessary for PURE_PERL=1: - ($r == -2 && length($self->{rbuf}) > 0x4000)) { + ($r == -2 && bytes::length($self->{rbuf}) > 0x4000)) { return quit($self, 400); } - return $self->watch_read(1) if $r < 0; # incomplete + return $self->watch_in1 if $r < 0; # incomplete $self->{rbuf} = substr($self->{rbuf}, $r); my $len = input_prepare($self, \%env); @@ -112,6 +102,15 @@ sub rbuf_process { $len ? read_input($self) : app_dispatch($self); } +# IO::Handle::write returns boolean, this returns bytes written: +sub xwrite ($$$) { + my ($fh, $rbuf, $max) = @_; + my $w = bytes::length($$rbuf); + $w = $max if $w > $max; + $fh->write($$rbuf, $w) or return; + $w; +} + sub read_input ($) { my ($self) = @_; my $env = $self->{env}; @@ -120,14 +119,13 @@ sub read_input ($) { # env->{CONTENT_LENGTH} (identity) my $sock = $self->{sock}; - my $len = $self->{input_left}; - $self->{input_left} = undef; + my $len = delete $self->{input_left}; my $rbuf = \($self->{rbuf}); my $input = $env->{'psgi.input'}; while ($len > 0) { if ($$rbuf ne '') { - my $w = write_in_full($input, $rbuf, $len); + my $w = xwrite($input, $rbuf, $len); return write_err($self, $len) unless $w; $len -= $w; die "BUG: $len < 0 (w=$w)" if $len < 0; @@ -146,7 +144,6 @@ sub read_input ($) { sub app_dispatch { my ($self, $input) = @_; - $self->watch_read(0); my $env = $self->{env}; $env->{REMOTE_ADDR} = $self->{remote_addr}; $env->{REMOTE_PORT} = $self->{remote_port}; @@ -210,9 +207,9 @@ sub response_header_write { $h .= 'Date: ' . http_date() . "\r\n\r\n"; if (($len || $chunked) && $env->{REQUEST_METHOD} ne 'HEAD') { - more($self, $h); + msg_more($self, $h); } else { - $self->write($h); + $self->write(\$h); } $alive; } @@ -222,12 +219,12 @@ sub chunked_wcb ($) { my ($self) = @_; sub { return if $_[0] eq ''; - more($self, sprintf("%x\r\n", bytes::length($_[0]))); - more($self, $_[0]); + msg_more($self, sprintf("%x\r\n", bytes::length($_[0]))); + msg_more($self, $_[0]); - # use $self->write("\n\n") if you care about real-time + # use $self->write(\"\n\n") if you care about real-time # streaming responses, public-inbox WWW does not. - more($self, "\r\n"); + msg_more($self, "\r\n"); } } @@ -239,7 +236,7 @@ sub identity_wcb ($) { sub next_request ($) { my ($self) = @_; if ($self->{rbuf} eq '') { # wait for next request - $self->watch_read(1); + $self->watch_in1; } else { # avoid recursion for pipelined requests push @$pipelineq, $self; $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq); @@ -249,10 +246,9 @@ sub next_request ($) { sub response_done_cb ($$) { my ($self, $alive) = @_; sub { - my $env = $self->{env}; - $self->{env} = undef; - $self->write("0\r\n\r\n") if $alive == 2; - $self->write(sub{$alive ? next_request($self) : $self->close}); + my $env = delete $self->{env}; + $self->write(\"0\r\n\r\n") if $alive == 2; + $self->write($alive ? \&next_request : \&close); } } @@ -266,9 +262,9 @@ sub getline_cb ($$$) { my $buf = eval { $forward->getline }; if (defined $buf) { $write->($buf); # may close in PublicInbox::DS::write - unless ($self->{closed}) { + if ($self->{sock}) { my $next = $self->{pull}; - if (scalar @{$self->{wbuf}}) { + if ($self->{wbuf}) { $self->write($next); } else { PublicInbox::EvCleanup::asap($next); @@ -282,7 +278,7 @@ sub getline_cb ($$$) { } } - $self->{forward} = $self->{pull} = undef; + delete @$self{qw(forward pull)}; # avoid recursion if ($forward) { eval { $forward->close }; @@ -319,21 +315,9 @@ sub response_write { } } -use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; -sub more ($$) { - my $self = $_[0]; - return if $self->{closed}; - if (MSG_MORE && !scalar(@{$self->{wbuf}})) { - my $n = send($self->{sock}, $_[1], MSG_MORE); - if (defined $n) { - my $nlen = length($_[1]) - $n; - return 1 if $nlen == 0; # all done! - - # PublicInbox::DS::write queues the unwritten substring: - return $self->write(substr($_[1], $n, $nlen)); - } - } - $self->write($_[1]); +sub input_tmpfile ($) { + open($_[0], '+>', undef); + $_[0]->autoflush(1); } sub input_prepare { @@ -345,10 +329,10 @@ sub input_prepare { quit($self, 413); return; } - open($input, '+>', undef); + input_tmpfile($input); } elsif (env_chunked($env)) { $len = CHUNK_START; - open($input, '+>', undef); + input_tmpfile($input); } else { $input = $null_io; } @@ -378,46 +362,31 @@ sub write_err { sub recv_err { my ($self, $r, $len) = @_; return $self->close if (defined $r && $r == 0); - if ($!{EAGAIN}) { + if ($! == EAGAIN) { $self->{input_left} = $len; - return; + return $self->watch_in1; } err($self, "error reading for input: $! ($len bytes remaining)"); quit($self, 500); } -sub write_in_full { - my ($fh, $rbuf, $len) = @_; - my $rv = 0; - my $off = 0; - while ($len > 0) { - my $w = syswrite($fh, $$rbuf, $len, $off); - return ($rv ? $rv : $w) unless $w; # undef or 0 - $rv += $w; - $off += $w; - $len -= $w; - } - $rv -} - sub read_input_chunked { # unlikely... my ($self) = @_; my $input = $self->{env}->{'psgi.input'}; my $sock = $self->{sock}; - my $len = $self->{input_left}; - $self->{input_left} = undef; + my $len = delete $self->{input_left}; my $rbuf = \($self->{rbuf}); while (1) { # chunk start if ($len == CHUNK_ZEND) { $$rbuf =~ s/\A\r\n//s and return app_dispatch($self, $input); - return quit($self, 400) if length($$rbuf) > 2; + return quit($self, 400) if bytes::length($$rbuf) > 2; } if ($len == CHUNK_END) { if ($$rbuf =~ s/\A\r\n//s) { $len = CHUNK_START; - } elsif (length($$rbuf) > 2) { + } elsif (bytes::length($$rbuf) > 2) { return quit($self, 400); } } @@ -427,14 +396,14 @@ sub read_input_chunked { # unlikely... if (($len + -s $input) > $MAX_REQUEST_BUFFER) { return quit($self, 413); } - } elsif (length($$rbuf) > CHUNK_MAX_HDR) { + } elsif (bytes::length($$rbuf) > CHUNK_MAX_HDR) { return quit($self, 400); } # will break from loop since $len >= 0 } if ($len < 0) { # chunk header is trickled, read more - my $off = length($$rbuf); + my $off = bytes::length($$rbuf); my $r = sysread($sock, $$rbuf, 8192, $off); return recv_err($self, $r, $len) unless $r; # (implicit) goto chunk_start if $r > 0; @@ -444,7 +413,7 @@ sub read_input_chunked { # unlikely... # drain the current chunk until ($len <= 0) { if ($$rbuf ne '') { - my $w = write_in_full($input, $rbuf, $len); + my $w = xwrite($input, $rbuf, $len); return write_err($self, "$len chunk") if !$w; $len -= $w; if ($len == 0) { @@ -470,27 +439,34 @@ sub read_input_chunked { # unlikely... sub quit { my ($self, $status) = @_; my $h = "HTTP/1.1 $status " . status_message($status) . "\r\n\r\n"; - $self->write($h); + $self->write(\$h); $self->close; } sub close { - my $self = shift; - my $forward = $self->{forward}; - my $env = $self->{env}; - delete $env->{'psgix.io'} if $env; # prevent circular references - $self->{pull} = $self->{forward} = $self->{env} = undef; - if ($forward) { + my $self = $_[0]; + if (my $env = delete $self->{env}) { + delete $env->{'psgix.io'}; # prevent circular references + } + delete $self->{pull}; + if (my $forward = delete $self->{forward}) { eval { $forward->close }; err($self, "forward ->close error: $@") if $@; } - $self->SUPER::close(@_); + $self->SUPER::close; # PublicInbox::DS::close } # for graceful shutdown in PublicInbox::Daemon: sub busy () { my ($self) = @_; - ($self->{rbuf} ne '' || $self->{env} || scalar(@{$self->{wbuf}})); + ($self->{rbuf} ne '' || $self->{env} || $self->{wbuf}); } +# fires after pending writes are complete: +sub restart_pass ($) { + $_[0]->{forward}->restart_read; # see PublicInbox::HTTPD::Async +} + +sub enqueue_restart_pass ($) { $_[0]->write(\&restart_pass) } + 1; diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 604627ab..b46baeb2 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -11,6 +11,7 @@ use warnings; use base qw(PublicInbox::DS); use fields qw(cb cleanup); require PublicInbox::EvCleanup; +use Errno qw(EAGAIN); sub new { my ($class, $io, $cb, $cleanup) = @_; @@ -25,18 +26,13 @@ sub new { my $self = fields::new($class); IO::Handle::blocking($io, 0); - $self->SUPER::new($io); + $self->SUPER::new($io, PublicInbox::DS::EPOLLIN()); $self->{cb} = $cb; $self->{cleanup} = $cleanup; - $self->watch_read(1); $self; } -# fires after pending writes are complete: -sub restart_read_cb ($) { - my ($self) = @_; - sub { $self->watch_read(1) } -} +sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) } sub main_cb ($$$) { my ($http, $fh, $bref) = @_; @@ -44,24 +40,31 @@ sub main_cb ($$$) { my ($self) = @_; my $r = sysread($self->{sock}, $$bref, 8192); if ($r) { - $fh->write($$bref); - unless ($http->{closed}) { # PublicInbox::DS sets this - if (scalar @{$http->{wbuf}}) { - $self->watch_read(0); - $http->write(restart_read_cb($self)); + $fh->write($$bref); # may call $http->close + + if ($http->{sock}) { # !closed + if ($http->{wbuf}) { + # HTTP client could not keep up, so + # stop reading and buffering. + $self->watch(0); + + # Tell the HTTP socket to restart us + # when HTTP client is done draining + # $http->{wbuf}: + $http->enqueue_restart_pass; } - # stay in watch_read, but let other clients + # stay in EPOLLIN, but let other clients # get some work done, too. return; } # fall through to close below... } elsif (!defined $r) { - return if $!{EAGAIN} || $!{EINTR}; + return restart_read($self) if $! == EAGAIN; } # Done! Error handling will happen in $fh->close # called by the {cleanup} handler - $http->{forward} = undef; + delete $http->{forward}; $self->close; } } @@ -78,13 +81,14 @@ sub async_pass { sub event_step { $_[0]->{cb}->(@_) } sub close { - my $self = shift; - my $cleanup = $self->{cleanup}; - $self->{cleanup} = $self->{cb} = undef; - $self->SUPER::close(@_); + my $self = $_[0]; + delete $self->{cb}; + $self->SUPER::close; # we defer this to the next timer loop since close is deferred - PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup; + if (my $cleanup = delete $self->{cleanup}) { + PublicInbox::EvCleanup::next_tick($cleanup); + } } 1; diff --git a/lib/PublicInbox/Listener.pm b/lib/PublicInbox/Listener.pm index 6ee3abb1..94b2aed4 100644 --- a/lib/PublicInbox/Listener.pm +++ b/lib/PublicInbox/Listener.pm @@ -17,8 +17,8 @@ sub new ($$$) { listen($s, 1024); IO::Handle::blocking($s, 0); my $self = fields::new($class); - $self->SUPER::new($s, 1); # calls epoll_create for the first socket - $self->watch_read(1); + $self->SUPER::new($s, PublicInbox::DS::EPOLLIN()| + PublicInbox::DS::EPOLLEXCLUSIVE()); $self->{post_accept} = $cb; $self } diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 796ac74d..53e18281 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2018 all contributors <meta@public-inbox.org> +# Copyright (C) 2015-2019 all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # # Each instance of this represents a NNTP client socket @@ -6,7 +6,7 @@ package PublicInbox::NNTP; use strict; use warnings; use base qw(PublicInbox::DS); -use fields qw(nntpd article rbuf ng long_res); +use fields qw(nntpd article rbuf ng); use PublicInbox::Search; use PublicInbox::Msgmap; use PublicInbox::MID qw(mid_escape); @@ -14,7 +14,7 @@ use PublicInbox::Git; require PublicInbox::EvCleanup; use Email::Simple; use POSIX qw(strftime); -use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); +PublicInbox::DS->import(qw(now msg_more)); use Digest::SHA qw(sha1_hex); use Time::Local qw(timegm timelocal); use constant { @@ -24,8 +24,8 @@ use constant { r225 => '225 Headers follow (multi-line)', r430 => '430 No article with that message-id', }; - -sub now () { clock_gettime(CLOCK_MONOTONIC) }; +use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); +use Errno qw(EAGAIN); my @OVERVIEW = qw(Subject From Date Message-ID References Xref); my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n"; @@ -45,30 +45,18 @@ sub next_tick () { $nextt = undef; my $q = $nextq; $nextq = []; - foreach my $nntp (@$q) { - # for request && response protocols, always finish writing - # before finishing reading: - if (my $long_cb = $nntp->{long_res}) { - $nntp->write($long_cb); - } else { - # pipelined request, we bypassed socket-readiness - # checks to get here: - event_step($nntp); - - # maybe there's more pipelined data, or we'll have - # to register it for socket-readiness notifications - if (!$nntp->{long_res} && !$nntp->{closed}) { - check_read($nntp); - } - } - } + event_step($_) for @$q; +} + +sub requeue ($) { + push @$nextq, $_[0]; + $nextt ||= PublicInbox::EvCleanup::asap(*next_tick); } sub update_idle_time ($) { my ($self) = @_; - my $sock = $self->{sock} or return; - my $fd = fileno($sock); - defined $fd and $EXPMAP->{$fd} = [ now(), $self ]; + my $sock = $self->{sock} or return; + $EXPMAP->{fileno($sock)} = [ now(), $self ]; } sub expire_old () { @@ -76,11 +64,17 @@ sub expire_old () { my $exp = $EXPTIME; my $old = $now - $exp; my $nr = 0; + my $closed = 0; my %new; while (my ($fd, $v) = each %$EXPMAP) { my ($idle_time, $nntp) = @$v; if ($idle_time < $old) { - $nntp->close; # idempotent + if ($nntp->shutdn) { + $closed++; + } else { + ++$nr; + $new{$fd} = $v; + } } else { ++$nr; $new{$fd} = $v; @@ -93,18 +87,28 @@ sub expire_old () { $expt = undef; # noop to kick outselves out of the loop ASAP so descriptors # really get closed - PublicInbox::EvCleanup::asap(sub {}); + PublicInbox::EvCleanup::asap(sub {}) if $closed; } } +sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) }; + sub new ($$$) { my ($class, $sock, $nntpd) = @_; my $self = fields::new($class); - $self->SUPER::new($sock); + my $ev = EPOLLIN; + my $wbuf; + if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) { + $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock); + $wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ]; + } + $self->SUPER::new($sock, $ev | EPOLLONESHOT); $self->{nntpd} = $nntpd; - res($self, '201 ' . $nntpd->{servername} . ' ready - post via email'); - $self->{rbuf} = ''; - $self->watch_read(1); + if ($wbuf) { + $self->{wbuf} = $wbuf; + } else { + greet($self); + } update_idle_time($self); $expt ||= PublicInbox::EvCleanup::later(*expire_old); $self; @@ -134,7 +138,7 @@ sub process_line ($$) { my $res = eval { $req->($self, @args) }; my $err = $@; - if ($err && !$self->{closed}) { + if ($err && $self->{sock}) { local $/ = "\n"; chomp($l); err($self, 'error from: %s (%s)', $l, $err); @@ -162,12 +166,12 @@ sub cmd_xgtitle ($;$) { sub list_overview_fmt ($) { my ($self) = @_; - do_more($self, $OVERVIEW_FMT); + msg_more($self, $OVERVIEW_FMT); } sub list_headers ($;$) { my ($self) = @_; - do_more($self, $LIST_HEADERS); + msg_more($self, $LIST_HEADERS); } sub list_active ($;$) { @@ -251,7 +255,7 @@ sub parse_time ($$;$) { } my @now = $gmt ? gmtime : localtime; my ($YYYY, $MM, $DD); - if (length($date) == 8) { # RFC 3977 allows YYYYMMDD + if (bytes::length($date) == 8) { # RFC 3977 allows YYYYMMDD ($YYYY, $MM, $DD) = unpack('A4A2A2', $date); } else { # legacy clients send YYMMDD ($YYYY, $MM, $DD) = unpack('A2A2A2', $date); @@ -403,7 +407,7 @@ sub cmd_post ($) { sub cmd_quit ($) { my ($self) = @_; res($self, '205 closing connection - goodbye!'); - $self->close; + $self->shutdn; undef; } @@ -522,8 +526,8 @@ sub simple_body_write ($$) { $s->body_set(''); $body =~ s/^\./../smg; $body =~ s/(?<!\r)\n/\r\n/sg; - do_more($self, $body); - do_more($self, "\r\n") unless $body =~ /\r\n\z/s; + msg_more($self, $body); + msg_more($self, "\r\n") unless $body =~ /\r\n\z/s; '.' } @@ -553,8 +557,8 @@ sub cmd_article ($;$) { my ($n, $mid, $s) = @$r; set_art($self, $art); more($self, "220 $n <$mid> article retrieved - head and body follow"); - do_more($self, _header($s)); - do_more($self, "\r\n"); + msg_more($self, _header($s)); + msg_more($self, "\r\n"); simple_body_write($self, $s); } @@ -565,7 +569,7 @@ sub cmd_head ($;$) { my ($n, $mid, $s) = @$r; set_art($self, $art); more($self, "221 $n <$mid> article retrieved - head follows"); - do_more($self, _header($s)); + msg_more($self, _header($s)); '.' } @@ -620,48 +624,46 @@ sub get_range ($$) { } sub long_response ($$) { - my ($self, $cb) = @_; - die "BUG: nested long response" if $self->{long_res}; + my ($self, $cb) = @_; # cb returns true if more, false if done my $fd = fileno($self->{sock}); defined $fd or return; # make sure we disable reading during a long response, # clients should not be sending us stuff and making us do more # work while we are stream a response to them - $self->watch_read(0); my $t0 = now(); - $self->{long_res} = sub { + my $long_cb; # DANGER: self-referential + $long_cb = sub { + # wbuf is unset or empty, here; $cb may add to it my $more = eval { $cb->() }; - if ($@ || $self->{closed}) { - $self->{long_res} = undef; - + if ($@ || !$self->{sock}) { # something bad happened... + $long_cb = undef; + my $diff = now() - $t0; if ($@) { err($self, "%s during long response[$fd] - %0.6f", - $@, now() - $t0); - } - if ($self->{closed}) { - out($self, " deferred[$fd] aborted - %0.6f", - now() - $t0); - } else { - update_idle_time($self); - check_read($self); + $@, $diff); } - } elsif ($more) { # scalar @{$self->{wbuf}}: + out($self, " deferred[$fd] aborted - %0.6f", $diff); + $self->close; + } elsif ($more) { # $self->{wbuf}: + update_idle_time($self); + # no recursion, schedule another call ASAP # but only after all pending writes are done - update_idle_time($self); + my $wbuf = $self->{wbuf} ||= []; + push @$wbuf, $long_cb; - push @$nextq, $self; - $nextt ||= PublicInbox::EvCleanup::asap(*next_tick); + # wbuf may be populated by $cb, no need to rearm if so: + requeue($self) if scalar(@$wbuf) == 1; } else { # all done! - $self->{long_res} = undef; - check_read($self); + $long_cb = undef; res($self, '.'); out($self, " deferred[$fd] done - %0.6f", now() - $t0); + requeue($self) unless $self->{wbuf}; } }; - $self->{long_res}->(); # kick off! + $self->write($long_cb); # kick off! undef; } @@ -765,7 +767,7 @@ sub hdr_searchmsg ($$$$) { $tmp .= $s->{num} . ' ' . $s->$field . "\r\n"; } utf8::encode($tmp); - do_more($self, $tmp); + msg_more($self, $tmp); $cur = $msgs->[-1]->{num} + 1; }); } @@ -904,6 +906,19 @@ sub cmd_xover ($;$) { }); } +sub cmd_starttls ($) { + my ($self) = @_; + my $sock = $self->{sock} or return; + # RFC 4642 2.2.1 + (ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable'; + my $opt = $self->{nntpd}->{accept_tls} or + return '580 can not initiate TLS negotiation'; + res($self, '382 Continue with TLS negotiation'); + $self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt); + requeue($self) if PublicInbox::DS::accept_tls_step($self); + undef; +} + sub cmd_xpath ($$) { my ($self, $mid) = @_; return r501 unless $mid =~ /\A<(.+)>\z/; @@ -917,24 +932,14 @@ sub cmd_xpath ($$) { '223 '.join(' ', @paths); } -sub res ($$) { - my ($self, $line) = @_; - do_write($self, $line . "\r\n"); -} +sub res ($$) { do_write($_[0], $_[1] . "\r\n") } -sub more ($$) { - my ($self, $line) = @_; - do_more($self, $line . "\r\n"); -} +sub more ($$) { msg_more($_[0], $_[1] . "\r\n") } sub do_write ($$) { - my ($self, $data) = @_; - my $done = $self->write($data); - return 0 if $self->{closed}; - - # Do not watch for readability if we have data in the queue, - # instead re-enable watching for readability when we can - $self->watch_read(0) if (!$done || $self->{long_res}); + my $self = $_[0]; + my $done = $self->write(\($_[1])); + return 0 unless $self->{sock}; $done; } @@ -949,88 +954,53 @@ sub out ($$;@) { printf { $self->{nntpd}->{out} } $fmt."\n", @args; } -use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; - -sub do_more ($$) { - my ($self, $data) = @_; - if (MSG_MORE && !scalar(@{$self->{wbuf}})) { - my $n = send($self->{sock}, $data, MSG_MORE); - if (defined $n) { - my $dlen = length($data); - return 1 if $n == $dlen; # all done! - $data = substr($data, $n, $dlen - $n); - } - } - do_write($self, $data); -} - +# callback used by PublicInbox::DS for any (e)poll (in/out/hup/err) sub event_step { my ($self) = @_; - return if $self->{closed}; - my $wbuf = $self->{wbuf}; - if (@$wbuf) { - update_idle_time($self); - $self->write(undef); - return if $self->{closed} || scalar(@$wbuf); - } - return if $self->{long_res}; + return unless $self->flush_write && $self->{sock}; + + update_idle_time($self); # only read more requests if we've drained the write buffer, # otherwise we can be buffering infinitely w/o backpressure use constant LINE_MAX => 512; # RFC 977 section 2.3 - my $rbuf = \($self->{rbuf}); - my $r; + my $rbuf = $self->{rbuf} // (\(my $x = '')); + my $r = 1; if (index($$rbuf, "\n") < 0) { - my $off = length($$rbuf); - $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off); - unless (defined $r) { - return if $!{EAGAIN}; - return $self->close; - } - return $self->close if $r == 0; + my $off = bytes::length($$rbuf); + $r = $self->do_read($rbuf, LINE_MAX, $off) or return; } - $r = 1; while ($r > 0 && $$rbuf =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) { my $line = $1; return $self->close if $line =~ /[[:cntrl:]]/s; my $t0 = now(); my $fd = fileno($self->{sock}); $r = eval { process_line($self, $line) }; - my $d = $self->{long_res} ? - " deferred[$fd]" : ''; - out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0); + my $pending = $self->{wbuf} ? ' pending' : ''; + out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0); } return $self->close if $r < 0; - my $len = length($$rbuf); + my $len = bytes::length($$rbuf); return $self->close if ($len >= LINE_MAX); - update_idle_time($self); -} - -sub check_read { - my ($self) = @_; - if (index($self->{rbuf}, "\n") >= 0) { - # Force another read if there is a pipelined request. - # We don't know if the socket has anything for us to read, - # and we must double-check again by the time the timer fires - # in case we really did dispatch a read event and started - # another long response. - push @$nextq, $self; - $nextt ||= PublicInbox::EvCleanup::asap(*next_tick); + if ($len) { + $self->{rbuf} = $rbuf; } else { - # no pipelined requests available, let the kernel know - # to wake us up if there's more - $self->watch_read(1); # PublicInbox::DS::watch_read + delete $self->{rbuf}; } + update_idle_time($self); + + # maybe there's more pipelined data, or we'll have + # to register it for socket-readiness notifications + requeue($self) unless $self->{wbuf}; } sub not_idle_long ($$) { my ($self, $now) = @_; - my $sock = $self->{sock} or return; - defined(my $fd = fileno($sock)) or return; - my $ary = $EXPMAP->{$fd} or return; + my $sock = $self->{sock} or return; + my $ary = $EXPMAP->{fileno($sock)} or return; my $exp_at = $ary->[0] + $EXPTIME; $exp_at > $now; } @@ -1038,8 +1008,7 @@ sub not_idle_long ($$) { # for graceful shutdown in PublicInbox::Daemon: sub busy { my ($self, $now) = @_; - ($self->{rbuf} ne '' || $self->{long_res} || - scalar(@{$self->{wbuf}}) || not_idle_long($self, $now)); + ($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now)); } 1; diff --git a/lib/PublicInbox/NNTPD.pm b/lib/PublicInbox/NNTPD.pm index 32848d7c..4f30c5d9 100644 --- a/lib/PublicInbox/NNTPD.pm +++ b/lib/PublicInbox/NNTPD.pm @@ -25,6 +25,8 @@ sub new { out => \*STDOUT, grouplist => [], servername => $name, + greet => \"201 $name ready - post via email\r\n", + # accept_tls => { SSL_server => 1, ..., SSL_reuse_ctx => ... } }, $class; } diff --git a/lib/PublicInbox/ParentPipe.pm b/lib/PublicInbox/ParentPipe.pm index a9f05fc1..ccc0815e 100644 --- a/lib/PublicInbox/ParentPipe.pm +++ b/lib/PublicInbox/ParentPipe.pm @@ -10,9 +10,8 @@ use fields qw(cb); sub new ($$$) { my ($class, $pipe, $cb) = @_; my $self = fields::new($class); - $self->SUPER::new($pipe); + $self->SUPER::new($pipe, PublicInbox::DS::EPOLLIN()); $self->{cb} = $cb; - $self->watch_read(1); $self; } diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 9aede103..f2630a0f 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -29,6 +29,9 @@ use warnings; use PublicInbox::Spawn qw(popen_rd); require Plack::Util; +# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers +use Errno qw(EAGAIN EINTR); + my $def_limiter; # declares a command to spawn (but does not spawn it). @@ -122,7 +125,7 @@ sub psgi_qx { eval { $qx_cb->($qx) }; $qx = undef; }; - my $rpipe; + my $rpipe; # comes from popen_rd my $async = $env->{'pi-httpd.async'}; my $cb = sub { my $r = sysread($rpipe, my $buf, 8192); @@ -131,13 +134,13 @@ sub psgi_qx { } elsif (defined $r) { $r ? $qx->write($buf) : $end->(); } else { - return if $!{EAGAIN} || $!{EINTR}; # loop again + return if $! == EAGAIN || $! == EINTR; # loop again $end->(); } }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); $self->start($limiter, sub { # may run later, much later... - ($rpipe) = @_; + ($rpipe) = @_; # popen_rd result if ($async) { # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) $async = $async->($rpipe, $cb, $end); @@ -193,7 +196,7 @@ sub psgi_return { my $buf = ''; my $rd_hdr = sub { my $r = sysread($rpipe, $buf, 1024, length($buf)); - return if !defined($r) && ($!{EINTR} || $!{EAGAIN}); + return if !defined($r) && $! == EAGAIN || $! == EINTR; $parse_hdr->($r, \$buf); }; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 66b916df..9161bb5b 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -229,8 +229,6 @@ sub popen_rd { my ($cmd, $env, $opts) = @_; pipe(my ($r, $w)) or die "pipe: $!\n"; $opts ||= {}; - my $blocking = $opts->{Blocking}; - IO::Handle::blocking($r, $blocking) if defined $blocking; $opts->{1} = fileno($w); my $pid = spawn($cmd, $env, $opts); return unless defined $pid; diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm index 4ef64cc3..500efa67 100644 --- a/lib/PublicInbox/Syscall.pm +++ b/lib/PublicInbox/Syscall.pm @@ -22,22 +22,24 @@ use vars qw(@ISA @EXPORT_OK %EXPORT_TAGS $VERSION); $VERSION = "0.25"; @ISA = qw(Exporter); @EXPORT_OK = qw(sendfile epoll_ctl epoll_create epoll_wait - EPOLLIN EPOLLOUT EPOLLERR EPOLLHUP EPOLLRDBAND + EPOLLIN EPOLLOUT EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD - EPOLLEXCLUSIVE); + EPOLLONESHOT EPOLLEXCLUSIVE); %EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait - EPOLLIN EPOLLOUT EPOLLERR EPOLLHUP EPOLLRDBAND + EPOLLIN EPOLLOUT EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD - EPOLLEXCLUSIVE)], + EPOLLONESHOT EPOLLEXCLUSIVE)], sendfile => [qw(sendfile)], ); use constant EPOLLIN => 1; use constant EPOLLOUT => 4; -use constant EPOLLERR => 8; -use constant EPOLLHUP => 16; -use constant EPOLLRDBAND => 128; +# use constant EPOLLERR => 8; +# use constant EPOLLHUP => 16; +# use constant EPOLLRDBAND => 128; use constant EPOLLEXCLUSIVE => (1 << 28); +use constant EPOLLONESHOT => (1 << 30); +# use constant EPOLLET => (1 << 31); use constant EPOLL_CTL_ADD => 1; use constant EPOLL_CTL_DEL => 2; use constant EPOLL_CTL_MOD => 3; @@ -57,7 +59,6 @@ sub _load_syscall { return $rv; } -our ($sysname, $nodename, $release, $version, $machine) = POSIX::uname(); our ( $SYS_epoll_create, @@ -69,6 +70,7 @@ our ( our $no_deprecated = 0; if ($^O eq "linux") { + my $machine = (POSIX::uname())[-1]; # whether the machine requires 64-bit numbers to be on 8-byte # boundaries. my $u64_mod_8 = 0; @@ -246,13 +248,8 @@ sub sendfile_freebsd { sub epoll_defined { return $SYS_epoll_create ? 1 : 0; } -# ARGS: (size) -- but in modern Linux 2.6, the -# size doesn't even matter (radix tree now, not hash) sub epoll_create { - return -1 unless defined $SYS_epoll_create; - my $epfd = eval { syscall($SYS_epoll_create, $no_deprecated ? 0 : ($_[0]||100)+0) }; - return -1 if $@; - return $epfd; + syscall($SYS_epoll_create, $no_deprecated ? 0 : ($_[0]||100)+0); } # epoll_ctl wrapper diff --git a/lib/PublicInbox/TLS.pm b/lib/PublicInbox/TLS.pm new file mode 100644 index 00000000..576c11d7 --- /dev/null +++ b/lib/PublicInbox/TLS.pm @@ -0,0 +1,24 @@ +# Copyright (C) 2019 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# IO::Socket::SSL support code +package PublicInbox::TLS; +use strict; +use IO::Socket::SSL; +require Carp; +use Errno qw(EAGAIN); +use PublicInbox::Syscall qw(EPOLLIN EPOLLOUT); + +sub err () { $SSL_ERROR } + +# returns the EPOLL event bit which matches the existing SSL error +sub epollbit () { + if ($! == EAGAIN) { + return EPOLLIN if $SSL_ERROR == SSL_WANT_READ; + return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE; + die "unexpected SSL error: $SSL_ERROR"; + } + 0; +} + +1; diff --git a/script/public-inbox-nntpd b/script/public-inbox-nntpd index 484ce8d6..55bf330e 100755 --- a/script/public-inbox-nntpd +++ b/script/public-inbox-nntpd @@ -11,4 +11,5 @@ require PublicInbox::NNTPD; my $nntpd = PublicInbox::NNTPD->new; PublicInbox::Daemon::run('0.0.0.0:119', sub { $nntpd->refresh_groups }, # refresh - sub ($$$) { PublicInbox::NNTP->new($_[0], $nntpd) }); # post_accept + sub ($$$) { PublicInbox::NNTP->new($_[0], $nntpd) }, # post_accept + $nntpd); diff --git a/t/ds-poll.t b/t/ds-poll.t new file mode 100644 index 00000000..a397ee06 --- /dev/null +++ b/t/ds-poll.t @@ -0,0 +1,58 @@ +# Copyright (C) 2019 all contributors <meta@public-inbox.org> +# Licensed the same as Danga::Socket (and Perl5) +# License: GPL-1.0+ or Artistic-1.0-Perl +# <https://www.gnu.org/licenses/gpl-1.0.txt> +# <https://dev.perl.org/licenses/artistic.html> +use strict; +use warnings; +use Test::More; +use PublicInbox::Syscall qw(:epoll); +my $cls = 'PublicInbox::DSPoll'; +use_ok $cls; +my $p = $cls->new; + +my ($r, $w, $x, $y); +pipe($r, $w) or die; +pipe($x, $y) or die; +is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($r), EPOLLIN), 0, 'add EPOLLIN'); +my $events = []; +my $n = epoll_wait($p, 9, 0, $events); +is_deeply($events, [], 'no events set'); +is($n, 0, 'nothing ready, yet'); +is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($w), EPOLLOUT|EPOLLONESHOT), 0, + 'add EPOLLOUT|EPOLLONESHOT'); +$n = epoll_wait($p, 9, -1, $events); +is($n, 1, 'got POLLOUT event'); +is($events->[0]->[0], fileno($w), '$w ready'); + +$n = epoll_wait($p, 9, 0, $events); +is($n, 0, 'nothing ready after oneshot'); +is_deeply($events, [], 'no events set after oneshot'); + +syswrite($w, '1') == 1 or die; +for my $t (0..1) { + $n = epoll_wait($p, 9, $t, $events); + is($events->[0]->[0], fileno($r), "level-trigger POLLIN ready #$t"); + is($n, 1, "only event ready #$t"); +} +syswrite($y, '1') == 1 or die; +is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($x), EPOLLIN|EPOLLONESHOT), 0, + 'EPOLLIN|EPOLLONESHOT add'); +is(epoll_wait($p, 9, -1, $events), 2, 'epoll_wait has 2 ready'); +my @fds = sort(map { $_->[0] } @$events); +my @exp = sort((fileno($r), fileno($x))); +is_deeply(\@fds, \@exp, 'got both ready FDs'); + +# EPOLL_CTL_DEL doesn't matter for kqueue, we do it in native epoll +# to avoid a kernel-wide lock; but its not needed for native kqueue +# paths so DSKQXS makes it a noop (as did Danga::Socket::close). +SKIP: { + if ($cls ne 'PublicInbox::DSPoll') { + skip "$cls doesn't handle EPOLL_CTL_DEL", 2; + } + is(epoll_ctl($p, EPOLL_CTL_DEL, fileno($r), 0), 0, 'EPOLL_CTL_DEL OK'); + $n = epoll_wait($p, 9, 0, $events); + is($n, 0, 'nothing ready after EPOLL_CTL_DEL'); +}; + +done_testing; diff --git a/t/httpd-corner.t b/t/httpd-corner.t index c1dc77db..1cfc2565 100644 --- a/t/httpd-corner.t +++ b/t/httpd-corner.t @@ -18,7 +18,7 @@ use File::Temp qw/tempdir/; use IO::Socket; use IO::Socket::UNIX; use Fcntl qw(:seek); -use Socket qw(IPPROTO_TCP TCP_NODELAY); +use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET); use POSIX qw(mkfifo); require './t/common.perl'; my $tmpdir = tempdir('httpd-corner-XXXXXX', TMPDIR => 1, CLEANUP => 1); @@ -36,6 +36,25 @@ my %opts = ( Listen => 1024, ); my $sock = IO::Socket::INET->new(%opts); + +# Make sure we don't clobber socket options set by systemd or similar +# using socket activation: +my ($defer_accept_val, $accf_arg); +if ($^O eq 'linux') { + setsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 5) or die; + my $x = getsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT()); + defined $x or die "getsockopt: $!"; + $defer_accept_val = unpack('i', $x); + if ($defer_accept_val <= 0) { + die "unexpected TCP_DEFER_ACCEPT value: $defer_accept_val"; + } +} elsif ($^O eq 'freebsd' && system('kldstat -m accf_data >/dev/null') == 0) { + require PublicInbox::Daemon; + my $var = PublicInbox::Daemon::SO_ACCEPTFILTER(); + $accf_arg = pack('a16a240', 'dataready', ''); + setsockopt($sock, SOL_SOCKET, $var, $accf_arg) or die "setsockopt: $!"; +} + my $upath = "$tmpdir/s"; my $unix = IO::Socket::UNIX->new( Listen => 1024, @@ -89,7 +108,7 @@ my $spawn_httpd = sub { is(scalar(grep(/CLOSE FAIL/, @$after)), 1, 'body->close not called'); } -{ +SKIP: { my $conn = conn_for($sock, 'excessive header'); $SIG{PIPE} = 'IGNORE'; $conn->write("GET /callback HTTP/1.0\r\n"); @@ -497,6 +516,21 @@ SKIP: { is($body, sha1_hex(''), 'read expected body #2'); } +SKIP: { + skip 'TCP_DEFER_ACCEPT is Linux-only', 1 if $^O ne 'linux'; + my $var = Socket::TCP_DEFER_ACCEPT(); + defined(my $x = getsockopt($sock, IPPROTO_TCP, $var)) or die; + is(unpack('i', $x), $defer_accept_val, + 'TCP_DEFER_ACCEPT unchanged if previously set'); +}; +SKIP: { + skip 'SO_ACCEPTFILTER is FreeBSD-only', 1 if $^O ne 'freebsd'; + skip 'accf_data not loaded: kldload accf_data' if !defined $accf_arg; + my $var = PublicInbox::Daemon::SO_ACCEPTFILTER(); + defined(my $x = getsockopt($sock, SOL_SOCKET, $var)) or die; + is($x, $accf_arg, 'SO_ACCEPTFILTER unchanged if previously set'); +}; + done_testing(); sub capture { @@ -10,6 +10,7 @@ foreach my $mod (qw(Plack::Util Plack::Builder HTTP::Date HTTP::Status)) { } use File::Temp qw/tempdir/; use IO::Socket::INET; +use Socket qw(IPPROTO_TCP); require './t/common.perl'; # FIXME: too much setup @@ -99,6 +100,23 @@ EOF 'fsck on cloned directory successful'); } +SKIP: { + skip 'TCP_DEFER_ACCEPT is Linux-only', 1 if $^O ne 'linux'; + my $var = Socket::TCP_DEFER_ACCEPT(); + defined(my $x = getsockopt($sock, IPPROTO_TCP, $var)) or die; + ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set'); +}; +SKIP: { + skip 'SO_ACCEPTFILTER is FreeBSD-only', 1 if $^O ne 'freebsd'; + if (system('kldstat -m accf_http >/dev/null') != 0) { + skip 'accf_http not loaded: kldload accf_http', 1; + } + require PublicInbox::Daemon; + my $var = PublicInbox::Daemon::SO_ACCEPTFILTER(); + my $x = getsockopt($sock, SOL_SOCKET, $var); + like($x, qr/\Ahttpready\0+\z/, 'got httpready accf for HTTP'); +}; + done_testing(); 1; diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t new file mode 100644 index 00000000..427d370f --- /dev/null +++ b/t/nntpd-tls.t @@ -0,0 +1,224 @@ +# Copyright (C) 2019 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; +use warnings; +use Test::More; +use File::Temp qw(tempdir); +use Socket qw(SOCK_STREAM IPPROTO_TCP SOL_SOCKET); +# IO::Poll and Net::NNTP are part of the standard library, but +# distros may split them off... +foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP IO::Poll)) { + eval "require $mod"; + plan skip_all => "$mod missing for $0" if $@; +} +my $cert = 'certs/server-cert.pem'; +my $key = 'certs/server-key.pem'; +unless (-r $key && -r $cert) { + plan skip_all => + "certs/ missing for $0, run ./create-certs.perl in certs/"; +} + +use_ok 'PublicInbox::TLS'; +use_ok 'IO::Socket::SSL'; +require './t/common.perl'; +require PublicInbox::InboxWritable; +require PublicInbox::MIME; +require PublicInbox::SearchIdx; +my $version = 2; # v2 needs newer git +require_git('2.6') if $version >= 2; +my $tmpdir = tempdir('pi-nntpd-tls-XXXXXX', TMPDIR => 1, CLEANUP => 1); +my $err = "$tmpdir/stderr.log"; +my $out = "$tmpdir/stdout.log"; +my $mainrepo = "$tmpdir"; +my $pi_config = "$tmpdir/pi_config"; +my $group = 'test-nntpd-tls'; +my $addr = $group . '@example.com'; +my $nntpd = 'blib/script/public-inbox-nntpd'; +my %opts = ( + LocalAddr => '127.0.0.1', + ReuseAddr => 1, + Proto => 'tcp', + Type => SOCK_STREAM, + Listen => 1024, +); +my $starttls = IO::Socket::INET->new(%opts); +my $nntps = IO::Socket::INET->new(%opts); +my ($pid, $tail_pid); +END { + foreach ($pid, $tail_pid) { + kill 'TERM', $_ if defined $_; + } +}; + +my $ibx = PublicInbox::Inbox->new({ + mainrepo => $mainrepo, + name => 'nntpd-tls', + version => $version, + -primary_address => $addr, + indexlevel => 'basic', +}); +$ibx = PublicInbox::InboxWritable->new($ibx, {nproc=>1}); +$ibx->init_inbox(0); +{ + open my $fh, '>', $pi_config or die "open: $!\n"; + print $fh <<EOF +[publicinbox "nntpd-tls"] + mainrepo = $mainrepo + address = $addr + indexlevel = basic + newsgroup = $group +EOF + ; + close $fh or die "close: $!\n"; +} + +{ + my $im = $ibx->importer(0); + my $mime = PublicInbox::MIME->new(do { + open my $fh, '<', 't/data/0001.patch' or die; + local $/; + <$fh> + }); + ok($im->add($mime), 'message added'); + $im->done; + if ($version == 1) { + my $s = PublicInbox::SearchIdx->new($ibx, 1); + $s->index_sync; + } +} + +my $nntps_addr = $nntps->sockhost . ':' . $nntps->sockport; +my $starttls_addr = $starttls->sockhost . ':' . $starttls->sockport; +my $env = { PI_CONFIG => $pi_config }; + +for my $args ( + [ "--cert=$cert", "--key=$key", + "-lnntps://$nntps_addr", + "-lnntp://$starttls_addr" ], +) { + for ($out, $err) { + open my $fh, '>', $_ or die "truncate: $!"; + } + if (my $tail_cmd = $ENV{TAIL}) { # don't assume GNU tail + $tail_pid = fork; + if (defined $tail_pid && $tail_pid == 0) { + exec(split(' ', $tail_cmd), $out, $err); + } + } + my $cmd = [ $nntpd, '-W0', @$args, "--stdout=$out", "--stderr=$err" ]; + $pid = spawn_listener($env, $cmd, [ $starttls, $nntps ]); + my %o = ( + SSL_hostname => 'server.local', + SSL_verifycn_name => 'server.local', + SSL_verify_mode => SSL_VERIFY_PEER(), + SSL_ca_file => 'certs/test-ca.pem', + ); + my $expect = { $group => [qw(1 1 n)] }; + + # start negotiating a slow TLS connection + my $slow = IO::Socket::INET->new( + Proto => 'tcp', + PeerAddr => $nntps_addr, + Type => SOCK_STREAM, + Blocking => 0, + ); + $slow = IO::Socket::SSL->start_SSL($slow, SSL_startHandshake => 0, %o); + my $slow_done = $slow->connect_SSL; + diag('W: connect_SSL early OK, slow client test invalid') if $slow_done; + my @poll = (fileno($slow), PublicInbox::TLS::epollbit()); + # we should call connect_SSL much later... + + # NNTPS + my $c = Net::NNTP->new($nntps_addr, %o, SSL => 1); + my $list = $c->list; + is_deeply($list, $expect, 'NNTPS LIST works'); + is($c->command('QUIT')->response(), Net::Cmd::CMD_OK(), 'QUIT works'); + is(0, sysread($c, my $buf, 1), 'got EOF after QUIT'); + + # STARTTLS + $c = Net::NNTP->new($starttls_addr, %o); + $list = $c->list; + is_deeply($list, $expect, 'plain LIST works'); + ok($c->starttls, 'STARTTLS succeeds'); + is($c->code, 382, 'got 382 for STARTTLS'); + $list = $c->list; + is_deeply($list, $expect, 'LIST works after STARTTLS'); + + # Net::NNTP won't let us do dumb things, but we need to test + # dumb things, so use Net::Cmd directly: + my $n = $c->command('STARTTLS')->response(); + is($n, Net::Cmd::CMD_ERROR(), 'error attempting STARTTLS again'); + is($c->code, 502, '502 according to RFC 4642 sec#2.2.1'); + + # STARTTLS with bad hostname + $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.invalid'; + $c = Net::NNTP->new($starttls_addr, %o); + $list = $c->list; + is_deeply($list, $expect, 'plain LIST works again'); + ok(!$c->starttls, 'STARTTLS fails with bad hostname'); + $c = Net::NNTP->new($starttls_addr, %o); + $list = $c->list; + is_deeply($list, $expect, 'not broken after bad negotiation'); + + # NNTPS with bad hostname + $c = Net::NNTP->new($nntps_addr, %o, SSL => 1); + is($c, undef, 'NNTPS fails with bad hostname'); + $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.local'; + $c = Net::NNTP->new($nntps_addr, %o, SSL => 1); + ok($c, 'NNTPS succeeds again with valid hostname'); + + # slow TLS connection did not block the other fast clients while + # connecting, finish it off: + until ($slow_done) { + IO::Poll::_poll(-1, @poll); + $slow_done = $slow->connect_SSL and last; + @poll = (fileno($slow), PublicInbox::TLS::epollbit()); + } + $slow->blocking(1); + ok(sysread($slow, my $greet, 4096) > 0, 'slow got greeting'); + like($greet, qr/\A201 /, 'got expected greeting'); + is(syswrite($slow, "QUIT\r\n"), 6, 'slow wrote QUIT'); + ok(sysread($slow, my $end, 4096) > 0, 'got EOF'); + is(sysread($slow, my $eof, 4096), 0, 'got EOF'); + $slow = undef; + + SKIP: { + skip 'TCP_DEFER_ACCEPT is Linux-only', 2 if $^O ne 'linux'; + my $var = Socket::TCP_DEFER_ACCEPT(); + defined(my $x = getsockopt($nntps, IPPROTO_TCP, $var)) or die; + ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set on NNTPS'); + defined($x = getsockopt($starttls, IPPROTO_TCP, $var)) or die; + is(unpack('i', $x), 0, 'TCP_DEFER_ACCEPT is 0 on plain NNTP'); + }; + SKIP: { + skip 'SO_ACCEPTFILTER is FreeBSD-only', 2 if $^O ne 'freebsd'; + if (system('kldstat -m accf_data >/dev/null')) { + skip 'accf_data not loaded? kldload accf_data', 2; + } + require PublicInbox::Daemon; + my $var = PublicInbox::Daemon::SO_ACCEPTFILTER(); + my $x = getsockopt($nntps, SOL_SOCKET, $var); + like($x, qr/\Adataready\0+\z/, 'got dataready accf for NNTPS'); + $x = getsockopt($starttls, IPPROTO_TCP, $var); + is($x, undef, 'no BSD accept filter for plain NNTP'); + }; + + $c = undef; + kill('TERM', $pid); + is($pid, waitpid($pid, 0), 'nntpd exited successfully'); + is($?, 0, 'no error in exited process'); + $pid = undef; + my $eout = eval { + open my $fh, '<', $err or die "open $err failed: $!"; + local $/; + <$fh>; + }; + unlike($eout, qr/wide/i, 'no Wide character warnings'); + if (defined $tail_pid) { + kill 'TERM', $tail_pid; + waitpid($tail_pid, 0); + $tail_pid = undef; + } +} +done_testing(); +1; @@ -106,6 +106,8 @@ EOF is_deeply($list, { $group => [ qw(1 1 n) ] }, 'LIST works'); is_deeply([$n->group($group)], [ qw(0 1 1), $group ], 'GROUP works'); is_deeply($n->listgroup($group), [1], 'listgroup OK'); + ok(!$n->starttls, 'STARTTLS fails when unconfigured'); + is($n->code, 580, 'got 580 code on server w/o TLS'); %opts = ( PeerAddr => $host_port, @@ -81,17 +81,6 @@ use PublicInbox::Spawn qw(which spawn popen_rd); isnt($?, 0, '$? set properly: '.$?); } -{ - my ($fh, $pid) = popen_rd([qw(sleep 60)], undef, { Blocking => 0 }); - ok(defined $pid && $pid > 0, 'returned pid when array requested'); - is(kill(0, $pid), 1, 'child process is running'); - ok(!defined(sysread($fh, my $buf, 1)) && $!{EAGAIN}, - 'sysread returned quickly with EAGAIN'); - is(kill(9, $pid), 1, 'child process killed early'); - is(waitpid($pid, 0), $pid, 'child process reapable'); - isnt($?, 0, '$? set properly: '.$?); -} - SKIP: { eval { require BSD::Resource; |