user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
* [PATCH 00/11] ds: more updates
@ 2019-06-29 19:59 Eric Wong
  2019-06-29 19:59 ` [PATCH 01/11] ds: share lazy rbuf handling between HTTP and NNTP Eric Wong
                   ` (10 more replies)
  0 siblings, 11 replies; 12+ messages in thread
From: Eric Wong @ 2019-06-29 19:59 UTC (permalink / raw)
  To: meta

We can simplify a lot of our async logic now that we don't have
to deal with the buffer-to-heap behavior of Danga::Socket.

The biggest change is now we no longer tie git-http-backend(1)
runtime and memory use to the bandwidth of a slow HTTP client.
This increases buffering on the FS (which may be tmpfs or
a fast SSD); but it's what nginx (and varnish) would be doing,
anyways

We can further remove a lot of the EvCleanup code since that
was to workaround deferred close being deferred for too long
when no I/O events were firing.

HTTPS now works, but more work needs to be done because
Varnish is still a requirement for busy sites.

Eric Wong (11):
  ds: share lazy rbuf handling between HTTP and NNTP
  ds: move requeue logic over from NNTP
  http: use requeue instead of watch_in1
  listener: use edge-triggered notifications
  ds: handle deferred DS->close after timers
  ds: consolidate IO::Socket::SSL checks
  http: support HTTPS (kinda)
  parentpipe: document and use one-shot wakeups
  parentpipe: make the ->close call more obvious
  httpd/async: switch to buffering-as-fast-as-possible
  http: use bigger, but shorter-lived buffers for pipes

 MANIFEST                       |   1 +
 lib/PublicInbox/DS.pm          |  85 +++++++++++++++----------
 lib/PublicInbox/DSKQXS.pm      |   4 +-
 lib/PublicInbox/Daemon.pm      |   4 +-
 lib/PublicInbox/EvCleanup.pm   |  80 +++--------------------
 lib/PublicInbox/HTTP.pm        | 102 +++++++++++++++--------------
 lib/PublicInbox/HTTPD/Async.pm |  55 ++++++++--------
 lib/PublicInbox/Listener.pm    |   7 +-
 lib/PublicInbox/NNTP.pm        |  47 +++-----------
 lib/PublicInbox/ParentPipe.pm  |  17 +++--
 lib/PublicInbox/Qspawn.pm      |   2 +-
 lib/PublicInbox/Syscall.pm     |   4 +-
 lib/PublicInbox/TLS.pm         |   9 +--
 t/httpd-https.t                | 141 +++++++++++++++++++++++++++++++++++++++++
 14 files changed, 314 insertions(+), 244 deletions(-)
 create mode 100644 t/httpd-https.t

-- 
EW


^ permalink raw reply	[flat|nested] 12+ messages in thread

* [PATCH 01/11] ds: share lazy rbuf handling between HTTP and NNTP
  2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
@ 2019-06-29 19:59 ` Eric Wong
  2019-06-29 19:59 ` [PATCH 02/11] ds: move requeue logic over from NNTP Eric Wong
                   ` (9 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2019-06-29 19:59 UTC (permalink / raw)
  To: meta

Doing this for HTTP cuts the memory usage of 10K
idle-after-one-request HTTP clients from 92 MB to 47 MB.

The savings over the equivalent NNTP change in commit
6f173864f5acac89769a67739b8c377510711d49,
("nntp: lazily allocate and stash rbuf") seems down to the
size of HTTP requests and the fact HTTP is a client-sends-first
protocol where as NNTP is server-sends-first.
---
 lib/PublicInbox/DS.pm   | 16 ++++++++--
 lib/PublicInbox/HTTP.pm | 79 ++++++++++++++++++++++++-------------------------
 lib/PublicInbox/NNTP.pm |  8 ++---
 3 files changed, 55 insertions(+), 48 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index a8700bc5..28240843 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -28,6 +28,7 @@ use 5.010_001;
 use PublicInbox::Syscall qw(:epoll);
 
 use fields ('sock',              # underlying socket
+            'rbuf',              # scalarref, usually undef
             'wbuf',              # arrayref of coderefs or GLOB refs
             'wbuf_off',  # offset into first element of wbuf to start writing at
             );
@@ -412,16 +413,27 @@ next_buf:
     1; # all done
 }
 
-sub do_read ($$$$) {
+sub rbuf_idle ($$) {
+    my ($self, $rbuf) = @_;
+    if ($$rbuf eq '') { # who knows how long till we can read again
+        delete $self->{rbuf};
+    } else {
+        $self->{rbuf} = $rbuf;
+    }
+}
+
+sub do_read ($$$;$) {
     my ($self, $rbuf, $len, $off) = @_;
-    my $r = sysread($self->{sock}, $$rbuf, $len, $off);
+    my $r = sysread($self->{sock}, $$rbuf, $len, $off // 0);
     return ($r == 0 ? $self->close : $r) if defined $r;
     # common for clients to break connections without warning,
     # would be too noisy to log here:
     if (ref($self) eq 'IO::Socket::SSL') {
         my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+        rbuf_idle($self, $rbuf);
         watch($self, $ev | EPOLLONESHOT);
     } elsif ($! == EAGAIN) {
+        rbuf_idle($self, $rbuf);
         watch($self, EPOLLIN | EPOLLONESHOT);
     } else {
         $self->close;
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index a1cb4aca..1153ef98 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -11,7 +11,7 @@ package PublicInbox::HTTP;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
-use fields qw(httpd env rbuf input_left remote_addr remote_port forward pull);
+use fields qw(httpd env input_left remote_addr remote_port forward pull);
 use bytes (); # only for bytes::length
 use Fcntl qw(:seek);
 use Plack::HTTPParser qw(parse_http_request); # XS or pure Perl
@@ -60,7 +60,6 @@ sub new ($$$) {
 	my $self = fields::new($class);
 	$self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
 	$self->{httpd} = $httpd;
-	$self->{rbuf} = '';
 	($self->{remote_addr}, $self->{remote_port}) =
 		PublicInbox::Daemon::host_with_port($addr);
 	$self;
@@ -75,31 +74,34 @@ sub event_step { # called by PublicInbox::DS
 	# otherwise we can be buffering infinitely w/o backpressure
 
 	return read_input($self) if defined $self->{env};
-	my $rbuf = \($self->{rbuf});
-	my $off = bytes::length($$rbuf);
-	$self->do_read($rbuf, 8192, $off) and rbuf_process($self);
+	my $rbuf = $self->{rbuf} // (\(my $x = ''));
+	$self->do_read($rbuf, 8192, bytes::length($$rbuf)) or return;
+	rbuf_process($self, $rbuf);
 }
 
 sub rbuf_process {
-	my ($self) = @_;
+	my ($self, $rbuf) = @_;
+	$rbuf //= $self->{rbuf} // (\(my $x = ''));
 
 	my %env = %{$self->{httpd}->{env}}; # full hash copy
-	my $r = parse_http_request($self->{rbuf}, \%env);
+	my $r = parse_http_request($$rbuf, \%env);
 
 	# We do not support Trailers in chunked requests, for now
 	# (they are rarely-used and git (as of 2.7.2) does not use them)
 	if ($r == -1 || $env{HTTP_TRAILER} ||
 			# this length-check is necessary for PURE_PERL=1:
-			($r == -2 && bytes::length($self->{rbuf}) > 0x4000)) {
+			($r == -2 && bytes::length($$rbuf) > 0x4000)) {
 		return quit($self, 400);
 	}
-	return $self->watch_in1 if $r < 0; # incomplete
-	$self->{rbuf} = substr($self->{rbuf}, $r);
-
+	if ($r < 0) { # incomplete
+		$self->rbuf_idle($rbuf);
+		return $self->watch_in1;
+	}
+	$$rbuf = substr($$rbuf, $r);
 	my $len = input_prepare($self, \%env);
 	defined $len or return write_err($self, undef); # EMFILE/ENFILE
 
-	$len ? read_input($self) : app_dispatch($self);
+	$len ? read_input($self, $rbuf) : app_dispatch($self, undef, $rbuf);
 }
 
 # IO::Handle::write returns boolean, this returns bytes written:
@@ -111,16 +113,15 @@ sub xwrite ($$$) {
 	$w;
 }
 
-sub read_input ($) {
-	my ($self) = @_;
+sub read_input ($;$) {
+	my ($self, $rbuf) = @_;
+	$rbuf //= $self->{rbuf} // (\(my $x = ''));
 	my $env = $self->{env};
 	return if $env->{REMOTE_ADDR}; # in app dispatch
-	return read_input_chunked($self) if env_chunked($env);
+	return read_input_chunked($self, $rbuf) if env_chunked($env);
 
 	# env->{CONTENT_LENGTH} (identity)
-	my $sock = $self->{sock};
 	my $len = delete $self->{input_left};
-	my $rbuf = \($self->{rbuf});
 	my $input = $env->{'psgi.input'};
 
 	while ($len > 0) {
@@ -135,15 +136,15 @@ sub read_input ($) {
 			}
 			$$rbuf = '';
 		}
-		my $r = sysread($sock, $$rbuf, 8192);
-		return recv_err($self, $r, $len) unless $r;
+		$self->do_read($rbuf, 8192) or return recv_err($self, $len);
 		# continue looping if $r > 0;
 	}
-	app_dispatch($self, $input);
+	app_dispatch($self, $input, $rbuf);
 }
 
 sub app_dispatch {
-	my ($self, $input) = @_;
+	my ($self, $input, $rbuf) = @_;
+	$self->rbuf_idle($rbuf);
 	my $env = $self->{env};
 	$env->{REMOTE_ADDR} = $self->{remote_addr};
 	$env->{REMOTE_PORT} = $self->{remote_port};
@@ -235,11 +236,12 @@ sub identity_wcb ($) {
 
 sub next_request ($) {
 	my ($self) = @_;
-	if ($self->{rbuf} eq '') { # wait for next request
-		$self->watch_in1;
-	} else { # avoid recursion for pipelined requests
+	if ($self->{rbuf}) {
+		# avoid recursion for pipelined requests
 		push @$pipelineq, $self;
 		$pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
+	} else { # wait for next request
+		$self->watch_in1;
 	}
 }
 
@@ -360,27 +362,25 @@ sub write_err {
 }
 
 sub recv_err {
-	my ($self, $r, $len) = @_;
-	return $self->close if (defined $r && $r == 0);
-	if ($! == EAGAIN) {
+	my ($self, $len) = @_;
+	if ($! == EAGAIN) { # epoll/kevent watch already set by do_read
 		$self->{input_left} = $len;
-		return $self->watch_in1;
+	} else {
+		err($self, "error reading input: $! ($len bytes remaining)");
 	}
-	err($self, "error reading for input: $! ($len bytes remaining)");
-	quit($self, 500);
 }
 
 sub read_input_chunked { # unlikely...
-	my ($self) = @_;
+	my ($self, $rbuf) = @_;
+	$rbuf //= $self->{rbuf} // (\(my $x = ''));
 	my $input = $self->{env}->{'psgi.input'};
-	my $sock = $self->{sock};
 	my $len = delete $self->{input_left};
-	my $rbuf = \($self->{rbuf});
 
 	while (1) { # chunk start
 		if ($len == CHUNK_ZEND) {
 			$$rbuf =~ s/\A\r\n//s and
-				return app_dispatch($self, $input);
+				return app_dispatch($self, $input, $rbuf);
+
 			return quit($self, 400) if bytes::length($$rbuf) > 2;
 		}
 		if ($len == CHUNK_END) {
@@ -403,9 +403,8 @@ sub read_input_chunked { # unlikely...
 		}
 
 		if ($len < 0) { # chunk header is trickled, read more
-			my $off = bytes::length($$rbuf);
-			my $r = sysread($sock, $$rbuf, 8192, $off);
-			return recv_err($self, $r, $len) unless $r;
+			$self->do_read($rbuf, 8192, bytes::length($$rbuf)) or
+				return recv_err($self, $len);
 			# (implicit) goto chunk_start if $r > 0;
 		}
 		$len = CHUNK_ZEND if $len == 0;
@@ -429,8 +428,8 @@ sub read_input_chunked { # unlikely...
 			}
 			if ($$rbuf eq '') {
 				# read more of current chunk
-				my $r = sysread($sock, $$rbuf, 8192);
-				return recv_err($self, $r, $len) unless $r;
+				$self->do_read($rbuf, 8192) or
+					return recv_err($self, $len);
 			}
 		}
 	}
@@ -459,7 +458,7 @@ sub close {
 # for graceful shutdown in PublicInbox::Daemon:
 sub busy () {
 	my ($self) = @_;
-	($self->{rbuf} ne '' || $self->{env} || $self->{wbuf});
+	($self->{rbuf} || $self->{env} || $self->{wbuf});
 }
 
 # fires after pending writes are complete:
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 53e18281..0a053627 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -6,7 +6,7 @@ package PublicInbox::NNTP;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng);
+use fields qw(nntpd article ng);
 use PublicInbox::Search;
 use PublicInbox::Msgmap;
 use PublicInbox::MID qw(mid_escape);
@@ -985,11 +985,7 @@ sub event_step {
 	return $self->close if $r < 0;
 	my $len = bytes::length($$rbuf);
 	return $self->close if ($len >= LINE_MAX);
-	if ($len) {
-		$self->{rbuf} = $rbuf;
-	} else {
-		delete $self->{rbuf};
-	}
+	$self->rbuf_idle($rbuf);
 	update_idle_time($self);
 
 	# maybe there's more pipelined data, or we'll have
-- 
EW


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 02/11] ds: move requeue logic over from NNTP
  2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
  2019-06-29 19:59 ` [PATCH 01/11] ds: share lazy rbuf handling between HTTP and NNTP Eric Wong
