diff options
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 58 | ||||
-rw-r--r-- | lib/PublicInbox/SearchIdx.pm | 64 | ||||
-rw-r--r-- | lib/PublicInbox/SearchIdxShard.pm | 17 |
3 files changed, 37 insertions, 102 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 3b021a1b..d5295735 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -149,7 +149,7 @@ sub index_unseen ($) { my $oid = $new_smsg->{blob}; my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset'; $self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key); - $idx->index_raw(undef, $eml, $new_smsg, $ibx); + $idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key); check_batch_limit($req); } @@ -395,23 +395,39 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over) done($self); } +sub _ibx_for ($$$) { + my ($self, $sync, $smsg) = @_; + my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset'; + my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos"; + $self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped" +} + sub _reindex_finalize ($$$) { my ($req, $smsg, $eml) = @_; my $sync = $req->{sync}; my $self = $sync->{self}; - my $by_chash = $req->{by_chash}; + my $by_chash = delete $req->{by_chash} or die 'BUG: no {by_chash}'; my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes'; my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}'; my $docid = $smsg->{num} = $orig_smsg->{num}; $self->{oidx}->add_overview($eml, $smsg); # may rethread check_batch_limit({ %$sync, new_smsg => $smsg }); - if ($nr == 1) { # likely, all good - $self->idx_shard($docid)->shard_reindex_docid($docid); - return; + my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}"; + my $stable = delete($by_chash->{$chash0}) // + die "BUG: $smsg->{blob} chash missing"; + my $idx = $self->idx_shard($docid); + my $top_smsg = pop @$stable; + $top_smsg == $smsg or die 'BUG: top_smsg != smsg'; + my $ibx = _ibx_for($self, $sync, $smsg); + $idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key); + for my $x (reverse @$stable) { + $ibx = _ibx_for($self, $sync, $x); + my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}'; + $idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr); } + return if $nr == 1; # likely, all good + warn "W: #$docid split into $nr due to deduplication change\n"; - my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}"; - delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing"; my @todo; for my $ary (values %$by_chash) { for my $x (reverse @$ary) { @@ -419,21 +435,16 @@ sub _reindex_finalize ($$$) { my $n = $self->{oidx}->remove_xref3($docid, $x->{blob}); die "BUG: $x->{blob} invalidated #$docid" if $n == 0; } - my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty"; + my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty"; $x->{num} = delete($x->{xnum}) // die '{xnum} unset'; - my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset'; - my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos"; - my $ibx = $self->{ibx_list}->[$pos] // - die "BUG: ibx for $x->{blob} not mapped"; + $ibx = _ibx_for($self, $sync, $x); my $e = $ibx->over->get_art($x->{num}); $e->{blob} eq $x->{blob} or die <<EOF; $x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num}); EOF push @todo, $ibx, $e; } - $self->{oidx}->commit_lazy; # ensure shard workers can see xref removals - $self->{oidx}->begin_lazy; - $self->idx_shard($docid)->shard_reindex_docid($docid); + undef $by_chash; while (my ($ibx, $e) = splice(@todo, 0, 2)) { reindex_unseen($self, $sync, $ibx, $e); } @@ -444,14 +455,14 @@ sub _reindex_oid { # git->cat_async callback my $sync = $req->{sync}; my $self = $sync->{self}; my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}'; - my $expect_oid = $req->{xr3}->[$req->{ix}]->[2]; + my $expect_oid = $req->{xr3r}->[$req->{ix}]->[2]; my $docid = $orig_smsg->{num}; if (is_bad_blob($oid, $type, $size, $expect_oid)) { my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid); if ($remain == 0) { warn "W: #$docid gone or corrupted\n"; $self->idx_shard($docid)->shard_remove($docid); - } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) { + } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) { $self->git->cat_async($next_oid, \&_reindex_oid, $req); } else { warn "BUG: #$docid gone (UNEXPECTED)\n"; @@ -468,10 +479,11 @@ sub _reindex_oid { # git->cat_async callback cotime => $orig_smsg->{ts} }); my $chash = content_hash($eml); $re_smsg->{chash} = $chash; - $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1]; - $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0]; + $re_smsg->{xnum} = $req->{xr3r}->[$req->{ix}]->[1]; + $re_smsg->{ibx_id} = $req->{xr3r}->[$req->{ix}]->[0]; + $re_smsg->{hdr} = $eml->header_obj; push @{$req->{by_chash}->{$chash}}, $re_smsg; - if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) { + if (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) { $self->git->cat_async($next_oid, \&_reindex_oid, $req); } else { # last $re_smsg is the highest priority xref3 local $self->{current_info} = "$ci #$docid"; @@ -492,7 +504,7 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex return; } - # we sort {xr3} in the reverse order of {ibx_list} so we can + # we sort {xr3r} in the reverse order of {ibx_list} so we can # hit the common case in _reindex_finalize without rereading # from git (or holding multiple messages in memory). my $id2pos = $sync->{id2pos}; # index in {ibx_list} @@ -502,15 +514,13 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex $b->[1] <=> $a->[1] # break ties with {xnum} } @$xr3; @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3; - my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 }; + my $req = { orig_smsg => $smsg, sync => $sync, xr3r => $xr3, ix => 0 }; $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req); } sub eidxq_process ($$) { # for reindexing my ($self, $sync) = @_; - $self->{oidx}->commit_lazy; # ensure shard workers can see it - $self->{oidx}->begin_lazy; my $dbh = $self->{oidx}->dbh; my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return; ${$sync->{nr}} = 0; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index cd8f4dd7..c6d2a0e8 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -1008,68 +1008,4 @@ SELECT COUNT(*) FROM over WHERE num = ? } } -sub reindex_xap { # git->cat_async callback - my ($bref, $oid, $type, $size, $ary) = @_; - my ($ibx_id, $oidhex, $req, $more) = @$ary; - my $self = $req->{self} // die 'BUG: {self} missing'; - my $eidx = $self->{eidx} // die 'BUG: {eidx} missing'; - my $eidx_key = $self->{-eidx_key_for}->{$ibx_id} // - die "BUG: bad ibx_id=$ibx_id ($oid)"; - - my $docid = $req->{docid}; - local $eidx->{current_info} = "#$docid $oid"; - return if is_bad_blob($oid, $type, $size, $oidhex); - if (my $doc = $req->{doc}) { # modify existing doc - $req->{tg_isset} //= do { # for existing documents in {xdb} - term_generator($self)->set_document($doc); - 1; - }; - $doc->add_boolean_term('O'.$eidx_key); - index_list_id($self, $doc, PublicInbox::Eml->new($bref)); - } else { # first time seeing this doc - my $smsg = $self->{eidx}->over->get_art($docid) // - die "BUG: #$docid ($oid) not in over"; - $smsg->{bytes} = $size + crlf_adjust($$bref); - $smsg->{eidx_key} = $eidx_key; - my $eml = PublicInbox::Eml->new($bref); - $req->{doc} = eml2doc($self, $eml, $smsg); - $req->{tg_isset} = 1; # eml2doc calls $tg->set_document - } - return if $more; - my $doc = delete($req->{doc}) or return; # all bad blobs! - $eidx->{transact_bytes} += $size; - $self->{xdb}->replace_document($req->{docid}, $doc); -} - -sub reindex_docid { - my ($self, $docid) = @_; - my $eidx = $self->{eidx} // die 'BUG: {eidx} missing'; - my $eidx_key_for = $self->{-eidx_key_for} //= do { - my %eidx_key_for = map { - $_->[0] => $_->[1]; - } @{$eidx->over->dbh->selectall_arrayref(<<'')}; -SELECT ibx_id,eidx_key FROM inboxes - - \%eidx_key_for; - }; - - begin_txn_lazy($self); - my $doc = eval { $self->{xdb}->get_document($docid) }; - my $req = { doc => $doc, self => $self, docid => $docid }; - my $sth = $eidx->over->dbh->prepare_cached(<<'', undef, 1); -SELECT ibx_id,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id ASC - - $sth->execute($docid); - my $rows = $sth->fetchall_arrayref; - while (my $row = shift(@$rows)) { - my ($ibx_id, $oidbin) = @$row; - my $oidhex = unpack('H*', $oidbin); - $eidx->git->cat_async($oidhex, \&reindex_xap, - [ $ibx_id, $oidhex, $req, scalar(@$rows) ]); - } - if ($eidx->{transact_bytes} >= $eidx->{batch_bytes}) { - commit_txn_lazy($self); - } -} - 1; diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index b6eef6bd..ee00858b 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -94,8 +94,6 @@ sub shard_worker_loop ($$$$$) { my $over_fn = $1; $over_fn =~ tr/\0/\n/; $self->over_check(PublicInbox::Over->new($over_fn)); - } elsif ($line =~ /\AE ([0-9]+)\n/) { - $self->reindex_docid($1 + 0); } else { chomp $line; my $eidx_key; @@ -124,9 +122,9 @@ sub shard_worker_loop ($$$$$) { } sub index_raw { - my ($self, $msgref, $eml, $smsg, $ibx) = @_; + my ($self, $msgref, $eml, $smsg, $eidx_key) = @_; if (my $w = $self->{w}) { - my @ekey = $ibx ? ('X='.$ibx->eidx_key."\0") : (); + my @ekey = defined($eidx_key) ? ("X=$eidx_key\0") : (); $msgref //= \($eml->as_string); $smsg->{raw_bytes} //= length($$msgref); # mid must be last, it can contain spaces (but not LF) @@ -140,7 +138,7 @@ sub index_raw { $eml = PublicInbox::Eml->new($msgref); } $self->begin_txn_lazy; - $smsg->{eidx_key} = $ibx->eidx_key if $ibx; + $smsg->{eidx_key} = $eidx_key if defined $eidx_key; $self->add_message($eml, $smsg); } } @@ -225,13 +223,4 @@ sub shard_over_check { } } -sub shard_reindex_docid { - my ($self, $docid) = @_; - if (my $w = $self->{w}) { - print $w "E $docid\n" or die "failed to write to shard: $!"; - } else { - $self->reindex_docid($docid); - } -} - 1; |