From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 010431FA11 for ; Fri, 13 Nov 2020 11:11:45 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 4/4] extindex: support graceful shutdown via QUIT/INT/TERM Date: Fri, 13 Nov 2020 11:11:44 +0000 Message-Id: <20201113111144.23038-5-e@80x24.org> In-Reply-To: <20201113111144.23038-1-e@80x24.org> References: <20201113111144.23038-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Just like the daemon processes, -extindex now supports graceful shutdown via the same signals. This lets users avoid having to repeat indexing messages when a power outage strikes during a long (multi-hour/day) indexing run. Per-inbox (v1/v2) -index graceful shutdowns are not supported, yet, but is planned for later. --- lib/PublicInbox/ExtSearchIdx.pm | 7 ++++++- lib/PublicInbox/IdxStack.pm | 2 ++ lib/PublicInbox/SearchIdxShard.pm | 6 ++++++ lib/PublicInbox/V2Writable.pm | 17 ++++++++++++++++- 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 6c09c460..91434b26 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -329,13 +329,18 @@ sub eidx_sync { # main entry point -regen_fmt => "%u/?\n", }; local $SIG{USR1} = sub { $need_checkpoint = 1 }; + my $quit = sub { $sync->{quit} = 1; warn "gracefully quitting\n"; }; + local $SIG{QUIT} = $quit; + local $SIG{INT} = $quit; + local $SIG{TERM} = $quit; # don't use $_ here, it'll get clobbered by reindex_checkpoint for my $ibx (@{$self->{ibx_list}}) { _sync_inbox($self, $sync, $ibx); + last if $sync->{quit}; } - $self->{oidx}->rethread_done($opt); + $self->{oidx}->rethread_done($opt) unless $sync->{quit}; PublicInbox::V2Writable::done($self); } diff --git a/lib/PublicInbox/IdxStack.pm b/lib/PublicInbox/IdxStack.pm index e7e10de9..c55c5c36 100644 --- a/lib/PublicInbox/IdxStack.pm +++ b/lib/PublicInbox/IdxStack.pm @@ -11,6 +11,8 @@ use constant PACK_FMT => eval { pack('Q', 1) } ? 'A1QQH*H*' : 'A1IIH*H*'; # start off in write-only mode sub new { open(my $io, '+>', undef) or die "open: $!"; + # latest_cmt is still useful when the newest revision is a `d'(elete), + # otherwise we favor $sync->{latest_cmt} for checkpoints and {quit} bless { wr => $io, latest_cmt => $_[1] }, __PACKAGE__ } diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index 1333b305..875a9ec9 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -10,6 +10,7 @@ use parent qw(PublicInbox::SearchIdx); use bytes qw(length); use IO::Handle (); # autoflush use PublicInbox::Eml; +use PublicInbox::Sigfd; sub new { my ($class, $v2w, $shard) = @_; # v2w may be ExtSearchIdx @@ -29,9 +30,13 @@ sub spawn_worker { my ($r, $w); pipe($r, $w) or die "pipe failed: $!\n"; $w->autoflush(1); + my $oldset = PublicInbox::Sigfd::block_signals(); my $pid = fork; defined $pid or die "fork failed: $!\n"; if ($pid == 0) { + # these signals are localized in parent + $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); + PublicInbox::Sigfd::sig_setmask($oldset); my $bnote = $v2w->atfork_child; close $w or die "failed to close: $!"; @@ -44,6 +49,7 @@ sub spawn_worker { die "unexpected MM $self->{mm}" if $self->{mm}; exit; } + PublicInbox::Sigfd::sig_setmask($oldset); $self->{pid} = $pid; $self->{w} = $w; close $r or die "failed to close: $!"; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 11cde627..5bac04a4 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -1090,6 +1090,7 @@ sub sync_prepare ($$) { $unit->{stack} = $stk; # may be undef unshift @{$sync->{todo}}, $unit; $regen_max += $nr; + last if $sync->{quit}; } # XXX this should not happen unless somebody bypasses checks in @@ -1102,9 +1103,11 @@ sub sync_prepare ($$) { $oid = unpack('H*', $oid); my $req = { %$sync, oid => $oid }; $self->git->cat_async($oid, $unindex_oid, $req); + last if $sync->{quit}; } $self->git->cat_async_wait; } + return 0 if $sync->{quit}; if (!$regen_max) { $sync->{-regen_fmt} = "%u/?\n"; return 0; @@ -1236,6 +1239,7 @@ sub index_xap_step ($$$;$) { sub index_todo ($$$) { my ($self, $sync, $unit) = @_; + return if $sync->{quit}; unindex_todo($self, $sync, $unit); my $stk = delete($unit->{stack}) or return; my $all = $self->git; @@ -1268,6 +1272,12 @@ sub index_todo ($$$) { } elsif ($f eq 'd') { $all->cat_async($oid, $unindex_oid, $req); } + if ($sync->{quit}) { + warn "waiting to quit...\n"; + $all->async_wait_all; + $self->update_last_commit($sync); + return; + } if (${$sync->{need_checkpoint}}) { reindex_checkpoint($self, $sync); } @@ -1334,6 +1344,11 @@ sub index_sync { ibx => $self->{ibx}, epoch_max => $epoch_max, }; + my $quit = sub { $sync->{quit} = 1 }; + local $SIG{QUIT} = $quit; + local $SIG{INT} = $quit; + local $SIG{TERM} = $quit; + if (sync_prepare($self, $sync)) { # tmp_clone seems to fail if inside a transaction, so # we rollback here (because we opened {mm} for reading) @@ -1352,7 +1367,7 @@ sub index_sync { } # work forwards through history index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []}; - $self->{oidx}->rethread_done($opt); + $self->{oidx}->rethread_done($opt) unless $sync->{quit}; $self->done; if (my $nr = $sync->{nr}) {