user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
* [PATCH 0/4] extindex: checkpoints, graceful shutdown, cleanups
@ 2020-11-13 11:11 Eric Wong
  2020-11-13 11:11 ` [PATCH 1/4] *index: checkpoints write last_commit metadata Eric Wong
                   ` (3 more replies)
  0 siblings, 4 replies; 7+ messages in thread
From: Eric Wong @ 2020-11-13 11:11 UTC (permalink / raw)
  To: meta

Patches 1 and 4 should make long indexing runs more
user-friendly by being interrupt-friendly (via SIGINT, SIGQUIT,
or SIGTERM, just like read-only daemons).

I would've found this feature useful when dealing with unplanned
emergency shutdowns due to power outages.   I may continue to
find it useful in the future since the power grid falling to
pieces and see more power outages.

Stealing UI ideas from git, SIGUSR1 also triggers a checkpoint
during indexing.

2 and 3 are just cleanups I've noticed along the way.

Eric Wong (4):
  *index: checkpoints write last_commit metadata
  *index: avoid per-epoch --batch-check processes
  *index: discard sync->{todo} on iteration
  extindex: support graceful shutdown via QUIT/INT/TERM

 lib/PublicInbox/ExtSearchIdx.pm   | 23 +++++++++----
 lib/PublicInbox/IdxStack.pm       | 18 +++++++---
 lib/PublicInbox/SearchIdx.pm      | 56 +++++++++++++++++--------------
 lib/PublicInbox/SearchIdxShard.pm |  6 ++++
 lib/PublicInbox/V2Writable.pm     | 49 +++++++++++++++++++++------
 t/idx_stack.t                     | 20 ++++++-----
 6 files changed, 115 insertions(+), 57 deletions(-)

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH 1/4] *index: checkpoints write last_commit metadata
  2020-11-13 11:11 [PATCH 0/4] extindex: checkpoints, graceful shutdown, cleanups Eric Wong
@ 2020-11-13 11:11 ` Eric Wong
  2020-11-13 11:11 ` [PATCH 2/4] *index: avoid per-epoch --batch-check processes Eric Wong
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2020-11-13 11:11 UTC (permalink / raw)
  To: meta

This will set us up for supporting graceful shutdown
on -index without repeating any work.
---
 lib/PublicInbox/ExtSearchIdx.pm | 12 ++++---
 lib/PublicInbox/IdxStack.pm     | 16 +++++++---
 lib/PublicInbox/SearchIdx.pm    | 56 ++++++++++++++++++---------------
 lib/PublicInbox/V2Writable.pm   | 28 ++++++++++++-----
 t/idx_stack.t                   | 20 ++++++------
 5 files changed, 81 insertions(+), 51 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 7aaf8291..14ffdadb 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -168,6 +168,10 @@ sub do_finalize ($) {
 		# `d' message was already unindexed in the v1/v2 inboxes,
 		# so it's too noisy to warn, here.
 	}
