diff options
author | Eric Wong <e@80x24.org> | 2016-06-24 23:39:58 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2016-06-24 23:40:36 +0000 |
commit | 59acf1776b0b4ca558ecae7dd3a36285b35a19fc (patch) | |
tree | 6e464531f3815098bb0572d2c274fe74e3861ac4 /lib/PublicInbox | |
parent | 758fd015e2e0fe45b3606b364baf9bd8878417b8 (diff) | |
download | public-inbox-59acf1776b0b4ca558ecae7dd3a36285b35a19fc.tar.gz |
Instead of relying on a timer with immediate callback, arm a pipe to watch for writability, ensuring the callback always fires.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/EvCleanup.pm | 42 |
1 files changed, 33 insertions, 9 deletions
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm index 5efb0930..61837b89 100644 --- a/lib/PublicInbox/EvCleanup.pm +++ b/lib/PublicInbox/EvCleanup.pm @@ -5,32 +5,56 @@ package PublicInbox::EvCleanup; use strict; use warnings; +use base qw(Danga::Socket); +use fields qw(rd); +my $singleton; +my $asapq = [ [], undef ]; +my $laterq = [ [], undef ]; -my $asapq = { queue => [], timer => undef }; -my $laterq = { queue => [], timer => undef }; +sub once_init () { + my $self = fields::new('PublicInbox::EvCleanup'); + my ($r, $w); + pipe($r, $w) or die "pipe: $!"; + $self->SUPER::new($w); + $self->{rd} = $r; # never read, since we never write.. + $self; +} sub _run_all ($) { my ($q) = @_; - my $run = $q->{queue}; - $q->{queue} = []; - $q->{timer} = undef; + my $run = $q->[0]; + $q->[0] = []; + $q->[1] = undef; $_->() foreach @$run; } sub _run_asap () { _run_all($asapq) } sub _run_later () { _run_all($laterq) } +# Called by Danga::Socket +sub event_write { + my ($self) = @_; + $self->watch_write(0); + _run_asap(); +} + +sub _asap_timer () { + $singleton ||= once_init(); + $singleton->watch_write(1); + 1; +} + sub asap ($) { my ($cb) = @_; - push @{$asapq->{queue}}, $cb; - $asapq->{timer} ||= Danga::Socket->AddTimer(0, *_run_asap); + push @{$asapq->[0]}, $cb; + $asapq->[1] ||= _asap_timer(); } sub later ($) { my ($cb) = @_; - push @{$laterq->{queue}}, $cb; - $laterq->{timer} ||= Danga::Socket->AddTimer(60, *_run_later); + push @{$laterq->[0]}, $cb; + $laterq->[1] ||= Danga::Socket->AddTimer(60, *_run_later); } END { |