about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/DS.pm41
-rw-r--r--lib/PublicInbox/EvCleanup.pm80
-rw-r--r--lib/PublicInbox/HTTP.pm6
-rw-r--r--lib/PublicInbox/HTTPD/Async.pm6
-rw-r--r--lib/PublicInbox/NNTP.pm14
5 files changed, 36 insertions, 111 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 8f1494f6..6cd527e2 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -37,7 +37,6 @@ use Errno  qw(EAGAIN EINVAL EEXIST);
 use Carp   qw(croak confess carp);
 require File::Spec;
 
-my $nextt; # timer for next_tick
 my $nextq = []; # queue for next_tick
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
@@ -101,12 +100,6 @@ Returns a timer object which you can call C<< $timer->cancel >> on if you need t
 sub AddTimer {
     my ($class, $secs, $coderef) = @_;
 
-    if (!$secs) {
-        my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer');
-        unshift(@Timers, $timer);
-        return $timer;
-    }
-
     my $fire_time = now() + $secs;
 
     my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
@@ -176,9 +169,23 @@ sub FirstTimeEventLoop {
 
 sub now () { clock_gettime(CLOCK_MONOTONIC) }
 
+sub next_tick () {
+    my $q = $nextq;
+    $nextq = [];
+    for (@$q) {
+        if (ref($_) eq 'CODE') {
+            $_->();
+        } else {
+            $_->event_step;
+        }
+    }
+}
+
 # runs timers and returns milliseconds for next one, or next event loop
 sub RunTimers {
-    return $LoopTimeout unless @Timers;
+    next_tick();
+
+    return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers;
 
     my $now = now();
 
@@ -188,6 +195,9 @@ sub RunTimers {
         $to_run->[1]->($now) if $to_run->[1];
     }
 
+    # timers may enqueue into nextq:
+    return 0 if (@$nextq || @ToClose);
+
     return $LoopTimeout unless @Timers;
 
     # convert time to an even number of milliseconds, adding 1
@@ -320,6 +330,8 @@ sub new {
 ### I N S T A N C E   M E T H O D S
 #####################################################################
 
+sub requeue ($) { push @$nextq, $_[0] }
+
 =head2 C<< $obj->close >>
 
 Close the socket.
@@ -593,19 +605,6 @@ sub shutdn ($) {
         $self->close;
     }
 }
-
-sub next_tick () {
-        $nextt = undef;
-        my $q = $nextq;
-        $nextq = [];
-        $_->event_step for @$q;
-}
-
-sub requeue ($) {
-        push @$nextq, $_[0];
-        $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
-}
-
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm
index 33b54ebc..be6672ed 100644
--- a/lib/PublicInbox/EvCleanup.pm
+++ b/lib/PublicInbox/EvCleanup.pm
@@ -1,80 +1,23 @@
-# Copyright (C) 2016-2018 all contributors <meta@public-inbox.org>
+# Copyright (C) 2016-2019 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# event cleanups (currently for PublicInbox::DS)
+# event cleanups (for PublicInbox::DS)
 package PublicInbox::EvCleanup;
 use strict;
 use warnings;
-use base qw(PublicInbox::DS);
-use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+require PublicInbox::DS;
 
+# this only runs under public-inbox-{httpd/nntpd}, not generic PSGI servers
 my $ENABLED;
 sub enabled { $ENABLED }
 sub enable { $ENABLED = 1 }
-my $singleton;
-my $asapq = [ [], undef ];
-my $nextq = [ [], undef ];
 my $laterq = [ [], undef ];
 
-sub once_init () {
-        my $self = fields::new('PublicInbox::EvCleanup');
-        my ($r, $w);
-
-        # This is a dummy pipe which is always writable so it can always
-        # fires in the next event loop iteration.
-        pipe($r, $w) or die "pipe: $!";
-        fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ
-        $self->SUPER::new($w, 0);
-
-        # always writable, since PublicInbox::EvCleanup::event_step
-        # never drains wbuf.  We can avoid wasting a hash slot by
-        # stuffing the read-end of the pipe into the never-to-be-touched
-        # wbuf
-        $self->{wbuf} = $r;
-        $self;
-}
-
-sub _run_all ($) {
-        my ($q) = @_;
-
-        my $run = $q->[0];
-        $q->[0] = [];
-        $q->[1] = undef;
-        $_->() foreach @$run;
-}
-
-# ensure PublicInbox::DS::ToClose processing after timers fire
-sub _asap_close () { $asapq->[1] ||= _asap_timer() }
-
-# Called by PublicInbox::DS
-sub event_step { _run_all($asapq) }
-
-sub _run_next () {
-        _run_all($nextq);
-        _asap_close();
-}
-
 sub _run_later () {
-        _run_all($laterq);
-        _asap_close();
-}
-
-sub _asap_timer () {
-        $singleton ||= once_init();
-        $singleton->watch(EPOLLOUT|EPOLLONESHOT);
-        1;
-}
-
-sub asap ($) {
-        my ($cb) = @_;
-        push @{$asapq->[0]}, $cb;
-        $asapq->[1] ||= _asap_timer();
-}
-
-sub next_tick ($) {
-        my ($cb) = @_;
-        push @{$nextq->[0]}, $cb;
-        $nextq->[1] ||= PublicInbox::DS->AddTimer(0, *_run_next);
+        my $run = $laterq->[0];
+        $laterq->[0] = [];
+        $laterq->[1] = undef;
+        $_->() foreach @$run;
 }
 
 sub later ($) {
@@ -83,10 +26,5 @@ sub later ($) {
         $laterq->[1] ||= PublicInbox::DS->AddTimer(60, *_run_later);
 }
 
-END {
-        event_step();
-        _run_all($nextq);
-        _run_all($laterq);
-}
-
+END { _run_later() }
 1;
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 856b8959..b8912950 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -30,10 +30,8 @@ use constant {
 use Errno qw(EAGAIN);
 
 my $pipelineq = [];
-my $pipet;
 sub process_pipelineq () {
         my $q = $pipelineq;
-        $pipet = undef;
         $pipelineq = [];
         foreach (@$q) {
                 next unless $_->{sock};
@@ -238,8 +236,8 @@ sub next_request ($) {
         my ($self) = @_;
         if ($self->{rbuf}) {
                 # avoid recursion for pipelined requests
+                PublicInbox::DS::requeue(\&process_pipelineq) if !@$pipelineq;
                 push @$pipelineq, $self;
-                $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
         } else { # wait for next request
                 $self->requeue;
         }
@@ -269,7 +267,7 @@ sub getline_cb ($$$) {
                                 if ($self->{wbuf}) {
                                         $self->write($next);
                                 } else {
-                                        PublicInbox::EvCleanup::asap($next);
+                                        PublicInbox::DS::requeue($next);
                                 }
                                 return;
                         }
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index b46baeb2..35d17150 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -19,8 +19,8 @@ sub new {
         # no $io? call $cb at the top of the next event loop to
         # avoid recursion:
         unless (defined($io)) {
-                PublicInbox::EvCleanup::asap($cb) if $cb;
-                PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+                PublicInbox::DS::requeue($cb);
+                die 'cleanup unsupported w/o $io' if $cleanup;
                 return;
         }
 
@@ -87,7 +87,7 @@ sub close {
 
         # we defer this to the next timer loop since close is deferred
         if (my $cleanup = delete $self->{cleanup}) {
-                PublicInbox::EvCleanup::next_tick($cleanup);
+                PublicInbox::DS::requeue($cleanup);
         }
 }
 
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 83970309..9973fcaf 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -50,14 +50,11 @@ sub expire_old () {
         my $exp = $EXPTIME;
         my $old = $now - $exp;
         my $nr = 0;
-        my $closed = 0;
         my %new;
         while (my ($fd, $v) = each %$EXPMAP) {
                 my ($idle_time, $nntp) = @$v;
                 if ($idle_time < $old) {
-                        if ($nntp->shutdn) {
-                                $closed++;
-                        } else {
+                        if (!$nntp->shutdn) {
                                 ++$nr;
                                 $new{$fd} = $v;
                         }
@@ -67,14 +64,7 @@ sub expire_old () {
                 }
         }
         $EXPMAP = \%new;
-        if ($nr) {
-                $expt = PublicInbox::EvCleanup::later(*expire_old);
-        } else {
-                $expt = undef;
-                # noop to kick outselves out of the loop ASAP so descriptors
-                # really get closed
-                PublicInbox::EvCleanup::asap(sub {}) if $closed;
-        }
+        $expt = PublicInbox::EvCleanup::later(*expire_old) if $nr;
 }
 
 sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };