about summary refs log tree commit homepage
path: root/lib/PublicInbox/SearchIdx.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/SearchIdx.pm')
-rw-r--r--lib/PublicInbox/SearchIdx.pm214
1 files changed, 42 insertions, 172 deletions
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 2e0b9a43..3412a615 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -16,10 +16,12 @@ use PublicInbox::MID qw/mid_clean id_compress mid_mime mids references/;
 use PublicInbox::MsgIter;
 use Carp qw(croak);
 use POSIX qw(strftime);
+use PublicInbox::OverIdx;
 require PublicInbox::Git;
+use Compress::Zlib qw(compress);
 
 use constant {
-        BATCH_BYTES => 1_000_000,
+        BATCH_BYTES => 10_000_000,
         DEBUG => !!$ENV{DEBUG},
 };
 
@@ -73,12 +75,13 @@ sub new {
         $ibx->umask_prepare;
         if ($version == 1) {
                 $self->{lock_path} = "$mainrepo/ssoma.lock";
+                my $dir = $self->xdir;
+                $self->{over} = PublicInbox::OverIdx->new("$dir/over.sqlite3");
         } elsif ($version == 2) {
                 defined $part or die "partition is required for v2\n";
-                # partition is a number or "all"
+                # partition is a number
                 $self->{partition} = $part;
                 $self->{lock_path} = undef;
-                $self->{msgmap_path} = "$mainrepo/msgmap.sqlite3";
         } else {
                 die "unsupported inbox version=$version\n";
         }
@@ -114,14 +117,6 @@ sub add_val ($$$) {
         $doc->add_value($col, $num);
 }
 
-sub add_values {
-        my ($doc, $ts, $ds, $num) = @_;
-        add_val($doc, PublicInbox::Search::TS, $ts);
-        my $yyyymmdd = strftime('%Y%m%d', gmtime($ds));
-        add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd);
-        defined($num) and add_val($doc, PublicInbox::Search::NUM, $num);
-}
-
 sub index_users ($$) {
         my ($tg, $smsg) = @_;
 
@@ -269,8 +264,11 @@ sub add_message {
         my ($self, $mime, $bytes, $num, $oid, $mid0) = @_;
         my $doc_id;
         my $mids = mids($mime->header_obj);
-        my $skel = $self->{skeleton};
-
+        $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility
+        unless (defined $num) { # v1
+                my $mm = $self->_msgmap_init;
+                $num = $mm->mid_insert($mid0) || $mm->num_for($mid0);
+        }
         eval {
                 my $smsg = PublicInbox::SearchMsg->new($mime);
                 my $doc = $smsg->{doc};
@@ -281,11 +279,12 @@ sub add_message {
                         $xpath = id_compress($xpath);
                 }
 
-                my $lines = $mime->body_raw =~ tr!\n!\n!;
                 $smsg->{lines} = $mime->body_raw =~ tr!\n!\n!;
                 defined $bytes or $bytes = length($mime->as_string);
                 $smsg->{bytes} = $bytes;
-                add_values($doc, $smsg->ts, $smsg->ds, $num);
+                add_val($doc, PublicInbox::Search::TS(), $smsg->ts);
+                my $yyyymmdd = strftime('%Y%m%d', gmtime($smsg->ds));
+                add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd);
 
                 my $tg = $self->term_generator;
 
@@ -336,7 +335,6 @@ sub add_message {
 
                 # populates smsg->references for smsg->to_doc_data
                 my $refs = parse_references($smsg);
-                $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility
                 my $data = $smsg->to_doc_data($oid, $mid0);
                 foreach my $mid (@$mids) {
                         $tg->index_text($mid, 1, 'XM');
@@ -354,16 +352,14 @@ sub add_message {
                 }
 
                 $self->delete_article($num) if defined $num; # for reindexing
-                if ($skel) {
-                        my @vals = ($smsg->ts, $num, $mids, $xpath, $data);
-                        $skel->index_skeleton(\@vals);
-                        $doc->add_boolean_term('Q' . $_) foreach @$mids;
-                        $doc->add_boolean_term('XNUM' . $num) if defined $num;
-                        $doc_id = $self->{xdb}->add_document($doc);
-                } else {
-                        $doc_id = link_and_save($self, $doc, $mids, $refs,
-                                                $num, $xpath);
-                }
+
+                utf8::encode($data);
+                $data = compress($data);
+                my @vals = ($smsg->ts, $num, $mids, $refs, $xpath, $data);
+                $self->{over}->add_over(\@vals);
+                $doc->add_boolean_term('Q' . $_) foreach @$mids;
+                $doc->add_boolean_term('XNUM' . $num) if defined $num;
+                $doc_id = $self->{xdb}->add_document($doc);
         };
 
         if ($@) {
@@ -439,14 +435,19 @@ sub remove_by_oid {
         # there is only ONE element in @delete unless we
         # have bugs in our v2writable deduplication check
         my @delete;
+        my @over_del;
         for (; $head != $tail; $head->inc) {
                 my $docid = $head->get_docid;
                 my $doc = $db->get_document($docid);
                 my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid);
                 $smsg->load_expand;
-                push(@delete, $docid) if $smsg->{blob} eq $oid;
+                if ($smsg->{blob} eq $oid) {
+                        push(@delete, $docid);
+                        push(@over_del, $smsg->num);
+                }
         }
         $db->delete_document($_) foreach @delete;
+        $self->{over}->remove_oid($oid, $mid);
         scalar(@delete);
 }
 
@@ -462,18 +463,6 @@ sub term_generator { # write-only
         $self->{term_generator} = $tg;
 }
 
-# increments last_thread_id counter
-# returns a 64-bit integer represented as a decimal string
-sub next_thread_id {
-        my ($self) = @_;
-        my $db = $self->{xdb};
-        my $last_thread_id = int($db->get_metadata('last_thread_id') || 0);
-
-        $db->set_metadata('last_thread_id', ++$last_thread_id);
-
-        $last_thread_id;
-}
-
 sub parse_references ($) {
         my ($smsg) = @_;
         my $mime = $smsg->{mime};
@@ -496,71 +485,6 @@ sub parse_references ($) {
         \@keep;
 }
 
-sub link_doc {
-        my ($self, $doc, $refs, $old_tid) = @_;
-        my $tid;
-
-        if (@$refs) {
-                # first ref *should* be the thread root,
-                # but we can never trust clients to do the right thing
-                my $ref = shift @$refs;
-                $tid = resolve_mid_to_tid($self, $ref);
-                merge_threads($self, $tid, $old_tid) if defined $old_tid;
-
-                # the rest of the refs should point to this tid:
-                foreach $ref (@$refs) {
-                        my $ptid = resolve_mid_to_tid($self, $ref);
-                        merge_threads($self, $tid, $ptid);
-                }
-        } else {
-                $tid = defined $old_tid ? $old_tid : $self->next_thread_id;
-        }
-        $doc->add_boolean_term('G' . $tid);
-        $tid;
-}
-
-sub link_and_save {
-        my ($self, $doc, $mids, $refs, $num, $xpath) = @_;
-        my $db = $self->{xdb};
-        my $old_tid;
-        my $doc_id;
-        $doc->add_boolean_term('XNUM' . $num) if defined $num;
-        $doc->add_boolean_term('XPATH' . $xpath) if defined $xpath;
-        $doc->add_boolean_term('Q' . $_) foreach @$mids;
-
-        $self->{skel} and die "Should not have read-only skel here\n";;
-        foreach my $mid (@$mids) {
-                my $vivified = 0;
-                $self->each_smsg_by_mid($mid, sub {
-                        my ($cur) = @_;
-                        my $type = $cur->type;
-                        my $cur_tid = $cur->thread_id;
-                        $old_tid = $cur_tid unless defined $old_tid;
-                        if ($type eq 'mail') {
-                                # do not break existing mail messages,
-                                # just merge the threads
-                                merge_threads($self, $old_tid, $cur_tid);
-                                return 1;
-                        }
-                        if ($type ne 'ghost') {
-                                die "<$mid> has a bad type: $type\n";
-                        }
-                        my $tid = link_doc($self, $doc, $refs, $old_tid);
-                        $old_tid = $tid unless defined $old_tid;
-                        $doc_id = $cur->{doc_id};
-                        $self->{xdb}->replace_document($doc_id, $doc);
-                        ++$vivified;
-                        1;
-                });
-                $vivified > 1 and warn
-                        "BUG: vivified multiple ($vivified) ghosts for $mid\n";
-        }
-        # not really important, but we return any vivified ghost docid, here:
-        return $doc_id if defined $doc_id;
-        link_doc($self, $doc, $refs, $old_tid);
-        $self->{xdb}->add_document($doc);
-}
-
 sub index_git_blob_id {
         my ($doc, $pfx, $objid) = @_;
 
@@ -675,14 +599,10 @@ sub rlog {
 
 sub _msgmap_init {
         my ($self) = @_;
+        die "BUG: _msgmap_init is only for v1\n" if $self->{version} != 1;
         $self->{mm} ||= eval {
                 require PublicInbox::Msgmap;
-                my $msgmap_path = $self->{msgmap_path};
-                if (defined $msgmap_path) { # v2
-                        PublicInbox::Msgmap->new_file($msgmap_path, 1);
-                } else {
-                        PublicInbox::Msgmap->new($self->{mainrepo}, 1);
-                }
+                PublicInbox::Msgmap->new($self->{mainrepo}, 1);
         };
 }
 
@@ -699,8 +619,7 @@ sub _index_sync {
         my $reindex = $opts->{reindex};
         my ($mkey, $last_commit, $lx, $xlog);
         $self->{git}->batch_prepare;
-        my $xdb = _xdb_acquire($self);
-        $xdb->begin_transaction;
+        my $xdb = $self->begin_txn_lazy;
         do {
                 $xlog = undef;
                 $mkey = 'last_commit';
@@ -710,6 +629,9 @@ sub _index_sync {
                         $lx = '';
                         $mkey = undef if $last_commit ne '';
                 }
+                $self->{over}->rollback_lazy;
+                $self->{over}->disconnect;
+                delete $self->{txn};
                 $xdb->cancel_transaction;
                 $xdb = _xdb_release($self);
 
@@ -717,8 +639,7 @@ sub _index_sync {
                 my $range = $lx eq '' ? $tip : "$lx..$tip";
                 $xlog = _git_log($self, $range);
 
-                $xdb = _xdb_acquire($self);
-                $xdb->begin_transaction;
+                $xdb = $self->begin_txn_lazy;
         } while ($xdb->get_metadata('last_commit') ne $last_commit);
 
         my $mm = _msgmap_init($self);
@@ -732,14 +653,12 @@ sub _index_sync {
                 }
                 if (!$mm_only) {
                         $xdb->set_metadata($mkey, $commit) if $mkey && $commit;
-                        $xdb->commit_transaction;
-                        $xdb = _xdb_release($self);
+                        $self->commit_txn_lazy;
                 }
                 # let another process do some work... <
                 if ($more) {
                         if (!$mm_only) {
-                                $xdb = _xdb_acquire($self);
-                                $xdb->begin_transaction;
+                                $xdb = $self->begin_txn_lazy;
                         }
                         $dbh->begin_work if $dbh;
                 }
@@ -779,71 +698,19 @@ sub _index_sync {
         }
 }
 
-# this will create a ghost as necessary
-sub resolve_mid_to_tid {
-        my ($self, $mid) = @_;
-        my $tid;
-        $self->each_smsg_by_mid($mid, sub {
-                my ($smsg) = @_;
-                my $cur_tid = $smsg->thread_id;
-                if (defined $tid) {
-                        merge_threads($self, $tid, $cur_tid);
-                } else {
-                        $tid = $smsg->thread_id;
-                }
-                1;
-        });
-        return $tid if defined $tid;
-
-        $self->create_ghost($mid)->thread_id;
-}
-
-sub create_ghost {
-        my ($self, $mid) = @_;
-
-        my $tid = $self->next_thread_id;
-        my $doc = Search::Xapian::Document->new;
-        $doc->add_boolean_term('Q' . $mid);
-        $doc->add_boolean_term('G' . $tid);
-        $doc->add_boolean_term('T' . 'ghost');
-
-        my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid);
-        $self->{xdb}->add_document($doc);
-
-        $smsg;
-}
-
-sub merge_threads {
-        my ($self, $winner_tid, $loser_tid) = @_;
-        return if $winner_tid == $loser_tid;
-        my $db = $self->{xdb};
-        batch_do($self, 'G' . $loser_tid, sub {
-                my ($ids) = @_;
-                foreach my $docid (@$ids) {
-                        my $doc = $db->get_document($docid);
-                        $doc->remove_term('G' . $loser_tid);
-                        $doc->add_boolean_term('G' . $winner_tid);
-                        $db->replace_document($docid, $doc);
-                }
-        });
-}
-
 sub DESTROY {
         # order matters for unlocking
         $_[0]->{xdb} = undef;
         $_[0]->{lockfh} = undef;
 }
 
-# remote_* subs are only used by SearchIdxPart and SearchIdxSkeleton
+# remote_* subs are only used by SearchIdxPart
 sub remote_commit {
         my ($self) = @_;
         if (my $w = $self->{w}) {
                 print $w "commit\n" or die "failed to write commit: $!";
         } else {
                 $self->commit_txn_lazy;
-                if (my $skel = $self->{skeleton}) {
-                        $skel->commit_txn_lazy;
-                }
         }
 }
 
@@ -864,7 +731,7 @@ sub remote_close {
 sub remote_remove {
         my ($self, $oid, $mid) = @_;
         if (my $w = $self->{w}) {
-                # triggers remove_by_oid in partition or skeleton
+                # triggers remove_by_oid in a partition
                 print $w "D $oid $mid\n" or die "failed to write remove $!";
         } else {
                 $self->begin_txn_lazy;
@@ -876,14 +743,17 @@ sub begin_txn_lazy {
         my ($self) = @_;
         return if $self->{txn};
         my $xdb = $self->{xdb} || $self->_xdb_acquire;
+        $self->{over}->begin_lazy;
         $xdb->begin_transaction;
         $self->{txn} = 1;
+        $xdb;
 }
 
 sub commit_txn_lazy {
         my ($self) = @_;
         delete $self->{txn} or return;
         $self->{xdb}->commit_transaction;
+        $self->{over}->commit_lazy;
 }
 
 sub worker_done {