From 7281c5c492f9d6bbd585da9f061d19819d952352 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 15 Dec 2020 02:02:16 +0000 Subject: extindex: preliminary --reindex support --reindex allows us to catch missed and stale messages due to -extindex vs -index races prior to commit 02b2fcc46f364b51 ("extsearchidx: enforce -index before -extindex"). We'll also rely on reindex to internally deal with v1/v2 inbox removals and partial-unindexing of messages which are only removed from one inbox out of many. This reindex design is completely different than how normal v1/v2 inbox reindex operates due to extindex having multiple histories to work with. Instead of scanning git history, this relies exclusively on comparing over.sqlite3 contents between the v1/v2 inboxes and the extindex. Changes to Xapian behavior also get picked up, now. Xapian indexing is handled by workers with minimal IPC to the parent process. This results in more read I/O but fewer writes when dealing with cross-posted messages. Changes to $smsg->populate and --rethread still need further work. --- lib/PublicInbox/SearchIdx.pm | 77 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) (limited to 'lib/PublicInbox/SearchIdx.pm') diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 0fbe6560..cd8f4dd7 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -352,8 +352,9 @@ sub index_ids ($$$$) { index_list_id($self, $doc, $hdr); } -sub add_xapian ($$$$) { +sub eml2doc ($$$;$) { my ($self, $eml, $smsg, $mids) = @_; + $mids //= mids_for_index($eml); my $doc = $X->{Document}->new; add_val($doc, PublicInbox::Search::TS(), $smsg->{ts}); my @ds = gmtime($smsg->{ds}); @@ -396,6 +397,12 @@ sub add_xapian ($$$$) { } } } + $doc; +} + +sub add_xapian ($$$$) { + my ($self, $eml, $smsg, $mids) = @_; + my $doc = eml2doc($self, $eml, $smsg, $mids); $self->{xdb}->replace_document($smsg->{num}, $doc); } @@ -941,6 +948,10 @@ sub set_metadata_once { sub _commit_txn { my ($self) = @_; + if (my $eidx = $self->{eidx}) { + $eidx->git->async_wait_all; + $eidx->{transact_bytes} = 0; + } if (my $xdb = $self->{xdb}) { set_metadata_once($self); $xdb->commit_transaction; @@ -997,4 +1008,68 @@ SELECT COUNT(*) FROM over WHERE num = ? } } +sub reindex_xap { # git->cat_async callback + my ($bref, $oid, $type, $size, $ary) = @_; + my ($ibx_id, $oidhex, $req, $more) = @$ary; + my $self = $req->{self} // die 'BUG: {self} missing'; + my $eidx = $self->{eidx} // die 'BUG: {eidx} missing'; + my $eidx_key = $self->{-eidx_key_for}->{$ibx_id} // + die "BUG: bad ibx_id=$ibx_id ($oid)"; + + my $docid = $req->{docid}; + local $eidx->{current_info} = "#$docid $oid"; + return if is_bad_blob($oid, $type, $size, $oidhex); + if (my $doc = $req->{doc}) { # modify existing doc + $req->{tg_isset} //= do { # for existing documents in {xdb} + term_generator($self)->set_document($doc); + 1; + }; + $doc->add_boolean_term('O'.$eidx_key); + index_list_id($self, $doc, PublicInbox::Eml->new($bref)); + } else { # first time seeing this doc + my $smsg = $self->{eidx}->over->get_art($docid) // + die "BUG: #$docid ($oid) not in over"; + $smsg->{bytes} = $size + crlf_adjust($$bref); + $smsg->{eidx_key} = $eidx_key; + my $eml = PublicInbox::Eml->new($bref); + $req->{doc} = eml2doc($self, $eml, $smsg); + $req->{tg_isset} = 1; # eml2doc calls $tg->set_document + } + return if $more; + my $doc = delete($req->{doc}) or return; # all bad blobs! + $eidx->{transact_bytes} += $size; + $self->{xdb}->replace_document($req->{docid}, $doc); +} + +sub reindex_docid { + my ($self, $docid) = @_; + my $eidx = $self->{eidx} // die 'BUG: {eidx} missing'; + my $eidx_key_for = $self->{-eidx_key_for} //= do { + my %eidx_key_for = map { + $_->[0] => $_->[1]; + } @{$eidx->over->dbh->selectall_arrayref(<<'')}; +SELECT ibx_id,eidx_key FROM inboxes + + \%eidx_key_for; + }; + + begin_txn_lazy($self); + my $doc = eval { $self->{xdb}->get_document($docid) }; + my $req = { doc => $doc, self => $self, docid => $docid }; + my $sth = $eidx->over->dbh->prepare_cached(<<'', undef, 1); +SELECT ibx_id,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id ASC + + $sth->execute($docid); + my $rows = $sth->fetchall_arrayref; + while (my $row = shift(@$rows)) { + my ($ibx_id, $oidbin) = @$row; + my $oidhex = unpack('H*', $oidbin); + $eidx->git->cat_async($oidhex, \&reindex_xap, + [ $ibx_id, $oidhex, $req, scalar(@$rows) ]); + } + if ($eidx->{transact_bytes} >= $eidx->{batch_bytes}) { + commit_txn_lazy($self); + } +} + 1; -- cgit v1.2.3-24-ge0c7