about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/EvCleanup.pm41
-rw-r--r--lib/PublicInbox/HTTP.pm26
-rw-r--r--lib/PublicInbox/HTTPD/Async.pm3
-rw-r--r--lib/PublicInbox/NNTP.pm30
4 files changed, 69 insertions, 31 deletions
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm
new file mode 100644
index 00000000..5efb0930
--- /dev/null
+++ b/lib/PublicInbox/EvCleanup.pm
@@ -0,0 +1,41 @@
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# event cleanups (currently for Danga::Socket)
+package PublicInbox::EvCleanup;
+use strict;
+use warnings;
+
+my $asapq = { queue => [], timer => undef };
+my $laterq = { queue => [], timer => undef };
+
+sub _run_all ($) {
+        my ($q) = @_;
+
+        my $run = $q->{queue};
+        $q->{queue} = [];
+        $q->{timer} = undef;
+        $_->() foreach @$run;
+}
+
+sub _run_asap () { _run_all($asapq) }
+sub _run_later () { _run_all($laterq) }
+
+sub asap ($) {
+        my ($cb) = @_;
+        push @{$asapq->{queue}}, $cb;
+        $asapq->{timer} ||= Danga::Socket->AddTimer(0, *_run_asap);
+}
+
+sub later ($) {
+        my ($cb) = @_;
+        push @{$laterq->{queue}}, $cb;
+        $laterq->{timer} ||= Danga::Socket->AddTimer(60, *_run_later);
+}
+
+END {
+        _run_asap();
+        _run_later();
+}
+
+1;
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 104a2132..00c9a044 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -26,13 +26,22 @@ use constant {
 
 # FIXME: duplicated code with NNTP.pm
 my $WEAKEN = {}; # string(inbox) -> inbox
-my $WEAKTIMER;
+my $weakt;
 sub weaken_task () {
-        $WEAKTIMER = undef;
+        $weakt = undef;
         $_->weaken_all for values %$WEAKEN;
         $WEAKEN = {};
 }
 
+my $pipelineq = [];
+my $pipet;
+sub process_pipelineq () {
+        my $q = $pipelineq;
+        $pipet = undef;
+        $pipelineq = [];
+        rbuf_process($_) foreach @$q;
+}
+
 # Use the same configuration parameter as git since this is primarily
 # a slow-client sponge for git-http-backend
 # TODO: support per-respository http.maxRequestBuffer somehow...
@@ -234,7 +243,7 @@ sub response_write {
                 if (my $obj = $env->{'pi-httpd.inbox'}) {
                         # grace period for reaping resources
                         $WEAKEN->{"$obj"} = $obj;
-                        $WEAKTIMER ||= Danga::Socket->AddTimer(60, *weaken_task);
+                        $weakt ||= PublicInbox::EvCleanup::later(*weaken_task);
                 }
                 $self->{env} = undef;
         };
@@ -281,15 +290,6 @@ sub more ($$) {
         $self->write($_[1]);
 }
 
-my $pipelineq = [];
-my $next_tick;
-sub process_pipelineq () {
-        $next_tick = undef;
-        my $q = $pipelineq;
-        $pipelineq = [];
-        rbuf_process($_) foreach @$q;
-}
-
 # overrides existing Danga::Socket method
 sub event_write {
         my ($self) = @_;
@@ -300,7 +300,7 @@ sub event_write {
                 $self->watch_read(1);
         } else { # avoid recursion for pipelined requests
                 push @$pipelineq, $self;
-                $next_tick ||= Danga::Socket->AddTimer(0, *process_pipelineq);
+                $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
         }
 }
 
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index bd2eacbf..47ba27d2 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -10,6 +10,7 @@ use strict;
 use warnings;
 use base qw(Danga::Socket);
 use fields qw(cb cleanup);
+require PublicInbox::EvCleanup;
 
 sub new {
         my ($class, $io, $cb, $cleanup) = @_;
@@ -61,7 +62,7 @@ sub close {
         $self->SUPER::close(@_);
 
         # we defer this to the next timer loop since close is deferred
-        Danga::Socket->AddTimer(0, $cleanup) if $cleanup;
+        PublicInbox::EvCleanup::asap($cleanup) if $cleanup;
 }
 
 # do not let ourselves be closed during graceful termination
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index ac536f71..f3de4b1c 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -11,6 +11,7 @@ use PublicInbox::Search;
 use PublicInbox::Msgmap;
 use PublicInbox::Git;
 use PublicInbox::MID qw(mid2path);
+require PublicInbox::EvCleanup;
 use Email::Simple;
 use POSIX qw(strftime);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
@@ -38,15 +39,15 @@ my $LIST_HEADERS = join("\r\n", @OVERVIEW,
 my %DISABLED; # = map { $_ => 1 } qw(xover list_overview_fmt newnews xhdr);
 
 my $EXPMAP; # fd -> [ idle_time, $self ]
-my $EXPTIMER;
+my $expt;
 our $EXPTIME = 180; # 3 minutes
 my $WEAKEN = {}; # string(nntpd) -> nntpd
-my $WEAKTIMER;
+my $weakt;
+my $nextt;
 
-my $next_tick;
 my $nextq = [];
 sub next_tick () {
-        $next_tick = undef;
+        $nextt = undef;
         my $q = $nextq;
         $nextq = [];
         foreach my $nntp (@$q) {
@@ -70,7 +71,7 @@ sub update_idle_time ($) {
 # reduce FD pressure by closing some "git cat-file --batch" processes
 # and unused FDs for msgmap and Xapian indices
 sub weaken_groups () {
-        $WEAKTIMER = undef;
+        $weakt = undef;
         foreach my $nntpd (values %$WEAKEN) {
                 $_->weaken_all foreach (@{$nntpd->{grouplist}});
         }
@@ -81,7 +82,6 @@ sub expire_old () {
         my $now = now();
         my $exp = $EXPTIME;
         my $old = $now - $exp;
-        my $next = $now + $exp;
         my $nr = 0;
         my %new;
         while (my ($fd, $v) = each %$EXPMAP) {
@@ -89,26 +89,22 @@ sub expire_old () {
                 if ($idle_time < $old) {
                         $nntp->close; # idempotent
                 } else {
-                        my $nexp = $idle_time + $exp;
-                        $next = $nexp if ($nexp < $next);
                         ++$nr;
                         $new{$fd} = $v;
                 }
         }
         $EXPMAP = \%new;
         if ($nr) {
-                $next -= $now;
-                $next = 0 if $next < 0;
-                $EXPTIMER = Danga::Socket->AddTimer($next, *expire_old);
+                $expt = PublicInbox::EvCleanup::later(*expire_old);
                 weaken_groups();
         } else {
-                $EXPTIMER = undef;
+                $expt = undef;
                 # noop to kick outselves out of the loop ASAP so descriptors
                 # really get closed
-                Danga::Socket->AddTimer(0, sub {});
+                PublicInbox::EvCleanup::asap(sub {});
 
                 # grace period for reaping resources
-                $WEAKTIMER ||= Danga::Socket->AddTimer(30, *weaken_groups);
+                $weakt ||= PublicInbox::EvCleanup::later(*weaken_groups);
         }
 }
 
@@ -122,7 +118,7 @@ sub new ($$$) {
         $self->watch_read(1);
         update_idle_time($self);
         $WEAKEN->{"$nntpd"} = $nntpd;
-        $EXPTIMER ||= Danga::Socket->AddTimer($EXPTIME, *expire_old);
+        $expt ||= PublicInbox::EvCleanup::later(*expire_old);
         $self;
 }
 
@@ -633,7 +629,7 @@ sub long_response ($$$$) {
                         update_idle_time($self);
 
                         push @$nextq, $self;
-                        $next_tick ||= Danga::Socket->AddTimer(0, *next_tick);
+                        $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
                 } else { # all done!
                         $self->{long_res} = undef;
                         $self->watch_read(1);
@@ -996,7 +992,7 @@ sub watch_read {
                 # in case we really did dispatch a read event and started
                 # another long response.
                 push @$nextq, $self;
-                $next_tick ||= Danga::Socket->AddTimer(0, *next_tick);
+                $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
         }
         $rv;
 }