@ 2019-06-29 19:59 ` Eric Wong
  2019-06-29 19:59 ` [PATCH 03/11] http: use requeue instead of watch_in1 Eric Wong
                   ` (8 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2019-06-29 19:59 UTC (permalink / raw)
  To: meta

We'll be reusing requeue in other places to reduce trips to
the kernel to retrieve "hot" descriptors.
---
 lib/PublicInbox/DS.pm   | 14 ++++++++++++++
 lib/PublicInbox/NNTP.pm | 22 ++++------------------
 2 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 28240843..9f245347 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -37,6 +37,8 @@ use Errno  qw(EAGAIN EINVAL EEXIST);
 use Carp   qw(croak confess carp);
 require File::Spec;
 
+my $nextt; # timer for next_tick
+my $nextq = []; # queue for next_tick
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
      $Epoll,                     # Global epoll fd (or DSKQXS ref)
@@ -594,6 +596,18 @@ sub shutdn ($) {
     }
 }
 
+sub next_tick () {
+	$nextt = undef;
+	my $q = $nextq;
+	$nextq = [];
+	$_->event_step for @$q;
+}
+
+sub requeue ($) {
+	push @$nextq, $_[0];
+	$nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+}
+
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 0a053627..83970309 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -38,20 +38,6 @@ my %DISABLED; # = map { $_ => 1 } qw(xover list_overview_fmt newnews xhdr);
 my $EXPMAP; # fd -> [ idle_time, $self ]
 my $expt;
 our $EXPTIME = 180; # 3 minutes
-my $nextt;
-
-my $nextq = [];
-sub next_tick () {
-	$nextt = undef;
-	my $q = $nextq;
-	$nextq = [];
-	event_step($_) for @$q;
-}
-
-sub requeue ($) {
-	push @$nextq, $_[0];
-	$nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
-}
 
 sub update_idle_time ($) {
 	my ($self) = @_;
@@ -655,12 +641,12 @@ sub long_response ($$) {
 			push @$wbuf, $long_cb;
 
 			# wbuf may be populated by $cb, no need to rearm if so:
-			requeue($self) if scalar(@$wbuf) == 1;
+			$self->requeue if scalar(@$wbuf) == 1;
 		} else { # all done!
 			$long_cb = undef;
 			res($self, '.');
 			out($self, " deferred[$fd] done - %0.6f", now() - $t0);
-			requeue($self) unless $self->{wbuf};
+			$self->requeue unless $self->{wbuf};
 		}
 	};
 	$self->write($long_cb); # kick off!
@@ -915,7 +901,7 @@ sub cmd_starttls ($) {
 		return '580 can not initiate TLS negotiation';
 	res($self, '382 Continue with TLS negotiation');
 	$self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt);
-	requeue($self) if PublicInbox::DS::accept_tls_step($self);
+	$self->requeue if PublicInbox::DS::accept_tls_step($self);
 	undef;
 }
 
@@ -990,7 +976,7 @@ sub event_step {
 
 	# maybe there's more pipelined data, or we'll have
 	# to register it for socket-readiness notifications
-	requeue($self) unless $self->{wbuf};
+	$self->requeue unless $self->{wbuf};
 }
 
 sub not_idle_long ($$) {
-- 
EW


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 03/11] http: use requeue instead of watch_in1
  2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
  2019-06-29 19:59 ` [PATCH 01/11] ds: share lazy rbuf handling between HTTP and NNTP Eric Wong
  2019-06-29 19:59 ` [PATCH 02/11] ds: move requeue logic over from NNTP Eric Wong
@ 2019-06-29 19:59 ` Eric Wong
  2019-06-29 19:59 ` [PATCH 04/11] listener: use edge-triggered notifications Eric Wong
                   ` (7 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2019-06-29 19:59 UTC (permalink / raw)
  To: meta

Don't use epoll or kqueue to watch for anything unless we hit
EAGAIN, since we don't know if a socket is SSL or not.
---
 lib/PublicInbox/DS.pm   | 2 --
 lib/PublicInbox/HTTP.pm | 4 ++--
 2 files changed, 2 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 9f245347..8f1494f6 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -557,8 +557,6 @@ sub watch ($$) {
     0;
 }
 
-sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
-
 # return true if complete, false if incomplete (or failure)
 sub accept_tls_step ($) {
     my ($self) = @_;
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 1153ef98..856b8959 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -95,7 +95,7 @@ sub rbuf_process {
 	}
 	if ($r < 0) { # incomplete
 		$self->rbuf_idle($rbuf);
-		return $self->watch_in1;
+		return $self->requeue;
 	}
 	$$rbuf = substr($$rbuf, $r);
 	my $len = input_prepare($self, \%env);
@@ -241,7 +241,7 @@ sub next_request ($) {
 		push @$pipelineq, $self;
 		$pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
 	} else { # wait for next request
-		$self->watch_in1;
+		$self->requeue;
 	}
 }
 
-- 
EW


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 04/11] listener: use edge-triggered notifications
  2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
                   ` (2 preceding siblings ...)
  2019-06-29 19:59 ` [PATCH 03/11] http: use requeue instead of watch_in1 Eric Wong
