user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 05/11] ds: handle deferred DS->close after timers
Date: Sat, 29 Jun 2019 19:59:45 +0000	[thread overview]
Message-ID: <20190629195951.32160-6-e@80x24.org> (raw)
In-Reply-To: <20190629195951.32160-1-e@80x24.org>

Our hacks in EvCleanup::next_tick and EvCleanup::asap were due
to the fact "closed" sockets were deferred and could not wake
up the event loop, causing certain actions to be delayed until
an event fired.

Instead, ensure we don't sleep if there are pending sockets to
close.

We can then remove most of the EvCleanup stuff

While we're at it, split out immediate timer handling into a
separate array so we don't need to deal with time calculations
for the event loop.
---
 lib/PublicInbox/DS.pm          | 41 +++++++++++-----------
 lib/PublicInbox/EvCleanup.pm   | 80 +++++-------------------------------------
 lib/PublicInbox/HTTP.pm        |  6 ++--
 lib/PublicInbox/HTTPD/Async.pm |  6 ++--
 lib/PublicInbox/NNTP.pm        | 14 ++------
 5 files changed, 36 insertions(+), 111 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 8f1494f6..6cd527e2 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -37,7 +37,6 @@ use Errno  qw(EAGAIN EINVAL EEXIST);
 use Carp   qw(croak confess carp);
 require File::Spec;
 
