about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-11-13 11:11:41 +0000
committerEric Wong <e@80x24.org>2020-11-15 02:52:24 +0000
commit58a964c3c8a2f1699065358e4041b529d3ee531c (patch)
tree9183d171c3f0053fcdd8193a58b692e837148bef /lib
parent0246654927fd9ddc9718ca0d7d5c7e29ce004d5c (diff)
downloadpublic-inbox-58a964c3c8a2f1699065358e4041b529d3ee531c.tar.gz
This will set us up for supporting graceful shutdown
on -index without repeating any work.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm12
-rw-r--r--lib/PublicInbox/IdxStack.pm16
-rw-r--r--lib/PublicInbox/SearchIdx.pm56
-rw-r--r--lib/PublicInbox/V2Writable.pm28
4 files changed, 70 insertions, 42 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 7aaf8291..14ffdadb 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -168,6 +168,10 @@ sub do_finalize ($) {
                 # `d' message was already unindexed in the v1/v2 inboxes,
                 # so it's too noisy to warn, here.
         }
+        # cur_cmt may be undef for unindex_oid, set by V2Writable::index_todo
+        if (defined(my $cur_cmt = $req->{cur_cmt})) {
+                ${$req->{latest_cmt}} = $cur_cmt;
+        }
 }
 
 sub do_step ($) { # main iterator for adding messages to the index
@@ -337,10 +341,10 @@ sub eidx_sync { # main entry point
 }
 
 sub update_last_commit { # overrides V2Writable
-        my ($self, $sync, $unit, $latest_cmt) = @_;
-        return unless defined $latest_cmt;
-
-        $self->git->async_wait_all;
+        my ($self, $sync, $stk) = @_;
+        my $unit = $sync->{unit} // return;
+        my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+        defined($latest_cmt) or return;
         my $ibx = $sync->{ibx} or die 'BUG: {ibx} missing';
         my $ekey = $ibx->eidx_key;
         my $uv = $ibx->uidvalidity;
diff --git a/lib/PublicInbox/IdxStack.pm b/lib/PublicInbox/IdxStack.pm
index ce75b46a..e7e10de9 100644
--- a/lib/PublicInbox/IdxStack.pm
+++ b/lib/PublicInbox/IdxStack.pm
@@ -6,7 +6,7 @@ package PublicInbox::IdxStack;
 use v5.10.1;
 use strict;
 use Fcntl qw(:seek);
-use constant FMT => eval { pack('Q', 1) } ? 'A1QQH*' : 'A1IIH*';
+use constant PACK_FMT => eval { pack('Q', 1) } ? 'A1QQH*H*' : 'A1IIH*H*';
 
 # start off in write-only mode
 sub new {
@@ -16,9 +16,15 @@ sub new {
 
 # file_char = [d|m]
 sub push_rec {
-        my ($self, $file_char, $at, $ct, $blob_oid) = @_;
-        my $rec = pack(FMT, $file_char, $at, $ct, $blob_oid);
-        $self->{rec_size} //= length($rec);
+        my ($self, $file_char, $at, $ct, $blob_oid, $cmt_oid) = @_;
+        my $rec = pack(PACK_FMT, $file_char, $at, $ct, $blob_oid, $cmt_oid);
+        $self->{unpack_fmt} //= do {
+                my $len = length($cmt_oid);
+                my $fmt = PACK_FMT;
+                $fmt =~ s/H\*/H$len/g;
+                $self->{rec_size} = length($rec);
+                $fmt;
+        };
         print { $self->{wr} } $rec or die "print: $!";
         $self->{tot_size} += length($rec);
 }
@@ -46,7 +52,7 @@ sub pop_rec {
         my $r = read($io, my $buf, $sz);
         defined($r) or die "read: $!";
         $r == $sz or die "read($r != $sz)";
-        unpack(FMT, $buf);
+        unpack($self->{unpack_fmt}, $buf);
 }
 
 1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 662055c6..90d8c8b3 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -608,11 +608,17 @@ sub index_both { # git->cat_async callback
         $smsg->{num} = index_mm($self, $eml, $oid, $sync) or
                 die "E: could not generate NNTP article number for $oid";
         add_message($self, $eml, $smsg, $sync);
+        my $cur_cmt = $sync->{cur_cmt} // die 'BUG: {cur_cmt} missing';
+        ${$sync->{latest_cmt}} = $cur_cmt;
 }
 
 sub unindex_both { # git->cat_async callback
-        my ($bref, $oid, $type, $size, $self) = @_;
-        unindex_eml($self, $oid, PublicInbox::Eml->new($bref));
+        my ($bref, $oid, $type, $size, $sync) = @_;
+        unindex_eml($sync->{sidx}, $oid, PublicInbox::Eml->new($bref));
+        # may be undef if leftover
+        if (defined(my $cur_cmt = $sync->{cur_cmt})) {
+                ${$sync->{latest_cmt}} = $cur_cmt;
+        }
 }
 
 sub with_umask {
@@ -646,34 +652,33 @@ sub v1_checkpoint ($$;$) {
         my ($self, $sync, $stk) = @_;
         $self->{ibx}->git->async_wait_all;
 
-        # latest_cmt may be undef
-        my $newest = $stk ? $stk->{latest_cmt} : undef;
-        if ($newest) {
+        # $newest may be undef
+        my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+        if (defined($newest)) {
                 my $cur = $self->{mm}->last_commit || '';
                 if (need_update($self, $cur, $newest)) {
                         $self->{mm}->last_commit($newest);
                 }
-        } else {
-                ${$sync->{max}} = $self->{batch_bytes};
         }
+        ${$sync->{max}} = $self->{batch_bytes};
 
         $self->{mm}->{dbh}->commit;
-        if ($newest && need_xapian($self)) {
-                my $xdb = $self->{xdb};
+        my $xdb = need_xapian($self) ? $self->{xdb} : undef;
+        if ($newest && $xdb) {
                 my $cur = $xdb->get_metadata('last_commit');
                 if (need_update($self, $cur, $newest)) {
                         $xdb->set_metadata('last_commit', $newest);
                 }
-
+        }
+        if ($stk) { # all done if $stk is passed
                 # let SearchView know a full --reindex was done so it can
                 # generate ->has_threadid-dependent links
-                if ($sync->{reindex} && !ref($sync->{reindex})) {
+                if ($xdb && $sync->{reindex} && !ref($sync->{reindex})) {
                         my $n = $xdb->get_metadata('has_threadid');
                         $xdb->set_metadata('has_threadid', '1') if $n ne '1';
                 }
+                $self->{oidx}->rethread_done($sync->{-opt}); # all done
         }
-
-        $self->{oidx}->rethread_done($sync->{-opt}) if $newest; # all done
         commit_txn_lazy($self);
         $sync->{ibx}->git->cleanup;
         my $nr = ${$sync->{nr}};
@@ -697,21 +702,24 @@ sub process_stack {
         $sync->{nr} = \$nr;
         $sync->{max} = \$max;
         $sync->{sidx} = $self;
+        $sync->{latest_cmt} = \(my $latest_cmt);
 
         $self->{mm}->{dbh}->begin_work;
         if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
                 warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
                 for my $oid (@leftovers) {
                         $oid = unpack('H*', $oid);
-                        $git->cat_async($oid, \&unindex_both, $self);
+                        $git->cat_async($oid, \&unindex_both, $sync);
                 }
         }
         if ($sync->{max_size} = $sync->{-opt}->{max_size}) {
                 $sync->{index_oid} = \&index_both;
         }
-        while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+        while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
+                my $arg = { %$sync, cur_cmt => $cur_cmt };
                 if ($f eq 'm') {
-                        my $arg = { %$sync, autime => $at, cotime => $ct };
+                        $arg->{autime} = $at;
+                        $arg->{cotime} = $ct;
                         if ($sync->{max_size}) {
                                 $git->check_async($oid, \&check_size, $arg);
                         } else {
@@ -719,7 +727,7 @@ sub process_stack {
                         }
                         v1_checkpoint($self, $sync) if $max <= 0;
                 } elsif ($f eq 'd') {
-                        $git->cat_async($oid, \&unindex_both, $self);
+                        $git->cat_async($oid, \&unindex_both, $arg);
                 }
         }
         v1_checkpoint($self, $sync, $stk);
@@ -743,17 +751,17 @@ sub log2stack ($$$) {
         my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
                                 --no-notes --no-color --no-renames --no-abbrev),
                                 $range);
-        my ($at, $ct, $stk);
+        my ($at, $ct, $stk, $cmt);
         while (<$fh>) {
                 if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
-                        ($at, $ct) = ($1 + 0, $2 + 0);
-                        $stk //= PublicInbox::IdxStack->new($3);
+                        ($at, $ct, $cmt) = ($1 + 0, $2 + 0, $3);
+                        $stk //= PublicInbox::IdxStack->new($cmt);
                 } elsif (/$del/) {
                         my $oid = $1;
                         if ($D) { # reindex case
                                 $D->{pack('H*', $oid)}++;
                         } else { # non-reindex case:
-                                $stk->push_rec('d', $at, $ct, $oid);
+                                $stk->push_rec('d', $at, $ct, $oid, $cmt);
                         }
                 } elsif (/$add/) {
                         my $oid = $1;
@@ -761,12 +769,10 @@ sub log2stack ($$$) {
                                 my $oid_bin = pack('H*', $oid);
                                 my $nr = --$D->{$oid_bin};
                                 delete($D->{$oid_bin}) if $nr <= 0;
-
                                 # nr < 0 (-1) means it never existed
-                                $stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
-                        } else {
-                                $stk->push_rec('m', $at, $ct, $oid);
+                                next if $nr >= 0;
                         }
+                        $stk->push_rec('m', $at, $ct, $oid, $cmt);
                 }
         }
         close $fh or die "git log failed: \$?=$?";
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 18f33655..87b76501 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -861,6 +861,7 @@ sub reindex_checkpoint ($$) {
         my ($self, $sync) = @_;
 
         $self->git->async_wait_all;
+        $self->update_last_commit($sync);
         ${$sync->{need_checkpoint}} = 0;
         my $mm_tmp = $sync->{mm_tmp};
         $mm_tmp->atfork_prepare if $mm_tmp;
@@ -955,19 +956,22 @@ sub index_oid { # cat_async callback
         if (do_idx($self, $bref, $eml, $smsg)) {
                 ${$arg->{need_checkpoint}} = 1;
         }
+        ${$arg->{latest_cmt}} = $arg->{cur_cmt} // die 'BUG: {cur_cmt} missing';
 }
 
 # only update last_commit for $i on reindex iff newer than current
-# $sync will be used by subclasses
 sub update_last_commit {
-        my ($self, $sync, $unit, $cmt) = @_;
+        my ($self, $sync, $stk) = @_;
+        my $unit = $sync->{unit} // return;
+        my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+        defined($latest_cmt) or return;
         my $last = last_epoch_commit($self, $unit->{epoch});
-        if (defined $last && is_ancestor($unit->{git}, $last, $cmt)) {
-                my @cmd = (qw(rev-list --count), "$last..$cmt");
+        if (defined $last && is_ancestor($unit->{git}, $last, $latest_cmt)) {
+                my @cmd = (qw(rev-list --count), "$last..$latest_cmt");
                 chomp(my $n = $unit->{git}->qx(@cmd));
                 return if $n ne '' && $n == 0;
         }
-        last_epoch_commit($self, $unit->{epoch}, $cmt);
+        last_epoch_commit($self, $unit->{epoch}, $latest_cmt);
 }
 
 sub last_commits {
@@ -1245,8 +1249,16 @@ sub index_todo ($$$) {
                 $pfx //= $unit->{git}->{git_dir};
         }
         local $self->{current_info} = "$pfx ";
-        while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
-                my $req = { %$sync, autime => $at, cotime => $ct, oid => $oid };
+        local $sync->{latest_cmt} = \(my $latest_cmt);
+        local $sync->{unit} = $unit;
+        while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
+                my $req = {
+                        %$sync,
+                        autime => $at,
+                        cotime => $ct,
+                        oid => $oid,
+                        cur_cmt => $cmt
+                };
                 if ($f eq 'm') {
                         if ($sync->{max_size}) {
                                 $all->check_async($oid, \&check_size, $req);
@@ -1261,7 +1273,7 @@ sub index_todo ($$$) {
                 }
         }
         $all->async_wait_all;
-        $self->update_last_commit($sync, $unit, $stk->{latest_cmt});
+        $self->update_last_commit($sync, $stk);
 }
 
 sub xapian_only {