about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-11-13 11:11:44 +0000
committerEric Wong <e@80x24.org>2020-11-15 06:12:43 +0000
commit7d42724e82c9e0ecfa07154c7a56e7f21e53e62f (patch)
tree6755566e109457ffcef08f3acb3b9301bb0dd858 /lib/PublicInbox
parent834181c640236c91f367de04d5cc9834ec617f4e (diff)
downloadpublic-inbox-7d42724e82c9e0ecfa07154c7a56e7f21e53e62f.tar.gz
Just like the daemon processes, -extindex now supports graceful
shutdown via the same signals.  This lets users avoid having to
repeat indexing messages when a power outage strikes during a
long (multi-hour/day) indexing run.

Per-inbox (v1/v2) -index graceful shutdowns are not supported,
yet, but is planned for later.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm7
-rw-r--r--lib/PublicInbox/IdxStack.pm2
-rw-r--r--lib/PublicInbox/SearchIdxShard.pm6
-rw-r--r--lib/PublicInbox/V2Writable.pm17
4 files changed, 30 insertions, 2 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 6c09c460..91434b26 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -329,13 +329,18 @@ sub eidx_sync { # main entry point
                 -regen_fmt => "%u/?\n",
         };
         local $SIG{USR1} = sub { $need_checkpoint = 1 };
+        my $quit = sub { $sync->{quit} = 1; warn "gracefully quitting\n"; };
+        local $SIG{QUIT} = $quit;
+        local $SIG{INT} = $quit;
+        local $SIG{TERM} = $quit;
 
         # don't use $_ here, it'll get clobbered by reindex_checkpoint
         for my $ibx (@{$self->{ibx_list}}) {
                 _sync_inbox($self, $sync, $ibx);
+                last if $sync->{quit};
         }
 
-        $self->{oidx}->rethread_done($opt);
+        $self->{oidx}->rethread_done($opt) unless $sync->{quit};
 
         PublicInbox::V2Writable::done($self);
 }
diff --git a/lib/PublicInbox/IdxStack.pm b/lib/PublicInbox/IdxStack.pm
index e7e10de9..c55c5c36 100644
--- a/lib/PublicInbox/IdxStack.pm
+++ b/lib/PublicInbox/IdxStack.pm
@@ -11,6 +11,8 @@ use constant PACK_FMT => eval { pack('Q', 1) } ? 'A1QQH*H*' : 'A1IIH*H*';
 # start off in write-only mode
 sub new {
         open(my $io, '+>', undef) or die "open: $!";
+        # latest_cmt is still useful when the newest revision is a `d'(elete),
+        # otherwise we favor $sync->{latest_cmt} for checkpoints and {quit}
         bless { wr => $io, latest_cmt => $_[1] }, __PACKAGE__
 }
 
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 1333b305..875a9ec9 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -10,6 +10,7 @@ use parent qw(PublicInbox::SearchIdx);
 use bytes qw(length);
 use IO::Handle (); # autoflush
 use PublicInbox::Eml;
+use PublicInbox::Sigfd;
 
 sub new {
         my ($class, $v2w, $shard) = @_; # v2w may be ExtSearchIdx
@@ -29,9 +30,13 @@ sub spawn_worker {
         my ($r, $w);
         pipe($r, $w) or die "pipe failed: $!\n";
         $w->autoflush(1);
+        my $oldset = PublicInbox::Sigfd::block_signals();
         my $pid = fork;
         defined $pid or die "fork failed: $!\n";
         if ($pid == 0) {
+                # these signals are localized in parent
+                $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+                PublicInbox::Sigfd::sig_setmask($oldset);
                 my $bnote = $v2w->atfork_child;
                 close $w or die "failed to close: $!";
 
@@ -44,6 +49,7 @@ sub spawn_worker {
                 die "unexpected MM $self->{mm}" if $self->{mm};
                 exit;
         }
+        PublicInbox::Sigfd::sig_setmask($oldset);
         $self->{pid} = $pid;
         $self->{w} = $w;
         close $r or die "failed to close: $!";
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 11cde627..5bac04a4 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -1090,6 +1090,7 @@ sub sync_prepare ($$) {
                 $unit->{stack} = $stk; # may be undef
                 unshift @{$sync->{todo}}, $unit;
                 $regen_max += $nr;
+                last if $sync->{quit};
         }
 
         # XXX this should not happen unless somebody bypasses checks in
@@ -1102,9 +1103,11 @@ sub sync_prepare ($$) {
                         $oid = unpack('H*', $oid);
                         my $req = { %$sync, oid => $oid };
                         $self->git->cat_async($oid, $unindex_oid, $req);
+                        last if $sync->{quit};
                 }
                 $self->git->cat_async_wait;
         }
+        return 0 if $sync->{quit};
         if (!$regen_max) {
                 $sync->{-regen_fmt} = "%u/?\n";
                 return 0;
@@ -1236,6 +1239,7 @@ sub index_xap_step ($$$;$) {
 
 sub index_todo ($$$) {
         my ($self, $sync, $unit) = @_;
+        return if $sync->{quit};
         unindex_todo($self, $sync, $unit);
         my $stk = delete($unit->{stack}) or return;
         my $all = $self->git;
@@ -1268,6 +1272,12 @@ sub index_todo ($$$) {
                 } elsif ($f eq 'd') {
                         $all->cat_async($oid, $unindex_oid, $req);
                 }
+                if ($sync->{quit}) {
+                        warn "waiting to quit...\n";
+                        $all->async_wait_all;
+                        $self->update_last_commit($sync);
+                        return;
+                }
                 if (${$sync->{need_checkpoint}}) {
                         reindex_checkpoint($self, $sync);
                 }
@@ -1334,6 +1344,11 @@ sub index_sync {
                 ibx => $self->{ibx},
                 epoch_max => $epoch_max,
         };
+        my $quit = sub { $sync->{quit} = 1 };
+        local $SIG{QUIT} = $quit;
+        local $SIG{INT} = $quit;
+        local $SIG{TERM} = $quit;
+
         if (sync_prepare($self, $sync)) {
                 # tmp_clone seems to fail if inside a transaction, so
                 # we rollback here (because we opened {mm} for reading)
@@ -1352,7 +1367,7 @@ sub index_sync {
         }
         # work forwards through history
         index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
-        $self->{oidx}->rethread_done($opt);
+        $self->{oidx}->rethread_done($opt) unless $sync->{quit};
         $self->done;
 
         if (my $nr = $sync->{nr}) {