user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 15/28] cindex: handle graceful shutdown by default
Date: Tue, 21 Mar 2023 23:07:30 +0000	[thread overview]
Message-ID: <20230321230743.3020032-15-e@80x24.org> (raw)
In-Reply-To: <20230321230743.3020032-1-e@80x24.org>

While individual Xapian shards are consistent due to the use of
Xapian transactions, the data across shards still needs to be
in a consistent state for our search to work.
---
 lib/PublicInbox/CodeSearchIdx.pm | 71 +++++++++++++++++++++-----------
 1 file changed, 48 insertions(+), 23 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 1a472b64..82f90368 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -38,6 +38,8 @@ our (
 	$LIVE_JOBS, # integer
 	$MY_SIG, # like %SIG
 	$SIGSET,
+	$TXN_BYTES, # number of bytes in current shard transaction
+	$DO_QUIT, # signal number
 	@RDONLY_SHARDS, # Xapian::Database
 	@IDX_SHARDS # clones of self
 );
@@ -153,18 +155,14 @@ sub store_repo { # wq_do - returns docid
 sub shard_index { # via wq_io_do
 	my ($self, $git, $n, $roots) = @_;
 	local $self->{current_info} = "$git->{git_dir} [$n]";
-	my ($quit, $cmt);
+	my $cmt;
 	local $self->{roots} = $roots;
 	my $in = delete($self->{0}) // die 'BUG: no {0} input';
 	my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
 	my $batch_bytes = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
-	my $max = $batch_bytes;
-	my $set_quit = sub { $quit = shift };
-	local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
-	local $SIG{QUIT} = $set_quit;
-	local $SIG{TERM} = $set_quit;
-	local $SIG{INT} = $set_quit;
+	# local-ized in parent before fork
+	$TXN_BYTES = $batch_bytes;
 	local $self->{git} = $git; # for patchid
 	my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
 	close $in or die "close: $!";
@@ -179,22 +177,23 @@ sub shard_index { # via wq_io_do
 	$self->begin_txn_lazy;
 	while (defined($buf = <$rd>)) {
 		chomp($buf);
-		$max -= length($buf);
+		$TXN_BYTES -= length($buf);
 		@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
 		$/ = "\n";
 		add_commit($self, $cmt);
-		last if $quit; # likely SIGPIPE
+		last if $DO_QUIT;
 		++$nr;
-		if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
+		if ($TXN_BYTES <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
 			progress($self, "[$n] $nr");
 			$self->{xdb}->commit_transaction;
-			$max = $batch_bytes;
+			$TXN_BYTES = $batch_bytes;
 			$self->{xdb}->begin_transaction;
 		}
 		$/ = $FS;
 	}
 	close($rd);
-	if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
+	if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+				($? & 127) == POSIX::SIGPIPE))) {
 		send($op_p, "shard_done $n", MSG_EOR);
 	} else {
 		warn "E: git @LOG_STDIN: \$?=$?\n";
@@ -254,7 +253,7 @@ sub need_reap { # post_loop_do
 sub cidx_reap ($$) {
 	my ($self, $jobs) = @_;
 	while (run_todo($self)) {}
-	local @PublicInbox::DS::post_loop_do = \(&need_reap, $jobs);
+	local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs);
 	while (need_reap(undef, $jobs)) {
 		PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
 	}
@@ -263,7 +262,7 @@ sub cidx_reap ($$) {
 
 sub cidx_await_cb { # awaitpid cb
 	my ($pid, $cb, $self, $git, @args) = @_;
-	return if !$LIVE; # premature shutdown
+	return if !$LIVE || $DO_QUIT;
 	my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
 	PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
 	if ($?) {
@@ -283,7 +282,7 @@ sub cidx_await ($$$$$@) {
 # only care about --heads (branches) and --tags, and not even their names
 sub fp_start ($$$) {
 	my ($self, $git, $prep_repo) = @_;
-	return if !$LIVE; # premature exit
+	return if !$LIVE || $DO_QUIT;
 	cidx_reap($self, $LIVE_JOBS);
 	open my $refs, '+>', undef or die "open: $!";
 	my $cmd = ['git', "--git-dir=$git->{git_dir}",
@@ -305,7 +304,7 @@ sub fp_fini { # cidx_await cb
 
 sub ct_start ($$$) {
 	my ($self, $git, $prep_repo) = @_;
-	return if !$LIVE; # premature exit
+	return if !$LIVE || $DO_QUIT;
 	cidx_reap($self, $LIVE_JOBS);
 	my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
 		qw[for-each-ref --sort=-committerdate
@@ -325,7 +324,7 @@ sub ct_fini { # cidx_await cb
 # TODO: also index gitweb.owner and the full fingerprint for grokmirror?
 sub prep_repo ($$) {
 	my ($self, $git) = @_;
-	return if !$LIVE || $git->{-cidx_err}; # premature exit
+	return if !$LIVE || $DO_QUIT || $git->{-cidx_err};
 	my $repo = $git->{-repo} // die 'BUG: no {-repo}';
 	if (!defined($repo->{ct})) {
 		warn "W: $git->{git_dir} has no commits, skipping\n";
@@ -449,6 +448,11 @@ sub index_repo { # cidx_await cb
 	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
 	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
 	die "E: $git->{git_dir} $n shards failed" if $n;
+	if ($DO_QUIT) {
+		commit_used_shards($self, $git, \%CONSUMERS);
+		progress($self, "$git->{git_dir}: done");
+		return;
+	}
 	$repo->{git_dir} = $git->{git_dir};
 	my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
 	if ($id > 0) {
@@ -462,7 +466,7 @@ sub index_repo { # cidx_await cb
 
 sub get_roots ($$) {
 	my ($self, $git) = @_;
-	return if !$LIVE; # premature exit
+	return if !$LIVE || $DO_QUIT;
 	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
 	sysseek($refs, 0, SEEK_SET) or die "seek: $!";
 	open my $roots, '+>', undef or die "open: $!";
@@ -489,6 +493,10 @@ sub load_existing ($) { # for -u/--update
 	@$dirs = grep { !$uniq{$_}++ } @$dirs;
 }
 
+# SIG handlers:
+sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() }
+sub shard_usr1 { $TXN_BYTES = -1 }
+
 sub cidx_init ($) {
 	my ($self) = @_;
 	my $dir = $self->{cidx_dir};
@@ -498,12 +506,13 @@ sub cidx_init ($) {
 	}
 	$self->lock_acquire;
 	my @shards;
+	local $TXN_BYTES;
 	for my $n (0..($self->{nshard} - 1)) {
 		my $shard = bless { %$self, shard => $n }, ref($self);
 		delete @$shard{qw(lockfh lock_path)};
 		$shard->idx_acquire;
 		$shard->idx_release;
-		$shard->wq_workers_start("shard[$n]", 1, undef, {
+		$shard->wq_workers_start("shard[$n]", 1, $SIGSET, {
 			siblings => \@shards, # for ipc_atfork_child
 		}, \&shard_done_wait, $self);
 		push @shards, $shard;
@@ -533,6 +542,15 @@ sub shards_active { # post_loop_do
 	scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
 }
 
+# signal handlers
+sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS }
+
+sub parent_quit {
+	$DO_QUIT = $_[0];
+	kill_shards(@_);
+	warn "# SIG$_[0] received, quitting...\n";
+}
+
 sub cidx_run { # main entry point
 	my ($self) = @_;
 	local $self->{todo} = [];
@@ -541,13 +559,15 @@ sub cidx_run { # main entry point
 	my $restore = PublicInbox::OnDestroy->new($$,
 		\&PublicInbox::DS::sig_setmask, $SIGSET);
 	local $LIVE = {};
+	local $DO_QUIT;
 	local @IDX_SHARDS = cidx_init($self);
 	local $self->{current_info} = '';
-	my $cb = $SIG{__WARN__} || \&CORE::warn;
 	local $MY_SIG = {
 		CHLD => \&PublicInbox::DS::enqueue_reap,
-		INT => sub { exit },
+		USR1 => \&kill_shards,
 	};
+	$MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT);
+	my $cb = $SIG{__WARN__} || \&CORE::warn;
 	local $SIG{__WARN__} = sub {
 		my $m = shift @_;
 		$self->{current_info} eq '' or
@@ -594,14 +614,19 @@ sub cidx_run { # main entry point
 sub ipc_atfork_child {
 	my ($self) = @_;
 	$self->SUPER::ipc_atfork_child;
+	$SIG{USR1} = \&shard_usr1;
+	$SIG{$_} = \&shard_quit for qw(INT TERM QUIT);
 	my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
 	$_->wq_close for @$x;
+	undef;
 }
 
 sub shard_done_wait { # awaitpid cb via ipc_worker_reap
 	my ($pid, $shard, $self) = @_;
-	delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
-	return unless $?;
+	if ($? == 0) { # success
+		delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+		return;
+	}
 	warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
 	++$self->{shard_err} if defined($self->{shard_err});
 }

  parent reply	other threads:[~2023-03-21 23:07 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-03-21 23:07 [PATCH 00/28] cindex coderepo commit indexer Eric Wong
2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
2023-03-21 23:07   ` [PATCH 02/28] search: relocate all_terms from lei_search Eric Wong
2023-03-21 23:07   ` [PATCH 03/28] admin: hoist out resolve_git_dir Eric Wong
2023-03-21 23:07   ` [PATCH 04/28] admin: ensure resolved GIT_DIR is absolute Eric Wong
2023-03-21 23:07   ` [PATCH 05/28] test_common: create_inbox: use `$!' properly on mkdir failure Eric Wong
2023-03-21 23:07   ` [PATCH 06/28] codesearch: initial cut w/ -cindex tool Eric Wong
2023-03-21 23:07   ` [PATCH 07/28] cindex: parallelize prep phases Eric Wong
2023-03-21 23:07   ` [PATCH 08/28] cindex: use read-only shards during " Eric Wong
2023-03-21 23:07   ` [PATCH 09/28] searchidxshard: improve comment wording Eric Wong
2023-03-21 23:07   ` [PATCH 10/28] cindex: use DS and workqueues for parallelism Eric Wong
2023-03-21 23:07   ` [PATCH 11/28] ds: @post_loop_do replaces SetPostLoopCallback Eric Wong
2023-03-21 23:07   ` [PATCH 12/28] cindex: implement --exclude= like -clone Eric Wong
2023-03-21 23:07   ` [PATCH 13/28] cindex: show shard number in progress message Eric Wong
2023-03-21 23:07   ` [PATCH 14/28] cindex: drop `unchanged' " Eric Wong
2023-03-21 23:07   ` Eric Wong [this message]
2023-03-21 23:07   ` [PATCH 16/28] sigfd: pass signal name rather than number to callback Eric Wong
2023-03-21 23:07   ` [PATCH 17/28] cindex: implement --max-size=SIZE Eric Wong
2023-03-21 23:07   ` [PATCH 18/28] cindex: check for checkpoint before giant messages Eric Wong
2023-03-21 23:07   ` [PATCH 19/28] cindex: truncate or drop body for over-sized commits Eric Wong
2023-03-21 23:07   ` [PATCH 20/28] cindex: attempt to give oldest commits lowest docids Eric Wong
2023-03-21 23:07   ` [PATCH 21/28] cindex: improve granularity of quit checks Eric Wong
2023-03-21 23:07   ` [PATCH 22/28] spawn: show failing directory for chdir failures Eric Wong
2023-03-21 23:07   ` [PATCH 23/28] cindex: filter out non-existent git directories Eric Wong
2023-03-21 23:07   ` [PATCH 24/28] cindex: add support for --prune Eric Wong
2023-03-21 23:07   ` [PATCH 25/28] cindex: implement reindex Eric Wong
2023-03-21 23:07   ` [PATCH 26/28] cindex: squelch incompatible options Eric Wong
2023-03-21 23:07   ` [PATCH 27/28] cindex: respect existing permissions Eric Wong
2023-03-21 23:07   ` [PATCH 28/28] cindex: ignore SIGPIPE Eric Wong
2023-03-24 10:40     ` [PATCH 29/28] cindex: --prune checkpoints to avoid OOM Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230321230743.3020032-15-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).