diff options
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 58 |
1 files changed, 34 insertions, 24 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 3b021a1b..d5295735 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -149,7 +149,7 @@ sub index_unseen ($) { my $oid = $new_smsg->{blob}; my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset'; $self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key); - $idx->index_raw(undef, $eml, $new_smsg, $ibx); + $idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key); check_batch_limit($req); } @@ -395,23 +395,39 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over) done($self); } +sub _ibx_for ($$$) { + my ($self, $sync, $smsg) = @_; + my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset'; + my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos"; + $self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped" +} + sub _reindex_finalize ($$$) { my ($req, $smsg, $eml) = @_; my $sync = $req->{sync}; my $self = $sync->{self}; - my $by_chash = $req->{by_chash}; + my $by_chash = delete $req->{by_chash} or die 'BUG: no {by_chash}'; my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes'; my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}'; my $docid = $smsg->{num} = $orig_smsg->{num}; $self->{oidx}->add_overview($eml, $smsg); # may rethread check_batch_limit({ %$sync, new_smsg => $smsg }); - if ($nr == 1) { # likely, all good - $self->idx_shard($docid)->shard_reindex_docid($docid); - return; + my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}"; + my $stable = delete($by_chash->{$chash0}) // + die "BUG: $smsg->{blob} chash missing"; + my $idx = $self->idx_shard($docid); + my $top_smsg = pop @$stable; + $top_smsg == $smsg or die 'BUG: top_smsg != smsg'; + my $ibx = _ibx_for($self, $sync, $smsg); + $idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key); + for my $x (reverse @$stable) { + $ibx = _ibx_for($self, $sync, $x); + my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}'; + $idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr); } + return if $nr == 1; # likely, all good + warn "W: #$docid split into $nr due to deduplication change\n"; - my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}"; - delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing"; my @todo; for my $ary (values %$by_chash) { for my $x (reverse @$ary) { @@ -419,21 +435,16 @@ sub _reindex_finalize ($$$) { my $n = $self->{oidx}->remove_xref3($docid, $x->{blob}); die "BUG: $x->{blob} invalidated #$docid" if $n == 0; } - my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty"; + my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty"; $x->{num} = delete($x->{xnum}) // die '{xnum} unset'; - my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset'; - my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos"; - my $ibx = $self->{ibx_list}->[$pos] // - die "BUG: ibx for $x->{blob} not mapped"; + $ibx = _ibx_for($self, $sync, $x); my $e = $ibx->over->get_art($x->{num}); $e->{blob} eq $x->{blob} or die <<EOF; $x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num}); EOF push @todo, $ibx, $e; } - $self->{oidx}->commit_lazy; # ensure shard workers can see xref removals - $self->{oidx}->begin_lazy; - $self->idx_shard($docid)->shard_reindex_docid($docid); + undef $by_chash; while (my ($ibx, $e) = splice(@todo, 0, 2)) { reindex_unseen($self, $sync, $ibx, $e); } @@ -444,14 +455,14 @@ sub _reindex_oid { # git->cat_async callback my $sync = $req->{sync}; my $self = $sync->{self}; my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}'; - my $expect_oid = $req->{xr3}->[$req->{ix}]->[2]; + my $expect_oid = $req->{xr3r}->[$req->{ix}]->[2]; my $docid = $orig_smsg->{num}; if (is_bad_blob($oid, $type, $size, $expect_oid)) { my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid); if ($remain == 0) { warn "W: #$docid gone or corrupted\n"; $self->idx_shard($docid)->shard_remove($docid); - } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) { + } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) { $self->git->cat_async($next_oid, \&_reindex_oid, $req); } else { warn "BUG: #$docid gone (UNEXPECTED)\n"; @@ -468,10 +479,11 @@ sub _reindex_oid { # git->cat_async callback cotime => $orig_smsg->{ts} }); my $chash = content_hash($eml); $re_smsg->{chash} = $chash; - $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1]; - $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0]; + $re_smsg->{xnum} = $req->{xr3r}->[$req->{ix}]->[1]; + $re_smsg->{ibx_id} = $req->{xr3r}->[$req->{ix}]->[0]; + $re_smsg->{hdr} = $eml->header_obj; push @{$req->{by_chash}->{$chash}}, $re_smsg; - if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) { + if (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) { $self->git->cat_async($next_oid, \&_reindex_oid, $req); } else { # last $re_smsg is the highest priority xref3 local $self->{current_info} = "$ci #$docid"; @@ -492,7 +504,7 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex return; } - # we sort {xr3} in the reverse order of {ibx_list} so we can + # we sort {xr3r} in the reverse order of {ibx_list} so we can # hit the common case in _reindex_finalize without rereading # from git (or holding multiple messages in memory). my $id2pos = $sync->{id2pos}; # index in {ibx_list} @@ -502,15 +514,13 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex $b->[1] <=> $a->[1] # break ties with {xnum} } @$xr3; @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3; - my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 }; + my $req = { orig_smsg => $smsg, sync => $sync, xr3r => $xr3, ix => 0 }; $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req); } sub eidxq_process ($$) { # for reindexing my ($self, $sync) = @_; - $self->{oidx}->commit_lazy; # ensure shard workers can see it - $self->{oidx}->begin_lazy; my $dbh = $self->{oidx}->dbh; my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return; ${$sync->{nr}} = 0; |