diff options
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 117 | ||||
-rw-r--r-- | lib/PublicInbox/Over.pm | 3 | ||||
-rw-r--r-- | t/extsearch.t | 71 |
3 files changed, 180 insertions, 11 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index ec86a7c0..c77fb197 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -395,6 +395,106 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over) done($self); } +sub _reindex_finalize ($$$) { + my ($req, $smsg, $eml) = @_; + my $sync = $req->{sync}; + my $self = $sync->{self}; + my $by_chash = $req->{by_chash}; + my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes'; + my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}'; + my $docid = $smsg->{num} = $orig_smsg->{num}; + $self->{oidx}->add_overview($eml, $smsg); # may rethread + return if $nr == 1; # likely, all good + + warn "W: #$docid split into $nr due to deduplication change\n"; + my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}"; + delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing"; + for my $ary (values %$by_chash) { + for my $x (reverse @$ary) { + my $n = $self->{oidx}->remove_xref3($docid, $x->{blob}); + die "BUG: $x->{blob} invalidated #$docid" if $n == 0; + } + my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty"; + $x->{num} = delete($x->{xnum}) // die '{xnum} unset'; + my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset'; + my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos"; + my $ibx = $self->{ibx_list}->[$pos] // + die "BUG: ibx for $x->{blob} not mapped"; + my $e = $ibx->over->get_art($x->{num}); + $e->{blob} eq $x->{blob} or die <<EOF; +$x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num}); +EOF + reindex_unseen($self, $sync, $ibx, $e); + } +} + +sub _reindex_oid { # git->cat_async callback + my ($bref, $oid, $type, $size, $req) = @_; + my $sync = $req->{sync}; + my $self = $sync->{self}; + my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}'; + my $expect_oid = $req->{xr3}->[$req->{ix}]->[2]; + my $docid = $orig_smsg->{num}; + if (is_bad_blob($oid, $type, $size, $expect_oid)) { + my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid); + if ($remain == 0) { + warn "W: #$docid gone or corrupted\n"; + $self->idx_shard($docid)->shard_remove($docid); + } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) { + $self->git->cat_async($next_oid, \&_reindex_oid, $req); + } else { + warn "BUG: #$docid gone (UNEXPECTED)\n"; + $self->idx_shard($docid)->shard_remove($docid); + } + return; + } + my $ci = $self->{current_info}; + local $self->{current_info} = "$ci #$docid $oid"; + my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg'; + $re_smsg->{bytes} = $size + crlf_adjust($$bref); + my $eml = PublicInbox::Eml->new($bref); + $re_smsg->populate($eml, { autime => $orig_smsg->{ds}, + cotime => $orig_smsg->{ts} }); + my $chash = content_hash($eml); + $re_smsg->{chash} = $chash; + $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1]; + $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0]; + push @{$req->{by_chash}->{$chash}}, $re_smsg; + if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) { + $self->git->cat_async($next_oid, \&_reindex_oid, $req); + } else { # last $re_smsg is the highest priority xref3 + local $self->{current_info} = "$ci #$docid"; + _reindex_finalize($req, $re_smsg, $eml); + } +} + +sub _reindex_smsg ($$$) { + my ($self, $sync, $smsg) = @_; + my $docid = $smsg->{num}; + my $xr3 = $self->{oidx}->get_xref3($docid, 1); + if (scalar(@$xr3) == 0) { # _reindex_check_stale should've covered this + warn <<""; +BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex + + $self->{oidx}->delete_by_num($docid); + $self->idx_shard($docid)->shard_remove($docid); + return; + } + + # we sort {xr3} in the reverse order of {ibx_list} so we can + # hit the common case in _reindex_finalize without rereading + # from git (or holding multiple messages in memory). + my $id2pos = $sync->{id2pos}; # index in {ibx_list} + @$xr3 = sort { + $id2pos->{$b->[0]} <=> $id2pos->{$a->[0]} + || + $b->[1] <=> $a->[1] # break ties with {xnum} + } @$xr3; + @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3; + my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 }; + $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req); +} + sub eidxq_process ($$) { # for reindexing my ($self, $sync) = @_; @@ -410,19 +510,29 @@ sub eidxq_process ($$) { # for reindexing my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq'); $pr->("Xapian indexing $min..$max (total=$tot)\n"); } + my %id2pos; + my $pos = 0; + $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}}; + $sync->{id2pos} = \%id2pos; my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?'); my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC'); $iter->execute; while (defined(my $docid = $iter->fetchrow_array)) { - $self->idx_shard($docid)->shard_reindex_docid($docid); - $del->execute($docid); last if $sync->{quit}; + if (my $smsg = $self->{oidx}->get_art($docid)) { + _reindex_smsg($self, $sync, $smsg); + } else { + warn "E: #$docid does not exist in over\n"; + } + $del->execute($docid); + my $cur = ++${$sync->{nr}}; # shards flush on their own, just don't queue up too many # deletes if (($cur % 1000) == 0) { + $self->git->async_wait_all; $self->{oidx}->commit_lazy; $self->{oidx}->begin_lazy; $pr->("reindexed $cur/$tot\n") if $pr; @@ -430,9 +540,8 @@ sub eidxq_process ($$) { # for reindexing # this is only for SIGUSR1, shards do their own accounting: reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}}; } + $self->git->async_wait_all; $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr; - $self->{oidx}->commit_lazy; - $self->{oidx}->begin_lazy; } sub _reindex_unseen { # git->cat_async callback diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm index 62709468..49ba180b 100644 --- a/lib/PublicInbox/Over.pm +++ b/lib/PublicInbox/Over.pm @@ -261,13 +261,14 @@ SELECT num,tid,ds,ts,ddd FROM over WHERE num = ? LIMIT 1 } sub get_xref3 { - my ($self, $num) = @_; + my ($self, $num, $raw) = @_; my $dbh = dbh($self); my $sth = $dbh->prepare_cached(<<'', undef, 1); SELECT ibx_id,xnum,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id,xnum ASC $sth->execute($num); my $rows = $sth->fetchall_arrayref; + return $rows if $raw; my $eidx_key_sth = $dbh->prepare_cached(<<'', undef, 1); SELECT eidx_key FROM inboxes WHERE ibx_id = ? diff --git a/t/extsearch.t b/t/extsearch.t index 4a8a9f49..fb31b0ab 100644 --- a/t/extsearch.t +++ b/t/extsearch.t @@ -6,12 +6,14 @@ use Test::More; use PublicInbox::TestCommon; use PublicInbox::Config; use PublicInbox::Search; +use PublicInbox::InboxWritable; use Fcntl qw(:seek); my $json = PublicInbox::Config::json() or plan skip_all => 'JSON missing'; require_git(2.6); require_mods(qw(DBD::SQLite Search::Xapian)); use_ok 'PublicInbox::ExtSearch'; use_ok 'PublicInbox::ExtSearchIdx'; +use_ok 'PublicInbox::OverIdx'; my $sock = tcp_server(); my $host_port = $sock->sockhost . ':' . $sock->sockport; my ($home, $for_destroy) = tmpdir(); @@ -179,6 +181,8 @@ like($it[1]->get_document->get_data, qr/v1test/, 'docdata matched v1'); my $cfg = PublicInbox::Config->new; my $schema_version = PublicInbox::Search::SCHEMA_VERSION(); +my $f = "$home/extindex/ei$schema_version/over.sqlite3"; +my $oidx = PublicInbox::OverIdx->new($f); if ('inject w/o indexing') { use PublicInbox::Import; my $v1ibx = $cfg->lookup_name('v1test'); @@ -232,8 +236,6 @@ if ('inject w/o indexing') { } if ('reindex catches missed messages') { - use PublicInbox::InboxWritable; - use PublicInbox::OverIdx; my $v2ibx = $cfg->lookup_name('v2test'); my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0); my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0); @@ -242,8 +244,6 @@ if ('reindex catches missed messages') { $im->done; my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0); isnt($cmt_a, $cmt_b, 'v2 0.git HEAD updated'); - my $f = "$home/extindex/ei$schema_version/over.sqlite3"; - my $oidx = PublicInbox::OverIdx->new($f); $oidx->dbh; my $uv = $v2ibx->uidvalidity; my $lc_key = "lc-v2:v2.example//$uv;0"; @@ -263,7 +263,7 @@ if ('reindex catches missed messages') { is($oidx->max, $max + 1, '->max bumped'); is($oidx->eidx_meta($lc_key), $cmt_b, 'lc-v2 stays unchanged'); my @err = split(/^/, $err); - is(scalar(@err), 1, 'only one warning'); + is(scalar(@err), 1, 'only one warning') or diag "err=$err"; like($err[0], qr/I: reindex_unseen/, 'got reindex_unseen message'); my $new = $oidx->get_art($max + 1); is($new->{subject}, $eml->header('Subject'), 'new message added'); @@ -283,7 +283,7 @@ if ('reindex catches missed messages') { $v2ibx->{inboxdir}], undef, $opt), '--reindex for stale'); @err = split(/^/, $err); - is(scalar(@err), 1, 'only one warning'); + is(scalar(@err), 1, 'only one warning') or diag "err=$err"; like($err[0], qr/\(#$new->{num}\): stale/, 'got stale message warning'); is($oidx->get_art($new->{num}), undef, 'stale message gone from over'); @@ -294,6 +294,65 @@ if ('reindex catches missed messages') { is($mset->size, 0, 'stale mid gone Xapian'); } +if ('reindex catches content bifurcation') { + use PublicInbox::MID qw(mids); + my $v2ibx = $cfg->lookup_name('v2test'); + my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0); + my $eml = eml_load('t/data/message_embed.eml'); + my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0); + $im->add($eml); + $im->done; + my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0); + my $uv = $v2ibx->uidvalidity; + my $lc_key = "lc-v2:v2.example//$uv;0"; + $oidx->dbh; + is($oidx->eidx_meta($lc_key, $cmt_b), $cmt_a, + 'update lc-v2 meta, old is as expected'); + my $mid = mids($eml)->[0]; + my $smsg = $v2ibx->over->next_by_mid($mid, \(my $id), \(my $prev)); + my $oldmax = $oidx->max; + my $x3_orig = $oidx->get_xref3(3); + is(scalar(@$x3_orig), 1, '#3 has one xref'); + $oidx->add_xref3(3, $smsg->{num}, $smsg->{blob}, 'v2.example'); + my $x3 = $oidx->get_xref3(3); + is(scalar(@$x3), 2, 'injected xref3'); + $oidx->commit_lazy; + my $opt = { 2 => \(my $err = '') }; + ok(run_script([qw(-extindex --all), "$home/extindex"], undef, $opt), + 'extindex --all is noop'); + is($err, '', 'no warnings in index'); + $oidx->dbh; + is($oidx->max, $oldmax, 'oidx->max unchanged'); + $oidx->dbh_close; + ok(run_script([qw(-extindex --reindex --all), "$home/extindex"], + undef, $opt), 'extindex --reindex'); + $oidx->dbh; + ok($oidx->max > $oldmax, 'oidx->max bumped'); + like($err, qr/split into 2 due to deduplication change/, + 'bifurcation noted'); + my $added = $oidx->get_art($oidx->max); + is($added->{blob}, $smsg->{blob}, 'new blob indexed'); + is_deeply(["v2.example:$smsg->{num}:$smsg->{blob}"], + $oidx->get_xref3($added->{num}), + 'xref3 corrected for bifurcated message'); + is_deeply($oidx->get_xref3(3), $x3_orig, 'xref3 restored for #3'); +} + +if ('--reindex --rethread') { + my $before = $oidx->dbh->selectrow_array(<<''); +SELECT MAX(tid) FROM over WHERE num > 0 + + my $opt = {}; + ok(run_script([qw(-extindex --reindex --rethread --all), + "$home/extindex"], undef, $opt), + '--rethread'); + my $after = $oidx->dbh->selectrow_array(<<''); +SELECT MIN(tid) FROM over WHERE num > 0 + + # actual rethread logic is identical to v1/v2 and tested elsewhere + ok($after > $before, '--rethread updates MIN(tid)'); +} + if ('remove v1test and test gc') { xsys([qw(git config --unset publicinbox.v1test.inboxdir)], { GIT_CONFIG => $cfg_path }); |