* [PATCH 0/4] extindex: checkpoints, graceful shutdown, cleanups
@ 2020-11-13 11:11 7% Eric Wong
2020-11-13 11:11 4% ` [PATCH 1/4] *index: checkpoints write last_commit metadata Eric Wong
0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2020-11-13 11:11 UTC (permalink / raw)
To: meta
Patches 1 and 4 should make long indexing runs more
user-friendly by being interrupt-friendly (via SIGINT, SIGQUIT,
or SIGTERM, just like read-only daemons).
I would've found this feature useful when dealing with unplanned
emergency shutdowns due to power outages. I may continue to
find it useful in the future since the power grid falling to
pieces and see more power outages.
Stealing UI ideas from git, SIGUSR1 also triggers a checkpoint
during indexing.
2 and 3 are just cleanups I've noticed along the way.
Eric Wong (4):
*index: checkpoints write last_commit metadata
*index: avoid per-epoch --batch-check processes
*index: discard sync->{todo} on iteration
extindex: support graceful shutdown via QUIT/INT/TERM
lib/PublicInbox/ExtSearchIdx.pm | 23 +++++++++----
lib/PublicInbox/IdxStack.pm | 18 +++++++---
lib/PublicInbox/SearchIdx.pm | 56 +++++++++++++++++--------------
lib/PublicInbox/SearchIdxShard.pm | 6 ++++
lib/PublicInbox/V2Writable.pm | 49 +++++++++++++++++++++------
t/idx_stack.t | 20 ++++++-----
6 files changed, 115 insertions(+), 57 deletions(-)
^ permalink raw reply [relevance 7%]
* [PATCH 1/4] *index: checkpoints write last_commit metadata
2020-11-13 11:11 7% [PATCH 0/4] extindex: checkpoints, graceful shutdown, cleanups Eric Wong
@ 2020-11-13 11:11 4% ` Eric Wong
0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2020-11-13 11:11 UTC (permalink / raw)
To: meta
This will set us up for supporting graceful shutdown
on -index without repeating any work.
---
lib/PublicInbox/ExtSearchIdx.pm | 12 ++++---
lib/PublicInbox/IdxStack.pm | 16 +++++++---
lib/PublicInbox/SearchIdx.pm | 56 ++++++++++++++++++---------------
lib/PublicInbox/V2Writable.pm | 28 ++++++++++++-----
t/idx_stack.t | 20 ++++++------
5 files changed, 81 insertions(+), 51 deletions(-)
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 7aaf8291..14ffdadb 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -168,6 +168,10 @@ sub do_finalize ($) {
# `d' message was already unindexed in the v1/v2 inboxes,
# so it's too noisy to warn, here.
}
+ # cur_cmt may be undef for unindex_oid, set by V2Writable::index_todo
+ if (defined(my $cur_cmt = $req->{cur_cmt})) {
+ ${$req->{latest_cmt}} = $cur_cmt;
+ }
}
sub do_step ($) { # main iterator for adding messages to the index
@@ -337,10 +341,10 @@ sub eidx_sync { # main entry point
}
sub update_last_commit { # overrides V2Writable
- my ($self, $sync, $unit, $latest_cmt) = @_;
- return unless defined $latest_cmt;
-
- $self->git->async_wait_all;
+ my ($self, $sync, $stk) = @_;
+ my $unit = $sync->{unit} // return;
+ my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+ defined($latest_cmt) or return;
my $ibx = $sync->{ibx} or die 'BUG: {ibx} missing';
my $ekey = $ibx->eidx_key;
my $uv = $ibx->uidvalidity;
diff --git a/lib/PublicInbox/IdxStack.pm b/lib/PublicInbox/IdxStack.pm
index ce75b46a..e7e10de9 100644
--- a/lib/PublicInbox/IdxStack.pm
+++ b/lib/PublicInbox/IdxStack.pm
@@ -6,7 +6,7 @@ package PublicInbox::IdxStack;
use v5.10.1;
use strict;
use Fcntl qw(:seek);
-use constant FMT => eval { pack('Q', 1) } ? 'A1QQH*' : 'A1IIH*';
+use constant PACK_FMT => eval { pack('Q', 1) } ? 'A1QQH*H*' : 'A1IIH*H*';
# start off in write-only mode
sub new {
@@ -16,9 +16,15 @@ sub new {
# file_char = [d|m]
sub push_rec {
- my ($self, $file_char, $at, $ct, $blob_oid) = @_;
- my $rec = pack(FMT, $file_char, $at, $ct, $blob_oid);
- $self->{rec_size} //= length($rec);
+ my ($self, $file_char, $at, $ct, $blob_oid, $cmt_oid) = @_;
+ my $rec = pack(PACK_FMT, $file_char, $at, $ct, $blob_oid, $cmt_oid);
+ $self->{unpack_fmt} //= do {
+ my $len = length($cmt_oid);
+ my $fmt = PACK_FMT;
+ $fmt =~ s/H\*/H$len/g;
+ $self->{rec_size} = length($rec);
+ $fmt;
+ };
print { $self->{wr} } $rec or die "print: $!";
$self->{tot_size} += length($rec);
}
@@ -46,7 +52,7 @@ sub pop_rec {
my $r = read($io, my $buf, $sz);
defined($r) or die "read: $!";
$r == $sz or die "read($r != $sz)";
- unpack(FMT, $buf);
+ unpack($self->{unpack_fmt}, $buf);
}
1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 662055c6..90d8c8b3 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -608,11 +608,17 @@ sub index_both { # git->cat_async callback
$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
die "E: could not generate NNTP article number for $oid";
add_message($self, $eml, $smsg, $sync);
+ my $cur_cmt = $sync->{cur_cmt} // die 'BUG: {cur_cmt} missing';
+ ${$sync->{latest_cmt}} = $cur_cmt;
}
sub unindex_both { # git->cat_async callback
- my ($bref, $oid, $type, $size, $self) = @_;
- unindex_eml($self, $oid, PublicInbox::Eml->new($bref));
+ my ($bref, $oid, $type, $size, $sync) = @_;
+ unindex_eml($sync->{sidx}, $oid, PublicInbox::Eml->new($bref));
+ # may be undef if leftover
+ if (defined(my $cur_cmt = $sync->{cur_cmt})) {
+ ${$sync->{latest_cmt}} = $cur_cmt;
+ }
}
sub with_umask {
@@ -646,34 +652,33 @@ sub v1_checkpoint ($$;$) {
my ($self, $sync, $stk) = @_;
$self->{ibx}->git->async_wait_all;
- # latest_cmt may be undef
- my $newest = $stk ? $stk->{latest_cmt} : undef;
- if ($newest) {
+ # $newest may be undef
+ my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+ if (defined($newest)) {
my $cur = $self->{mm}->last_commit || '';
if (need_update($self, $cur, $newest)) {
$self->{mm}->last_commit($newest);
}
- } else {
- ${$sync->{max}} = $self->{batch_bytes};
}
+ ${$sync->{max}} = $self->{batch_bytes};
$self->{mm}->{dbh}->commit;
- if ($newest && need_xapian($self)) {
- my $xdb = $self->{xdb};
+ my $xdb = need_xapian($self) ? $self->{xdb} : undef;
+ if ($newest && $xdb) {
my $cur = $xdb->get_metadata('last_commit');
if (need_update($self, $cur, $newest)) {
$xdb->set_metadata('last_commit', $newest);
}
-
+ }
+ if ($stk) { # all done if $stk is passed
# let SearchView know a full --reindex was done so it can
# generate ->has_threadid-dependent links
- if ($sync->{reindex} && !ref($sync->{reindex})) {
+ if ($xdb && $sync->{reindex} && !ref($sync->{reindex})) {
my $n = $xdb->get_metadata('has_threadid');
$xdb->set_metadata('has_threadid', '1') if $n ne '1';
}
+ $self->{oidx}->rethread_done($sync->{-opt}); # all done
}
-
- $self->{oidx}->rethread_done($sync->{-opt}) if $newest; # all done
commit_txn_lazy($self);
$sync->{ibx}->git->cleanup;
my $nr = ${$sync->{nr}};
@@ -697,21 +702,24 @@ sub process_stack {
$sync->{nr} = \$nr;
$sync->{max} = \$max;
$sync->{sidx} = $self;
+ $sync->{latest_cmt} = \(my $latest_cmt);
$self->{mm}->{dbh}->begin_work;
if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
for my $oid (@leftovers) {
$oid = unpack('H*', $oid);
- $git->cat_async($oid, \&unindex_both, $self);
+ $git->cat_async($oid, \&unindex_both, $sync);
}
}
if ($sync->{max_size} = $sync->{-opt}->{max_size}) {
$sync->{index_oid} = \&index_both;
}
- while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+ while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
+ my $arg = { %$sync, cur_cmt => $cur_cmt };
if ($f eq 'm') {
- my $arg = { %$sync, autime => $at, cotime => $ct };
+ $arg->{autime} = $at;
+ $arg->{cotime} = $ct;
if ($sync->{max_size}) {
$git->check_async($oid, \&check_size, $arg);
} else {
@@ -719,7 +727,7 @@ sub process_stack {
}
v1_checkpoint($self, $sync) if $max <= 0;
} elsif ($f eq 'd') {
- $git->cat_async($oid, \&unindex_both, $self);
+ $git->cat_async($oid, \&unindex_both, $arg);
}
}
v1_checkpoint($self, $sync, $stk);
@@ -743,17 +751,17 @@ sub log2stack ($$$) {
my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
--no-notes --no-color --no-renames --no-abbrev),
$range);
- my ($at, $ct, $stk);
+ my ($at, $ct, $stk, $cmt);
while (<$fh>) {
if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
- ($at, $ct) = ($1 + 0, $2 + 0);
- $stk //= PublicInbox::IdxStack->new($3);
+ ($at, $ct, $cmt) = ($1 + 0, $2 + 0, $3);
+ $stk //= PublicInbox::IdxStack->new($cmt);
} elsif (/$del/) {
my $oid = $1;
if ($D) { # reindex case
$D->{pack('H*', $oid)}++;
} else { # non-reindex case:
- $stk->push_rec('d', $at, $ct, $oid);
+ $stk->push_rec('d', $at, $ct, $oid, $cmt);
}
} elsif (/$add/) {
my $oid = $1;
@@ -761,12 +769,10 @@ sub log2stack ($$$) {
my $oid_bin = pack('H*', $oid);
my $nr = --$D->{$oid_bin};
delete($D->{$oid_bin}) if $nr <= 0;
-
# nr < 0 (-1) means it never existed
- $stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
- } else {
- $stk->push_rec('m', $at, $ct, $oid);
+ next if $nr >= 0;
}
+ $stk->push_rec('m', $at, $ct, $oid, $cmt);
}
}
close $fh or die "git log failed: \$?=$?";
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 18f33655..87b76501 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -861,6 +861,7 @@ sub reindex_checkpoint ($$) {
my ($self, $sync) = @_;
$self->git->async_wait_all;
+ $self->update_last_commit($sync);
${$sync->{need_checkpoint}} = 0;
my $mm_tmp = $sync->{mm_tmp};
$mm_tmp->atfork_prepare if $mm_tmp;
@@ -955,19 +956,22 @@ sub index_oid { # cat_async callback
if (do_idx($self, $bref, $eml, $smsg)) {
${$arg->{need_checkpoint}} = 1;
}
+ ${$arg->{latest_cmt}} = $arg->{cur_cmt} // die 'BUG: {cur_cmt} missing';
}
# only update last_commit for $i on reindex iff newer than current
-# $sync will be used by subclasses
sub update_last_commit {
- my ($self, $sync, $unit, $cmt) = @_;
+ my ($self, $sync, $stk) = @_;
+ my $unit = $sync->{unit} // return;
+ my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+ defined($latest_cmt) or return;
my $last = last_epoch_commit($self, $unit->{epoch});
- if (defined $last && is_ancestor($unit->{git}, $last, $cmt)) {
- my @cmd = (qw(rev-list --count), "$last..$cmt");
+ if (defined $last && is_ancestor($unit->{git}, $last, $latest_cmt)) {
+ my @cmd = (qw(rev-list --count), "$last..$latest_cmt");
chomp(my $n = $unit->{git}->qx(@cmd));
return if $n ne '' && $n == 0;
}
- last_epoch_commit($self, $unit->{epoch}, $cmt);
+ last_epoch_commit($self, $unit->{epoch}, $latest_cmt);
}
sub last_commits {
@@ -1245,8 +1249,16 @@ sub index_todo ($$$) {
$pfx //= $unit->{git}->{git_dir};
}
local $self->{current_info} = "$pfx ";
- while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
- my $req = { %$sync, autime => $at, cotime => $ct, oid => $oid };
+ local $sync->{latest_cmt} = \(my $latest_cmt);
+ local $sync->{unit} = $unit;
+ while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
+ my $req = {
+ %$sync,
+ autime => $at,
+ cotime => $ct,
+ oid => $oid,
+ cur_cmt => $cmt
+ };
if ($f eq 'm') {
if ($sync->{max_size}) {
$all->check_async($oid, \&check_size, $req);
@@ -1261,7 +1273,7 @@ sub index_todo ($$$) {
}
}
$all->async_wait_all;
- $self->update_last_commit($sync, $unit, $stk->{latest_cmt});
+ $self->update_last_commit($sync, $stk);
}
sub xapian_only {
diff --git a/t/idx_stack.t b/t/idx_stack.t
index 35aff37b..e0474fa4 100644
--- a/t/idx_stack.t
+++ b/t/idx_stack.t
@@ -6,6 +6,8 @@ use Test::More;
use_ok 'PublicInbox::IdxStack';
my $oid_a = '03c21563cf15c241687966b5b2a3f37cdc193316';
my $oid_b = '963caad026055ab9bcbe3ee9550247f9d8840feb';
+my $cmt_a = 'df8e4a0612545d53672036641e9f076efc94c2f6';
+my $cmt_b = '3ba7c9fa4a083c439e768882c571c2026a981ca5';
my $stk = PublicInbox::IdxStack->new;
is($stk->read_prepare, $stk, 'nothing');
@@ -13,19 +15,19 @@ is($stk->num_records, 0, 'no records');
is($stk->pop_rec, undef, 'undef on empty');
$stk = PublicInbox::IdxStack->new;
-$stk->push_rec('m', 1234, 5678, $oid_a);
+$stk->push_rec('m', 1234, 5678, $oid_a, $cmt_a);
is($stk->read_prepare, $stk, 'read_prepare');
is($stk->num_records, 1, 'num_records');
-is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop once');
+is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a, $cmt_a], 'pop once');
is($stk->pop_rec, undef, 'undef on empty');
$stk = PublicInbox::IdxStack->new;
-$stk->push_rec('m', 1234, 5678, $oid_a);
-$stk->push_rec('d', 1234, 5678, $oid_b);
+$stk->push_rec('m', 1234, 5678, $oid_a, $cmt_a);
+$stk->push_rec('d', 1234, 5678, $oid_b, $cmt_b);
is($stk->read_prepare, $stk, 'read_prepare');
is($stk->num_records, 2, 'num_records');
-is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b], 'pop');
-is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop-pop');
+is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b, $cmt_b], 'pop');
+is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a, $cmt_a], 'pop-pop');
is($stk->pop_rec, undef, 'empty');
SKIP: {
@@ -37,11 +39,11 @@ SKIP: {
while (<$fh>) {
chomp;
my ($at, $ct, $H) = split(/\./);
- $stk //= PublicInbox::IdxStack->new($H);
+ $stk //= PublicInbox::IdxStack->new;
# not bothering to parse blobs here, just using commit OID
# as a blob OID since they're the same size + format
- $stk->push_rec('m', $at + 0, $ct + 0, $H);
- push(@expect, [ 'm', $at, $ct, $H ]);
+ $stk->push_rec('m', $at + 0, $ct + 0, $H, $H);
+ push(@expect, [ 'm', $at, $ct, $H, $H ]);
}
$stk or skip('nothing from git log', 3);
is($stk->read_prepare, $stk, 'read_prepare');
^ permalink raw reply related [relevance 4%]
Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2020-11-13 11:11 7% [PATCH 0/4] extindex: checkpoints, graceful shutdown, cleanups Eric Wong
2020-11-13 11:11 4% ` [PATCH 1/4] *index: checkpoints write last_commit metadata Eric Wong
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).