about summary refs log tree commit homepage
path: root/lib/PublicInbox/V2Writable.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-11-27 21:33:55 +0000
committerEric Wong <e@80x24.org>2020-11-28 04:53:25 +0000
commitc2f82b2e27e1b3c11a4c0b00b90829a4ee99c602 (patch)
tree3b33517c4c28e7d3eebf29cd8d1a5d16b82e4cb4 /lib/PublicInbox/V2Writable.pm
parent811b8d3cbaa790f59b7b107140b86248da16499b (diff)
downloadpublic-inbox-c2f82b2e27e1b3c11a4c0b00b90829a4ee99c602.tar.gz
v1 and v2 inbox indexing now supports graceful shutdown checks
just like ExtSearchIdx.  Additionally, we'll consistently
perform quit checks at the top of loops for consistency.

Interaction with the --xapian-only and --sequential-shard
options are a bit lacking, and will warn the user to use
"--reindex --xapian-only" to fix.
Diffstat (limited to 'lib/PublicInbox/V2Writable.pm')
-rw-r--r--lib/PublicInbox/V2Writable.pm37
1 files changed, 25 insertions, 12 deletions
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 7bef1c89..a3938b56 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -1106,10 +1106,10 @@ sub sync_prepare ($$) {
                 local $self->{current_info} = 'leftover ';
                 my $unindex_oid = $self->can('unindex_oid');
                 for my $oid (@leftovers) {
+                        last if $sync->{quit};
                         $oid = unpack('H*', $oid);
                         my $req = { %$sync, oid => $oid };
                         $self->git->cat_async($oid, $unindex_oid, $req);
-                        last if $sync->{quit};
                 }
                 $self->git->cat_async_wait;
         }
@@ -1233,6 +1233,7 @@ sub index_xap_step ($$$;$) {
                         "$beg..$end (% $step)\n");
         }
         for (my $num = $beg; $num <= $end; $num += $step) {
+                last if $sync->{quit};
                 my $smsg = $ibx->over->get_art($num) or next;
                 $smsg->{self} = $self;
                 $ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
@@ -1262,6 +1263,12 @@ sub index_todo ($$$) {
         local $sync->{latest_cmt} = \(my $latest_cmt);
         local $sync->{unit} = $unit;
         while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
+                if ($sync->{quit}) {
+                        warn "waiting to quit...\n";
+                        $all->async_wait_all;
+                        $self->update_last_commit($sync);
+                        return;
+                }
                 my $req = {
                         %$sync,
                         autime => $at,
@@ -1278,12 +1285,6 @@ sub index_todo ($$$) {
                 } elsif ($f eq 'd') {
                         $all->cat_async($oid, $unindex_oid, $req);
                 }
-                if ($sync->{quit}) {
-                        warn "waiting to quit...\n";
-                        $all->async_wait_all;
-                        $self->update_last_commit($sync);
-                        return;
-                }
                 if (${$sync->{need_checkpoint}}) {
                         reindex_checkpoint($self, $sync);
                 }
@@ -1310,6 +1311,7 @@ sub xapian_only {
                 if ($seq || !$self->{parallel}) {
                         my $shard_end = $self->{shards} - 1;
                         for my $i (0..$shard_end) {
+                                last if $sync->{quit};
                                 index_xap_step($self, $sync, $art_beg + $i);
                                 if ($i != $shard_end) {
                                         reindex_checkpoint($self, $sync);
@@ -1350,7 +1352,7 @@ sub index_sync {
                 ibx => $self->{ibx},
                 epoch_max => $epoch_max,
         };
-        my $quit = sub { $sync->{quit} = 1 };
+        my $quit = PublicInbox::SearchIdx::quit_cb($sync);
         local $SIG{QUIT} = $quit;
         local $SIG{INT} = $quit;
         local $SIG{TERM} = $quit;
@@ -1381,14 +1383,21 @@ sub index_sync {
                 $pr->('all.git '.sprintf($sync->{-regen_fmt}, $$nr)) if $pr;
         }
 
+        my $quit_warn;
         # deal with Xapian shards sequentially
         if ($seq && delete($sync->{mm_tmp})) {
-                $self->{ibx}->{indexlevel} = $idxlevel;
-                xapian_only($self, $opt, $sync, $art_beg);
+                if ($sync->{quit}) {
+                        $quit_warn = 1;
+                } else {
+                        $self->{ibx}->{indexlevel} = $idxlevel;
+                        xapian_only($self, $opt, $sync, $art_beg);
+                        $quit_warn = 1 if $sync->{quit};
+                }
         }
 
         # --reindex on the command-line
-        if ($opt->{reindex} && !ref($opt->{reindex}) && $idxlevel ne 'basic') {
+        if (!$sync->{quit} && $opt->{reindex} &&
+                        !ref($opt->{reindex}) && $idxlevel ne 'basic') {
                 $self->lock_acquire;
                 my $s0 = PublicInbox::SearchIdx->new($self->{ibx}, 0, 0);
                 if (my $xdb = $s0->idx_acquire) {
@@ -1400,12 +1409,16 @@ sub index_sync {
         }
 
         # reindex does not pick up new changes, so we rerun w/o it:
-        if ($opt->{reindex}) {
+        if ($opt->{reindex} && !$sync->{quit}) {
                 my %again = %$opt;
                 $sync = undef;
                 delete @again{qw(rethread reindex -skip_lock)};
                 index_sync($self, \%again);
+                $opt->{quit} = $again{quit}; # propagate to caller
         }
+        warn <<EOF if $quit_warn;
+W: interrupted, --xapian-only --reindex required upon restart
+EOF
 }
 
 1;