about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-03-21 23:07:36 +0000
committerEric Wong <e@80x24.org>2023-03-25 09:37:57 +0000
commit2dce8067324f0e1ae214c08b59a9947bd6f78b81 (patch)
treea23705b381e2006fed8c49c46180803e809fbbcc /lib
parente8a6b99a45dfa8ae36bb6f95bc9aed4577014f29 (diff)
downloadpublic-inbox-2dce8067324f0e1ae214c08b59a9947bd6f78b81.tar.gz
This fixes shutdown handling when shard_index() isn't running
and ensures we can shut down the process more quickly.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/CodeSearchIdx.pm55
1 files changed, 34 insertions, 21 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index f0b506da..4f91e0b6 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -206,6 +206,7 @@ sub shard_index { # via wq_io_do
         # local-ized in parent before fork
         $TXN_BYTES = $batch_bytes;
         local $self->{git} = $git; # for patchid
+        return if $DO_QUIT;
         my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
         close $in or die "close: $!";
         my $nr = 0;
@@ -216,10 +217,10 @@ sub shard_index { # via wq_io_do
         my $len;
         my $cmt = {};
         local $/ = $FS;
-        my $buf = <$rd> // return; # leading $FS
+        my $buf = <$rd> // return close($rd); # leading $FS
         $buf eq $FS or die "BUG: not LF-NUL: $buf\n";
         $self->begin_txn_lazy;
-        while (defined($buf = <$rd>)) {
+        while (!$DO_QUIT && defined($buf = <$rd>)) {
                 chomp($buf);
                 $/ = "\n";
                 $len = length($buf);
@@ -234,7 +235,6 @@ sub shard_index { # via wq_io_do
                         $TXN_BYTES = $batch_bytes - $len;
                 }
                 add_commit($self, $cmt);
-                last if $DO_QUIT;
                 ++$nr;
                 if ($TXN_BYTES <= 0) {
                         cidx_ckpoint($self, "[$n] $nr");
@@ -298,6 +298,7 @@ sub run_todo ($) {
 
 sub need_reap { # post_loop_do
         my (undef, $jobs) = @_;
+        return if !$LIVE || $DO_QUIT;
         scalar(keys(%$LIVE)) > $jobs;
 }
 
@@ -412,7 +413,7 @@ sub check_existing { # retry_reopen callback
 sub partition_refs ($$$) {
         my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
         sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
-        my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
+        my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
         close $refs or die "close: $!";
         my ($seen, $nchange) = (0, 0);
         my @shard_in = map {
@@ -421,7 +422,7 @@ sub partition_refs ($$$) {
                 $fh;
         } @RDONLY_SHARDS;
 
-        while (defined(my $cmt = <$fh>)) {
+        while (defined(my $cmt = <$rfh>)) {
                 chomp $cmt;
                 my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
                 if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
@@ -431,8 +432,13 @@ sub partition_refs ($$$) {
                         ++$nchange;
                         $seen = 0;
                 }
+                if ($DO_QUIT) {
+                        close($rfh);
+                        return ();
+                }
         }
-        close($fh);
+        close($rfh);
+        return () if $DO_QUIT;
         if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
                 $self->{nchange} += $nchange;
                 progress($self, "$git->{git_dir}: $nchange commits");
@@ -454,9 +460,18 @@ sub shard_commit { # via wq_io_do
 
 sub consumers_open { # post_loop_do
         my (undef, $consumers) = @_;
+        return if $DO_QUIT;
         scalar(grep { $_->{sock} } values %$consumers);
 }
 
+sub wait_consumers ($$$) {
+        my ($self, $git, $consumers) = @_;
+        local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+        PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+        my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+        die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
+}
+
 sub commit_used_shards ($$$) {
         my ($self, $git, $consumers) = @_;
         local $self->{-shard_ok} = {};
@@ -466,15 +481,12 @@ sub commit_used_shards ($$$) {
                 $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
                 $consumers->{$n} = $c;
         }
-        local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
-        PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-        my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
-        die "E: $git->{git_dir} $n shards failed" if $n;
+        wait_consumers($self, $git, $consumers);
 }
 
 sub index_repo { # cidx_await cb
         my ($self, $git, $roots) = @_;
-        return if $git->{-cidx_err};
+        return if $git->{-cidx_err} || $DO_QUIT;
         my $repo = delete $git->{-repo} or return;
         seek($roots, 0, SEEK_SET) or die "seek: $!";
         chomp(my @roots = <$roots>);
@@ -484,31 +496,29 @@ sub index_repo { # cidx_await cb
         local $self->{current_info} = $git->{git_dir};
         my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
         local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
-        my %CONSUMERS;
+        my $consumers = {};
         for my $n (0..$#shard_in) {
                 -s $shard_in[$n] or next;
+                last if $DO_QUIT;
                 my ($c, $p) = PublicInbox::PktOp->pair;
                 $c->{ops}->{shard_done} = [ $self ];
                 $IDX_SHARDS[$n]->wq_io_do('shard_index',
                                         [ $shard_in[$n], $p->{op_p} ],
                                         $git, $n, \@roots);
-                $CONSUMERS{$n} = $c;
+                $consumers->{$n} = $c;
         }
         @shard_in = ();
-        local @PublicInbox::DS::post_loop_do = (\&consumers_open, \%CONSUMERS);
-        PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-        my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
-        die "E: $git->{git_dir} $n shards failed" if $n;
+        wait_consumers($self, $git, $consumers);
         if ($DO_QUIT) {
-                commit_used_shards($self, $git, \%CONSUMERS);
+                commit_used_shards($self, $git, $consumers);
                 progress($self, "$git->{git_dir}: done");
                 return;
         }
         $repo->{git_dir} = $git->{git_dir};
         my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
         if ($id > 0) {
-                $CONSUMERS{$repo->{shard_n}} = undef;
-                commit_used_shards($self, $git, \%CONSUMERS);
+                $consumers->{$repo->{shard_n}} = undef;
+                commit_used_shards($self, $git, $consumers);
                 progress($self, "$git->{git_dir}: done");
                 return run_todo($self);
         }
@@ -585,6 +595,7 @@ sub scan_git_dirs ($) {
                                                         $self, $git);
                 fp_start($self, $git, $prep_repo);
                 ct_start($self, $git, $prep_repo);
+                last if $DO_QUIT;
         }
         cidx_reap($self, 0);
 }
@@ -674,8 +685,10 @@ sub ipc_atfork_child {
 
 sub shard_done_wait { # awaitpid cb via ipc_worker_reap
         my ($pid, $shard, $self) = @_;
+        my $quit_req = delete($shard->{-cidx_quit});
+        return if $DO_QUIT || !$LIVE;
         if ($? == 0) { # success
-                delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+                $quit_req // warn 'BUG: {-cidx_quit} unset';
                 return;
         }
         warn "PID:$pid $shard->{shard} exited with \$?=$?\n";