diff options
-rw-r--r-- | lib/PublicInbox/EvCleanup.pm | 41 | ||||
-rw-r--r-- | lib/PublicInbox/HTTP.pm | 26 | ||||
-rw-r--r-- | lib/PublicInbox/HTTPD/Async.pm | 3 | ||||
-rw-r--r-- | lib/PublicInbox/NNTP.pm | 30 |
4 files changed, 69 insertions, 31 deletions
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm new file mode 100644 index 00000000..5efb0930 --- /dev/null +++ b/lib/PublicInbox/EvCleanup.pm @@ -0,0 +1,41 @@ +# Copyright (C) 2016 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# event cleanups (currently for Danga::Socket) +package PublicInbox::EvCleanup; +use strict; +use warnings; + +my $asapq = { queue => [], timer => undef }; +my $laterq = { queue => [], timer => undef }; + +sub _run_all ($) { + my ($q) = @_; + + my $run = $q->{queue}; + $q->{queue} = []; + $q->{timer} = undef; + $_->() foreach @$run; +} + +sub _run_asap () { _run_all($asapq) } +sub _run_later () { _run_all($laterq) } + +sub asap ($) { + my ($cb) = @_; + push @{$asapq->{queue}}, $cb; + $asapq->{timer} ||= Danga::Socket->AddTimer(0, *_run_asap); +} + +sub later ($) { + my ($cb) = @_; + push @{$laterq->{queue}}, $cb; + $laterq->{timer} ||= Danga::Socket->AddTimer(60, *_run_later); +} + +END { + _run_asap(); + _run_later(); +} + +1; diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 104a2132..00c9a044 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -26,13 +26,22 @@ use constant { # FIXME: duplicated code with NNTP.pm my $WEAKEN = {}; # string(inbox) -> inbox -my $WEAKTIMER; +my $weakt; sub weaken_task () { - $WEAKTIMER = undef; + $weakt = undef; $_->weaken_all for values %$WEAKEN; $WEAKEN = {}; } +my $pipelineq = []; +my $pipet; +sub process_pipelineq () { + my $q = $pipelineq; + $pipet = undef; + $pipelineq = []; + rbuf_process($_) foreach @$q; +} + # Use the same configuration parameter as git since this is primarily # a slow-client sponge for git-http-backend # TODO: support per-respository http.maxRequestBuffer somehow... @@ -234,7 +243,7 @@ sub response_write { if (my $obj = $env->{'pi-httpd.inbox'}) { # grace period for reaping resources $WEAKEN->{"$obj"} = $obj; - $WEAKTIMER ||= Danga::Socket->AddTimer(60, *weaken_task); + $weakt ||= PublicInbox::EvCleanup::later(*weaken_task); } $self->{env} = undef; }; @@ -281,15 +290,6 @@ sub more ($$) { $self->write($_[1]); } -my $pipelineq = []; -my $next_tick; -sub process_pipelineq () { - $next_tick = undef; - my $q = $pipelineq; - $pipelineq = []; - rbuf_process($_) foreach @$q; -} - # overrides existing Danga::Socket method sub event_write { my ($self) = @_; @@ -300,7 +300,7 @@ sub event_write { $self->watch_read(1); } else { # avoid recursion for pipelined requests push @$pipelineq, $self; - $next_tick ||= Danga::Socket->AddTimer(0, *process_pipelineq); + $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq); } } diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index bd2eacbf..47ba27d2 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -10,6 +10,7 @@ use strict; use warnings; use base qw(Danga::Socket); use fields qw(cb cleanup); +require PublicInbox::EvCleanup; sub new { my ($class, $io, $cb, $cleanup) = @_; @@ -61,7 +62,7 @@ sub close { $self->SUPER::close(@_); # we defer this to the next timer loop since close is deferred - Danga::Socket->AddTimer(0, $cleanup) if $cleanup; + PublicInbox::EvCleanup::asap($cleanup) if $cleanup; } # do not let ourselves be closed during graceful termination diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index ac536f71..f3de4b1c 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -11,6 +11,7 @@ use PublicInbox::Search; use PublicInbox::Msgmap; use PublicInbox::Git; use PublicInbox::MID qw(mid2path); +require PublicInbox::EvCleanup; use Email::Simple; use POSIX qw(strftime); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); @@ -38,15 +39,15 @@ my $LIST_HEADERS = join("\r\n", @OVERVIEW, my %DISABLED; # = map { $_ => 1 } qw(xover list_overview_fmt newnews xhdr); my $EXPMAP; # fd -> [ idle_time, $self ] -my $EXPTIMER; +my $expt; our $EXPTIME = 180; # 3 minutes my $WEAKEN = {}; # string(nntpd) -> nntpd -my $WEAKTIMER; +my $weakt; +my $nextt; -my $next_tick; my $nextq = []; sub next_tick () { - $next_tick = undef; + $nextt = undef; my $q = $nextq; $nextq = []; foreach my $nntp (@$q) { @@ -70,7 +71,7 @@ sub update_idle_time ($) { # reduce FD pressure by closing some "git cat-file --batch" processes # and unused FDs for msgmap and Xapian indices sub weaken_groups () { - $WEAKTIMER = undef; + $weakt = undef; foreach my $nntpd (values %$WEAKEN) { $_->weaken_all foreach (@{$nntpd->{grouplist}}); } @@ -81,7 +82,6 @@ sub expire_old () { my $now = now(); my $exp = $EXPTIME; my $old = $now - $exp; - my $next = $now + $exp; my $nr = 0; my %new; while (my ($fd, $v) = each %$EXPMAP) { @@ -89,26 +89,22 @@ sub expire_old () { if ($idle_time < $old) { $nntp->close; # idempotent } else { - my $nexp = $idle_time + $exp; - $next = $nexp if ($nexp < $next); ++$nr; $new{$fd} = $v; } } $EXPMAP = \%new; if ($nr) { - $next -= $now; - $next = 0 if $next < 0; - $EXPTIMER = Danga::Socket->AddTimer($next, *expire_old); + $expt = PublicInbox::EvCleanup::later(*expire_old); weaken_groups(); } else { - $EXPTIMER = undef; + $expt = undef; # noop to kick outselves out of the loop ASAP so descriptors # really get closed - Danga::Socket->AddTimer(0, sub {}); + PublicInbox::EvCleanup::asap(sub {}); # grace period for reaping resources - $WEAKTIMER ||= Danga::Socket->AddTimer(30, *weaken_groups); + $weakt ||= PublicInbox::EvCleanup::later(*weaken_groups); } } @@ -122,7 +118,7 @@ sub new ($$$) { $self->watch_read(1); update_idle_time($self); $WEAKEN->{"$nntpd"} = $nntpd; - $EXPTIMER ||= Danga::Socket->AddTimer($EXPTIME, *expire_old); + $expt ||= PublicInbox::EvCleanup::later(*expire_old); $self; } @@ -633,7 +629,7 @@ sub long_response ($$$$) { update_idle_time($self); push @$nextq, $self; - $next_tick ||= Danga::Socket->AddTimer(0, *next_tick); + $nextt ||= PublicInbox::EvCleanup::asap(*next_tick); } else { # all done! $self->{long_res} = undef; $self->watch_read(1); @@ -996,7 +992,7 @@ sub watch_read { # in case we really did dispatch a read event and started # another long response. push @$nextq, $self; - $next_tick ||= Danga::Socket->AddTimer(0, *next_tick); + $nextt ||= PublicInbox::EvCleanup::asap(*next_tick); } $rv; } |