From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 8C0031F670 for ; Fri, 13 Nov 2020 11:11:44 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 1/4] *index: checkpoints write last_commit metadata Date: Fri, 13 Nov 2020 11:11:41 +0000 Message-Id: <20201113111144.23038-2-e@80x24.org> In-Reply-To: <20201113111144.23038-1-e@80x24.org> References: <20201113111144.23038-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This will set us up for supporting graceful shutdown on -index without repeating any work. --- lib/PublicInbox/ExtSearchIdx.pm | 12 ++++--- lib/PublicInbox/IdxStack.pm | 16 +++++++--- lib/PublicInbox/SearchIdx.pm | 56 ++++++++++++++++++--------------- lib/PublicInbox/V2Writable.pm | 28 ++++++++++++----- t/idx_stack.t | 20 ++++++------ 5 files changed, 81 insertions(+), 51 deletions(-) diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 7aaf8291..14ffdadb 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -168,6 +168,10 @@ sub do_finalize ($) { # `d' message was already unindexed in the v1/v2 inboxes, # so it's too noisy to warn, here. } + # cur_cmt may be undef for unindex_oid, set by V2Writable::index_todo + if (defined(my $cur_cmt = $req->{cur_cmt})) { + ${$req->{latest_cmt}} = $cur_cmt; + } } sub do_step ($) { # main iterator for adding messages to the index @@ -337,10 +341,10 @@ sub eidx_sync { # main entry point } sub update_last_commit { # overrides V2Writable - my ($self, $sync, $unit, $latest_cmt) = @_; - return unless defined $latest_cmt; - - $self->git->async_wait_all; + my ($self, $sync, $stk) = @_; + my $unit = $sync->{unit} // return; + my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}}; + defined($latest_cmt) or return; my $ibx = $sync->{ibx} or die 'BUG: {ibx} missing'; my $ekey = $ibx->eidx_key; my $uv = $ibx->uidvalidity; diff --git a/lib/PublicInbox/IdxStack.pm b/lib/PublicInbox/IdxStack.pm index ce75b46a..e7e10de9 100644 --- a/lib/PublicInbox/IdxStack.pm +++ b/lib/PublicInbox/IdxStack.pm @@ -6,7 +6,7 @@ package PublicInbox::IdxStack; use v5.10.1; use strict; use Fcntl qw(:seek); -use constant FMT => eval { pack('Q', 1) } ? 'A1QQH*' : 'A1IIH*'; +use constant PACK_FMT => eval { pack('Q', 1) } ? 'A1QQH*H*' : 'A1IIH*H*'; # start off in write-only mode sub new { @@ -16,9 +16,15 @@ sub new { # file_char = [d|m] sub push_rec { - my ($self, $file_char, $at, $ct, $blob_oid) = @_; - my $rec = pack(FMT, $file_char, $at, $ct, $blob_oid); - $self->{rec_size} //= length($rec); + my ($self, $file_char, $at, $ct, $blob_oid, $cmt_oid) = @_; + my $rec = pack(PACK_FMT, $file_char, $at, $ct, $blob_oid, $cmt_oid); + $self->{unpack_fmt} //= do { + my $len = length($cmt_oid); + my $fmt = PACK_FMT; + $fmt =~ s/H\*/H$len/g; + $self->{rec_size} = length($rec); + $fmt; + }; print { $self->{wr} } $rec or die "print: $!"; $self->{tot_size} += length($rec); } @@ -46,7 +52,7 @@ sub pop_rec { my $r = read($io, my $buf, $sz); defined($r) or die "read: $!"; $r == $sz or die "read($r != $sz)"; - unpack(FMT, $buf); + unpack($self->{unpack_fmt}, $buf); } 1; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 662055c6..90d8c8b3 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -608,11 +608,17 @@ sub index_both { # git->cat_async callback $smsg->{num} = index_mm($self, $eml, $oid, $sync) or die "E: could not generate NNTP article number for $oid"; add_message($self, $eml, $smsg, $sync); + my $cur_cmt = $sync->{cur_cmt} // die 'BUG: {cur_cmt} missing'; + ${$sync->{latest_cmt}} = $cur_cmt; } sub unindex_both { # git->cat_async callback - my ($bref, $oid, $type, $size, $self) = @_; - unindex_eml($self, $oid, PublicInbox::Eml->new($bref)); + my ($bref, $oid, $type, $size, $sync) = @_; + unindex_eml($sync->{sidx}, $oid, PublicInbox::Eml->new($bref)); + # may be undef if leftover + if (defined(my $cur_cmt = $sync->{cur_cmt})) { + ${$sync->{latest_cmt}} = $cur_cmt; + } } sub with_umask { @@ -646,34 +652,33 @@ sub v1_checkpoint ($$;$) { my ($self, $sync, $stk) = @_; $self->{ibx}->git->async_wait_all; - # latest_cmt may be undef - my $newest = $stk ? $stk->{latest_cmt} : undef; - if ($newest) { + # $newest may be undef + my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}}; + if (defined($newest)) { my $cur = $self->{mm}->last_commit || ''; if (need_update($self, $cur, $newest)) { $self->{mm}->last_commit($newest); } - } else { - ${$sync->{max}} = $self->{batch_bytes}; } + ${$sync->{max}} = $self->{batch_bytes}; $self->{mm}->{dbh}->commit; - if ($newest && need_xapian($self)) { - my $xdb = $self->{xdb}; + my $xdb = need_xapian($self) ? $self->{xdb} : undef; + if ($newest && $xdb) { my $cur = $xdb->get_metadata('last_commit'); if (need_update($self, $cur, $newest)) { $xdb->set_metadata('last_commit', $newest); } - + } + if ($stk) { # all done if $stk is passed # let SearchView know a full --reindex was done so it can # generate ->has_threadid-dependent links - if ($sync->{reindex} && !ref($sync->{reindex})) { + if ($xdb && $sync->{reindex} && !ref($sync->{reindex})) { my $n = $xdb->get_metadata('has_threadid'); $xdb->set_metadata('has_threadid', '1') if $n ne '1'; } + $self->{oidx}->rethread_done($sync->{-opt}); # all done } - - $self->{oidx}->rethread_done($sync->{-opt}) if $newest; # all done commit_txn_lazy($self); $sync->{ibx}->git->cleanup; my $nr = ${$sync->{nr}}; @@ -697,21 +702,24 @@ sub process_stack { $sync->{nr} = \$nr; $sync->{max} = \$max; $sync->{sidx} = $self; + $sync->{latest_cmt} = \(my $latest_cmt); $self->{mm}->{dbh}->begin_work; if (my @leftovers = keys %{delete($sync->{D}) // {}}) { warn('W: unindexing '.scalar(@leftovers)." leftovers\n"); for my $oid (@leftovers) { $oid = unpack('H*', $oid); - $git->cat_async($oid, \&unindex_both, $self); + $git->cat_async($oid, \&unindex_both, $sync); } } if ($sync->{max_size} = $sync->{-opt}->{max_size}) { $sync->{index_oid} = \&index_both; } - while (my ($f, $at, $ct, $oid) = $stk->pop_rec) { + while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) { + my $arg = { %$sync, cur_cmt => $cur_cmt }; if ($f eq 'm') { - my $arg = { %$sync, autime => $at, cotime => $ct }; + $arg->{autime} = $at; + $arg->{cotime} = $ct; if ($sync->{max_size}) { $git->check_async($oid, \&check_size, $arg); } else { @@ -719,7 +727,7 @@ sub process_stack { } v1_checkpoint($self, $sync) if $max <= 0; } elsif ($f eq 'd') { - $git->cat_async($oid, \&unindex_both, $self); + $git->cat_async($oid, \&unindex_both, $arg); } } v1_checkpoint($self, $sync, $stk); @@ -743,17 +751,17 @@ sub log2stack ($$$) { my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H --no-notes --no-color --no-renames --no-abbrev), $range); - my ($at, $ct, $stk); + my ($at, $ct, $stk, $cmt); while (<$fh>) { if (/\A([0-9]+)-([0-9]+)-($OID)$/o) { - ($at, $ct) = ($1 + 0, $2 + 0); - $stk //= PublicInbox::IdxStack->new($3); + ($at, $ct, $cmt) = ($1 + 0, $2 + 0, $3); + $stk //= PublicInbox::IdxStack->new($cmt); } elsif (/$del/) { my $oid = $1; if ($D) { # reindex case $D->{pack('H*', $oid)}++; } else { # non-reindex case: - $stk->push_rec('d', $at, $ct, $oid); + $stk->push_rec('d', $at, $ct, $oid, $cmt); } } elsif (/$add/) { my $oid = $1; @@ -761,12 +769,10 @@ sub log2stack ($$$) { my $oid_bin = pack('H*', $oid); my $nr = --$D->{$oid_bin}; delete($D->{$oid_bin}) if $nr <= 0; - # nr < 0 (-1) means it never existed - $stk->push_rec('m', $at, $ct, $oid) if $nr < 0; - } else { - $stk->push_rec('m', $at, $ct, $oid); + next if $nr >= 0; } + $stk->push_rec('m', $at, $ct, $oid, $cmt); } } close $fh or die "git log failed: \$?=$?"; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 18f33655..87b76501 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -861,6 +861,7 @@ sub reindex_checkpoint ($$) { my ($self, $sync) = @_; $self->git->async_wait_all; + $self->update_last_commit($sync); ${$sync->{need_checkpoint}} = 0; my $mm_tmp = $sync->{mm_tmp}; $mm_tmp->atfork_prepare if $mm_tmp; @@ -955,19 +956,22 @@ sub index_oid { # cat_async callback if (do_idx($self, $bref, $eml, $smsg)) { ${$arg->{need_checkpoint}} = 1; } + ${$arg->{latest_cmt}} = $arg->{cur_cmt} // die 'BUG: {cur_cmt} missing'; } # only update last_commit for $i on reindex iff newer than current -# $sync will be used by subclasses sub update_last_commit { - my ($self, $sync, $unit, $cmt) = @_; + my ($self, $sync, $stk) = @_; + my $unit = $sync->{unit} // return; + my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}}; + defined($latest_cmt) or return; my $last = last_epoch_commit($self, $unit->{epoch}); - if (defined $last && is_ancestor($unit->{git}, $last, $cmt)) { - my @cmd = (qw(rev-list --count), "$last..$cmt"); + if (defined $last && is_ancestor($unit->{git}, $last, $latest_cmt)) { + my @cmd = (qw(rev-list --count), "$last..$latest_cmt"); chomp(my $n = $unit->{git}->qx(@cmd)); return if $n ne '' && $n == 0; } - last_epoch_commit($self, $unit->{epoch}, $cmt); + last_epoch_commit($self, $unit->{epoch}, $latest_cmt); } sub last_commits { @@ -1245,8 +1249,16 @@ sub index_todo ($$$) { $pfx //= $unit->{git}->{git_dir}; } local $self->{current_info} = "$pfx "; - while (my ($f, $at, $ct, $oid) = $stk->pop_rec) { - my $req = { %$sync, autime => $at, cotime => $ct, oid => $oid }; + local $sync->{latest_cmt} = \(my $latest_cmt); + local $sync->{unit} = $unit; + while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) { + my $req = { + %$sync, + autime => $at, + cotime => $ct, + oid => $oid, + cur_cmt => $cmt + }; if ($f eq 'm') { if ($sync->{max_size}) { $all->check_async($oid, \&check_size, $req); @@ -1261,7 +1273,7 @@ sub index_todo ($$$) { } } $all->async_wait_all; - $self->update_last_commit($sync, $unit, $stk->{latest_cmt}); + $self->update_last_commit($sync, $stk); } sub xapian_only { diff --git a/t/idx_stack.t b/t/idx_stack.t index 35aff37b..e0474fa4 100644 --- a/t/idx_stack.t +++ b/t/idx_stack.t @@ -6,6 +6,8 @@ use Test::More; use_ok 'PublicInbox::IdxStack'; my $oid_a = '03c21563cf15c241687966b5b2a3f37cdc193316'; my $oid_b = '963caad026055ab9bcbe3ee9550247f9d8840feb'; +my $cmt_a = 'df8e4a0612545d53672036641e9f076efc94c2f6'; +my $cmt_b = '3ba7c9fa4a083c439e768882c571c2026a981ca5'; my $stk = PublicInbox::IdxStack->new; is($stk->read_prepare, $stk, 'nothing'); @@ -13,19 +15,19 @@ is($stk->num_records, 0, 'no records'); is($stk->pop_rec, undef, 'undef on empty'); $stk = PublicInbox::IdxStack->new; -$stk->push_rec('m', 1234, 5678, $oid_a); +$stk->push_rec('m', 1234, 5678, $oid_a, $cmt_a); is($stk->read_prepare, $stk, 'read_prepare'); is($stk->num_records, 1, 'num_records'); -is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop once'); +is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a, $cmt_a], 'pop once'); is($stk->pop_rec, undef, 'undef on empty'); $stk = PublicInbox::IdxStack->new; -$stk->push_rec('m', 1234, 5678, $oid_a); -$stk->push_rec('d', 1234, 5678, $oid_b); +$stk->push_rec('m', 1234, 5678, $oid_a, $cmt_a); +$stk->push_rec('d', 1234, 5678, $oid_b, $cmt_b); is($stk->read_prepare, $stk, 'read_prepare'); is($stk->num_records, 2, 'num_records'); -is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b], 'pop'); -is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop-pop'); +is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b, $cmt_b], 'pop'); +is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a, $cmt_a], 'pop-pop'); is($stk->pop_rec, undef, 'empty'); SKIP: { @@ -37,11 +39,11 @@ SKIP: { while (<$fh>) { chomp; my ($at, $ct, $H) = split(/\./); - $stk //= PublicInbox::IdxStack->new($H); + $stk //= PublicInbox::IdxStack->new; # not bothering to parse blobs here, just using commit OID # as a blob OID since they're the same size + format - $stk->push_rec('m', $at + 0, $ct + 0, $H); - push(@expect, [ 'm', $at, $ct, $H ]); + $stk->push_rec('m', $at + 0, $ct + 0, $H, $H); + push(@expect, [ 'm', $at, $ct, $H, $H ]); } $stk or skip('nothing from git log', 3); is($stk->read_prepare, $stk, 'read_prepare');