diff options
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 117 |
1 files changed, 113 insertions, 4 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index ec86a7c0..c77fb197 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -395,6 +395,106 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over) done($self); } +sub _reindex_finalize ($$$) { + my ($req, $smsg, $eml) = @_; + my $sync = $req->{sync}; + my $self = $sync->{self}; + my $by_chash = $req->{by_chash}; + my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes'; + my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}'; + my $docid = $smsg->{num} = $orig_smsg->{num}; + $self->{oidx}->add_overview($eml, $smsg); # may rethread + return if $nr == 1; # likely, all good + + warn "W: #$docid split into $nr due to deduplication change\n"; + my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}"; + delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing"; + for my $ary (values %$by_chash) { + for my $x (reverse @$ary) { + my $n = $self->{oidx}->remove_xref3($docid, $x->{blob}); + die "BUG: $x->{blob} invalidated #$docid" if $n == 0; + } + my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty"; + $x->{num} = delete($x->{xnum}) // die '{xnum} unset'; + my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset'; + my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos"; + my $ibx = $self->{ibx_list}->[$pos] // + die "BUG: ibx for $x->{blob} not mapped"; + my $e = $ibx->over->get_art($x->{num}); + $e->{blob} eq $x->{blob} or die <<EOF; +$x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num}); +EOF + reindex_unseen($self, $sync, $ibx, $e); + } +} + +sub _reindex_oid { # git->cat_async callback + my ($bref, $oid, $type, $size, $req) = @_; + my $sync = $req->{sync}; + my $self = $sync->{self}; + my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}'; + my $expect_oid = $req->{xr3}->[$req->{ix}]->[2]; + my $docid = $orig_smsg->{num}; + if (is_bad_blob($oid, $type, $size, $expect_oid)) { + my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid); + if ($remain == 0) { + warn "W: #$docid gone or corrupted\n"; + $self->idx_shard($docid)->shard_remove($docid); + } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) { + $self->git->cat_async($next_oid, \&_reindex_oid, $req); + } else { + warn "BUG: #$docid gone (UNEXPECTED)\n"; + $self->idx_shard($docid)->shard_remove($docid); + } + return; + } + my $ci = $self->{current_info}; + local $self->{current_info} = "$ci #$docid $oid"; + my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg'; + $re_smsg->{bytes} = $size + crlf_adjust($$bref); + my $eml = PublicInbox::Eml->new($bref); + $re_smsg->populate($eml, { autime => $orig_smsg->{ds}, + cotime => $orig_smsg->{ts} }); + my $chash = content_hash($eml); + $re_smsg->{chash} = $chash; + $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1]; + $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0]; + push @{$req->{by_chash}->{$chash}}, $re_smsg; + if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) { + $self->git->cat_async($next_oid, \&_reindex_oid, $req); + } else { # last $re_smsg is the highest priority xref3 + local $self->{current_info} = "$ci #$docid"; + _reindex_finalize($req, $re_smsg, $eml); + } +} + +sub _reindex_smsg ($$$) { + my ($self, $sync, $smsg) = @_; + my $docid = $smsg->{num}; + my $xr3 = $self->{oidx}->get_xref3($docid, 1); + if (scalar(@$xr3) == 0) { # _reindex_check_stale should've covered this + warn <<""; +BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex + + $self->{oidx}->delete_by_num($docid); + $self->idx_shard($docid)->shard_remove($docid); + return; + } + + # we sort {xr3} in the reverse order of {ibx_list} so we can + # hit the common case in _reindex_finalize without rereading + # from git (or holding multiple messages in memory). + my $id2pos = $sync->{id2pos}; # index in {ibx_list} + @$xr3 = sort { + $id2pos->{$b->[0]} <=> $id2pos->{$a->[0]} + || + $b->[1] <=> $a->[1] # break ties with {xnum} + } @$xr3; + @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3; + my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 }; + $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req); +} + sub eidxq_process ($$) { # for reindexing my ($self, $sync) = @_; @@ -410,19 +510,29 @@ sub eidxq_process ($$) { # for reindexing my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq'); $pr->("Xapian indexing $min..$max (total=$tot)\n"); } + my %id2pos; + my $pos = 0; + $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}}; + $sync->{id2pos} = \%id2pos; my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?'); my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC'); $iter->execute; while (defined(my $docid = $iter->fetchrow_array)) { - $self->idx_shard($docid)->shard_reindex_docid($docid); - $del->execute($docid); last if $sync->{quit}; + if (my $smsg = $self->{oidx}->get_art($docid)) { + _reindex_smsg($self, $sync, $smsg); + } else { + warn "E: #$docid does not exist in over\n"; + } + $del->execute($docid); + my $cur = ++${$sync->{nr}}; # shards flush on their own, just don't queue up too many # deletes if (($cur % 1000) == 0) { + $self->git->async_wait_all; $self->{oidx}->commit_lazy; $self->{oidx}->begin_lazy; $pr->("reindexed $cur/$tot\n") if $pr; @@ -430,9 +540,8 @@ sub eidxq_process ($$) { # for reindexing # this is only for SIGUSR1, shards do their own accounting: reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}}; } + $self->git->async_wait_all; $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr; - $self->{oidx}->commit_lazy; - $self->{oidx}->begin_lazy; } sub _reindex_unseen { # git->cat_async callback |