diff options
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/CodeSearchIdx.pm | 29 | ||||
-rw-r--r-- | lib/PublicInbox/DS.pm | 30 | ||||
-rw-r--r-- | lib/PublicInbox/Daemon.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/IPC.pm | 7 | ||||
-rw-r--r-- | lib/PublicInbox/LEI.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/Watch.pm | 2 |
7 files changed, 36 insertions, 40 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 13fe1c28..587f0b81 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -246,12 +246,18 @@ sub run_todo ($) { $n; } +sub need_reap { # post_loop_do + my (undef, $jobs) = @_; + scalar(keys(%$LIVE)) > $jobs; +} + sub cidx_reap ($$) { my ($self, $jobs) = @_; while (run_todo($self)) {} - my $cb = sub { keys(%$LIVE) > $jobs }; - PublicInbox::DS->SetPostLoopCallback($cb); - PublicInbox::DS::event_loop($MY_SIG, $SIGSET) while $cb->(); + local @PublicInbox::DS::post_loop_do = \(&need_reap, $jobs); + while (need_reap(undef, $jobs)) { + PublicInbox::DS::event_loop($MY_SIG, $SIGSET); + } while (!$jobs && run_todo($self)) {} } @@ -397,6 +403,11 @@ sub shard_commit { # via wq_io_do send($op_p, "shard_done $n", MSG_EOR); } +sub consumers_open { # post_loop_do + my (undef, $consumers) = @_; + scalar(grep { $_->{sock} } values %$consumers); +} + sub commit_used_shards ($$$) { my ($self, $git, $consumers) = @_; local $self->{-shard_ok} = {}; @@ -406,9 +417,7 @@ sub commit_used_shards ($$$) { $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n); $consumers->{$n} = $c; } - PublicInbox::DS->SetPostLoopCallback(sub { - scalar(grep { $_->{sock} } values %$consumers); - }); + local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers); PublicInbox::DS::event_loop($MY_SIG, $SIGSET); my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers; die "E: $git->{git_dir} $n shards failed" if $n; @@ -437,9 +446,7 @@ sub index_repo { # cidx_await cb $CONSUMERS{$n} = $c; } @shard_in = (); - PublicInbox::DS->SetPostLoopCallback(sub { - scalar(grep { $_->{sock} } values %CONSUMERS); - }); + local @PublicInbox::DS::post_loop_do = (\&consumers_open, \%CONSUMERS); PublicInbox::DS::event_loop($MY_SIG, $SIGSET); my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS; die "E: $git->{git_dir} $n shards failed" if $n; @@ -523,7 +530,7 @@ sub scan_git_dirs ($) { cidx_reap($self, 0); } -sub shards_active { # PostLoopCallback +sub shards_active { # post_loop_do scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); } @@ -572,7 +579,7 @@ sub cidx_run { # main entry point $s->wq_close; } - PublicInbox::DS->SetPostLoopCallback(\&shards_active); + local @PublicInbox::DS::post_loop_do = (\&shards_active); PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active(); $self->lock_release(!!$self->{nchange}); } diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index b6eaf2d7..340086fc 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -44,7 +44,7 @@ our ( $Epoll, # Global epoll fd (or DSKQXS ref) $ep_io, # IO::Handle for Epoll - $PostLoopCallback, # subref to call at the end of each loop, if defined (global) + @post_loop_do, # subref + args to call at the end of each loop $LoopTimeout, # timeout of event loop in milliseconds @Timers, # timers @@ -69,7 +69,7 @@ sub Reset { %DescriptorMap = (); @Timers = (); %UniqTimer = (); - $PostLoopCallback = undef; + @post_loop_do = (); # we may be iterating inside one of these on our stack my @q = delete @Stack{keys %Stack}; @@ -79,7 +79,7 @@ sub Reset { $Epoll = undef; # may call DSKQXS::DESTROY } while (@Timers || keys(%Stack) || $nextq || $AWAIT_PIDS || $ToClose || keys(%DescriptorMap) || - $PostLoopCallback || keys(%UniqTimer)); + @post_loop_do || keys(%UniqTimer)); $reap_armed = undef; $LoopTimeout = -1; # no timeout by default @@ -247,11 +247,13 @@ sub PostEventLoop () { } # by default we keep running, unless a postloop callback cancels it - $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1; + @post_loop_do ? $post_loop_do[0]->(\%DescriptorMap, + @post_loop_do[1..$#post_loop_do]) + : 1 } # Start processing IO events. In most daemon programs this never exits. See -# C<PostLoopCallback> for how to exit the loop. +# C<post_loop_do> for how to exit the loop. sub event_loop (;$$) { my ($sig, $oldset) = @_; $Epoll //= _InitPoller(); @@ -287,24 +289,6 @@ sub event_loop (;$$) { } while (PostEventLoop()); } -=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >> - -Sets post loop callback function. Pass a subref and it will be -called every time the event loop finishes. - -Return 1 (or any true value) from the sub to make the loop continue, 0 or false -and it will exit. - -The callback function will be passed two parameters: \%DescriptorMap - -=cut -sub SetPostLoopCallback { - my ($class, $ref) = @_; - - # global callback - $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; -} - ##################################################################### ### PublicInbox::DS-the-object code ##################################################################### diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index 17e799ca..6152a5d3 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -350,7 +350,7 @@ sub worker_quit { # $_[0] = signal name or number (unused) my $proc_name; my $warn = 0; # drop idle connections and try to quit gracefully - PublicInbox::DS->SetPostLoopCallback(sub { + @PublicInbox::DS::post_loop_do = (sub { my ($dmap, undef) = @_; my $n = 0; my $now = now(); diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 401b18d0..5445b156 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -1388,7 +1388,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop my $quit = PublicInbox::SearchIdx::quit_cb($sync); $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit; local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock - PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} }); + local @PublicInbox::DS::post_loop_do = (sub { !$sync->{quit} }); $pr->("initial scan complete, entering event loop\n") if $pr; # calls InboxIdle->event_step: PublicInbox::DS::event_loop($sig, $oldset); diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 730f2cf6..da534aa7 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -245,11 +245,16 @@ sub recv_and_run { $n; } +sub sock_defined { + my (undef, $wqw) = @_; + defined($wqw->{sock}); +} + sub wq_worker_loop ($$) { my ($self, $bcast2) = @_; my $wqw = PublicInbox::WQWorker->new($self, $self->{-wq_s2}); PublicInbox::WQWorker->new($self, $bcast2) if $bcast2; - PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} }); + local @PublicInbox::DS::post_loop_do = (\&sock_defined, $wqw); PublicInbox::DS::event_loop(); PublicInbox::DS->Reset; } diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index b83de91d..eb9799f6 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1325,11 +1325,11 @@ sub lazy_start { }; require PublicInbox::DirIdle; local $dir_idle = PublicInbox::DirIdle->new(sub { - # just rely on wakeup to hit PostLoopCallback set below + # just rely on wakeup to hit post_loop_do dir_idle_handler($_[0]) if $_[0]->fullname ne $path; }); $dir_idle->add_watches([$sock_dir]); - PublicInbox::DS->SetPostLoopCallback(sub { + local @PublicInbox::DS::post_loop_do = (sub { my ($dmap, undef) = @_; if (@st = defined($path) ? stat($path) : ()) { if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) { diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm index d9aadf82..8482100c 100644 --- a/lib/PublicInbox/Watch.pm +++ b/lib/PublicInbox/Watch.pm @@ -536,7 +536,7 @@ sub watch { # main entry point add_timer(0, \&poll_fetch_fork, $self, $intvl, $uris); } watch_fs_init($self) if $self->{mdre}; - PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done }); + local @PublicInbox::DS::post_loop_do = (sub { !$self->quit_done }); PublicInbox::DS::event_loop($sig, $oldset); # calls ->event_step _done_for_now($self); } |