+	# cur_cmt may be undef for unindex_oid, set by V2Writable::index_todo
+	if (defined(my $cur_cmt = $req->{cur_cmt})) {
+		${$req->{latest_cmt}} = $cur_cmt;
+	}
 }
 
 sub do_step ($) { # main iterator for adding messages to the index
@@ -337,10 +341,10 @@ sub eidx_sync { # main entry point
 }
 
 sub update_last_commit { # overrides V2Writable
-	my ($self, $sync, $unit, $latest_cmt) = @_;
-	return unless defined $latest_cmt;
-
-	$self->git->async_wait_all;
+	my ($self, $sync, $stk) = @_;
+	my $unit = $sync->{unit} // return;
+	my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+	defined($latest_cmt) or return;
 	my $ibx = $sync->{ibx} or die 'BUG: {ibx} missing';
 	my $ekey = $ibx->eidx_key;
 	my $uv = $ibx->uidvalidity;
diff --git a/lib/PublicInbox/IdxStack.pm b/lib/PublicInbox/IdxStack.pm
index ce75b46a..e7e10de9 100644
--- a/lib/PublicInbox/IdxStack.pm
+++ b/lib/PublicInbox/IdxStack.pm
@@ -6,7 +6,7 @@ package PublicInbox::IdxStack;
 use v5.10.1;
 use strict;
 use Fcntl qw(:seek);
-use constant FMT => eval { pack('Q', 1) } ? 'A1QQH*' : 'A1IIH*';
+use constant PACK_FMT => eval { pack('Q', 1) } ? 'A1QQH*H*' : 'A1IIH*H*';
 
 # start off in write-only mode
 sub new {
@@ -16,9 +16,15 @@ sub new {
 
 # file_char = [d|m]
 sub push_rec {
-	my ($self, $file_char, $at, $ct, $blob_oid) = @_;
-	my $rec = pack(FMT, $file_char, $at, $ct, $blob_oid);
-	$self->{rec_size} //= length($rec);
+	my ($self, $file_char, $at, $ct, $blob_oid, $cmt_oid) = @_;
+	my $rec = pack(PACK_FMT, $file_char, $at, $ct, $blob_oid, $cmt_oid);
+	$self->{unpack_fmt} //= do {
+		my $len = length($cmt_oid);
+		my $fmt = PACK_FMT;
+		$fmt =~ s/H\*/H$len/g;
+		$self->{rec_size} = length($rec);
+		$fmt;
+	};
 	print { $self->{wr} } $rec or die "print: $!";
 	$self->{tot_size} += length($rec);
 }
@@ -46,7 +52,7 @@ sub pop_rec {
 	my $r = read($io, my $buf, $sz);
 	defined($r) or die "read: $!";
 	$r == $sz or die "read($r != $sz)";
-	unpack(FMT, $buf);
+	unpack($self->{unpack_fmt}, $buf);
 }
 
 1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 662055c6..90d8c8b3 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -608,11 +608,17 @@ sub index_both { # git->cat_async callback
 	$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
 		die "E: could not generate NNTP article number for $oid";
 	add_message($self, $eml, $smsg, $sync);
+	my $cur_cmt = $sync->{cur_cmt} // die 'BUG: {cur_cmt} missing';
+	${$sync->{latest_cmt}} = $cur_cmt;
 }
 
 sub unindex_both { # git->cat_async callback
-	my ($bref, $oid, $type, $size, $self) = @_;
-	unindex_eml($self, $oid, PublicInbox::Eml->new($bref));
+	my ($bref, $oid, $type, $size, $sync) = @_;
+	unindex_eml($sync->{sidx}, $oid, PublicInbox::Eml->new($bref));
+	# may be undef if leftover
+	if (defined(my $cur_cmt = $sync->{cur_cmt})) {
+		${$sync->{latest_cmt}} = $cur_cmt;
+	}
 }
 
 sub with_umask {
@@ -646,34 +652,33 @@ sub v1_checkpoint ($$;$) {
 	my ($self, $sync, $stk) = @_;
 	$self->{ibx}->git->async_wait_all;
 
-	# latest_cmt may be undef
-	my $newest = $stk ? $stk->{latest_cmt} : undef;
-	if ($newest) {
+	# $newest may be undef
+	my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+	if (defined($newest)) {
 		my $cur = $self->{mm}->last_commit || '';
 		if (need_update($self, $cur, $newest)) {
 			$self->{mm}->last_commit($newest);
 		}
-	} else {
-		${$sync->{max}} = $self->{batch_bytes};
 	}
+	${$sync->{max}} = $self->{batch_bytes};
 
 	$self->{mm}->{dbh}->commit;
-	if ($newest && need_xapian($self)) {
-		my $xdb = $self->{xdb};
+	my $xdb = need_xapian($self) ? $self->{xdb} : undef;
+	if ($newest && $xdb) {
 		my $cur = $xdb->get_metadata('last_commit');
 		if (need_update($self, $cur, $newest)) {
 			$xdb->set_metadata('last_commit', $newest);
 		}
-
+	}
+	if ($stk) { # all done if $stk is passed
 		# let SearchView know a full --reindex was done so it can
 		# generate ->has_threadid-dependent links
-		if ($sync->{reindex} && !ref($sync->{reindex})) {
+		if ($xdb && $sync->{reindex} && !ref($sync->{reindex})) {
 			my $n = $xdb->get_metadata('has_threadid');
 			$xdb->set_metadata('has_threadid', '1') if $n ne '1';
 		}
+		$self->{oidx}->rethread_done($sync->{-opt}); # all done
 	}
-
-	$self->{oidx}->rethread_done($sync->{-opt}) if $newest; # all done
 	commit_txn_lazy($self);
 	$sync->{ibx}->git->cleanup;
 	my $nr = ${$sync->{nr}};
@@ -697,21 +702,24 @@ sub process_stack {
 	$sync->{nr} = \$nr;
 	$sync->{max} = \$max;
 	$sync->{sidx} = $self;
+	$sync->{latest_cmt} = \(my $latest_cmt);
 
 	$self->{mm}->{dbh}->begin_work;
 	if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
 		warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
 		for my $oid (@leftovers) {
 			$oid = unpack('H*', $oid);
-			$git->cat_async($oid, \&unindex_both, $self);
+			$git->cat_async($oid, \&unindex_both, $sync);
 		}
 	}
 	if ($sync->{max_size} = $sync->{-opt}->{max_size}) {
 		$sync->{index_oid} = \&index_both;
 	}
-	while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+	while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
+		my $arg = { %$sync, cur_cmt => $cur_cmt };
 		if ($f eq 'm') {
-			my $arg = { %$sync, autime => $at, cotime => $ct };
+			$arg->{autime} = $at;
+			$arg->{cotime} = $ct;
 			if ($sync->{max_size}) {
 				$git->check_async($oid, \&check_size, $arg);
 			} else {
@@ -719,7 +727,7 @@ sub process_stack {
 			}
 			v1_checkpoint($self, $sync) if $max <= 0;
 		} elsif ($f eq 'd') {
-			$git->cat_async($oid, \&unindex_both, $self);
+			$git->cat_async($oid, \&unindex_both, $arg);
 		}
 	}
 	v1_checkpoint($self, $sync, $stk);
@@ -743,17 +751,17 @@ sub log2stack ($$$) {
 	my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
 				--no-notes --no-color --no-renames --no-abbrev),
 				$range);
-	my ($at, $ct, $stk);
+	my ($at, $ct, $stk, $cmt);
 	while (<$fh>) {
 		if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
-			($at, $ct) = ($1 + 0, $2 + 0);
-			$stk //= PublicInbox::IdxStack->new($3);
+			($at, $ct, $cmt) = ($1 + 0, $2 + 0, $3);
+			$stk //= PublicInbox::IdxStack->new($cmt);
 		} elsif (/$del/) {
 			my $oid = $1;
 			if ($D) { # reindex case
 				$D->{pack('H*', $oid)}++;
 			} else { # non-reindex case:
-				$stk->push_rec('d', $at, $ct, $oid);
+				$stk->push_rec('d', $at, $ct, $oid, $cmt);
 			}
 		} elsif (/$add/) {
 			my $oid = $1;
@@ -761,12 +769,10 @@ sub log2stack ($$$) {
 				my $oid_bin = pack('H*', $oid);
 				my $nr = --$D->{$oid_bin};
 				delete($D->{$oid_bin}) if $nr <= 0;
-
 				# nr < 0 (-1) means it never existed
-				$stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
-			} else {
-				$stk->push_rec('m', $at, $ct, $oid);
+				next if $nr >= 0;
 			}
+			$stk->push_rec('m', $at, $ct, $oid, $cmt);
 		}
 	}
 	close $fh or die "git log failed: \$?=$?";
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 18f33655..87b76501 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -861,6 +861,7 @@ sub reindex_checkpoint ($$) {
 	my ($self, $sync) = @_;
 
 	$self->git->async_wait_all;
+	$self->update_last_commit($sync);
 	${$sync->{need_checkpoint}} = 0;
 	my $mm_tmp = $sync->{mm_tmp};
 	$mm_tmp->atfork_prepare if $mm_tmp;
@@ -955,19 +956,22 @@ sub index_oid { # cat_async callback
 	if (do_idx($self, $bref, $eml, $smsg)) {
 		${$arg->{need_checkpoint}} = 1;
 	}
+	${$arg->{latest_cmt}} = $arg->{cur_cmt} // die 'BUG: {cur_cmt} missing';
 }
 
 # only update last_commit for $i on reindex iff newer than current
-# $sync will be used by subclasses
 sub update_last_commit {
-	my ($self, $sync, $unit, $cmt) = @_;
+	my ($self, $sync, $stk) = @_;
+	my $unit = $sync->{unit} // return;
+	my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+	defined($latest_cmt) or return;
 	my $last = last_epoch_commit($self, $unit->{epoch});
-	if (defined $last && is_ancestor($unit->{git}, $last, $cmt)) {
-		my @cmd = (qw(rev-list --count), "$last..$cmt");
+	if (defined $last && is_ancestor($unit->{git}, $last, $latest_cmt)) {
+		my @cmd = (qw(rev-list --count), "$last..$latest_cmt");
 		chomp(my $n = $unit->{git}->qx(@cmd));
 		return if $n ne '' && $n == 0;
 	}
-	last_epoch_commit($self, $unit->{epoch}, $cmt);
+	last_epoch_commit($self, $unit->{epoch}, $latest_cmt);
 }
 
 sub last_commits {
@@ -1245,8 +1249,16 @@ sub index_todo ($$$) {
 		$pfx //= $unit->{git}->{git_dir};
 	}
 	local $self->{current_info} = "$pfx ";
-	while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
-		my $req = { %$sync, autime => $at, cotime => $ct, oid => $oid };
+	local $sync->{latest_cmt} = \(my $latest_cmt);
+	local $sync->{unit} = $unit;
+	while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
+		my $req = {
+			%$sync,
+			autime => $at,
+			cotime => $ct,
+			oid => $oid,
+			cur_cmt => $cmt
+		};
 		if ($f eq 'm') {
 			if ($sync->{max_size}) {
 				$all->check_async($oid, \&check_size, $req);
@@ -1261,7 +1273,7 @@ sub index_todo ($$$) {
 		}
 	}
 	$all->async_wait_all;
-	$self->update_last_commit($sync, $unit, $stk->{latest_cmt});
+	$self->update_last_commit($sync, $stk);
 }
 
 sub xapian_only {
diff --git a/t/idx_stack.t b/t/idx_stack.t
index 35aff37b..e0474fa4 100644
--- a/t/idx_stack.t
+++ b/t/idx_stack.t
@@ -6,6 +6,8 @@ use Test::More;
 use_ok 'PublicInbox::IdxStack';
 my $oid_a = '03c21563cf15c241687966b5b2a3f37cdc193316';
 my $oid_b = '963caad026055ab9bcbe3ee9550247f9d8840feb';
+my $cmt_a = 'df8e4a0612545d53672036641e9f076efc94c2f6';
+my $cmt_b = '3ba7c9fa4a083c439e768882c571c2026a981ca5';
 
 my $stk = PublicInbox::IdxStack->new;
 is($stk->read_prepare, $stk, 'nothing');
@@ -13,19 +15,19 @@ is($stk->num_records, 0, 'no records');
 is($stk->pop_rec, undef, 'undef on empty');
 
 $stk = PublicInbox::IdxStack->new;
-$stk->push_rec('m', 1234, 5678, $oid_a);
+$stk->push_rec('m', 1234, 5678, $oid_a, $cmt_a);
 is($stk->read_prepare, $stk, 'read_prepare');
 is($stk->num_records, 1, 'num_records');
-is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop once');
+is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a, $cmt_a], 'pop once');
 is($stk->pop_rec, undef, 'undef on empty');
 
 $stk = PublicInbox::IdxStack->new;
-$stk->push_rec('m', 1234, 5678, $oid_a);
-$stk->push_rec('d', 1234, 5678, $oid_b);
+$stk->push_rec('m', 1234, 5678, $oid_a, $cmt_a);
+$stk->push_rec('d', 1234, 5678, $oid_b, $cmt_b);
 is($stk->read_prepare, $stk, 'read_prepare');
 is($stk->num_records, 2, 'num_records');
-is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b], 'pop');
-is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop-pop');
+is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b, $cmt_b], 'pop');
+is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a, $cmt_a], 'pop-pop');
 is($stk->pop_rec, undef, 'empty');
 
 SKIP: {
@@ -37,11 +39,11 @@ SKIP: {
 	while (<$fh>) {
 		chomp;
 		my ($at, $ct, $H) = split(/\./);
-		$stk //= PublicInbox::IdxStack->new($H);
+		$stk //= PublicInbox::IdxStack->new;
 		# not bothering to parse blobs here, just using commit OID
 		# as a blob OID since they're the same size + format
-		$stk->push_rec('m', $at + 0, $ct + 0, $H);
-		push(@expect, [ 'm', $at, $ct, $H ]);
+		$stk->push_rec('m', $at + 0, $ct + 0, $H, $H);
+		push(@expect, [ 'm', $at, $ct, $H, $H ]);
 	}
 	$stk or skip('nothing from git log', 3);
 	is($stk->read_prepare, $stk, 'read_prepare');

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 2/4] *index: avoid per-epoch --batch-check processes
  2020-11-13 11:11 [PATCH 0/4] extindex: checkpoints, graceful shutdown, cleanups Eric Wong
  2020-11-13 11:11 ` [PATCH 1/4] *index: checkpoints write last_commit metadata Eric Wong
@ 2020-11-13 11:11 ` Eric Wong
  2020-11-13 12:38   ` Kyle Meyer
  2020-11-13 11:11 ` [PATCH 3/4] *index: discard sync->{todo} on iteration Eric Wong
  2020-11-13 11:11 ` [PATCH 4/4] extindex: support graceful shutdown via QUIT/INT/TERM Eric Wong
  3 siblings, 1 reply; 7+ messages in thread
From: Eric Wong @ 2020-11-13 11:11 UTC (permalink / raw)
  To: meta

Since all.git (v2) and ALL.git (extindex) encompass every single
epoch or indexed inbox; and we is_ancestor() only uses
hexadecimal OIDs, there is no good reason to to use $unit->{git}
for an epoch-local $git->check avoids redundant long-lived
processes.

This prevents dozens/hundreds of --batch-check processes from
being left running after indexing and can improve locality
if size checks are being done (since that uses --batch-check,
too).

Theoretically an several epochs may have conflicting OIDs, but
we're screwed in those cases, anyways, so we might as well
detect it earlier (though I'm not sure what the behavior would
be :x).
---
 lib/PublicInbox/ExtSearchIdx.pm | 2 +-
 lib/PublicInbox/V2Writable.pm   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 14ffdadb..2d230dc1 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -361,7 +361,7 @@ sub update_last_commit { # overrides V2Writable
 		die "Unsupported inbox version: $v";
 	}
 	my $last = $self->{oidx}->eidx_meta($meta_key);
-	if (defined $last && is_ancestor($unit->{git}, $last, $latest_cmt)) {
+	if (defined $last && is_ancestor($self->git, $last, $latest_cmt)) {
 		my @cmd = (qw(rev-list --count), "$last..$latest_cmt");
 		chomp(my $n = $unit->{git}->qx(@cmd));
 		return if $n ne '' && $n == 0;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 87b76501..cf44c95b 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -966,7 +966,7 @@ sub update_last_commit {
 	my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
 	defined($latest_cmt) or return;
 	my $last = last_epoch_commit($self, $unit->{epoch});
-	if (defined $last && is_ancestor($unit->{git}, $last, $latest_cmt)) {
+	if (defined $last && is_ancestor($self->git, $last, $latest_cmt)) {
 		my @cmd = (qw(rev-list --count), "$last..$latest_cmt");
 		chomp(my $n = $unit->{git}->qx(@cmd));
 		return if $n ne '' && $n == 0;
@@ -1003,7 +1003,7 @@ sub log_range ($$$) {
 	my $range = "$cur..$tip";
 	$pr->("$i.git checking contiguity... ") if $pr;
 	my $git = $unit->{git};
-	if (is_ancestor($git, $cur, $tip)) { # common case
+	if (is_ancestor($sync->{self}->git, $cur, $tip)) { # common case
 		$pr->("OK\n") if $pr;
 		my $n = $git->qx(qw(rev-list --count), $range);
 		chomp($n);

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 3/4] *index: discard sync->{todo} on iteration
  2020-11-13 11:11 [PATCH 0/4] extindex: checkpoints, graceful shutdown, cleanups Eric Wong
  2020-11-13 11:11 ` [PATCH 1/4] *index: checkpoints write last_commit metadata Eric Wong
  2020-11-13 11:11 ` [PATCH 2/4] *index: avoid per-epoch --batch-check processes Eric Wong
@ 2020-11-13 11:11 ` Eric Wong
  2020-11-13 11:11 ` [PATCH 4/4] extindex: support graceful shutdown via QUIT/INT/TERM Eric Wong
  3 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2020-11-13 11:11 UTC (permalink / raw)
  To: meta

There's no need to continuously append to {todo} when indexing
multiple inboxes.  They're not redundantly indexed (because the
IdxStack is discarded, making it a noop), but it's still a waste
of memory keeping the $unit hashrefs around.
---
 lib/PublicInbox/ExtSearchIdx.pm | 2 +-
 lib/PublicInbox/V2Writable.pm   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 2d230dc1..6c09c460 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -308,7 +308,7 @@ sub _sync_inbox ($$$) {
 		warn "E: $ekey unsupported inbox version (v$v)\n";
 		return;
 	}
-	index_todo($self, $sync, $_) for @{$sync->{todo}};
+	index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
 }
 
 sub eidx_sync { # main entry point
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index cf44c95b..11cde627 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -1351,7 +1351,7 @@ sub index_sync {
 		}
 	}
 	# work forwards through history
-	index_todo($self, $sync, $_) for @{$sync->{todo}};
+	index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
 	$self->{oidx}->rethread_done($opt);
 	$self->done;
 

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 4/4] extindex: support graceful shutdown via QUIT/INT/TERM
  2020-11-13 11:11 [PATCH 0/4] extindex: checkpoints, graceful shutdown, cleanups Eric Wong
                   ` (2 preceding siblings ...)
  2020-11-13 11:11 ` [PATCH 3/4] *index: discard sync->{todo} on iteration Eric Wong
@ 2020-11-13 11:11 ` Eric Wong
  3 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2020-11-13 11:11 UTC (permalink / raw)
  To: meta

Just like the daemon processes, -extindex now supports graceful
shutdown via the same signals.  This lets users avoid having to
repeat indexing messages when a power outage strikes during a
long (multi-hour/day) indexing run.

Per-inbox (v1/v2) -index graceful shutdowns are not supported,
yet, but is planned for later.
---
 lib/PublicInbox/ExtSearchIdx.pm   |  7 ++++++-
 lib/PublicInbox/IdxStack.pm       |  2 ++
 lib/PublicInbox/SearchIdxShard.pm |  6 ++++++
 lib/PublicInbox/V2Writable.pm     | 17 ++++++++++++++++-
 4 files changed, 30 insertions(+), 2 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 6c09c460..91434b26 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -329,13 +329,18 @@ sub eidx_sync { # main entry point
 		-regen_fmt => "%u/?\n",
 	};
 	local $SIG{USR1} = sub { $need_checkpoint = 1 };
+	my $quit = sub { $sync->{quit} = 1; warn "gracefully quitting\n"; };
+	local $SIG{QUIT} = $quit;
+	local $SIG{INT} = $quit;
+	local $SIG{TERM} = $quit;
 
 	# don't use $_ here, it'll get clobbered by reindex_checkpoint
 	for my $ibx (@{$self->{ibx_list}}) {
 		_sync_inbox($self, $sync, $ibx);
+		last if $sync->{quit};
 	}
 
-	$self->{oidx}->rethread_done($opt);
+	$self->{oidx}->rethread_done($opt) unless $sync->{quit};
 
 	PublicInbox::V2Writable::done($self);
 }
diff --git a/lib/PublicInbox/IdxStack.pm b/lib/PublicInbox/IdxStack.pm
index e7e10de9..c55c5c36 100644
--- a/lib/PublicInbox/IdxStack.pm
+++ b/lib/PublicInbox/IdxStack.pm
@@ -11,6 +11,8 @@ use constant PACK_FMT => eval { pack('Q', 1) } ? 'A1QQH*H*' : 'A1IIH*H*';
 # start off in write-only mode
 sub new {
 	open(my $io, '+>', undef) or die "open: $!";
+	# latest_cmt is still useful when the newest revision is a `d'(elete),
+	# otherwise we favor $sync->{latest_cmt} for checkpoints and {quit}
 	bless { wr => $io, latest_cmt => $_[1] }, __PACKAGE__
 }
 
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 1333b305..875a9ec9 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -10,6 +10,7 @@ use parent qw(PublicInbox::SearchIdx);
 use bytes qw(length);
 use IO::Handle (); # autoflush
 use PublicInbox::Eml;
+use PublicInbox::Sigfd;
 
 sub new {
 	my ($class, $v2w, $shard) = @_; # v2w may be ExtSearchIdx
@@ -29,9 +30,13 @@ sub spawn_worker {
 	my ($r, $w);
 	pipe($r, $w) or die "pipe failed: $!\n";
 	$w->autoflush(1);
+	my $oldset = PublicInbox::Sigfd::block_signals();
 	my $pid = fork;
 	defined $pid or die "fork failed: $!\n";
 	if ($pid == 0) {
+		# these signals are localized in parent
+		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+		PublicInbox::Sigfd::sig_setmask($oldset);
 		my $bnote = $v2w->atfork_child;
 		close $w or die "failed to close: $!";
 
@@ -44,6 +49,7 @@ sub spawn_worker {
 		die "unexpected MM $self->{mm}" if $self->{mm};
 		exit;
 	}
+	PublicInbox::Sigfd::sig_setmask($oldset);
 	$self->{pid} = $pid;
 	$self->{w} = $w;
 	close $r or die "failed to close: $!";
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 11cde627..5bac04a4 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -1090,6 +1090,7 @@ sub sync_prepare ($$) {
 		$unit->{stack} = $stk; # may be undef
 		unshift @{$sync->{todo}}, $unit;
 		$regen_max += $nr;
+		last if $sync->{quit};
 	}
 
 	# XXX this should not happen unless somebody bypasses checks in
@@ -1102,9 +1103,11 @@ sub sync_prepare ($$) {
 			$oid = unpack('H*', $oid);
 			my $req = { %$sync, oid => $oid };
 			$self->git->cat_async($oid, $unindex_oid, $req);
+			last if $sync->{quit};
 		}
 		$self->git->cat_async_wait;
 	}
+	return 0 if $sync->{quit};
 	if (!$regen_max) {
 		$sync->{-regen_fmt} = "%u/?\n";
 		return 0;
@@ -1236,6 +1239,7 @@ sub index_xap_step ($$$;$) {
 
 sub index_todo ($$$) {
 	my ($self, $sync, $unit) = @_;
+	return if $sync->{quit};
 	unindex_todo($self, $sync, $unit);
 	my $stk = delete($unit->{stack}) or return;
 	my $all = $self->git;
@@ -1268,6 +1272,12 @@ sub index_todo ($$$) {
 		} elsif ($f eq 'd') {
 			$all->cat_async($oid, $unindex_oid, $req);
 		}
+		if ($sync->{quit}) {
+			warn "waiting to quit...\n";
+			$all->async_wait_all;
+			$self->update_last_commit($sync);
+			return;
+		}
 		if (${$sync->{need_checkpoint}}) {
 			reindex_checkpoint($self, $sync);
 		}
@@ -1334,6 +1344,11 @@ sub index_sync {
 		ibx => $self->{ibx},
 		epoch_max => $epoch_max,
 	};
+	my $quit = sub { $sync->{quit} = 1 };
+	local $SIG{QUIT} = $quit;
+	local $SIG{INT} = $quit;
+	local $SIG{TERM} = $quit;
+
 	if (sync_prepare($self, $sync)) {
 		# tmp_clone seems to fail if inside a transaction, so
 		# we rollback here (because we opened {mm} for reading)
@@ -1352,7 +1367,7 @@ sub index_sync {
 	}
 	# work forwards through history
 	index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
-	$self->{oidx}->rethread_done($opt);
+	$self->{oidx}->rethread_done($opt) unless $sync->{quit};
 	$self->done;
 
 	if (my $nr = $sync->{nr}) {

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* Re: [PATCH 2/4] *index: avoid per-epoch --batch-check processes
  2020-11-13 11:11 ` [PATCH 2/4] *index: avoid per-epoch --batch-check processes Eric Wong
@ 2020-11-13 12:38   ` Kyle Meyer
  2020-11-15  3:03     ` Eric Wong
  0 siblings, 1 reply; 7+ messages in thread
From: Kyle Meyer @ 2020-11-13 12:38 UTC (permalink / raw)
  To: Eric Wong; +Cc: meta

[ superficial typo comments from a reader who is out of his depth :) ]

Eric Wong writes:

> Since all.git (v2) and ALL.git (extindex) encompass every single
> epoch or indexed inbox; and we is_ancestor() only uses

s/we is_ancestor/is_ancestor/?

> hexadecimal OIDs, there is no good reason to to use $unit->{git}
> for an epoch-local $git->check avoids redundant long-lived
> processes.

s/to to/to/

I'm having trouble parsing that last part.  Perhaps the "avoids ..." is
covered by the next paragraph and should be dropped?

> This prevents dozens/hundreds of --batch-check processes from
> being left running after indexing and can improve locality
> if size checks are being done (since that uses --batch-check,
> too).
>
> Theoretically an several epochs may have conflicting OIDs, but

s/an several/several/

> we're screwed in those cases, anyways, so we might as well
> detect it earlier (though I'm not sure what the behavior would
> be :x).

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH 2/4] *index: avoid per-epoch --batch-check processes
  2020-11-13 12:38   ` Kyle Meyer
@ 2020-11-15  3:03     ` Eric Wong
  0 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2020-11-15  3:03 UTC (permalink / raw)
  To: Kyle Meyer; +Cc: meta

Kyle Meyer <kyle@kyleam.com> wrote:
> [ superficial typo comments from a reader who is out of his depth :) ]

They're greatly appreciated.  My brain is working worse than
usual these days, so extra eyes anywhere are appreciated.  And
trying to get my mind off crap by trying something new caused me
to hit my head, so maybe I'm concussed, too :x

> Eric Wong writes:
> 
> > Since all.git (v2) and ALL.git (extindex) encompass every single
> > epoch or indexed inbox; and we is_ancestor() only uses
> 
> s/we is_ancestor/is_ancestor/?

yup

> > hexadecimal OIDs, there is no good reason to to use $unit->{git}
> > for an epoch-local $git->check avoids redundant long-lived
> > processes.
> 
> s/to to/to/
> 
> I'm having trouble parsing that last part.  Perhaps the "avoids ..." is
> covered by the next paragraph and should be dropped?

Agreed, dropped that bit.

> s/an several/several/

Thanks

^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2020-11-15  3:03 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-11-13 11:11 [PATCH 0/4] extindex: checkpoints, graceful shutdown, cleanups Eric Wong
2020-11-13 11:11 ` [PATCH 1/4] *index: checkpoints write last_commit metadata Eric Wong
2020-11-13 11:11 ` [PATCH 2/4] *index: avoid per-epoch --batch-check processes Eric Wong
2020-11-13 12:38   ` Kyle Meyer
2020-11-15  3:03     ` Eric Wong
2020-11-13 11:11 ` [PATCH 3/4] *index: discard sync->{todo} on iteration Eric Wong
2020-11-13 11:11 ` [PATCH 4/4] extindex: support graceful shutdown via QUIT/INT/TERM Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).