From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 9A0331FB06 for ; Sat, 18 Sep 2021 09:33:33 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 4/9] ds: support add unique timers Date: Sat, 18 Sep 2021 09:33:27 +0000 Message-Id: <20210918093332.16054-5-e@80x24.org> In-Reply-To: <20210918093332.16054-1-e@80x24.org> References: <20210918093332.16054-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: A common pattern we use is to arm a timer once and prevent it from being armed until it fires. We'll be using it more to do polling for saved searches and imports. --- lib/PublicInbox/DS.pm | 100 ++++++++++++++++---------------- lib/PublicInbox/LeiNoteEvent.pm | 5 +- 2 files changed, 52 insertions(+), 53 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index d804792b..c89c7b8b 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -32,7 +32,7 @@ use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; use Errno qw(EAGAIN EINVAL); use Carp qw(carp croak); -our @EXPORT_OK = qw(now msg_more dwaitpid add_timer); +our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer); my %Stack; my $nextq; # queue for next_tick @@ -40,7 +40,7 @@ my $wait_pids; # list of [ pid, callback, callback_arg ] my $later_q; # list of callbacks to run at some later interval my $EXPMAP; # fd -> idle_time our $EXPTIME = 180; # 3 minutes -my ($later_timer, $reap_armed, $exp_timer); +my ($reap_armed, $exp_timer); my $ToClose; # sockets to close when event loop is done our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object @@ -51,6 +51,7 @@ our ( $LoopTimeout, # timeout of event loop in milliseconds @Timers, # timers + %UniqTimer, $in_loop, ); @@ -70,6 +71,7 @@ sub Reset { $in_loop = undef; # first in case DESTROY callbacks use this %DescriptorMap = (); @Timers = (); + %UniqTimer = (); $PostLoopCallback = undef; # we may be iterating inside one of these on our stack @@ -81,9 +83,9 @@ sub Reset { $Epoll = undef; # may call DSKQXS::DESTROY } while (@Timers || keys(%Stack) || $nextq || $wait_pids || $later_q || $ToClose || keys(%DescriptorMap) || - $PostLoopCallback); + $PostLoopCallback || keys(%UniqTimer)); - $reap_armed = $later_timer = $exp_timer = undef; + $reap_armed = $exp_timer = undef; $LoopTimeout = -1; # no timeout by default } @@ -97,36 +99,33 @@ immediately. =cut sub SetLoopTimeout { $LoopTimeout = $_[1] + 0 } -=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef, $arg) >> +sub _add_named_timer { + my ($name, $secs, $coderef, @args) = @_; + my $fire_time = now() + $secs; + my $timer = [$fire_time, $name, $coderef, @args]; -Add a timer to occur $seconds from now. $seconds may be fractional, but timers -are not guaranteed to fire at the exact time you ask for. - -=cut -sub add_timer ($$;@) { - my ($secs, $coderef, @args) = @_; - - my $fire_time = now() + $secs; - - my $timer = [$fire_time, $coderef, @args]; + if (!@Timers || $fire_time >= $Timers[-1][0]) { + push @Timers, $timer; + return $timer; + } - if (!@Timers || $fire_time >= $Timers[-1][0]) { - push @Timers, $timer; - return $timer; - } + # Now, where do we insert? (NOTE: this appears slow, algorithm-wise, + # but it was compared against calendar queues, heaps, naive push/sort, + # and a bunch of other versions, and found to be fastest with a large + # variety of datasets.) + for (my $i = 0; $i < @Timers; $i++) { + if ($Timers[$i][0] > $fire_time) { + splice(@Timers, $i, 0, $timer); + return $timer; + } + } + die "Shouldn't get here."; +} - # Now, where do we insert? (NOTE: this appears slow, algorithm-wise, - # but it was compared against calendar queues, heaps, naive push/sort, - # and a bunch of other versions, and found to be fastest with a large - # variety of datasets.) - for (my $i = 0; $i < @Timers; $i++) { - if ($Timers[$i][0] > $fire_time) { - splice(@Timers, $i, 0, $timer); - return $timer; - } - } +sub add_timer { _add_named_timer(undef, @_) } - die "Shouldn't get here."; +sub add_uniq_timer { # ($name, $secs, $coderef, @args) = @_; + $UniqTimer{$_[0]} //= _add_named_timer(@_); } # keeping this around in case we support other FD types for now, @@ -184,31 +183,32 @@ sub next_tick () { # runs timers and returns milliseconds for next one, or next event loop sub RunTimers { - next_tick(); + next_tick(); - return (($nextq || $ToClose) ? 0 : $LoopTimeout) unless @Timers; + return (($nextq || $ToClose) ? 0 : $LoopTimeout) unless @Timers; - my $now = now(); + my $now = now(); - # Run expired timers - while (@Timers && $Timers[0][0] <= $now) { - my $to_run = shift(@Timers); - $to_run->[1]->(@$to_run[2..$#$to_run]); - } + # Run expired timers + while (@Timers && $Timers[0][0] <= $now) { + my $to_run = shift(@Timers); + delete $UniqTimer{$to_run->[1] // ''}; + $to_run->[2]->(@$to_run[3..$#$to_run]); + } - # timers may enqueue into nextq: - return 0 if ($nextq || $ToClose); + # timers may enqueue into nextq: + return 0 if ($nextq || $ToClose); - return $LoopTimeout unless @Timers; + return $LoopTimeout unless @Timers; - # convert time to an even number of milliseconds, adding 1 - # extra, otherwise floating point fun can occur and we'll - # call RunTimers like 20-30 times, each returning a timeout - # of 0.0000212 seconds - my $timeout = int(($Timers[0][0] - $now) * 1000) + 1; + # convert time to an even number of milliseconds, adding 1 + # extra, otherwise floating point fun can occur and we'll + # call RunTimers like 20-30 times, each returning a timeout + # of 0.0000212 seconds + my $timeout = int(($Timers[0][0] - $now) * 1000) + 1; - # -1 is an infinite timeout, so prefer a real timeout - ($LoopTimeout < 0 || $LoopTimeout >= $timeout) ? $timeout : $LoopTimeout; + # -1 is an infinite timeout, so prefer a real timeout + ($LoopTimeout < 0 || $LoopTimeout >= $timeout) ? $timeout : $LoopTimeout } sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" } @@ -660,7 +660,7 @@ sub dwaitpid ($;$$) { sub _run_later () { my $q = $later_q or return; - $later_timer = $later_q = undef; + $later_q = undef; $Stack{later_q} = $q; $_->() for @$q; delete $Stack{later_q}; @@ -668,7 +668,7 @@ sub _run_later () { sub later ($) { push @$later_q, $_[0]; # autovivifies @$later_q - $later_timer //= add_timer(60, \&_run_later); + add_uniq_timer('later', 60, \&_run_later); } sub expire_old () { diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm index c03c5319..18313359 100644 --- a/lib/PublicInbox/LeiNoteEvent.pm +++ b/lib/PublicInbox/LeiNoteEvent.pm @@ -7,8 +7,8 @@ package PublicInbox::LeiNoteEvent; use strict; use v5.10.1; use parent qw(PublicInbox::IPC); +use PublicInbox::DS; -my $flush_timer; our $to_flush; # { cfgpath => $lei } sub flush_lei ($) { @@ -20,7 +20,6 @@ sub flush_lei ($) { # we batch up writes and flush every 5s (matching Linux default # writeback behavior) since MUAs can trigger a storm of inotify events sub flush_task { # PublicInbox::DS timer callback - undef $flush_timer; my $todo = $to_flush // return; $to_flush = undef; for my $lei (values %$todo) { flush_lei($lei) } @@ -29,7 +28,7 @@ sub flush_task { # PublicInbox::DS timer callback # sets a timer to flush sub note_event_arm_done ($) { my ($lei) = @_; - $flush_timer //= PublicInbox::DS::add_timer(5, \&flush_task); + PublicInbox::DS::add_uniq_timer('flush_timer', 5, \&flush_task); $to_flush->{$lei->{cfg}->{'-f'}} //= $lei; }