@ 2019-06-29 19:59 ` Eric Wong
  2019-06-29 19:59 ` [PATCH 05/11] ds: handle deferred DS->close after timers Eric Wong
                   ` (6 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2019-06-29 19:59 UTC (permalink / raw)
  To: meta

We don't need extra wakeups from the kernel when we know a
listener is already active.
---
 lib/PublicInbox/DSKQXS.pm   | 4 +++-
 lib/PublicInbox/Listener.pm | 7 ++++---
 lib/PublicInbox/Syscall.pm  | 4 ++--
 3 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/lib/PublicInbox/DSKQXS.pm b/lib/PublicInbox/DSKQXS.pm
index 364df3d6..278d3f88 100644
--- a/lib/PublicInbox/DSKQXS.pm
+++ b/lib/PublicInbox/DSKQXS.pm
@@ -16,7 +16,8 @@ use warnings;
 use parent qw(IO::KQueue);
 use parent qw(Exporter);
 use IO::KQueue;
-use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL);
+use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLLET
+	EPOLL_CTL_DEL);
 our @EXPORT_OK = qw(epoll_ctl epoll_wait);
 my $owner_pid = -1; # kqueue is close-on-fork (yes, fork, not exec)
 
@@ -25,6 +26,7 @@ sub kq_flag ($$) {
 	my ($bit, $ev) = @_;
 	if ($ev & $bit) {
 		my $fl = EV_ADD | EV_ENABLE;
+		$fl |= EV_CLEAR if $fl & EPOLLET;
 		($ev & EPOLLONESHOT) ? ($fl | EV_ONESHOT) : $fl;
 	} else {
 		EV_ADD | EV_DISABLE;
diff --git a/lib/PublicInbox/Listener.pm b/lib/PublicInbox/Listener.pm
index 94b2aed4..594dabb8 100644
--- a/lib/PublicInbox/Listener.pm
+++ b/lib/PublicInbox/Listener.pm
@@ -9,6 +9,7 @@ use base 'PublicInbox::DS';
 use Socket qw(SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
 use fields qw(post_accept);
 require IO::Handle;
+use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE EPOLLET);
 
 sub new ($$$) {
 	my ($class, $s, $cb) = @_;
@@ -17,15 +18,14 @@ sub new ($$$) {
 	listen($s, 1024);
 	IO::Handle::blocking($s, 0);
 	my $self = fields::new($class);
-	$self->SUPER::new($s, PublicInbox::DS::EPOLLIN()|
-	                      PublicInbox::DS::EPOLLEXCLUSIVE());
+	$self->SUPER::new($s, EPOLLIN|EPOLLET|EPOLLEXCLUSIVE);
 	$self->{post_accept} = $cb;
 	$self
 }
 
 sub event_step {
 	my ($self) = @_;
-	my $sock = $self->{sock};
+	my $sock = $self->{sock} or return;
 
 	# no loop here, we want to fairly distribute clients
 	# between multiple processes sharing the same socket
@@ -35,6 +35,7 @@ sub event_step {
 	if (my $addr = accept(my $c, $sock)) {
 		IO::Handle::blocking($c, 0); # no accept4 :<
 		$self->{post_accept}->($c, $addr, $sock);
+		$self->requeue;
 	}
 }
 
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index 500efa67..d7e15c72 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -22,7 +22,7 @@ use vars qw(@ISA @EXPORT_OK %EXPORT_TAGS $VERSION);
 $VERSION     = "0.25";
 @ISA         = qw(Exporter);
 @EXPORT_OK   = qw(sendfile epoll_ctl epoll_create epoll_wait
-                  EPOLLIN EPOLLOUT
+                  EPOLLIN EPOLLOUT EPOLLET
                   EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
                   EPOLLONESHOT EPOLLEXCLUSIVE);
 %EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait
@@ -39,7 +39,7 @@ use constant EPOLLOUT      => 4;
 # use constant EPOLLRDBAND   => 128;
 use constant EPOLLEXCLUSIVE => (1 << 28);
 use constant EPOLLONESHOT => (1 << 30);
-# use constant EPOLLET => (1 << 31);
+use constant EPOLLET => (1 << 31);
 use constant EPOLL_CTL_ADD => 1;
 use constant EPOLL_CTL_DEL => 2;
 use constant EPOLL_CTL_MOD => 3;
-- 
EW


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 05/11] ds: handle deferred DS->close after timers
  2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
                   ` (3 preceding siblings ...)
  2019-06-29 19:59 ` [PATCH 04/11] listener: use edge-triggered notifications Eric Wong
@ 2019-06-29 19:59 ` Eric Wong
  2019-06-29 19:59 ` [PATCH 06/11] ds: consolidate IO::Socket::SSL checks Eric Wong
                   ` (5 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2019-06-29 19:59 UTC (permalink / raw)
  To: meta

Our hacks in EvCleanup::next_tick and EvCleanup::asap were due
to the fact "closed" sockets were deferred and could not wake
up the event loop, causing certain actions to be delayed until
an event fired.

Instead, ensure we don't sleep if there are pending sockets to
close.

We can then remove most of the EvCleanup stuff

While we're at it, split out immediate timer handling into a
separate array so we don't need to deal with time calculations
for the event loop.
---
 lib/PublicInbox/DS.pm          | 41 +++++++++++-----------
 lib/PublicInbox/EvCleanup.pm   | 80 +++++-------------------------------------
 lib/PublicInbox/HTTP.pm        |  6 ++--
 lib/PublicInbox/HTTPD/Async.pm |  6 ++--
 lib/PublicInbox/NNTP.pm        | 14 ++------
 5 files changed, 36 insertions(+), 111 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 8f1494f6..6cd527e2 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -37,7 +37,6 @@ use Errno  qw(EAGAIN EINVAL EEXIST);
 use Carp   qw(croak confess carp);
 require File::Spec;
 
-my $nextt; # timer for next_tick
 my $nextq = []; # queue for next_tick
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
@@ -101,12 +100,6 @@ Returns a timer object which you can call C<< $timer->cancel >> on if you need t
 sub AddTimer {
     my ($class, $secs, $coderef) = @_;
 
-    if (!$secs) {
-        my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer');
-        unshift(@Timers, $timer);
-        return $timer;
-    }
-
     my $fire_time = now() + $secs;
 
     my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
@@ -176,9 +169,23 @@ sub FirstTimeEventLoop {
 
 sub now () { clock_gettime(CLOCK_MONOTONIC) }
 
+sub next_tick () {
+    my $q = $nextq;
+    $nextq = [];
+    for (@$q) {
+        if (ref($_) eq 'CODE') {
+            $_->();
+        } else {
+            $_->event_step;
+        }
+    }
+}
+
 # runs timers and returns milliseconds for next one, or next event loop
 sub RunTimers {
-    return $LoopTimeout unless @Timers;
+    next_tick();
+
+    return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers;
 
     my $now = now();
 
@@ -188,6 +195,9 @@ sub RunTimers {
         $to_run->[1]->($now) if $to_run->[1];
     }
 
+    # timers may enqueue into nextq:
+    return 0 if (@$nextq || @ToClose);
+
     return $LoopTimeout unless @Timers;
 
     # convert time to an even number of milliseconds, adding 1
@@ -320,6 +330,8 @@ sub new {
 ### I N S T A N C E   M E T H O D S
 #####################################################################
 
+sub requeue ($) { push @$nextq, $_[0] }
+
 =head2 C<< $obj->close >>
 
 Close the socket.
@@ -593,19 +605,6 @@ sub shutdn ($) {
 	$self->close;
     }
 }
-
-sub next_tick () {
-	$nextt = undef;
-	my $q = $nextq;
-	$nextq = [];
-	$_->event_step for @$q;
-}
-
-sub requeue ($) {
-	push @$nextq, $_[0];
-	$nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
-}
-
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm
index 33b54ebc..be6672ed 100644
--- a/lib/PublicInbox/EvCleanup.pm
+++ b/lib/PublicInbox/EvCleanup.pm
@@ -1,80 +1,23 @@
-# Copyright (C) 2016-2018 all contributors <meta@public-inbox.org>
+# Copyright (C) 2016-2019 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# event cleanups (currently for PublicInbox::DS)
+# event cleanups (for PublicInbox::DS)
 package PublicInbox::EvCleanup;
 use strict;
 use warnings;
-use base qw(PublicInbox::DS);
-use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+require PublicInbox::DS;
 
+# this only runs under public-inbox-{httpd/nntpd}, not generic PSGI servers
 my $ENABLED;
 sub enabled { $ENABLED }
 sub enable { $ENABLED = 1 }
-my $singleton;
-my $asapq = [ [], undef ];
-my $nextq = [ [], undef ];
 my $laterq = [ [], undef ];
 
-sub once_init () {
-	my $self = fields::new('PublicInbox::EvCleanup');
-	my ($r, $w);
-
-	# This is a dummy pipe which is always writable so it can always
-	# fires in the next event loop iteration.
-	pipe($r, $w) or die "pipe: $!";
-	fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ
-	$self->SUPER::new($w, 0);
-
-	# always writable, since PublicInbox::EvCleanup::event_step
-	# never drains wbuf.  We can avoid wasting a hash slot by
-	# stuffing the read-end of the pipe into the never-to-be-touched
-	# wbuf
-	$self->{wbuf} = $r;
-	$self;
-}
-
-sub _run_all ($) {
-	my ($q) = @_;
-
-	my $run = $q->[0];
-	$q->[0] = [];
-	$q->[1] = undef;
-	$_->() foreach @$run;
-}
-
-# ensure PublicInbox::DS::ToClose processing after timers fire
-sub _asap_close () { $asapq->[1] ||= _asap_timer() }
-
-# Called by PublicInbox::DS
-sub event_step { _run_all($asapq) }
-
-sub _run_next () {
-	_run_all($nextq);
-	_asap_close();
-}
-
 sub _run_later () {
-	_run_all($laterq);
-	_asap_close();
-}
-
-sub _asap_timer () {
-	$singleton ||= once_init();
-	$singleton->watch(EPOLLOUT|EPOLLONESHOT);
-	1;
-}
-
-sub asap ($) {
-	my ($cb) = @_;
-	push @{$asapq->[0]}, $cb;
-	$asapq->[1] ||= _asap_timer();
-}
-
-sub next_tick ($) {
-	my ($cb) = @_;
-	push @{$nextq->[0]}, $cb;
-	$nextq->[1] ||= PublicInbox::DS->AddTimer(0, *_run_next);
+	my $run = $laterq->[0];
+	$laterq->[0] = [];
+	$laterq->[1] = undef;
+	$_->() foreach @$run;
 }
 
 sub later ($) {
@@ -83,10 +26,5 @@ sub later ($) {
 	$laterq->[1] ||= PublicInbox::DS->AddTimer(60, *_run_later);
 }
 
-END {
-	event_step();
-	_run_all($nextq);
-	_run_all($laterq);
-}
-
+END { _run_later() }
 1;
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 856b8959..b8912950 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -30,10 +30,8 @@ use constant {
 use Errno qw(EAGAIN);
 
 my $pipelineq = [];
-my $pipet;
 sub process_pipelineq () {
 	my $q = $pipelineq;
-	$pipet = undef;
 	$pipelineq = [];
 	foreach (@$q) {
 		next unless $_->{sock};
@@ -238,8 +236,8 @@ sub next_request ($) {
 	my ($self) = @_;
 	if ($self->{rbuf}) {
 		# avoid recursion for pipelined requests
+		PublicInbox::DS::requeue(\&process_pipelineq) if !@$pipelineq;
 		push @$pipelineq, $self;
-		$pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
 	} else { # wait for next request
 		$self->requeue;
 	}
@@ -269,7 +267,7 @@ sub getline_cb ($$$) {
 				if ($self->{wbuf}) {
 					$self->write($next);
 				} else {
-					PublicInbox::EvCleanup::asap($next);
+					PublicInbox::DS::requeue($next);
 				}
 				return;
 			}
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index b46baeb2..35d17150 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -19,8 +19,8 @@ sub new {
 	# no $io? call $cb at the top of the next event loop to
 	# avoid recursion:
 	unless (defined($io)) {
-		PublicInbox::EvCleanup::asap($cb) if $cb;
-		PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+		PublicInbox::DS::requeue($cb);
+		die 'cleanup unsupported w/o $io' if $cleanup;
 		return;
 	}
 
@@ -87,7 +87,7 @@ sub close {
 
 	# we defer this to the next timer loop since close is deferred
 	if (my $cleanup = delete $self->{cleanup}) {
-		PublicInbox::EvCleanup::next_tick($cleanup);
+		PublicInbox::DS::requeue($cleanup);
 	}
 }
 
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 83970309..9973fcaf 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -50,14 +50,11 @@ sub expire_old () {
 	my $exp = $EXPTIME;
 	my $old = $now - $exp;
 	my $nr = 0;
-	my $closed = 0;
 	my %new;
 	while (my ($fd, $v) = each %$EXPMAP) {
 		my ($idle_time, $nntp) = @$v;
 		if ($idle_time < $old) {
-			if ($nntp->shutdn) {
-				$closed++;
-			} else {
+			if (!$nntp->shutdn) {
 				++$nr;
 				$new{$fd} = $v;
 			}
@@ -67,14 +64,7 @@ sub expire_old () {
 		}
 	}
 	$EXPMAP = \%new;
-	if ($nr) {
-		$expt = PublicInbox::EvCleanup::later(*expire_old);
-	} else {
-		$expt = undef;
-		# noop to kick outselves out of the loop ASAP so descriptors
-		# really get closed
-		PublicInbox::EvCleanup::asap(sub {}) if $closed;
-	}
+	$expt = PublicInbox::EvCleanup::later(*expire_old) if $nr;
 }
 
 sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };
-- 
EW


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 06/11] ds: consolidate IO::Socket::SSL checks
  2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
                   ` (4 preceding siblings ...)
  2019-06-29 19:59 ` [PATCH 05/11] ds: handle deferred DS->close after timers Eric Wong
@ 2019-06-29 19:59 ` Eric Wong
  2019-06-29 19:59 ` [PATCH 07/11] http: support HTTPS (kinda) Eric Wong
                   ` (4 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2019-06-29 19:59 UTC (permalink / raw)
  To: meta

We need to be careful about handling EAGAIN on write(2)
failures deal with SSL_WANT_READ vs SSL_WANT_WRITE as
appropriate.
---
 lib/PublicInbox/DS.pm   | 48 +++++++++++++++++++++++++-----------------------
 lib/PublicInbox/NNTP.pm |  3 ++-
 lib/PublicInbox/TLS.pm  |  9 +++------
 3 files changed, 30 insertions(+), 30 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 6cd527e2..b2f59983 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -388,6 +388,10 @@ sub psendfile ($$$) {
     $written;
 }
 
+sub epbit ($$) { # (sock, default)
+    ref($_[0]) eq 'IO::Socket::SSL' ? PublicInbox::TLS::epollbit() : $_[1];
+}
+
 # returns 1 if done, 0 if incomplete
 sub flush_write ($) {
     my ($self) = @_;
@@ -406,8 +410,8 @@ next_buf:
                         goto next_buf;
                     }
                 } elsif ($! == EAGAIN) {
+                    epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT);
                     $self->{wbuf_off} = $off;
-                    watch($self, EPOLLOUT|EPOLLONESHOT);
                     return 0;
                 } else {
                     return $self->close;
@@ -438,17 +442,13 @@ sub rbuf_idle ($$) {
 
 sub do_read ($$$;$) {
     my ($self, $rbuf, $len, $off) = @_;
-    my $r = sysread($self->{sock}, $$rbuf, $len, $off // 0);
+    my $r = sysread(my $sock = $self->{sock}, $$rbuf, $len, $off // 0);
     return ($r == 0 ? $self->close : $r) if defined $r;
     # common for clients to break connections without warning,
     # would be too noisy to log here:
-    if (ref($self) eq 'IO::Socket::SSL') {
-        my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+    if ($! == EAGAIN) {
+        epwait($sock, epbit($sock, EPOLLIN) | EPOLLONESHOT);
         rbuf_idle($self, $rbuf);
-        watch($self, $ev | EPOLLONESHOT);
-    } elsif ($! == EAGAIN) {
-        rbuf_idle($self, $rbuf);
-        watch($self, EPOLLIN | EPOLLONESHOT);
     } else {
         $self->close;
     }
@@ -525,17 +525,20 @@ sub write {
 
         if (defined $written) {
             return 1 if $written == $to_write;
+            requeue($self); # runs: event_step -> flush_write
         } elsif ($! == EAGAIN) {
+            epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT);
             $written = 0;
         } else {
             return $self->close;
         }
+
+        # deal with EAGAIN or partial write:
         my $tmpio = tmpio($self, $bref, $written) or return 0;
 
         # wbuf may be an empty array if we're being called inside
         # ->flush_write via CODE bref:
         push @{$self->{wbuf} ||= []}, $tmpio;
-        watch($self, EPOLLOUT|EPOLLONESHOT);
         return 0;
     }
 }
@@ -554,32 +557,34 @@ sub msg_more ($$) {
             # queue up the unwritten substring:
             my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
             $self->{wbuf} = [ $tmpio ];
-            watch($self, EPOLLOUT|EPOLLONESHOT);
+            epwait($sock, EPOLLOUT|EPOLLONESHOT);
             return 0;
         }
     }
     $self->write(\($_[1]));
 }
 
-sub watch ($$) {
-    my ($self, $ev) = @_;
-    my $sock = $self->{sock} or return;
+sub epwait ($$) {
+    my ($sock, $ev) = @_;
     epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and
         confess("EPOLL_CTL_MOD $!");
     0;
 }
 
+sub watch ($$) {
+    my ($self, $ev) = @_;
+    my $sock = $self->{sock} or return;
+    epwait($sock, $ev);
+}
+
 # return true if complete, false if incomplete (or failure)
 sub accept_tls_step ($) {
     my ($self) = @_;
     my $sock = $self->{sock} or return;
     return 1 if $sock->accept_SSL;
     return $self->close if $! != EAGAIN;
-    if (my $ev = PublicInbox::TLS::epollbit()) {
-        unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
-        return watch($self, $ev | EPOLLONESHOT);
-    }
-    drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
+    epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
+    unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
 }
 
 sub shutdn_tls_step ($) {
@@ -587,11 +592,8 @@ sub shutdn_tls_step ($) {
     my $sock = $self->{sock} or return;
     return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1);
     return $self->close if $! != EAGAIN;
-    if (my $ev = PublicInbox::TLS::epollbit()) {
-        unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
-        return watch($self, $ev | EPOLLONESHOT);
-    }
-    drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
+    epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
+    unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
 }
 
 # don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 9973fcaf..82762b1a 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -75,7 +75,8 @@ sub new ($$$) {
 	my $ev = EPOLLIN;
 	my $wbuf;
 	if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
-		$ev = PublicInbox::TLS::epollbit() or return CORE::close($sock);
+		return CORE::close($sock) if $! != EAGAIN;
+		$ev = PublicInbox::TLS::epollbit();
 		$wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ];
 	}
 	$self->SUPER::new($sock, $ev | EPOLLONESHOT);
diff --git a/lib/PublicInbox/TLS.pm b/lib/PublicInbox/TLS.pm
index 576c11d7..0b9a55df 100644
--- a/lib/PublicInbox/TLS.pm
+++ b/lib/PublicInbox/TLS.pm
@@ -13,12 +13,9 @@ sub err () { $SSL_ERROR }
 
 # returns the EPOLL event bit which matches the existing SSL error
 sub epollbit () {
-	if ($! == EAGAIN) {
-		return EPOLLIN if $SSL_ERROR == SSL_WANT_READ;
-		return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE;
-		die "unexpected SSL error: $SSL_ERROR";
-	}
-	0;
+	return EPOLLIN if $SSL_ERROR == SSL_WANT_READ;
+	return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE;
+	die "unexpected SSL error: $SSL_ERROR";
 }
 
 1;
-- 
EW


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 07/11] http: support HTTPS (kinda)
  2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
                   ` (5 preceding siblings ...)
  2019-06-29 19:59 ` [PATCH 06/11] ds: consolidate IO::Socket::SSL checks Eric Wong
@ 2019-06-29 19:59 ` Eric Wong
  2019-06-29 19:59 ` [PATCH 08/11] parentpipe: document and use one-shot wakeups Eric Wong
                   ` (3 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2019-06-29 19:59 UTC (permalink / raw)
  To: meta

It's barely any effort at all to support HTTPS now that we have
NNTPS support and can share all the code for writing daemons.

However, we still depend on Varnish to avoid hug-of-death
situations, so supporting reverse-proxying will be required.
---
 MANIFEST                |   1 +
 lib/PublicInbox/HTTP.pm |  10 +++-
 t/httpd-https.t         | 141 ++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 151 insertions(+), 1 deletion(-)
 create mode 100644 t/httpd-https.t

diff --git a/MANIFEST b/MANIFEST
index 29920953..4cb5f38f 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -210,6 +210,7 @@ t/hl_mod.t
 t/html_index.t
 t/httpd-corner.psgi
 t/httpd-corner.t
+t/httpd-https.t
 t/httpd-unix.t
 t/httpd.t
 t/hval.t
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index b8912950..680be72b 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -56,8 +56,16 @@ sub http_date () {
 sub new ($$$) {
 	my ($class, $sock, $addr, $httpd) = @_;
 	my $self = fields::new($class);
-	$self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
+	my $ev = EPOLLIN;
+	my $wbuf;
+	if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
+		return CORE::close($sock) if $! != EAGAIN;
+		$ev = PublicInbox::TLS::epollbit();
+		$wbuf = [ \&PublicInbox::DS::accept_tls_step ];
+	}
+	$self->SUPER::new($sock, $ev | EPOLLONESHOT);
 	$self->{httpd} = $httpd;
+	$self->{wbuf} = $wbuf if $wbuf;
 	($self->{remote_addr}, $self->{remote_port}) =
 		PublicInbox::Daemon::host_with_port($addr);
 	$self;
diff --git a/t/httpd-https.t b/t/httpd-https.t
new file mode 100644
index 00000000..f6b9806a
--- /dev/null
+++ b/t/httpd-https.t
@@ -0,0 +1,141 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+use File::Temp qw(tempdir);
+use Socket qw(SOCK_STREAM IPPROTO_TCP SOL_SOCKET);
+# IO::Poll is part of the standard library, but distros may split them off...
+foreach my $mod (qw(IO::Socket::SSL IO::Poll)) {
+	eval "require $mod";
+	plan skip_all => "$mod missing for $0" if $@;
+}
+my $cert = 'certs/server-cert.pem';
+my $key = 'certs/server-key.pem';
+unless (-r $key && -r $cert) {
+	plan skip_all =>
+		"certs/ missing for $0, run ./create-certs.perl in certs/";
+}
+use_ok 'PublicInbox::TLS';
+use_ok 'IO::Socket::SSL';
+require './t/common.perl';
+my $psgi = "./t/httpd-corner.psgi";
+my $tmpdir = tempdir('pi-httpd-https-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+my $err = "$tmpdir/stderr.log";
+my $out = "$tmpdir/stdout.log";
+my $httpd = 'blib/script/public-inbox-httpd';
+my %opts = (
+	LocalAddr => '127.0.0.1',
+	ReuseAddr => 1,
+	Proto => 'tcp',
+	Type => SOCK_STREAM,
+	Listen => 1024,
+);
+my $https = IO::Socket::INET->new(%opts);
+my ($pid, $tail_pid);
+END {
+	foreach ($pid, $tail_pid) {
+		kill 'TERM', $_ if defined $_;
+	}
+};
+my $https_addr = $https->sockhost . ':' . $https->sockport;
+my %opt = ( Proto => 'tcp', PeerAddr => $https_addr, Type => SOCK_STREAM );
+
+for my $args (
+	[ "-lhttps://$https_addr/?key=$key,cert=$cert" ],
+) {
+	for ($out, $err) {
+		open my $fh, '>', $_ or die "truncate: $!";
+	}
+	if (my $tail_cmd = $ENV{TAIL}) { # don't assume GNU tail
+		$tail_pid = fork;
+		if (defined $tail_pid && $tail_pid == 0) {
+			exec(split(' ', $tail_cmd), $out, $err);
+		}
+	}
+	my $cmd = [ $httpd, '-W0', @$args,
+			"--stdout=$out", "--stderr=$err", $psgi ];
+	$pid = spawn_listener(undef, $cmd, [ $https ]);
+	my %o = (
+		SSL_hostname => 'server.local',
+		SSL_verifycn_name => 'server.local',
+		SSL_verify_mode => SSL_VERIFY_PEER(),
+		SSL_ca_file => 'certs/test-ca.pem',
+	);
+	# start negotiating a slow TLS connection
+	my $slow = IO::Socket::INET->new(%opt, Blocking => 0);
+	$slow = IO::Socket::SSL->start_SSL($slow, SSL_startHandshake => 0, %o);
+	my @poll = (fileno($slow));
+	my $slow_done = $slow->connect_SSL;
+	if ($slow_done) {
+		diag('W: connect_SSL early OK, slow client test invalid');
+		push @poll, PublicInbox::Syscall::EPOLLOUT();
+	} else {
+		push @poll, PublicInbox::TLS::epollbit();
+	}
+
+	# normal HTTPS
+	my $c = IO::Socket::INET->new(%opt);
+	IO::Socket::SSL->start_SSL($c, %o);
+	ok($c->print("GET /empty HTTP/1.1\r\n\r\nHost: example.com\r\n\r\n"),
+		'wrote HTTP request');
+	my $buf = '';
+	sysread($c, $buf, 2007, length($buf)) until $buf =~ /\r\n\r\n/;
+	like($buf, qr!\AHTTP/1\.1 200!, 'read HTTP response');
+
+	# HTTPS with bad hostname
+	$c = IO::Socket::INET->new(%opt);
+	$o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.fail';
+	$c = IO::Socket::SSL->start_SSL($c, %o);
+	is($c, undef, 'HTTPS fails with bad hostname');
+
+	$o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.local';
+	$c = IO::Socket::INET->new(%opt);
+	IO::Socket::SSL->start_SSL($c, %o);
+	ok($c, 'HTTPS succeeds again with valid hostname');
+
+	# slow TLS connection did not block the other fast clients while
+	# connecting, finish it off:
+	until ($slow_done) {
+		IO::Poll::_poll(-1, @poll);
+		$slow_done = $slow->connect_SSL and last;
+		@poll = (fileno($slow), PublicInbox::TLS::epollbit());
+	}
+	$slow->blocking(1);
+	ok($slow->print("GET /empty HTTP/1.1\r\n\r\nHost: example.com\r\n\r\n"),
+		'wrote HTTP request from slow');
+	$buf = '';
+	sysread($slow, $buf, 666, length($buf)) until $buf =~ /\r\n\r\n/;
+	like($buf, qr!\AHTTP/1\.1 200!, 'read HTTP response from slow');
+	$slow = undef;
+
+	SKIP: {
+		skip 'TCP_DEFER_ACCEPT is Linux-only', 2 if $^O ne 'linux';
+		my $var = Socket::TCP_DEFER_ACCEPT();
+		defined(my $x = getsockopt($https, IPPROTO_TCP, $var)) or die;
+		ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set on https');
+	};
+	SKIP: {
+		skip 'SO_ACCEPTFILTER is FreeBSD-only', 2 if $^O ne 'freebsd';
+		if (system('kldstat -m accf_data >/dev/null')) {
+			skip 'accf_data not loaded? kldload accf_data', 2;
+		}
+		require PublicInbox::Daemon;
+		my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+		my $x = getsockopt($https, SOL_SOCKET, $var);
+		like($x, qr/\Adataready\0+\z/, 'got dataready accf for https');
+	};
+
+	$c = undef;
+	kill('TERM', $pid);
+	is($pid, waitpid($pid, 0), 'httpd exited successfully');
+	is($?, 0, 'no error in exited process');
+	$pid = undef;
+	if (defined $tail_pid) {
+		kill 'TERM', $tail_pid;
+		waitpid($tail_pid, 0);
+		$tail_pid = undef;
+	}
+}
+done_testing();
+1;
-- 
EW


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 08/11] parentpipe: document and use one-shot wakeups
  2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
                   ` (6 preceding siblings ...)
  2019-06-29 19:59 ` [PATCH 07/11] http: support HTTPS (kinda) Eric Wong
@ 2019-06-29 19:59 ` Eric Wong
  2019-06-29 19:59 ` [PATCH 09/11] parentpipe: make the ->close call more obvious Eric Wong
                   ` (2 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2019-06-29 19:59 UTC (permalink / raw)
  To: meta

The master process only dies once and we close ourselves right
away.  So it doesn't matter if it's level-triggered or
edge-triggered, actually, but one-shot is most consistent with
our use and keeps the kernel from doing extra work.
---
 lib/PublicInbox/ParentPipe.pm | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/ParentPipe.pm b/lib/PublicInbox/ParentPipe.pm
index ccc0815e..6ef51c1a 100644
--- a/lib/PublicInbox/ParentPipe.pm
+++ b/lib/PublicInbox/ParentPipe.pm
@@ -1,20 +1,24 @@
 # Copyright (C) 2016-2018 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# only for PublicInbox::Daemon
+
+# only for PublicInbox::Daemon, allows worker processes to be
+# notified if the master process dies.
 package PublicInbox::ParentPipe;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
 use fields qw(cb);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
 
 sub new ($$$) {
-	my ($class, $pipe, $cb) = @_;
+	my ($class, $pipe, $worker_quit) = @_;
 	my $self = fields::new($class);
-	$self->SUPER::new($pipe, PublicInbox::DS::EPOLLIN());
-	$self->{cb} = $cb;
+	$self->SUPER::new($pipe, EPOLLIN|EPOLLONESHOT);
+	$self->{cb} = $worker_quit;
 	$self;
 }
 
+# master process died, time to call worker_quit ourselves
 sub event_step { $_[0]->{cb}->($_[0]) }
 
 1;
-- 
EW


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 09/11] parentpipe: make the ->close call more obvious
  2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
                   ` (7 preceding siblings ...)
  2019-06-29 19:59 ` [PATCH 08/11] parentpipe: document and use one-shot wakeups Eric Wong
@ 2019-06-29 19:59 ` Eric Wong
  2019-06-29 19:59 ` [PATCH 10/11] httpd/async: switch to buffering-as-fast-as-possible Eric Wong
  2019-06-29 19:59 ` [PATCH 11/11] http: use bigger, but shorter-lived buffers for pipes Eric Wong
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2019-06-29 19:59 UTC (permalink / raw)
  To: meta

We can close directly in event_step without bad side effects,
and then we also don't need to take a reason arg from worker_quit,
since we weren't logging it anywhere.
---
 lib/PublicInbox/Daemon.pm     | 4 +---
 lib/PublicInbox/ParentPipe.pm | 5 ++++-
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index cf011a20..2b7ac266 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -252,13 +252,11 @@ sub daemonize () {
 
 
 sub worker_quit {
-	my ($reason) = @_;
 	# killing again terminates immediately:
 	exit unless @listeners;
 
 	$_->close foreach @listeners; # call PublicInbox::DS::close
 	@listeners = ();
-	$reason->close if ref($reason) eq 'PublicInbox::ParentPipe';
 
 	my $proc_name;
 	my $warn = 0;
@@ -590,7 +588,7 @@ sub daemon_loop ($$$$) {
 	} else {
 		reopen_logs();
 		$set_user->() if $set_user;
-		$SIG{USR2} = sub { worker_quit('USR2') if upgrade() };
+		$SIG{USR2} = sub { worker_quit() if upgrade() };
 		$refresh->();
 	}
 	$uid = $gid = undef;
diff --git a/lib/PublicInbox/ParentPipe.pm b/lib/PublicInbox/ParentPipe.pm
index 6ef51c1a..2e2abb5f 100644
--- a/lib/PublicInbox/ParentPipe.pm
+++ b/lib/PublicInbox/ParentPipe.pm
@@ -19,6 +19,9 @@ sub new ($$$) {
 }
 
 # master process died, time to call worker_quit ourselves
-sub event_step { $_[0]->{cb}->($_[0]) }
+sub event_step {
+	$_[0]->close; # PublicInbox::DS::close
+	$_[0]->{cb}->();
+}
 
 1;
-- 
EW


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 10/11] httpd/async: switch to buffering-as-fast-as-possible
  2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
                   ` (8 preceding siblings ...)
  2019-06-29 19:59 ` [PATCH 09/11] parentpipe: make the ->close call more obvious Eric Wong
@ 2019-06-29 19:59 ` Eric Wong
  2019-06-29 19:59 ` [PATCH 11/11] http: use bigger, but shorter-lived buffers for pipes Eric Wong
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2019-06-29 19:59 UTC (permalink / raw)
  To: meta

With DS buffering to a temporary file nowadays, applying
backpressure to git-http-backend(1) hurts overall memory
usage of the system.  Instead, try to get git-http-backend(1)
to finish as quickly as possible and use edge-triggered
notifications to reduce wakeups on our end.
---
 lib/PublicInbox/DS.pm          |  6 ------
 lib/PublicInbox/HTTP.pm        |  7 -------
 lib/PublicInbox/HTTPD/Async.pm | 40 +++++++++++++++++-----------------------
 3 files changed, 17 insertions(+), 36 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index b2f59983..a8236023 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -571,12 +571,6 @@ sub epwait ($$) {
     0;
 }
 
-sub watch ($$) {
-    my ($self, $ev) = @_;
-    my $sock = $self->{sock} or return;
-    epwait($sock, $ev);
-}
-
 # return true if complete, false if incomplete (or failure)
 sub accept_tls_step ($) {
     my ($self) = @_;
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 680be72b..5546ac46 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -467,11 +467,4 @@ sub busy () {
 	($self->{rbuf} || $self->{env} || $self->{wbuf});
 }
 
-# fires after pending writes are complete:
-sub restart_pass ($) {
-	$_[0]->{forward}->restart_read; # see PublicInbox::HTTPD::Async
-}
-
-sub enqueue_restart_pass ($) { $_[0]->write(\&restart_pass) }
-
 1;
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 35d17150..a468ed91 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -4,14 +4,15 @@
 # XXX This is a totally unstable API for public-inbox internal use only
 # This is exposed via the 'pi-httpd.async' key in the PSGI env hash.
 # The name of this key is not even stable!
-# Currently is is intended for use with read-only pipes.
+# Currently intended for use with read-only pipes with expensive
+# processes such as git-http-backend(1), cgit(1)
 package PublicInbox::HTTPD::Async;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
 use fields qw(cb cleanup);
-require PublicInbox::EvCleanup;
 use Errno qw(EAGAIN);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 
 sub new {
 	my ($class, $io, $cb, $cleanup) = @_;
@@ -26,14 +27,12 @@ sub new {
 
 	my $self = fields::new($class);
 	IO::Handle::blocking($io, 0);
-	$self->SUPER::new($io, PublicInbox::DS::EPOLLIN());
+	$self->SUPER::new($io, EPOLLIN | EPOLLET);
 	$self->{cb} = $cb;
 	$self->{cleanup} = $cleanup;
 	$self;
 }
 
-sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
-
 sub main_cb ($$$) {
 	my ($http, $fh, $bref) = @_;
 	sub {
@@ -41,25 +40,15 @@ sub main_cb ($$$) {
 		my $r = sysread($self->{sock}, $$bref, 8192);
 		if ($r) {
 			$fh->write($$bref); # may call $http->close
-
 			if ($http->{sock}) { # !closed
-				if ($http->{wbuf}) {
-					# HTTP client could not keep up, so
-					# stop reading and buffering.
-					$self->watch(0);
-
-					# Tell the HTTP socket to restart us
-					# when HTTP client is done draining
-					# $http->{wbuf}:
-					$http->enqueue_restart_pass;
-				}
-				# stay in EPOLLIN, but let other clients
-				# get some work done, too.
+				$self->requeue;
+				# let other clients get some work done, too
 				return;
 			}
-			# fall through to close below...
-		} elsif (!defined $r) {
-			return restart_read($self) if $! == EAGAIN;
+
+			# else: fall through to close below...
+		} elsif (!defined $r && $! == EAGAIN) {
+			return; # EPOLLET means we'll be notified
 		}
 
 		# Done! Error handling will happen in $fh->close
@@ -75,10 +64,15 @@ sub async_pass {
 	# will automatically close this ($self) object.
 	$http->{forward} = $self;
 	$fh->write($$bref); # PublicInbox:HTTP::{chunked,identity}_wcb
-	$self->{cb} = main_cb($http, $fh, $bref);
+	my $cb = $self->{cb} = main_cb($http, $fh, $bref);
+	$cb->($self); # either hit EAGAIN or ->requeue to keep EPOLLET happy
 }
 
-sub event_step { $_[0]->{cb}->(@_) }
+sub event_step {
+	# {cb} may be undef after ->requeue due to $http->close happening
+	my $cb = $_[0]->{cb} or return;
+	$cb->(@_);
+}
 
 sub close {
 	my $self = $_[0];
-- 
EW


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 11/11] http: use bigger, but shorter-lived buffers for pipes
  2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
                   ` (9 preceding siblings ...)
  2019-06-29 19:59 ` [PATCH 10/11] httpd/async: switch to buffering-as-fast-as-possible Eric Wong
@ 2019-06-29 19:59 ` Eric Wong
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2019-06-29 19:59 UTC (permalink / raw)
  To: meta

Linux pipes default to 65536 bytes in size, and we want to read
external processes as fast as possible now that we don't use
Danga::Socket or buffer to heap.

However, drop the buffer ASAP if we need to wait on anything;
since idle buffers can be idle for eons.  This lets other
execution contexts can reuse that memory right away.
---
 lib/PublicInbox/HTTPD/Async.pm | 11 ++++++-----
 lib/PublicInbox/Qspawn.pm      |  2 +-
 2 files changed, 7 insertions(+), 6 deletions(-)

diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index a468ed91..e195fab0 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -33,13 +33,13 @@ sub new {
 	$self;
 }
 
-sub main_cb ($$$) {
-	my ($http, $fh, $bref) = @_;
+sub main_cb ($$) {
+	my ($http, $fh) = @_;
 	sub {
 		my ($self) = @_;
-		my $r = sysread($self->{sock}, $$bref, 8192);
+		my $r = sysread($self->{sock}, my $buf, 65536);
 		if ($r) {
-			$fh->write($$bref); # may call $http->close
+			$fh->write($buf); # may call $http->close
 			if ($http->{sock}) { # !closed
 				$self->requeue;
 				# let other clients get some work done, too
@@ -64,7 +64,8 @@ sub async_pass {
 	# will automatically close this ($self) object.
 	$http->{forward} = $self;
 	$fh->write($$bref); # PublicInbox:HTTP::{chunked,identity}_wcb
-	my $cb = $self->{cb} = main_cb($http, $fh, $bref);
+	$$bref = undef; # we're done with this
+	my $cb = $self->{cb} = main_cb($http, $fh);
 	$cb->($self); # either hit EAGAIN or ->requeue to keep EPOLLET happy
 }
 
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index f2630a0f..8f0b9fe2 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -128,7 +128,7 @@ sub psgi_qx {
 	my $rpipe; # comes from popen_rd
 	my $async = $env->{'pi-httpd.async'};
 	my $cb = sub {
-		my $r = sysread($rpipe, my $buf, 8192);
+		my $r = sysread($rpipe, my $buf, 65536);
 		if ($async) {
 			$async->async_pass($env->{'psgix.io'}, $qx, \$buf);
 		} elsif (defined $r) {
-- 
EW


^ permalink raw reply related	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2019-06-29 19:59 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
2019-06-29 19:59 ` [PATCH 01/11] ds: share lazy rbuf handling between HTTP and NNTP Eric Wong
2019-06-29 19:59 ` [PATCH 02/11] ds: move requeue logic over from NNTP Eric Wong
2019-06-29 19:59 ` [PATCH 03/11] http: use requeue instead of watch_in1 Eric Wong
2019-06-29 19:59 ` [PATCH 04/11] listener: use edge-triggered notifications Eric Wong
2019-06-29 19:59 ` [PATCH 05/11] ds: handle deferred DS->close after timers Eric Wong
2019-06-29 19:59 ` [PATCH 06/11] ds: consolidate IO::Socket::SSL checks Eric Wong
2019-06-29 19:59 ` [PATCH 07/11] http: support HTTPS (kinda) Eric Wong
2019-06-29 19:59 ` [PATCH 08/11] parentpipe: document and use one-shot wakeups Eric Wong
2019-06-29 19:59 ` [PATCH 09/11] parentpipe: make the ->close call more obvious Eric Wong
2019-06-29 19:59 ` [PATCH 10/11] httpd/async: switch to buffering-as-fast-as-possible Eric Wong
2019-06-29 19:59 ` [PATCH 11/11] http: use bigger, but shorter-lived buffers for pipes Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).