about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm117
-rw-r--r--lib/PublicInbox/Over.pm3
-rw-r--r--t/extsearch.t71
3 files changed, 180 insertions, 11 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index ec86a7c0..c77fb197 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -395,6 +395,106 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
         done($self);
 }
 
+sub _reindex_finalize ($$$) {
+        my ($req, $smsg, $eml) = @_;
+        my $sync = $req->{sync};
+        my $self = $sync->{self};
+        my $by_chash = $req->{by_chash};
+        my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
+        my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
+        my $docid = $smsg->{num} = $orig_smsg->{num};
+        $self->{oidx}->add_overview($eml, $smsg); # may rethread
+        return if $nr == 1; # likely, all good
+
+        warn "W: #$docid split into $nr due to deduplication change\n";
+        my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
+        delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
+        for my $ary (values %$by_chash) {
+                for my $x (reverse @$ary) {
+                        my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
+                        die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
+                }
+                my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty";
+                $x->{num} = delete($x->{xnum}) // die '{xnum} unset';
+                my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset';
+                my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+                my $ibx = $self->{ibx_list}->[$pos] //
+                        die "BUG: ibx for $x->{blob} not mapped";
+                my $e = $ibx->over->get_art($x->{num});
+                $e->{blob} eq $x->{blob} or die <<EOF;
+$x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
+EOF
+                reindex_unseen($self, $sync, $ibx, $e);
+        }
+}
+
+sub _reindex_oid { # git->cat_async callback
+        my ($bref, $oid, $type, $size, $req) = @_;
+        my $sync = $req->{sync};
+        my $self = $sync->{self};
+        my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
+        my $expect_oid = $req->{xr3}->[$req->{ix}]->[2];
+        my $docid = $orig_smsg->{num};
+        if (is_bad_blob($oid, $type, $size, $expect_oid)) {
+                my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
+                if ($remain == 0) {
+                        warn "W: #$docid gone or corrupted\n";
+                        $self->idx_shard($docid)->shard_remove($docid);
+                } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+                        $self->git->cat_async($next_oid, \&_reindex_oid, $req);
+                } else {
+                        warn "BUG: #$docid gone (UNEXPECTED)\n";
+                        $self->idx_shard($docid)->shard_remove($docid);
+                }
+                return;
+        }
+        my $ci = $self->{current_info};
+        local $self->{current_info} = "$ci #$docid $oid";
+        my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
+        $re_smsg->{bytes} = $size + crlf_adjust($$bref);
+        my $eml = PublicInbox::Eml->new($bref);
+        $re_smsg->populate($eml, { autime => $orig_smsg->{ds},
+                                cotime => $orig_smsg->{ts} });
+        my $chash = content_hash($eml);
+        $re_smsg->{chash} = $chash;
+        $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1];
+        $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0];
+        push @{$req->{by_chash}->{$chash}}, $re_smsg;
+        if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+                $self->git->cat_async($next_oid, \&_reindex_oid, $req);
+        } else { # last $re_smsg is the highest priority xref3
+                local $self->{current_info} = "$ci #$docid";
+                _reindex_finalize($req, $re_smsg, $eml);
+        }
+}
+
+sub _reindex_smsg ($$$) {
+        my ($self, $sync, $smsg) = @_;
+        my $docid = $smsg->{num};
+        my $xr3 = $self->{oidx}->get_xref3($docid, 1);
+        if (scalar(@$xr3) == 0) { # _reindex_check_stale should've covered this
+                warn <<"";
+BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
+
+                $self->{oidx}->delete_by_num($docid);
+                $self->idx_shard($docid)->shard_remove($docid);
+                return;
+        }
+
+        # we sort {xr3} in the reverse order of {ibx_list} so we can
+        # hit the common case in _reindex_finalize without rereading
+        # from git (or holding multiple messages in memory).
+        my $id2pos = $sync->{id2pos}; # index in {ibx_list}
+        @$xr3 = sort {
+                $id2pos->{$b->[0]} <=> $id2pos->{$a->[0]}
+                                ||
+                $b->[1] <=> $a->[1] # break ties with {xnum}
+        } @$xr3;
+        @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
+        my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 };
+        $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
+}
+
 sub eidxq_process ($$) { # for reindexing
         my ($self, $sync) = @_;
 
@@ -410,19 +510,29 @@ sub eidxq_process ($$) { # for reindexing
                 my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
                 $pr->("Xapian indexing $min..$max (total=$tot)\n");
         }
+        my %id2pos;
+        my $pos = 0;
+        $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
+        $sync->{id2pos} = \%id2pos;
 
         my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
         my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
         $iter->execute;
         while (defined(my $docid = $iter->fetchrow_array)) {
-                $self->idx_shard($docid)->shard_reindex_docid($docid);
-                $del->execute($docid);
                 last if $sync->{quit};
+                if (my $smsg = $self->{oidx}->get_art($docid)) {
+                        _reindex_smsg($self, $sync, $smsg);
+                } else {
+                        warn "E: #$docid does not exist in over\n";
+                }
+                $del->execute($docid);
+
                 my $cur = ++${$sync->{nr}};
 
                 # shards flush on their own, just don't queue up too many
                 # deletes
                 if (($cur % 1000) == 0) {
+                        $self->git->async_wait_all;
                         $self->{oidx}->commit_lazy;
                         $self->{oidx}->begin_lazy;
                         $pr->("reindexed $cur/$tot\n") if $pr;
@@ -430,9 +540,8 @@ sub eidxq_process ($$) { # for reindexing
                 # this is only for SIGUSR1, shards do their own accounting:
                 reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
         }
+        $self->git->async_wait_all;
         $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
-        $self->{oidx}->commit_lazy;
-        $self->{oidx}->begin_lazy;
 }
 
 sub _reindex_unseen { # git->cat_async callback
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
index 62709468..49ba180b 100644
--- a/lib/PublicInbox/Over.pm
+++ b/lib/PublicInbox/Over.pm
@@ -261,13 +261,14 @@ SELECT num,tid,ds,ts,ddd FROM over WHERE num = ? LIMIT 1
 }
 
 sub get_xref3 {
-        my ($self, $num) = @_;
+        my ($self, $num, $raw) = @_;
         my $dbh = dbh($self);
         my $sth = $dbh->prepare_cached(<<'', undef, 1);
 SELECT ibx_id,xnum,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id,xnum ASC
 
         $sth->execute($num);
         my $rows = $sth->fetchall_arrayref;
+        return $rows if $raw;
         my $eidx_key_sth = $dbh->prepare_cached(<<'', undef, 1);
 SELECT eidx_key FROM inboxes WHERE ibx_id = ?
 
diff --git a/t/extsearch.t b/t/extsearch.t
index 4a8a9f49..fb31b0ab 100644
--- a/t/extsearch.t
+++ b/t/extsearch.t
@@ -6,12 +6,14 @@ use Test::More;
 use PublicInbox::TestCommon;
 use PublicInbox::Config;
 use PublicInbox::Search;
+use PublicInbox::InboxWritable;
 use Fcntl qw(:seek);
 my $json = PublicInbox::Config::json() or plan skip_all => 'JSON missing';
 require_git(2.6);
 require_mods(qw(DBD::SQLite Search::Xapian));
 use_ok 'PublicInbox::ExtSearch';
 use_ok 'PublicInbox::ExtSearchIdx';
+use_ok 'PublicInbox::OverIdx';
 my $sock = tcp_server();
 my $host_port = $sock->sockhost . ':' . $sock->sockport;
 my ($home, $for_destroy) = tmpdir();
@@ -179,6 +181,8 @@ like($it[1]->get_document->get_data, qr/v1test/, 'docdata matched v1');
 
 my $cfg = PublicInbox::Config->new;
 my $schema_version = PublicInbox::Search::SCHEMA_VERSION();
+my $f = "$home/extindex/ei$schema_version/over.sqlite3";
+my $oidx = PublicInbox::OverIdx->new($f);
 if ('inject w/o indexing') {
         use PublicInbox::Import;
         my $v1ibx = $cfg->lookup_name('v1test');
@@ -232,8 +236,6 @@ if ('inject w/o indexing') {
 }
 
 if ('reindex catches missed messages') {
-        use PublicInbox::InboxWritable;
-        use PublicInbox::OverIdx;
         my $v2ibx = $cfg->lookup_name('v2test');
         my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0);
         my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0);
@@ -242,8 +244,6 @@ if ('reindex catches missed messages') {
         $im->done;
         my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0);
         isnt($cmt_a, $cmt_b, 'v2 0.git HEAD updated');
-        my $f = "$home/extindex/ei$schema_version/over.sqlite3";
-        my $oidx = PublicInbox::OverIdx->new($f);
         $oidx->dbh;
         my $uv = $v2ibx->uidvalidity;
         my $lc_key = "lc-v2:v2.example//$uv;0";
@@ -263,7 +263,7 @@ if ('reindex catches missed messages') {
         is($oidx->max, $max + 1, '->max bumped');
         is($oidx->eidx_meta($lc_key), $cmt_b, 'lc-v2 stays unchanged');
         my @err = split(/^/, $err);
-        is(scalar(@err), 1, 'only one warning');
+        is(scalar(@err), 1, 'only one warning') or diag "err=$err";
         like($err[0], qr/I: reindex_unseen/, 'got reindex_unseen message');
         my $new = $oidx->get_art($max + 1);
         is($new->{subject}, $eml->header('Subject'), 'new message added');
@@ -283,7 +283,7 @@ if ('reindex catches missed messages') {
                         $v2ibx->{inboxdir}], undef, $opt),
                         '--reindex for stale');
         @err = split(/^/, $err);
-        is(scalar(@err), 1, 'only one warning');
+        is(scalar(@err), 1, 'only one warning') or diag "err=$err";
         like($err[0], qr/\(#$new->{num}\): stale/, 'got stale message warning');
         is($oidx->get_art($new->{num}), undef,
                 'stale message gone from over');
@@ -294,6 +294,65 @@ if ('reindex catches missed messages') {
         is($mset->size, 0, 'stale mid gone Xapian');
 }
 
+if ('reindex catches content bifurcation') {
+        use PublicInbox::MID qw(mids);
+        my $v2ibx = $cfg->lookup_name('v2test');
+        my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0);
+        my $eml = eml_load('t/data/message_embed.eml');
+        my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0);
+        $im->add($eml);
+        $im->done;
+        my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0);
+        my $uv = $v2ibx->uidvalidity;
+        my $lc_key = "lc-v2:v2.example//$uv;0";
+        $oidx->dbh;
+        is($oidx->eidx_meta($lc_key, $cmt_b), $cmt_a,
+                'update lc-v2 meta, old is as expected');
+        my $mid = mids($eml)->[0];
+        my $smsg = $v2ibx->over->next_by_mid($mid, \(my $id), \(my $prev));
+        my $oldmax = $oidx->max;
+        my $x3_orig = $oidx->get_xref3(3);
+        is(scalar(@$x3_orig), 1, '#3 has one xref');
+        $oidx->add_xref3(3, $smsg->{num}, $smsg->{blob}, 'v2.example');
+        my $x3 = $oidx->get_xref3(3);
+        is(scalar(@$x3), 2, 'injected xref3');
+        $oidx->commit_lazy;
+        my $opt = { 2 => \(my $err = '') };
+        ok(run_script([qw(-extindex --all), "$home/extindex"], undef, $opt),
+                'extindex --all is noop');
+        is($err, '', 'no warnings in index');
+        $oidx->dbh;
+        is($oidx->max, $oldmax, 'oidx->max unchanged');
+        $oidx->dbh_close;
+        ok(run_script([qw(-extindex --reindex --all), "$home/extindex"],
+                undef, $opt), 'extindex --reindex');
+        $oidx->dbh;
+        ok($oidx->max > $oldmax, 'oidx->max bumped');
+        like($err, qr/split into 2 due to deduplication change/,
+                'bifurcation noted');
+        my $added = $oidx->get_art($oidx->max);
+        is($added->{blob}, $smsg->{blob}, 'new blob indexed');
+        is_deeply(["v2.example:$smsg->{num}:$smsg->{blob}"],
+                $oidx->get_xref3($added->{num}),
+                'xref3 corrected for bifurcated message');
+        is_deeply($oidx->get_xref3(3), $x3_orig, 'xref3 restored for #3');
+}
+
+if ('--reindex --rethread') {
+        my $before = $oidx->dbh->selectrow_array(<<'');
+SELECT MAX(tid) FROM over WHERE num > 0
+
+        my $opt = {};
+        ok(run_script([qw(-extindex --reindex --rethread --all),
+                        "$home/extindex"], undef, $opt),
+                        '--rethread');
+        my $after = $oidx->dbh->selectrow_array(<<'');
+SELECT MIN(tid) FROM over WHERE num > 0
+
+        # actual rethread logic is identical to v1/v2 and tested elsewhere
+        ok($after > $before, '--rethread updates MIN(tid)');
+}
+
 if ('remove v1test and test gc') {
         xsys([qw(git config --unset publicinbox.v1test.inboxdir)],
                 { GIT_CONFIG => $cfg_path });