diff options
-rw-r--r-- | lib/PublicInbox/CodeSearchIdx.pm | 29 | ||||
-rw-r--r-- | lib/PublicInbox/DS.pm | 30 | ||||
-rw-r--r-- | lib/PublicInbox/Daemon.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/IPC.pm | 7 | ||||
-rw-r--r-- | lib/PublicInbox/LEI.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/Watch.pm | 2 | ||||
-rw-r--r-- | t/dir_idle.t | 6 | ||||
-rw-r--r-- | t/ds-leak.t | 8 | ||||
-rw-r--r-- | t/imapd.t | 6 | ||||
-rw-r--r-- | t/nntpd.t | 2 | ||||
-rw-r--r-- | t/sigfd.t | 7 | ||||
-rw-r--r-- | t/watch_maildir.t | 8 | ||||
-rw-r--r-- | xt/mem-imapd-tls.t | 7 | ||||
-rw-r--r-- | xt/mem-nntpd-tls.t | 8 | ||||
-rw-r--r-- | xt/net_writer-imap.t | 4 |
16 files changed, 64 insertions, 68 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 13fe1c28..587f0b81 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -246,12 +246,18 @@ sub run_todo ($) { $n; } +sub need_reap { # post_loop_do + my (undef, $jobs) = @_; + scalar(keys(%$LIVE)) > $jobs; +} + sub cidx_reap ($$) { my ($self, $jobs) = @_; while (run_todo($self)) {} - my $cb = sub { keys(%$LIVE) > $jobs }; - PublicInbox::DS->SetPostLoopCallback($cb); - PublicInbox::DS::event_loop($MY_SIG, $SIGSET) while $cb->(); + local @PublicInbox::DS::post_loop_do = \(&need_reap, $jobs); + while (need_reap(undef, $jobs)) { + PublicInbox::DS::event_loop($MY_SIG, $SIGSET); + } while (!$jobs && run_todo($self)) {} } @@ -397,6 +403,11 @@ sub shard_commit { # via wq_io_do send($op_p, "shard_done $n", MSG_EOR); } +sub consumers_open { # post_loop_do + my (undef, $consumers) = @_; + scalar(grep { $_->{sock} } values %$consumers); +} + sub commit_used_shards ($$$) { my ($self, $git, $consumers) = @_; local $self->{-shard_ok} = {}; @@ -406,9 +417,7 @@ sub commit_used_shards ($$$) { $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n); $consumers->{$n} = $c; } - PublicInbox::DS->SetPostLoopCallback(sub { - scalar(grep { $_->{sock} } values %$consumers); - }); + local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers); PublicInbox::DS::event_loop($MY_SIG, $SIGSET); my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers; die "E: $git->{git_dir} $n shards failed" if $n; @@ -437,9 +446,7 @@ sub index_repo { # cidx_await cb $CONSUMERS{$n} = $c; } @shard_in = (); - PublicInbox::DS->SetPostLoopCallback(sub { - scalar(grep { $_->{sock} } values %CONSUMERS); - }); + local @PublicInbox::DS::post_loop_do = (\&consumers_open, \%CONSUMERS); PublicInbox::DS::event_loop($MY_SIG, $SIGSET); my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS; die "E: $git->{git_dir} $n shards failed" if $n; @@ -523,7 +530,7 @@ sub scan_git_dirs ($) { cidx_reap($self, 0); } -sub shards_active { # PostLoopCallback +sub shards_active { # post_loop_do scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); } @@ -572,7 +579,7 @@ sub cidx_run { # main entry point $s->wq_close; } - PublicInbox::DS->SetPostLoopCallback(\&shards_active); + local @PublicInbox::DS::post_loop_do = (\&shards_active); PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active(); $self->lock_release(!!$self->{nchange}); } diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index b6eaf2d7..340086fc 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -44,7 +44,7 @@ our ( $Epoll, # Global epoll fd (or DSKQXS ref) $ep_io, # IO::Handle for Epoll - $PostLoopCallback, # subref to call at the end of each loop, if defined (global) + @post_loop_do, # subref + args to call at the end of each loop $LoopTimeout, # timeout of event loop in milliseconds @Timers, # timers @@ -69,7 +69,7 @@ sub Reset { %DescriptorMap = (); @Timers = (); %UniqTimer = (); - $PostLoopCallback = undef; + @post_loop_do = (); # we may be iterating inside one of these on our stack my @q = delete @Stack{keys %Stack}; @@ -79,7 +79,7 @@ sub Reset { $Epoll = undef; # may call DSKQXS::DESTROY } while (@Timers || keys(%Stack) || $nextq || $AWAIT_PIDS || $ToClose || keys(%DescriptorMap) || - $PostLoopCallback || keys(%UniqTimer)); + @post_loop_do || keys(%UniqTimer)); $reap_armed = undef; $LoopTimeout = -1; # no timeout by default @@ -247,11 +247,13 @@ sub PostEventLoop () { } # by default we keep running, unless a postloop callback cancels it - $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1; + @post_loop_do ? $post_loop_do[0]->(\%DescriptorMap, + @post_loop_do[1..$#post_loop_do]) + : 1 } # Start processing IO events. In most daemon programs this never exits. See -# C<PostLoopCallback> for how to exit the loop. +# C<post_loop_do> for how to exit the loop. sub event_loop (;$$) { my ($sig, $oldset) = @_; $Epoll //= _InitPoller(); @@ -287,24 +289,6 @@ sub event_loop (;$$) { } while (PostEventLoop()); } -=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >> - -Sets post loop callback function. Pass a subref and it will be -called every time the event loop finishes. - -Return 1 (or any true value) from the sub to make the loop continue, 0 or false -and it will exit. - -The callback function will be passed two parameters: \%DescriptorMap - -=cut -sub SetPostLoopCallback { - my ($class, $ref) = @_; - - # global callback - $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; -} - ##################################################################### ### PublicInbox::DS-the-object code ##################################################################### diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index 17e799ca..6152a5d3 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -350,7 +350,7 @@ sub worker_quit { # $_[0] = signal name or number (unused) my $proc_name; my $warn = 0; # drop idle connections and try to quit gracefully - PublicInbox::DS->SetPostLoopCallback(sub { + @PublicInbox::DS::post_loop_do = (sub { my ($dmap, undef) = @_; my $n = 0; my $now = now(); diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 401b18d0..5445b156 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -1388,7 +1388,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop my $quit = PublicInbox::SearchIdx::quit_cb($sync); $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit; local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock - PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} }); + local @PublicInbox::DS::post_loop_do = (sub { !$sync->{quit} }); $pr->("initial scan complete, entering event loop\n") if $pr; # calls InboxIdle->event_step: PublicInbox::DS::event_loop($sig, $oldset); diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 730f2cf6..da534aa7 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -245,11 +245,16 @@ sub recv_and_run { $n; } +sub sock_defined { + my (undef, $wqw) = @_; + defined($wqw->{sock}); +} + sub wq_worker_loop ($$) { my ($self, $bcast2) = @_; my $wqw = PublicInbox::WQWorker->new($self, $self->{-wq_s2}); PublicInbox::WQWorker->new($self, $bcast2) if $bcast2; - PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} }); + local @PublicInbox::DS::post_loop_do = (\&sock_defined, $wqw); PublicInbox::DS::event_loop(); PublicInbox::DS->Reset; } diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index b83de91d..eb9799f6 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1325,11 +1325,11 @@ sub lazy_start { }; require PublicInbox::DirIdle; local $dir_idle = PublicInbox::DirIdle->new(sub { - # just rely on wakeup to hit PostLoopCallback set below + # just rely on wakeup to hit post_loop_do dir_idle_handler($_[0]) if $_[0]->fullname ne $path; }); $dir_idle->add_watches([$sock_dir]); - PublicInbox::DS->SetPostLoopCallback(sub { + local @PublicInbox::DS::post_loop_do = (sub { my ($dmap, undef) = @_; if (@st = defined($path) ? stat($path) : ()) { if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) { diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm index d9aadf82..8482100c 100644 --- a/lib/PublicInbox/Watch.pm +++ b/lib/PublicInbox/Watch.pm @@ -536,7 +536,7 @@ sub watch { # main entry point add_timer(0, \&poll_fetch_fork, $self, $intvl, $uris); } watch_fs_init($self) if $self->{mdre}; - PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done }); + local @PublicInbox::DS::post_loop_do = (sub { !$self->quit_done }); PublicInbox::DS::event_loop($sig, $oldset); # calls ->event_step _done_for_now($self); } diff --git a/t/dir_idle.t b/t/dir_idle.t index 19e54967..50e1dd27 100644 --- a/t/dir_idle.t +++ b/t/dir_idle.t @@ -1,7 +1,7 @@ #!perl -w -# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> -use v5.10.1; use strict; use PublicInbox::TestCommon; +use v5.12; use strict; use PublicInbox::TestCommon; use PublicInbox::DS qw(now); use File::Path qw(make_path); use_ok 'PublicInbox::DirIdle'; @@ -13,7 +13,7 @@ my $di = PublicInbox::DirIdle->new($cb); $di->add_watches(["$tmpdir/a", "$tmpdir/c"], 1); PublicInbox::DS->SetLoopTimeout(1000); my $end = 3 + now; -PublicInbox::DS->SetPostLoopCallback(sub { scalar(@x) == 0 && now < $end }); +local @PublicInbox::DS::post_loop_do = (sub { scalar(@x) == 0 && now < $end }); tick(0.011); rmdir("$tmpdir/a/b") or xbail "rmdir $!"; PublicInbox::DS::event_loop(); diff --git a/t/ds-leak.t b/t/ds-leak.t index 4e8d76cd..eaca05b8 100644 --- a/t/ds-leak.t +++ b/t/ds-leak.t @@ -1,9 +1,9 @@ -# Copyright (C) 2019-2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # Licensed the same as Danga::Socket (and Perl5) # License: GPL-1.0+ or Artistic-1.0-Perl # <https://www.gnu.org/licenses/gpl-1.0.txt> # <https://dev.perl.org/licenses/artistic.html> -use strict; use v5.10.1; use PublicInbox::TestCommon; +use v5.12; use PublicInbox::TestCommon; use_ok 'PublicInbox::DS'; if ('close-on-exec for epoll and kqueue') { @@ -12,7 +12,7 @@ if ('close-on-exec for epoll and kqueue') { my $evfd_re = qr/(?:kqueue|eventpoll)/i; PublicInbox::DS->SetLoopTimeout(0); - PublicInbox::DS->SetPostLoopCallback(sub { 0 }); + local @PublicInbox::DS::post_loop_do = (sub { 0 }); # make sure execve closes if we're using fork() my ($r, $w); @@ -55,7 +55,7 @@ SKIP: { my $cb = sub {}; for my $i (0..$n) { PublicInbox::DS->SetLoopTimeout(0); - PublicInbox::DS->SetPostLoopCallback($cb); + local @PublicInbox::DS::post_loop_do = ($cb); PublicInbox::DS::event_loop(); PublicInbox::DS->Reset; } @@ -457,7 +457,7 @@ SKIP: { my $cfg = PublicInbox::Config->new; PublicInbox::DS->Reset; my $ii = PublicInbox::InboxIdle->new($cfg); - my $cb = sub { PublicInbox::DS->SetPostLoopCallback(sub {}) }; + my $cb = sub { @PublicInbox::DS::post_loop_do = (sub {}) }; my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup'; $cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) }); my $watcherr = "$tmpdir/watcherr"; @@ -476,7 +476,7 @@ SKIP: { 'delivered a message for IDLE to kick -watch') or diag "mda error \$?=$?"; diag 'waiting for IMAP IDLE wakeup'; - PublicInbox::DS->SetPostLoopCallback(undef); + @PublicInbox::DS::post_loop_do = (); PublicInbox::DS::event_loop(); diag 'inbox unlocked on IDLE wakeup'; @@ -493,7 +493,7 @@ SKIP: { 'delivered a message for -watch PollInterval'); diag 'waiting for PollInterval wakeup'; - PublicInbox::DS->SetPostLoopCallback(undef); + @PublicInbox::DS::post_loop_do = (); PublicInbox::DS::event_loop(); diag 'inbox unlocked (poll)'; $w->kill; @@ -436,7 +436,7 @@ sub test_watch { my $cfg = PublicInbox::Config->new; PublicInbox::DS->Reset; my $ii = PublicInbox::InboxIdle->new($cfg); - my $cb = sub { PublicInbox::DS->SetPostLoopCallback(sub {}) }; + my $cb = sub { @PublicInbox::DS::post_loop_do = (sub {}) }; my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup'; $cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) }); my $watcherr = "$tmpdir/watcherr"; @@ -1,5 +1,6 @@ -# Copyright (C) 2019-2021 all contributors <meta@public-inbox.org> -use strict; +#!perl -w +# Copyright (C) all contributors <meta@public-inbox.org> +use v5.12; use Test::More; use IO::Handle; use POSIX qw(:signal_h); @@ -46,7 +47,7 @@ SKIP: { is($nbsig->wait_once, undef, 'nonblocking ->wait_once'); ok($! == Errno::EAGAIN, 'got EAGAIN'); kill('HUP', $$) or die "kill $!"; - PublicInbox::DS->SetPostLoopCallback(sub {}); # loop once + local @PublicInbox::DS::post_loop_do = (sub {}); # loop once PublicInbox::DS::event_loop(); is($hit->{HUP}->{sigfd}, 2, 'HUP sigfd fired in event loop') or diag explain($hit); # sometimes fails on FreeBSD 11.x diff --git a/t/watch_maildir.t b/t/watch_maildir.t index e0719f54..04a1c959 100644 --- a/t/watch_maildir.t +++ b/t/watch_maildir.t @@ -1,7 +1,7 @@ -# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org> +#!perl -w +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> -use strict; -use Test::More; +use v5.12; use PublicInbox::Eml; use Cwd; use PublicInbox::Config; @@ -170,7 +170,7 @@ More majordomo info at http://vger.kernel.org/majordomo-info.html\n); my $ii = PublicInbox::InboxIdle->new($cfg); my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup'; $cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) }); - PublicInbox::DS->SetPostLoopCallback(sub { $delivered == 0 }); + local @PublicInbox::DS::post_loop_do = (sub { $delivered == 0 }); # wait for -watch to setup inotify watches my $sleep = 1; diff --git a/xt/mem-imapd-tls.t b/xt/mem-imapd-tls.t index 75f2911f..00199a9b 100644 --- a/xt/mem-imapd-tls.t +++ b/xt/mem-imapd-tls.t @@ -82,7 +82,7 @@ sub once { 0 }; # stops event loop # setup the event loop so that it exits at every step # while we're still doing connect(2) PublicInbox::DS->SetLoopTimeout(0); -PublicInbox::DS->SetPostLoopCallback(\&once); +local @PublicInbox::DS::post_loop_do = (\&once); my $pid = $td->{pid}; if ($^O eq 'linux' && open(my $f, '<', "/proc/$pid/status")) { diag(grep(/RssAnon/, <$f>)); @@ -101,14 +101,13 @@ foreach my $n (1..$nfd) { if (!($n % 128) && $DONE != $n) { diag("nr: ($n) $DONE/$nfd"); PublicInbox::DS->SetLoopTimeout(-1); - PublicInbox::DS->SetPostLoopCallback(sub { $DONE != $n }); + local @PublicInbox::DS::post_loop_do = (sub { $DONE != $n }); # clear the backlog: PublicInbox::DS::event_loop(); # resume looping PublicInbox::DS->SetLoopTimeout(0); - PublicInbox::DS->SetPostLoopCallback(\&once); } } @@ -116,7 +115,7 @@ foreach my $n (1..$nfd) { diag "done?: @".time." $DONE/$nfd"; if ($DONE != $nfd) { PublicInbox::DS->SetLoopTimeout(-1); - PublicInbox::DS->SetPostLoopCallback(sub { $DONE != $nfd }); + local @PublicInbox::DS::post_loop_do = (sub { $DONE != $nfd }); PublicInbox::DS::event_loop(); } is($nfd, $DONE, "$nfd/$DONE done"); diff --git a/xt/mem-nntpd-tls.t b/xt/mem-nntpd-tls.t index 6e34d233..a861e318 100644 --- a/xt/mem-nntpd-tls.t +++ b/xt/mem-nntpd-tls.t @@ -105,7 +105,7 @@ sub once { 0 }; # stops event loop # setup the event loop so that it exits at every step # while we're still doing connect(2) PublicInbox::DS->SetLoopTimeout(0); -PublicInbox::DS->SetPostLoopCallback(\&once); +local @PublicInbox::DS::post_loop_do = (\&once); foreach my $n (1..$nfd) { my $io = tcp_connect($nntps, Blocking => 0); @@ -120,14 +120,14 @@ foreach my $n (1..$nfd) { if (!($n % 128) && $n != $DONE) { diag("nr: ($n) $DONE/$nfd"); PublicInbox::DS->SetLoopTimeout(-1); - PublicInbox::DS->SetPostLoopCallback(sub { $DONE != $n }); + @PublicInbox::DS::post_loop_do = (sub { $DONE != $n }); # clear the backlog: PublicInbox::DS::event_loop(); # resume looping PublicInbox::DS->SetLoopTimeout(0); - PublicInbox::DS->SetPostLoopCallback(\&once); + @PublicInbox::DS::post_loop_do = (\&once); } } my $pid = $td->{pid}; @@ -141,7 +141,7 @@ $dump_rss->(); # run the event loop normally, now: if ($DONE != $nfd) { PublicInbox::DS->SetLoopTimeout(-1); - PublicInbox::DS->SetPostLoopCallback(sub { + @PublicInbox::DS::post_loop_do = (sub { diag "done: ".time." $DONE"; $DONE != $nfd; }); diff --git a/xt/net_writer-imap.t b/xt/net_writer-imap.t index 333e0e3b..f7796e8e 100644 --- a/xt/net_writer-imap.t +++ b/xt/net_writer-imap.t @@ -1,5 +1,5 @@ #!perl -w -# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> use strict; use v5.10.1; use PublicInbox::TestCommon; use Sys::Hostname qw(hostname); @@ -233,7 +233,7 @@ EOM my $pub_cfg = PublicInbox::Config->new; PublicInbox::DS->Reset; my $ii = PublicInbox::InboxIdle->new($pub_cfg); - my $cb = sub { PublicInbox::DS->SetPostLoopCallback(sub {}) }; + my $cb = sub { @PublicInbox::DS::post_loop_do = (sub {}) }; my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup'; $pub_cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) }); my $w = start_script(['-watch'], undef, { 2 => $err_wr }); |