about summary refs log tree commit homepage
path: root/lib/PublicInbox/SearchIdx.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-12-15 02:02:16 +0000
committerEric Wong <e@80x24.org>2020-12-17 19:12:53 +0000
commit7281c5c492f9d6bbd585da9f061d19819d952352 (patch)
tree947228cf3f08bc6ae8874d99936fcef457096282 /lib/PublicInbox/SearchIdx.pm
parentd26c2837f479b41182946a6540aad95d34b2b594 (diff)
downloadpublic-inbox-7281c5c492f9d6bbd585da9f061d19819d952352.tar.gz
--reindex allows us to catch missed and stale messages due to
-extindex vs -index races prior to commit 02b2fcc46f364b51
("extsearchidx: enforce -index before -extindex").

We'll also rely on reindex to internally deal with v1/v2 inbox
removals and partial-unindexing of messages which are only
removed from one inbox out of many.

This reindex design is completely different than how normal
v1/v2 inbox reindex operates due to extindex having multiple
histories to work with.  Instead of scanning git history, this
relies exclusively on comparing over.sqlite3 contents between
the v1/v2 inboxes and the extindex.

Changes to Xapian behavior also get picked up, now.  Xapian indexing
is handled by workers with minimal IPC to the parent process.
This results in more read I/O but fewer writes when dealing
with cross-posted messages.

Changes to $smsg->populate and --rethread still need further
work.
Diffstat (limited to 'lib/PublicInbox/SearchIdx.pm')
-rw-r--r--lib/PublicInbox/SearchIdx.pm77
1 files changed, 76 insertions, 1 deletions
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 0fbe6560..cd8f4dd7 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -352,8 +352,9 @@ sub index_ids ($$$$) {
         index_list_id($self, $doc, $hdr);
 }
 
-sub add_xapian ($$$$) {
+sub eml2doc ($$$;$) {
         my ($self, $eml, $smsg, $mids) = @_;
+        $mids //= mids_for_index($eml);
         my $doc = $X->{Document}->new;
         add_val($doc, PublicInbox::Search::TS(), $smsg->{ts});
         my @ds = gmtime($smsg->{ds});
@@ -396,6 +397,12 @@ sub add_xapian ($$$$) {
                         }
                 }
         }
+        $doc;
+}
+
+sub add_xapian ($$$$) {
+        my ($self, $eml, $smsg, $mids) = @_;
+        my $doc = eml2doc($self, $eml, $smsg, $mids);
         $self->{xdb}->replace_document($smsg->{num}, $doc);
 }
 
@@ -941,6 +948,10 @@ sub set_metadata_once {
 
 sub _commit_txn {
         my ($self) = @_;
+        if (my $eidx = $self->{eidx}) {
+                $eidx->git->async_wait_all;
+                $eidx->{transact_bytes} = 0;
+        }
         if (my $xdb = $self->{xdb}) {
                 set_metadata_once($self);
                 $xdb->commit_transaction;
@@ -997,4 +1008,68 @@ SELECT COUNT(*) FROM over WHERE num = ?
         }
 }
 
+sub reindex_xap { # git->cat_async callback
+        my ($bref, $oid, $type, $size, $ary) = @_;
+        my ($ibx_id, $oidhex, $req, $more) = @$ary;
+        my $self = $req->{self} // die 'BUG: {self} missing';
+        my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
+        my $eidx_key = $self->{-eidx_key_for}->{$ibx_id} //
+                        die "BUG: bad ibx_id=$ibx_id ($oid)";
+
+        my $docid = $req->{docid};
+        local $eidx->{current_info} = "#$docid $oid";
+        return if is_bad_blob($oid, $type, $size, $oidhex);
+        if (my $doc = $req->{doc}) { # modify existing doc
+                $req->{tg_isset} //= do { # for existing documents in {xdb}
+                        term_generator($self)->set_document($doc);
+                        1;
+                };
+                $doc->add_boolean_term('O'.$eidx_key);
+                index_list_id($self, $doc, PublicInbox::Eml->new($bref));
+        } else { # first time seeing this doc
+                my $smsg = $self->{eidx}->over->get_art($docid) //
+                        die "BUG: #$docid ($oid) not in over";
+                $smsg->{bytes} = $size + crlf_adjust($$bref);
+                $smsg->{eidx_key} = $eidx_key;
+                my $eml = PublicInbox::Eml->new($bref);
+                $req->{doc} = eml2doc($self, $eml, $smsg);
+                $req->{tg_isset} = 1; # eml2doc calls $tg->set_document
+        }
+        return if $more;
+        my $doc = delete($req->{doc}) or return; # all bad blobs!
+        $eidx->{transact_bytes} += $size;
+        $self->{xdb}->replace_document($req->{docid}, $doc);
+}
+
+sub reindex_docid {
+        my ($self, $docid) = @_;
+        my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
+        my $eidx_key_for = $self->{-eidx_key_for} //= do {
+                my %eidx_key_for = map {
+                        $_->[0] => $_->[1];
+                } @{$eidx->over->dbh->selectall_arrayref(<<'')};
+SELECT ibx_id,eidx_key FROM inboxes
+
+                \%eidx_key_for;
+        };
+
+        begin_txn_lazy($self);
+        my $doc = eval { $self->{xdb}->get_document($docid) };
+        my $req = { doc => $doc, self => $self, docid => $docid };
+        my $sth = $eidx->over->dbh->prepare_cached(<<'', undef, 1);
+SELECT ibx_id,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id ASC
+
+        $sth->execute($docid);
+        my $rows = $sth->fetchall_arrayref;
+        while (my $row = shift(@$rows)) {
+                my ($ibx_id, $oidbin) = @$row;
+                my $oidhex = unpack('H*', $oidbin);
+                $eidx->git->cat_async($oidhex, \&reindex_xap,
+                                [ $ibx_id, $oidhex, $req, scalar(@$rows) ]);
+        }
+        if ($eidx->{transact_bytes} >= $eidx->{batch_bytes}) {
+                commit_txn_lazy($self);
+        }
+}
+
 1;