-my $nextt; # timer for next_tick
 my $nextq = []; # queue for next_tick
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
@@ -101,12 +100,6 @@ Returns a timer object which you can call C<< $timer->cancel >> on if you need t
 sub AddTimer {
     my ($class, $secs, $coderef) = @_;
 
-    if (!$secs) {
-        my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer');
-        unshift(@Timers, $timer);
-        return $timer;
-    }
-
     my $fire_time = now() + $secs;
 
     my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
@@ -176,9 +169,23 @@ sub FirstTimeEventLoop {
 
 sub now () { clock_gettime(CLOCK_MONOTONIC) }
 
+sub next_tick () {
+    my $q = $nextq;
+    $nextq = [];
+    for (@$q) {
+        if (ref($_) eq 'CODE') {
+            $_->();
+        } else {
+            $_->event_step;
+        }
+    }
+}
+
 # runs timers and returns milliseconds for next one, or next event loop
 sub RunTimers {
-    return $LoopTimeout unless @Timers;
+    next_tick();
+
+    return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers;
 
     my $now = now();
 
@@ -188,6 +195,9 @@ sub RunTimers {
         $to_run->[1]->($now) if $to_run->[1];
     }
 
+    # timers may enqueue into nextq:
+    return 0 if (@$nextq || @ToClose);
+
     return $LoopTimeout unless @Timers;
 
     # convert time to an even number of milliseconds, adding 1
@@ -320,6 +330,8 @@ sub new {
 ### I N S T A N C E   M E T H O D S
 #####################################################################
 
+sub requeue ($) { push @$nextq, $_[0] }
+
 =head2 C<< $obj->close >>
 
 Close the socket.
@@ -593,19 +605,6 @@ sub shutdn ($) {
 	$self->close;
     }
 }
-
-sub next_tick () {
-	$nextt = undef;
-	my $q = $nextq;
-	$nextq = [];
-	$_->event_step for @$q;
-}
-
-sub requeue ($) {
-	push @$nextq, $_[0];
-	$nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
-}
-
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm
index 33b54ebc..be6672ed 100644
--- a/lib/PublicInbox/EvCleanup.pm
+++ b/lib/PublicInbox/EvCleanup.pm
@@ -1,80 +1,23 @@
-# Copyright (C) 2016-2018 all contributors <meta@public-inbox.org>
+# Copyright (C) 2016-2019 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# event cleanups (currently for PublicInbox::DS)
+# event cleanups (for PublicInbox::DS)
 package PublicInbox::EvCleanup;
 use strict;
 use warnings;
-use base qw(PublicInbox::DS);
-use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+require PublicInbox::DS;
 
+# this only runs under public-inbox-{httpd/nntpd}, not generic PSGI servers
 my $ENABLED;
 sub enabled { $ENABLED }
 sub enable { $ENABLED = 1 }
-my $singleton;
-my $asapq = [ [], undef ];
-my $nextq = [ [], undef ];
 my $laterq = [ [], undef ];
 
-sub once_init () {
-	my $self = fields::new('PublicInbox::EvCleanup');
-	my ($r, $w);
-
-	# This is a dummy pipe which is always writable so it can always
-	# fires in the next event loop iteration.
-	pipe($r, $w) or die "pipe: $!";
-	fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ
-	$self->SUPER::new($w, 0);
-
-	# always writable, since PublicInbox::EvCleanup::event_step
-	# never drains wbuf.  We can avoid wasting a hash slot by
-	# stuffing the read-end of the pipe into the never-to-be-touched
-	# wbuf
-	$self->{wbuf} = $r;
-	$self;
-}
-
-sub _run_all ($) {
-	my ($q) = @_;
-
-	my $run = $q->[0];
-	$q->[0] = [];
-	$q->[1] = undef;
-	$_->() foreach @$run;
-}
-
-# ensure PublicInbox::DS::ToClose processing after timers fire
-sub _asap_close () { $asapq->[1] ||= _asap_timer() }
-
-# Called by PublicInbox::DS
-sub event_step { _run_all($asapq) }
-
-sub _run_next () {
-	_run_all($nextq);
-	_asap_close();
-}
-
 sub _run_later () {
-	_run_all($laterq);
-	_asap_close();
-}
-
-sub _asap_timer () {
-	$singleton ||= once_init();
-	$singleton->watch(EPOLLOUT|EPOLLONESHOT);
-	1;
-}
-
-sub asap ($) {
-	my ($cb) = @_;
-	push @{$asapq->[0]}, $cb;
-	$asapq->[1] ||= _asap_timer();
-}
-
-sub next_tick ($) {
-	my ($cb) = @_;
-	push @{$nextq->[0]}, $cb;
-	$nextq->[1] ||= PublicInbox::DS->AddTimer(0, *_run_next);
+	my $run = $laterq->[0];
+	$laterq->[0] = [];
+	$laterq->[1] = undef;
+	$_->() foreach @$run;
 }
 
 sub later ($) {
@@ -83,10 +26,5 @@ sub later ($) {
 	$laterq->[1] ||= PublicInbox::DS->AddTimer(60, *_run_later);
 }
 
-END {
-	event_step();
-	_run_all($nextq);
-	_run_all($laterq);
-}
-
+END { _run_later() }
 1;
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 856b8959..b8912950 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -30,10 +30,8 @@ use constant {
 use Errno qw(EAGAIN);
 
 my $pipelineq = [];
-my $pipet;
 sub process_pipelineq () {
 	my $q = $pipelineq;
-	$pipet = undef;
 	$pipelineq = [];
 	foreach (@$q) {
 		next unless $_->{sock};
@@ -238,8 +236,8 @@ sub next_request ($) {
 	my ($self) = @_;
 	if ($self->{rbuf}) {
 		# avoid recursion for pipelined requests
+		PublicInbox::DS::requeue(\&process_pipelineq) if !@$pipelineq;
 		push @$pipelineq, $self;
-		$pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
 	} else { # wait for next request
 		$self->requeue;
 	}
@@ -269,7 +267,7 @@ sub getline_cb ($$$) {
 				if ($self->{wbuf}) {
 					$self->write($next);
 				} else {
-					PublicInbox::EvCleanup::asap($next);
+					PublicInbox::DS::requeue($next);
 				}
 				return;
 			}
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index b46baeb2..35d17150 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -19,8 +19,8 @@ sub new {
 	# no $io? call $cb at the top of the next event loop to
 	# avoid recursion:
 	unless (defined($io)) {
-		PublicInbox::EvCleanup::asap($cb) if $cb;
-		PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+		PublicInbox::DS::requeue($cb);
+		die 'cleanup unsupported w/o $io' if $cleanup;
 		return;
 	}
 
@@ -87,7 +87,7 @@ sub close {
 
 	# we defer this to the next timer loop since close is deferred
 	if (my $cleanup = delete $self->{cleanup}) {
-		PublicInbox::EvCleanup::next_tick($cleanup);
+		PublicInbox::DS::requeue($cleanup);
 	}
 }
 
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 83970309..9973fcaf 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -50,14 +50,11 @@ sub expire_old () {
 	my $exp = $EXPTIME;
 	my $old = $now - $exp;
 	my $nr = 0;
-	my $closed = 0;
 	my %new;
 	while (my ($fd, $v) = each %$EXPMAP) {
 		my ($idle_time, $nntp) = @$v;
 		if ($idle_time < $old) {
-			if ($nntp->shutdn) {
-				$closed++;
-			} else {
+			if (!$nntp->shutdn) {
 				++$nr;
 				$new{$fd} = $v;
 			}
@@ -67,14 +64,7 @@ sub expire_old () {
 		}
 	}
 	$EXPMAP = \%new;
-	if ($nr) {
-		$expt = PublicInbox::EvCleanup::later(*expire_old);
-	} else {
-		$expt = undef;
-		# noop to kick outselves out of the loop ASAP so descriptors
-		# really get closed
-		PublicInbox::EvCleanup::asap(sub {}) if $closed;
-	}
+	$expt = PublicInbox::EvCleanup::later(*expire_old) if $nr;
 }
 
 sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };
-- 
EW


  parent reply	other threads:[~2019-06-29 19:59 UTC|newest]

Thread overview: 12+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-06-29 19:59 [PATCH 00/11] ds: more updates Eric Wong
2019-06-29 19:59 ` [PATCH 01/11] ds: share lazy rbuf handling between HTTP and NNTP Eric Wong
2019-06-29 19:59 ` [PATCH 02/11] ds: move requeue logic over from NNTP Eric Wong
2019-06-29 19:59 ` [PATCH 03/11] http: use requeue instead of watch_in1 Eric Wong
2019-06-29 19:59 ` [PATCH 04/11] listener: use edge-triggered notifications Eric Wong
2019-06-29 19:59 ` Eric Wong [this message]
2019-06-29 19:59 ` [PATCH 06/11] ds: consolidate IO::Socket::SSL checks Eric Wong
2019-06-29 19:59 ` [PATCH 07/11] http: support HTTPS (kinda) Eric Wong
2019-06-29 19:59 ` [PATCH 08/11] parentpipe: document and use one-shot wakeups Eric Wong
2019-06-29 19:59 ` [PATCH 09/11] parentpipe: make the ->close call more obvious Eric Wong
2019-06-29 19:59 ` [PATCH 10/11] httpd/async: switch to buffering-as-fast-as-possible Eric Wong
2019-06-29 19:59 ` [PATCH 11/11] http: use bigger, but shorter-lived buffers for pipes Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: http://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20190629195951.32160-6-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).