about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong (Contractor, The Linux Foundation) <e@80x24.org>2018-04-02 00:04:52 +0000
committerEric Wong (Contractor, The Linux Foundation) <e@80x24.org>2018-04-02 00:05:39 +0000
commit35ff6bb106909b1c1232666a9792156dfa398ea8 (patch)
tree000f656d3daf3a077fbfa02b5853523d66a89329
parent7503aeb540af5afd5cb1b554b3c29f35f5fc918d (diff)
downloadpublic-inbox-35ff6bb106909b1c1232666a9792156dfa398ea8.tar.gz
This ought to provide better performance and scalability
which is less dependent on inbox size.  Xapian does not
seem optimized for some queries used by the WWW homepage,
Atom feeds, XOVER and NEWNEWS NNTP commands.

This can actually make Xapian optional for NNTP usage,
and allow more functionality to work without Xapian
installed.

Indexing performance was extremely bad at first, but
DBI::Profile helped me optimize away problematic queries.
-rw-r--r--MANIFEST5
-rw-r--r--lib/PublicInbox/Inbox.pm15
-rw-r--r--lib/PublicInbox/Msgmap.pm1
-rw-r--r--lib/PublicInbox/NNTP.pm29
-rw-r--r--lib/PublicInbox/Over.pm119
-rw-r--r--lib/PublicInbox/OverIdx.pm370
-rw-r--r--lib/PublicInbox/OverIdxFork.pm (renamed from lib/PublicInbox/SearchIdxSkeleton.pm)128
-rw-r--r--lib/PublicInbox/Search.pm109
-rw-r--r--lib/PublicInbox/SearchIdx.pm214
-rw-r--r--lib/PublicInbox/SearchIdxPart.pm16
-rw-r--r--lib/PublicInbox/SearchMsg.pm26
-rw-r--r--lib/PublicInbox/V2Writable.pm78
-rwxr-xr-xscript/public-inbox-compact32
-rw-r--r--t/over.t38
-rw-r--r--t/psgi_search.t6
-rw-r--r--t/search-thr-index.t7
-rw-r--r--t/search.t14
17 files changed, 763 insertions, 444 deletions
diff --git a/MANIFEST b/MANIFEST
index 60e15f28..5fd8acf8 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -82,6 +82,9 @@ lib/PublicInbox/Msgmap.pm
 lib/PublicInbox/NNTP.pm
 lib/PublicInbox/NNTPD.pm
 lib/PublicInbox/NewsWWW.pm
+lib/PublicInbox/Over.pm
+lib/PublicInbox/OverIdx.pm
+lib/PublicInbox/OverIdxFork.pm
 lib/PublicInbox/ParentPipe.pm
 lib/PublicInbox/ProcessPipe.pm
 lib/PublicInbox/Qspawn.pm
@@ -90,7 +93,6 @@ lib/PublicInbox/SaPlugin/ListMirror.pm
 lib/PublicInbox/Search.pm
 lib/PublicInbox/SearchIdx.pm
 lib/PublicInbox/SearchIdxPart.pm
-lib/PublicInbox/SearchIdxSkeleton.pm
 lib/PublicInbox/SearchMsg.pm
 lib/PublicInbox/SearchThread.pm
 lib/PublicInbox/SearchView.pm
@@ -170,6 +172,7 @@ t/msg_iter.t
 t/msgmap.t
 t/nntp.t
 t/nntpd.t
+t/over.t
 t/plack.t
 t/precheck.t
 t/psgi_attach.t
diff --git a/lib/PublicInbox/Inbox.pm b/lib/PublicInbox/Inbox.pm
index 43cf15ba..142b5c89 100644
--- a/lib/PublicInbox/Inbox.pm
+++ b/lib/PublicInbox/Inbox.pm
@@ -319,20 +319,7 @@ sub msg_by_mid ($$;$) {
 
 sub recent {
         my ($self, $opts) = @_;
-        my $qs = '';
-        my $srch = search($self);
-        if (!$opts->{offset}) {
-                # this complicated bit cuts /$INBOX/ loading time by
-                # over 400ms on my system:
-                my ($min, $max) = mm($self)->minmax;
-                my $n = $max - $opts->{limit};
-                $n = $min if $n < $min;
-                for (; $qs eq '' && $n >= $min; --$n) {
-                        my $smsg = $srch->lookup_article($n) or next;
-                        $qs = strftime('d:%Y%m%d..', gmtime($smsg->ts));
-                }
-        }
-        $srch->query($qs, $opts);
+        search($self)->query('', $opts);
 }
 
 1;
diff --git a/lib/PublicInbox/Msgmap.pm b/lib/PublicInbox/Msgmap.pm
index 12833050..dea95731 100644
--- a/lib/PublicInbox/Msgmap.pm
+++ b/lib/PublicInbox/Msgmap.pm
@@ -39,6 +39,7 @@ sub dbh_new {
 
 sub new_file {
         my ($class, $f, $writable) = @_;
+        return if !$writable && !-r $f;
 
         my $dbh = dbh_new($f, $writable);
         my $self = bless { dbh => $dbh }, $class;
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index fb65ddc0..48ab7fc2 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -331,13 +331,11 @@ sub cmd_newnews ($$$$;$$) {
         };
         return '.' unless @srch;
 
-        $ts .= '..';
-        my $opts = { asc => 1, limit => 1000, offset => 0 };
+        my $opts = { limit => 1000, offset => 0 };
         long_response($self, 0, long_response_limit, sub {
                 my ($i) = @_;
                 my $srch = $srch[0];
-                my $res = $srch->query_ts($ts, $opts);
-                my $msgs = $res->{msgs};
+                my $msgs = $srch->query_ts($ts, $opts);
                 if (my $nr = scalar @$msgs) {
                         more($self, '<' .
                                 join(">\r\n<", map { $_->mid } @$msgs ).
@@ -463,7 +461,7 @@ find_mid:
                 defined $mid or return $err;
         }
 found:
-        my $smsg = $ng->search->lookup_article($n) or return $err;
+        my $smsg = $ng->search->{over_ro}->get_art($n) or return $err;
         my $msg = $ng->msg_by_smsg($smsg) or return $err;
         my $s = Email::Simple->new($msg);
         if ($set_headers) {
@@ -692,8 +690,9 @@ sub hdr_xref ($$$) { # optimize XHDR Xref [range] for rtin
 
 sub search_header_for {
         my ($srch, $num, $field) = @_;
-        my $smsg = $srch->lookup_article($num) or return;
-        $smsg->$field;
+        my $smsg = $srch->{over_ro}->get_art($num) or return;
+        return PublicInbox::SearchMsg::date($smsg) if $field eq 'date';
+        $smsg->{$field};
 }
 
 sub hdr_searchmsg ($$$$) {
@@ -714,8 +713,7 @@ sub hdr_searchmsg ($$$$) {
                 my $off = 0;
                 long_response($self, $beg, $end, sub {
                         my ($i) = @_;
-                        my $res = $srch->query_xover($beg, $end, $off);
-                        my $msgs = $res->{msgs};
+                        my $msgs = $srch->query_xover($beg, $end, $off);
                         my $nr = scalar @$msgs or return;
                         $off += $nr;
                         my $tmp = '';
@@ -816,10 +814,10 @@ sub over_line ($$) {
                 $smsg->{subject},
                 $smsg->{from},
                 PublicInbox::SearchMsg::date($smsg),
-                '<'.PublicInbox::SearchMsg::mid($smsg).'>',
+                "<$smsg->{mid}>",
                 $smsg->{references},
-                PublicInbox::SearchMsg::bytes($smsg),
-                PublicInbox::SearchMsg::lines($smsg));
+                $smsg->{bytes},
+                $smsg->{lines});
         utf8::encode($s);
         $s
 }
@@ -829,7 +827,7 @@ sub cmd_over ($;$) {
         if ($range && $range =~ /\A<(.+)>\z/) {
                 my ($ng, $n) = mid_lookup($self, $1);
                 defined $n or return r430;
-                my $smsg = $ng->search->lookup_article($n) or return r430;
+                my $smsg = $ng->search->{over_ro}->get_art($n) or return r430;
                 more($self, '224 Overview information follows (multi-line)');
 
                 # Only set article number column if it's the current group
@@ -853,14 +851,13 @@ sub cmd_xover ($;$) {
         my $off = 0;
         long_response($self, $beg, $end, sub {
                 my ($i) = @_;
-                my $res = $srch->query_xover($beg, $end, $off);
-                my $msgs = $res->{msgs};
+                my $msgs = $srch->query_xover($beg, $end, $off);
                 my $nr = scalar @$msgs or return;
                 $off += $nr;
 
                 # OVERVIEW.FMT
                 more($self, join("\r\n", map {
-                        over_line(PublicInbox::SearchMsg::num($_), $_);
+                        over_line($_->{num}, $_);
                         } @$msgs));
 
                 # -1 to adjust for implicit increment in long_response
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
new file mode 100644
index 00000000..cf7a8849
--- /dev/null
+++ b/lib/PublicInbox/Over.pm
@@ -0,0 +1,119 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI
+# Unlike Msgmap, this is an _UNSTABLE_ database which can be
+# tweaked/updated over time and rebuilt.
+package PublicInbox::Over;
+use strict;
+use warnings;
+use DBI;
+use DBD::SQLite;
+use PublicInbox::SearchMsg;
+use Compress::Zlib qw(uncompress);
+
+sub dbh_new {
+        my ($self) = @_;
+        my $ro = ref($self) eq 'PublicInbox::Over';
+        my $dbh = DBI->connect("dbi:SQLite:dbname=$self->{filename}",'','', {
+                AutoCommit => 1,
+                RaiseError => 1,
+                PrintError => 0,
+                ReadOnly => $ro,
+                sqlite_use_immediate_transaction => 1,
+        });
+        $dbh->{sqlite_unicode} = 1;
+        $dbh;
+}
+
+sub new {
+        my ($class, $f) = @_;
+        bless { filename => $f }, $class;
+}
+
+sub disconnect { $_[0]->{dbh} = undef }
+
+sub connect { $_[0]->{dbh} ||= $_[0]->dbh_new }
+
+sub load_from_row {
+        my ($smsg) = @_;
+        bless $smsg, 'PublicInbox::SearchMsg';
+        if (defined(my $data = delete $smsg->{ddd})) {
+                $data = uncompress($data);
+                utf8::decode($data);
+                $smsg->load_from_data($data);
+        }
+        $smsg
+}
+
+sub do_get {
+        my ($self, $sql, $opts, @args) = @_;
+        my $dbh = $self->connect;
+        my $lim = (($opts->{limit} || 0) + 0) || 1000;
+        my $off = (($opts->{offset} || 0) + 0) || 0;
+        $sql .= "LIMIT $lim OFFSET $off";
+        my $msgs = $dbh->selectall_arrayref($sql, { Slice => {} }, @args);
+        load_from_row($_) for @$msgs;
+        $msgs
+}
+
+sub query_xover {
+        my ($self, $beg, $end, $off) = @_;
+        do_get($self, <<'', { offset => $off }, $beg, $end);
+SELECT * FROM over WHERE num >= ? AND num <= ?
+ORDER BY num ASC
+
+}
+
+sub query_ts {
+        my ($self, $ts, $opts) = @_;
+        do_get($self, <<'', $opts, $ts);
+SELECT * FROM over WHERE num > 0 AND ts >= ?
+ORDER BY ts ASC
+
+}
+
+sub get_thread {
+        my ($self, $mid, $opts) = @_;
+        my $dbh = $self->connect;
+        my ($tid, $sid) = $dbh->selectrow_array(<<'', undef, $mid);
+SELECT tid,sid FROM over
+LEFT JOIN id2num ON over.num = id2num.num
+LEFT JOIN msgid ON id2num.id = msgid.id
+WHERE msgid.mid = ? AND over.num > 0
+LIMIT 1
+
+        my $cond = 'FROM over WHERE (tid = ? OR sid = ?) AND num > 0';
+        my $msgs = do_get($self, <<"", $opts, $tid, $sid);
+SELECT * $cond
+ORDER BY ts ASC
+
+        my $nr = $dbh->selectrow_array(<<"", undef, $tid, $sid);
+SELECT COUNT(num) $cond
+
+        { total => $nr, msgs => $msgs };
+}
+
+sub recent {
+        my ($self, $opts) = @_;
+        my $msgs = do_get($self, <<'', $opts);
+SELECT * FROM over WHERE num > 0
+ORDER BY ts DESC
+
+        my $nr = $self->{dbh}->selectrow_array(<<'');
+SELECT COUNT(num) FROM over WHERE num > 0
+
+        { total => $nr, msgs => $msgs };
+}
+
+sub get_art {
+        my ($self, $num) = @_;
+        my $dbh = $self->connect;
+        my $smsg = $dbh->selectrow_hashref(<<'', undef, $num);
+SELECT * from OVER where num = ? LIMIT 1
+
+        return load_from_row($smsg) if $smsg;
+        undef;
+}
+
+1;
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
new file mode 100644
index 00000000..0e43aabc
--- /dev/null
+++ b/lib/PublicInbox/OverIdx.pm
@@ -0,0 +1,370 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI
+# Unlike Msgmap, this is an _UNSTABLE_ database which can be
+# tweaked/updated over time and rebuilt.
+package PublicInbox::OverIdx;
+use strict;
+use warnings;
+use base qw(PublicInbox::Over);
+use IO::Handle;
+use DBI qw(:sql_types); # SQL_BLOB
+
+sub dbh_new {
+        my ($self) = @_;
+        my $dbh = $self->SUPER::dbh_new;
+        $dbh->do('PRAGMA synchronous = OFF'); # commit_fsync instead
+        $dbh->do('PRAGMA journal_mode = TRUNCATE');
+        $dbh->do('PRAGMA cache_size = 80000');
+        create_tables($dbh);
+        $dbh;
+}
+
+sub commit_fsync {
+        my $fn = $_[0]->{filename};
+        if (open my $fh, '+<', $fn) {
+                $fh->sync;
+                close $fh;
+        }
+}
+
+sub get_counter ($$) {
+        my ($dbh, $key) = @_;
+        my $sth = $dbh->prepare_cached(<<'', undef, 1);
+SELECT val FROM counter WHERE key = ? LIMIT 1
+
+        $sth->execute($key);
+        $sth->fetchrow_array;
+}
+
+sub adj_counter ($$$) {
+        my ($self, $key, $op) = @_;
+        my $dbh = $self->{dbh};
+        my $sth = $dbh->prepare_cached(<<"");
+UPDATE counter SET val = val $op 1 WHERE key = ?
+
+        $sth->execute($key);
+
+        get_counter($dbh, $key);
+}
+
+sub next_tid { adj_counter($_[0], 'thread', '+') }
+sub next_ghost_num { adj_counter($_[0], 'ghost', '-') }
+
+sub id_for ($$$$$) {
+        my ($self, $tbl, $id_col, $val_col, $val) = @_;
+        my $dbh = $self->{dbh};
+        my $in = $dbh->prepare_cached(<<"")->execute($val);
+INSERT OR IGNORE INTO $tbl ($val_col) VALUES (?)
+
+        if ($in == 0) {
+                my $sth = $dbh->prepare_cached(<<"", undef, 1);
+SELECT $id_col FROM $tbl WHERE $val_col = ? LIMIT 1
+
+                $sth->execute($val);
+                $sth->fetchrow_array;
+        } else {
+                $dbh->last_insert_id(undef, undef, $tbl, $id_col);
+        }
+}
+
+sub sid {
+        my ($self, $path) = @_;
+        return unless defined $path && $path ne '';
+        id_for($self, 'subject', 'sid', 'path' => $path);
+}
+
+sub mid2id {
+        my ($self, $mid) = @_;
+        id_for($self, 'msgid', 'id', 'mid' => $mid);
+}
+
+sub delete_by_num {
+        my ($self, $num) = @_;
+        my $dbh = $self->{dbh};
+        foreach (qw(over id2num)) {
+                $dbh->prepare_cached(<<"")->execute($num);
+DELETE FROM $_ WHERE num = ?
+
+        }
+}
+
+# this includes ghosts
+sub each_by_mid {
+        my ($self, $mid, $cols, $cb) = @_;
+        my $dbh = $self->{dbh};
+
+=over
+        I originally wanted to stuff everything into a single query:
+
+        SELECT over.* FROM over
+        LEFT JOIN id2num ON over.num = id2num.num
+        LEFT JOIN msgid ON msgid.id = id2num.id
+        WHERE msgid.mid = ? AND over.num >= ?
+        ORDER BY over.num ASC
+        LIMIT 1000
+
+        But it's faster broken out (and we're always in a
+        transaction for subroutines in this file)
+=cut
+
+        my $sth = $dbh->prepare_cached(<<'', undef, 1);
+SELECT id FROM msgid WHERE mid = ? LIMIT 1
+
+        $sth->execute($mid);
+        my $id = $sth->fetchrow_array;
+        defined $id or return;
+
+        push(@$cols, 'num');
+        $cols = join(',', map { $_ } @$cols);
+        my $lim = 10;
+        my $prev = get_counter($dbh, 'ghost');
+        while (1) {
+                $sth = $dbh->prepare_cached(<<"", undef, 1);
+SELECT num FROM id2num WHERE id = ? AND num >= ?
+ORDER BY num ASC
+LIMIT $lim
+
+                $sth->execute($id, $prev);
+                my $nums = $sth->fetchall_arrayref;
+                my $nr = scalar(@$nums) or return;
+                $prev = $nums->[-1]->[0];
+
+                $sth = $dbh->prepare_cached(<<"", undef, 1);
+SELECT $cols FROM over WHERE over.num = ? LIMIT 1
+
+                foreach (@$nums) {
+                        $sth->execute($_->[0]);
+                        my $smsg = $sth->fetchrow_hashref;
+                        $cb->(PublicInbox::Over::load_from_row($smsg)) or
+                                return;
+                }
+                return if $nr != $lim;
+        }
+}
+
+# this will create a ghost as necessary
+sub resolve_mid_to_tid {
+        my ($self, $mid) = @_;
+        my $tid;
+        each_by_mid($self, $mid, ['tid'], sub {
+                my ($smsg) = @_;
+                my $cur_tid = $smsg->{tid};
+                if (defined $tid) {
+                        merge_threads($self, $tid, $cur_tid);
+                } else {
+                        $tid = $cur_tid;
+                }
+                1;
+        });
+        defined $tid ? $tid : create_ghost($self, $mid);
+}
+
+sub create_ghost {
+        my ($self, $mid) = @_;
+        my $id = $self->mid2id($mid);
+        my $num = $self->next_ghost_num;
+        $num < 0 or die "ghost num is non-negative: $num\n";
+        my $tid = $self->next_tid;
+        my $dbh = $self->{dbh};
+        $dbh->prepare_cached(<<'')->execute($num, $tid);
+INSERT INTO over (num, tid) VALUES (?,?)
+
+        $dbh->prepare_cached(<<'')->execute($id, $num);
+INSERT INTO id2num (id, num) VALUES (?,?)
+
+        $tid;
+}
+
+sub merge_threads {
+        my ($self, $winner_tid, $loser_tid) = @_;
+        return if $winner_tid == $loser_tid;
+        my $dbh = $self->{dbh};
+        $dbh->prepare_cached(<<'')->execute($winner_tid, $loser_tid);
+UPDATE over SET tid = ? WHERE tid = ?
+
+}
+
+sub link_refs {
+        my ($self, $refs, $old_tid) = @_;
+        my $tid;
+
+        if (@$refs) {
+                # first ref *should* be the thread root,
+                # but we can never trust clients to do the right thing
+                my $ref = $refs->[0];
+                $tid = resolve_mid_to_tid($self, $ref);
+                merge_threads($self, $tid, $old_tid) if defined $old_tid;
+
+                # the rest of the refs should point to this tid:
+                foreach my $i (1..$#$refs) {
+                        $ref = $refs->[$i];
+                        my $ptid = resolve_mid_to_tid($self, $ref);
+                        merge_threads($self, $tid, $ptid);
+                }
+        } else {
+                $tid = defined $old_tid ? $old_tid : $self->next_tid;
+        }
+        $tid;
+}
+
+sub add_over {
+        my ($self, $values) = @_;
+        my ($ts, $num, $mids, $refs, $xpath, $ddd) = @$values;
+        my $old_tid;
+        my $vivified = 0;
+
+        $self->begin_lazy;
+        $self->delete_by_num($num);
+        foreach my $mid (@$mids) {
+                my $v = 0;
+                each_by_mid($self, $mid, ['tid'], sub {
+                        my ($cur) = @_;
+                        my $cur_tid = $cur->{tid};
+                        my $n = $cur->{num};
+                        die "num must not be zero for $mid" if !$n;
+                        $old_tid = $cur_tid unless defined $old_tid;
+                        if ($n > 0) { # regular mail
+                                merge_threads($self, $old_tid, $cur_tid);
+                        } elsif ($n < 0) { # ghost
+                                link_refs($self, $refs, $old_tid);
+                                $self->delete_by_num($n);
+                                $v++;
+                        }
+                        1;
+                });
+                $v > 1 and warn "BUG: vivified multiple ($v) ghosts for $mid\n";
+                $vivified += $v;
+        }
+        my $tid = $vivified ? $old_tid : link_refs($self, $refs, $old_tid);
+        my $sid = $self->sid($xpath);
+        my $dbh = $self->{dbh};
+        my $sth = $dbh->prepare_cached(<<'');
+INSERT INTO over (num, tid, sid, ts, ddd)
+VALUES (?,?,?,?,?)
+
+        my $n = 0;
+        my @v = ($num, $tid, $sid, $ts);
+        foreach (@v) { $sth->bind_param(++$n, $_) }
+        $sth->bind_param(++$n, $ddd, SQL_BLOB);
+        $sth->execute;
+        $sth = $dbh->prepare_cached(<<'');
+INSERT INTO id2num (id, num) VALUES (?,?)
+
+        foreach my $mid (@$mids) {
+                my $id = $self->mid2id($mid);
+                $sth->execute($id, $num);
+        }
+}
+
+sub delete_articles {
+        my ($self, $nums) = @_;
+        my $dbh = $self->connect;
+        $self->delete_by_num($_) foreach @$nums;
+}
+
+sub remove_oid {
+        my ($self, $oid, $mid) = @_;
+        $self->begin_lazy;
+        each_by_mid($self, $mid, ['ddd'], sub {
+                my ($smsg) = @_;
+                $self->delete_by_num($smsg->{num}) if $smsg->{blob} eq $oid;
+                1;
+        });
+}
+
+sub create_tables {
+        my ($dbh) = @_;
+
+        $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS over (
+        num INTEGER NOT NULL,
+        tid INTEGER NOT NULL,
+        sid INTEGER,
+        ts INTEGER,
+        ddd VARBINARY, /* doc-data-deflated */
+        UNIQUE (num)
+)
+
+        $dbh->do('CREATE INDEX IF NOT EXISTS idx_tid ON over (tid)');
+        $dbh->do('CREATE INDEX IF NOT EXISTS idx_sid ON over (sid)');
+        $dbh->do('CREATE INDEX IF NOT EXISTS idx_ts ON over (ts)');
+
+        $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS counter (
+        key VARCHAR(8) PRIMARY KEY NOT NULL,
+        val INTEGER DEFAULT 0,
+        UNIQUE (key)
+)
+
+        $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('thread')");
+        $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('ghost')");
+
+        $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS subject (
+        sid INTEGER PRIMARY KEY AUTOINCREMENT,
+        path VARCHAR(40) NOT NULL,
+        UNIQUE (path)
+)
+
+        $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS id2num (
+        id INTEGER NOT NULL,
+        num INTEGER NOT NULL,
+        UNIQUE (id, num)
+)
+
+        # performance critical:
+        $dbh->do('CREATE INDEX IF NOT EXISTS idx_inum ON id2num (num)');
+        $dbh->do('CREATE INDEX IF NOT EXISTS idx_id ON id2num (id)');
+
+        $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS msgid (
+        id INTEGER PRIMARY KEY AUTOINCREMENT,
+        mid VARCHAR(244) NOT NULL,
+        UNIQUE (mid)
+)
+
+}
+
+sub commit_lazy {
+        my ($self) = @_;
+        delete $self->{txn} or return;
+        $self->{dbh}->commit;
+}
+
+sub begin_lazy {
+        my ($self) = @_;
+        return if $self->{txn};
+        my $dbh = $self->connect or return;
+        $dbh->begin_work;
+        # $dbh->{Profile} = 2;
+        $self->{txn} = 1;
+}
+
+sub rollback_lazy {
+        my ($self) = @_;
+        delete $self->{txn} or return;
+        $self->{dbh}->rollback;
+}
+
+sub disconnect {
+        my ($self) = @_;
+        die "in transaction" if $self->{txn};
+        $self->{dbh} = undef;
+}
+
+sub create {
+        my ($self) = @_;
+        unless (-r $self->{filename}) {
+                require File::Path;
+                require File::Basename;
+                File::Path::mkpath(File::Basename::dirname($self->{filename}));
+        }
+        # create the DB:
+        PublicInbox::Over::connect($self);
+        $self->disconnect;
+}
+
+1;
diff --git a/lib/PublicInbox/SearchIdxSkeleton.pm b/lib/PublicInbox/OverIdxFork.pm
index 2be64960..f4f7cddd 100644
--- a/lib/PublicInbox/SearchIdxSkeleton.pm
+++ b/lib/PublicInbox/OverIdxFork.pm
@@ -1,19 +1,16 @@
 # Copyright (C) 2018 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-package PublicInbox::SearchIdxSkeleton;
+package PublicInbox::OverIdxFork;
 use strict;
 use warnings;
-use base qw(PublicInbox::SearchIdx);
+use base qw(PublicInbox::OverIdx PublicInbox::Lock);
 use Storable qw(freeze thaw);
+use IO::Handle;
 
-sub new {
-        my ($class, $v2writable) = @_;
-        my $self = $class->SUPER::new($v2writable->{-inbox}, 1, 'skel');
-        # create the DB:
-        $self->_xdb_acquire;
-        $self->_xdb_release;
+sub create {
+        my ($self, $v2writable) = @_;
+        $self->SUPER::create();
         $self->spawn_worker($v2writable) if $v2writable->{parallel};
-        $self
 }
 
 sub spawn_worker {
@@ -30,33 +27,35 @@ sub spawn_worker {
                 $v2writable = undef;
                 close $w;
                 close $barrier_wait;
-                eval { skeleton_worker_loop($self, $r, $barrier_note) };
-                die "skeleton worker died: $@\n" if $@;
+
+                # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
+                # speeds V2Writable batch imports across 8 cores by nearly 20%
+                fcntl($r, 1031, 1048576) if $^O eq 'linux';
+
+                eval { over_worker_loop($self, $r, $barrier_note) };
+                die "over worker died: $@\n" if $@;
                 exit;
         }
         $self->{w} = $w;
         $self->{pid} = $pid;
+        $self->{lock_path} = "$self->{filename}.pipe.lock";
         close $r;
         close $barrier_note;
         $self->{barrier_wait} = $barrier_wait;
-
         $w->autoflush(1);
-
-        # lock on only exists in parent, not in worker
-        $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock';
 }
 
-sub skeleton_worker_loop {
+sub over_worker_loop {
         my ($self, $r, $barrier_note) = @_;
         $barrier_note->autoflush(1);
-        $0 = 'pi-v2-skeleton';
-        $self->begin_txn_lazy;
+        $0 = 'pi-v2-overview';
+        $self->begin_lazy;
         my $barrier = undef;
         while (my $line = $r->getline) {
                 if ($line eq "commit\n") {
-                        $self->commit_txn_lazy;
+                        $self->commit_lazy;
                 } elsif ($line eq "close\n") {
-                        $self->_xdb_release;
+                        $self->disconnect;
                 } elsif ($line =~ /\Abarrier_init (\d+)\n\z/) {
                         my $n = $1 - 1;
                         die "barrier in-progress\n" if defined $barrier;
@@ -67,30 +66,30 @@ sub skeleton_worker_loop {
                         delete $barrier->{$1} or die "unknown barrier: $part\n";
                         if ((scalar keys %$barrier) == 0) {
                                 $barrier = undef;
-                                $self->commit_txn_lazy;
+                                $self->commit_lazy;
                                 print $barrier_note "barrier_done\n" or die
                                         "print failed to barrier note: $!";
                         }
                 } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.*)\n\z/s) {
                         my ($oid, $mid) = ($1, $2);
-                        $self->begin_txn_lazy;
-                        $self->remove_by_oid($oid, $mid);
+                        $self->remove_oid($oid, $mid);
                 } else {
                         my $len = int($line);
                         my $n = read($r, my $msg, $len) or die "read: $!\n";
                         $n == $len or die "short read: $n != $len\n";
                         $msg = thaw($msg); # should raise on error
                         defined $msg or die "failed to thaw buffer\n";
-                        $self->begin_txn_lazy;
-                        eval { index_skeleton_real($self, $msg) };
+                        eval { add_over($self, $msg) };
                         warn "failed to index message <$msg->[-1]>: $@\n" if $@;
                 }
         }
-        $self->worker_done;
+        die "$$ $0 dbh not released\n" if $self->{dbh};
+        die "$$ $0 still in transaction\n" if $self->{txn};
 }
 
 # called by a partition worker
-sub index_skeleton {
+# values: [ DS, NUM, BYTES, LINES, TS, MIDS, XPATH, doc_data ]
+sub add_over {
         my ($self, $values) = @_;
         if (my $w = $self->{w}) {
                 my $err;
@@ -106,32 +105,21 @@ sub index_skeleton {
 
                 die "print failed: $err\n" if $err;
         } else {
-                $self->begin_txn_lazy;
-                index_skeleton_real($self, $values);
+                $self->SUPER::add_over($values);
         }
 }
 
-sub remote_remove {
+sub remove_oid {
         my ($self, $oid, $mid) = @_;
-        my $err;
-        $self->lock_acquire;
-        eval { $self->SUPER::remote_remove($oid, $mid) };
-        $err = $@;
-        $self->lock_release;
-        die $err if $err;
-}
-
-sub index_skeleton_real ($$) {
-        my ($self, $values) = @_;
-        my ($ts, $num, $mids, $xpath, $doc_data) = @$values;
-        my $smsg = PublicInbox::SearchMsg->new(undef);
-        $smsg->load_from_data($doc_data);
-        my $doc = $smsg->{doc};
-        $doc->set_data($doc_data);
-        PublicInbox::SearchIdx::add_values($doc, $ts, $smsg->ds, $num);
-        my @refs = ($smsg->references =~ /<([^>]+)>/g);
-        $self->delete_article($num) if defined $num; # for reindexing
-        $self->link_and_save($doc, $mids, \@refs, $num, $xpath);
+        if (my $w = $self->{w}) {
+                my $err;
+                $self->lock_acquire;
+                print $w "D $oid $mid\n" or $err = $!;
+                $self->lock_release;
+                die $err if $err;
+        } else {
+                $self->SUPER::remove_oid($oid, $mid); # OverIdx
+        }
 }
 
 # write to the subprocess
@@ -140,7 +128,7 @@ sub barrier_init {
         my $w = $self->{w} or return;
         my $err;
         $self->lock_acquire;
-        print $w "barrier_init $nparts\n" or $err = "failed to write: $!\n";
+        print $w "barrier_init $nparts\n" or $err = $!;
         $self->lock_release;
         die $err if $err;
 }
@@ -152,4 +140,44 @@ sub barrier_wait {
         $l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n";
 }
 
+sub remote_commit {
+        my ($self) = @_;
+        if (my $w = $self->{w}) {
+                my $err;
+                $self->lock_acquire;
+                print $w "commit\n" or $err = $!;
+                $self->lock_release;
+                die $err if $err;
+        } else {
+                $self->commit_lazy;
+        }
+}
+
+# prevent connections when using forked subprocesses
+sub connect {
+        my ($self) = @_;
+        return if $self->{w};
+        $self->SUPER::connect;
+}
+
+sub remote_close {
+        my ($self) = @_;
+        if (my $w = delete $self->{w}) {
+                my $pid = delete $self->{pid} or die "no process to wait on\n";
+                print $w "close\n" or die "failed to write to pid:$pid: $!\n";
+                close $w or die "failed to close pipe for pid:$pid: $!\n";
+                waitpid($pid, 0) == $pid or die "remote process did not finish";
+                $? == 0 or die ref($self)." pid:$pid exited with: $?";
+        } else {
+                die "transaction in progress $self\n" if $self->{txn};
+                $self->disconnect;
+        }
+}
+
+sub commit_fsync {
+        my ($self) = @_;
+        return if $self->{w}; # don't bother; main parent can also call this
+        $self->SUPER::commit_fsync;
+}
+
 1;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index ca389e32..91251246 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -10,12 +10,12 @@ use warnings;
 # values for searching
 use constant TS => 0;  # Received: header in Unix time
 use constant YYYYMMDD => 1; # for searching in the WWW UI
-use constant NUM => 2; # NNTP article number
 
 use Search::Xapian qw/:standard/;
 use PublicInbox::SearchMsg;
 use PublicInbox::MIME;
 use PublicInbox::MID qw/id_compress/;
+use PublicInbox::Over;
 
 # This is English-only, everything else is non-standard and may be confused as
 # a prefix common in patch emails
@@ -40,19 +40,13 @@ use constant {
         # 13 - fix threading for empty References/In-Reply-To
         #      (commit 83425ef12e4b65cdcecd11ddcb38175d4a91d5a0)
         # 14 - fix ghost root vivification
-        SCHEMA_VERSION => 14,
+        SCHEMA_VERSION => 15,
 
         # n.b. FLAG_PURE_NOT is expensive not suitable for a public website
         # as it could become a denial-of-service vector
         QP_FLAGS => FLAG_PHRASE|FLAG_BOOLEAN|FLAG_LOVEHATE|FLAG_WILDCARD,
 };
 
-# setup prefixes
-my %bool_pfx_internal = (
-        type => 'T', # "mail" or "ghost"
-        thread => 'G', # newsGroup (or similar entity - e.g. a web forum name)
-);
-
 my %bool_pfx_external = (
         mid => 'Q', # Message-ID (full/exact), this is mostly uniQue
 );
@@ -116,8 +110,6 @@ EOF
 );
 chomp @HELP;
 
-my $mail_query = Search::Xapian::Query->new('T' . 'mail');
-
 sub xdir {
         my ($self) = @_;
         if ($self->{version} == 1) {
@@ -143,8 +135,9 @@ sub new {
                 altid => $altid,
                 version => $version,
         }, $class;
+        my $dir;
         if ($version >= 2) {
-                my $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION;
+                $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION;
                 my $xdb;
                 my $parts = 0;
                 foreach my $part (<$dir/*>) {
@@ -158,55 +151,36 @@ sub new {
                         }
                 }
                 $self->{xdb} = $xdb;
-                $self->{skel} = Search::Xapian::Database->new("$dir/skel");
         } else {
-                $self->{xdb} = Search::Xapian::Database->new($self->xdir);
+                $dir = $self->xdir;
+                $self->{xdb} = Search::Xapian::Database->new($dir);
         }
+        $self->{over_ro} = PublicInbox::Over->new("$dir/over.sqlite3");
         $self;
 }
 
 sub reopen {
         my ($self) = @_;
         $self->{xdb}->reopen;
-        if (my $skel = $self->{skel}) {
-                $skel->reopen;
-        }
         $self; # make chaining easier
 }
 
 # read-only
 sub query {
         my ($self, $query_string, $opts) = @_;
-        my $query;
-
         $opts ||= {};
-        unless ($query_string eq '') {
-                $query = $self->qp->parse_query($query_string, QP_FLAGS);
+        if ($query_string eq '' && !$opts->{mset}) {
+                $self->{over_ro}->recent($opts);
+        } else {
+                my $query = $self->qp->parse_query($query_string, QP_FLAGS);
                 $opts->{relevance} = 1 unless exists $opts->{relevance};
+                _do_enquire($self, $query, $opts);
         }
-
-        _do_enquire($self, $query, $opts);
 }
 
 sub get_thread {
         my ($self, $mid, $opts) = @_;
-        my $smsg = first_smsg_by_mid($self, $mid) or
-                        return { total => 0, msgs => [] };
-        my $qtid = Search::Xapian::Query->new('G' . $smsg->thread_id);
-        my $path = $smsg->path;
-        if (defined $path && $path ne '') {
-                my $path = id_compress($smsg->path);
-                my $qsub = Search::Xapian::Query->new('XPATH' . $path);
-                $qtid = Search::Xapian::Query->new(OP_OR, $qtid, $qsub);
-        }
-        $opts ||= {};
-        $opts->{limit} ||= 1000;
-
-        # always sort threads by timestamp, this makes life easier
-        # for the threading algorithm (in SearchThread.pm)
-        $opts->{asc} = 1;
-        $opts->{enquire} = enquire_skel($self);
-        _do_enquire($self, $qtid, $opts);
+        $self->{over_ro}->get_thread($mid, $opts);
 }
 
 sub retry_reopen {
@@ -235,19 +209,13 @@ sub _do_enquire {
 
 sub _enquire_once {
         my ($self, $query, $opts) = @_;
-        my $enquire = $opts->{enquire} || enquire($self);
-        if (defined $query) {
-                $query = Search::Xapian::Query->new(OP_AND,$query,$mail_query);
-        } else {
-                $query = $mail_query;
-        }
+        my $enquire = enquire($self);
+        $query = Search::Xapian::Query->new(OP_AND,$query);
         $enquire->set_query($query);
         $opts ||= {};
         my $desc = !$opts->{asc};
         if ($opts->{relevance}) {
                 $enquire->set_sort_by_relevance_then_value(TS, $desc);
-        } elsif ($opts->{num}) {
-                $enquire->set_sort_by_value(NUM, 0);
         } else {
                 $enquire->set_sort_by_value_then_relevance(TS, $desc);
         }
@@ -309,39 +277,15 @@ EOF
         $self->{query_parser} = $qp;
 }
 
-sub num_range_processor {
-        $_[0]->{nrp} ||= Search::Xapian::NumberValueRangeProcessor->new(NUM);
-}
-
 # only used for NNTP server
 sub query_xover {
         my ($self, $beg, $end, $offset) = @_;
-        my $qp = Search::Xapian::QueryParser->new;
-        $qp->set_database($self->{skel} || $self->{xdb});
-        $qp->add_valuerangeprocessor($self->num_range_processor);
-        my $query = $qp->parse_query("$beg..$end", QP_FLAGS);
-
-        my $opts = {
-                enquire => enquire_skel($self),
-                num => 1,
-                limit => 200,
-                offset => $offset,
-        };
-        _do_enquire($self, $query, $opts);
+        $self->{over_ro}->query_xover($beg, $end, $offset);
 }
 
 sub query_ts {
-        my ($self, $ts, $opts) = @_;
-        my $qp = $self->{qp_ts} ||= eval {
-                my $q = Search::Xapian::QueryParser->new;
-                $q->set_database($self->{skel} || $self->{xdb});
-                $q->add_valuerangeprocessor(
-                        Search::Xapian::NumberValueRangeProcessor->new(TS));
-                $q
-        };
-        my $query = $qp->parse_query($ts, QP_FLAGS);
-        $opts->{enquire} = enquire_skel($self);
-        _do_enquire($self, $query, $opts);
+        my ($self, $ts, $offset) = @_;
+        $self->{over_ro}->query_ts($ts, $offset);
 }
 
 sub first_smsg_by_mid {
@@ -356,7 +300,7 @@ sub first_smsg_by_mid {
 sub lookup_article {
         my ($self, $num) = @_;
         my $term = 'XNUM'.$num;
-        my $db = $self->{skel} || $self->{xdb};
+        my $db = $self->{xdb};
         retry_reopen($self, sub {
                 my $head = $db->postlist_begin($term);
                 my $tail = $db->postlist_end($term);
@@ -365,9 +309,7 @@ sub lookup_article {
                 return unless defined $doc_id;
                 $head->inc;
                 if ($head->nequal($tail)) {
-                        my $loc= $self->{mainrepo} .
-                                ($self->{skel} ? 'skel' : 'xdb');
-                        warn "article #$num is not unique in $loc\n";
+                        warn "article #$num is not unique\n";
                 }
                 # raises on error:
                 my $doc = $db->get_document($doc_id);
@@ -381,7 +323,7 @@ sub each_smsg_by_mid {
         my ($self, $mid, $cb) = @_;
         # XXX retry_reopen isn't necessary for V2Writable, but the PSGI
         # interface will need it...
-        my $db = $self->{skel} || $self->{xdb};
+        my $db = $self->{xdb};
         my $term = 'Q' . $mid;
         my $head = $db->postlist_begin($term);
         my $tail = $db->postlist_end($term);
@@ -424,15 +366,6 @@ sub enquire {
         $self->{enquire} ||= Search::Xapian::Enquire->new($self->{xdb});
 }
 
-sub enquire_skel {
-        my ($self) = @_;
-        if (my $skel = $self->{skel}) {
-                $self->{enquire_skel} ||= Search::Xapian::Enquire->new($skel);
-        } else {
-                enquire($self);
-        }
-}
-
 sub help {
         my ($self) = @_;
         $self->qp; # parse altids
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 2e0b9a43..3412a615 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -16,10 +16,12 @@ use PublicInbox::MID qw/mid_clean id_compress mid_mime mids references/;
 use PublicInbox::MsgIter;
 use Carp qw(croak);
 use POSIX qw(strftime);
+use PublicInbox::OverIdx;
 require PublicInbox::Git;
+use Compress::Zlib qw(compress);
 
 use constant {
-        BATCH_BYTES => 1_000_000,
+        BATCH_BYTES => 10_000_000,
         DEBUG => !!$ENV{DEBUG},
 };
 
@@ -73,12 +75,13 @@ sub new {
         $ibx->umask_prepare;
         if ($version == 1) {
                 $self->{lock_path} = "$mainrepo/ssoma.lock";
+                my $dir = $self->xdir;
+                $self->{over} = PublicInbox::OverIdx->new("$dir/over.sqlite3");
         } elsif ($version == 2) {
                 defined $part or die "partition is required for v2\n";
-                # partition is a number or "all"
+                # partition is a number
                 $self->{partition} = $part;
                 $self->{lock_path} = undef;
-                $self->{msgmap_path} = "$mainrepo/msgmap.sqlite3";
         } else {
                 die "unsupported inbox version=$version\n";
         }
@@ -114,14 +117,6 @@ sub add_val ($$$) {
         $doc->add_value($col, $num);
 }
 
-sub add_values {
-        my ($doc, $ts, $ds, $num) = @_;
-        add_val($doc, PublicInbox::Search::TS, $ts);
-        my $yyyymmdd = strftime('%Y%m%d', gmtime($ds));
-        add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd);
-        defined($num) and add_val($doc, PublicInbox::Search::NUM, $num);
-}
-
 sub index_users ($$) {
         my ($tg, $smsg) = @_;
 
@@ -269,8 +264,11 @@ sub add_message {
         my ($self, $mime, $bytes, $num, $oid, $mid0) = @_;
         my $doc_id;
         my $mids = mids($mime->header_obj);
-        my $skel = $self->{skeleton};
-
+        $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility
+        unless (defined $num) { # v1
+                my $mm = $self->_msgmap_init;
+                $num = $mm->mid_insert($mid0) || $mm->num_for($mid0);
+        }
         eval {
                 my $smsg = PublicInbox::SearchMsg->new($mime);
                 my $doc = $smsg->{doc};
@@ -281,11 +279,12 @@ sub add_message {
                         $xpath = id_compress($xpath);
                 }
 
-                my $lines = $mime->body_raw =~ tr!\n!\n!;
                 $smsg->{lines} = $mime->body_raw =~ tr!\n!\n!;
                 defined $bytes or $bytes = length($mime->as_string);
                 $smsg->{bytes} = $bytes;
-                add_values($doc, $smsg->ts, $smsg->ds, $num);
+                add_val($doc, PublicInbox::Search::TS(), $smsg->ts);
+                my $yyyymmdd = strftime('%Y%m%d', gmtime($smsg->ds));
+                add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd);
 
                 my $tg = $self->term_generator;
 
@@ -336,7 +335,6 @@ sub add_message {
 
                 # populates smsg->references for smsg->to_doc_data
                 my $refs = parse_references($smsg);
-                $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility
                 my $data = $smsg->to_doc_data($oid, $mid0);
                 foreach my $mid (@$mids) {
                         $tg->index_text($mid, 1, 'XM');
@@ -354,16 +352,14 @@ sub add_message {
                 }
 
                 $self->delete_article($num) if defined $num; # for reindexing
-                if ($skel) {
-                        my @vals = ($smsg->ts, $num, $mids, $xpath, $data);
-                        $skel->index_skeleton(\@vals);
-                        $doc->add_boolean_term('Q' . $_) foreach @$mids;
-                        $doc->add_boolean_term('XNUM' . $num) if defined $num;
-                        $doc_id = $self->{xdb}->add_document($doc);
-                } else {
-                        $doc_id = link_and_save($self, $doc, $mids, $refs,
-                                                $num, $xpath);
-                }
+
+                utf8::encode($data);
+                $data = compress($data);
+                my @vals = ($smsg->ts, $num, $mids, $refs, $xpath, $data);
+                $self->{over}->add_over(\@vals);
+                $doc->add_boolean_term('Q' . $_) foreach @$mids;
+                $doc->add_boolean_term('XNUM' . $num) if defined $num;
+                $doc_id = $self->{xdb}->add_document($doc);
         };
 
         if ($@) {
@@ -439,14 +435,19 @@ sub remove_by_oid {
         # there is only ONE element in @delete unless we
         # have bugs in our v2writable deduplication check
         my @delete;
+        my @over_del;
         for (; $head != $tail; $head->inc) {
                 my $docid = $head->get_docid;
                 my $doc = $db->get_document($docid);
                 my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid);
                 $smsg->load_expand;
-                push(@delete, $docid) if $smsg->{blob} eq $oid;
+                if ($smsg->{blob} eq $oid) {
+                        push(@delete, $docid);
+                        push(@over_del, $smsg->num);
+                }
         }
         $db->delete_document($_) foreach @delete;
+        $self->{over}->remove_oid($oid, $mid);
         scalar(@delete);
 }
 
@@ -462,18 +463,6 @@ sub term_generator { # write-only
         $self->{term_generator} = $tg;
 }
 
-# increments last_thread_id counter
-# returns a 64-bit integer represented as a decimal string
-sub next_thread_id {
-        my ($self) = @_;
-        my $db = $self->{xdb};
-        my $last_thread_id = int($db->get_metadata('last_thread_id') || 0);
-
-        $db->set_metadata('last_thread_id', ++$last_thread_id);
-
-        $last_thread_id;
-}
-
 sub parse_references ($) {
         my ($smsg) = @_;
         my $mime = $smsg->{mime};
@@ -496,71 +485,6 @@ sub parse_references ($) {
         \@keep;
 }
 
-sub link_doc {
-        my ($self, $doc, $refs, $old_tid) = @_;
-        my $tid;
-
-        if (@$refs) {
-                # first ref *should* be the thread root,
-                # but we can never trust clients to do the right thing
-                my $ref = shift @$refs;
-                $tid = resolve_mid_to_tid($self, $ref);
-                merge_threads($self, $tid, $old_tid) if defined $old_tid;
-
-                # the rest of the refs should point to this tid:
-                foreach $ref (@$refs) {
-                        my $ptid = resolve_mid_to_tid($self, $ref);
-                        merge_threads($self, $tid, $ptid);
-                }
-        } else {
-                $tid = defined $old_tid ? $old_tid : $self->next_thread_id;
-        }
-        $doc->add_boolean_term('G' . $tid);
-        $tid;
-}
-
-sub link_and_save {
-        my ($self, $doc, $mids, $refs, $num, $xpath) = @_;
-        my $db = $self->{xdb};
-        my $old_tid;
-        my $doc_id;
-        $doc->add_boolean_term('XNUM' . $num) if defined $num;
-        $doc->add_boolean_term('XPATH' . $xpath) if defined $xpath;
-        $doc->add_boolean_term('Q' . $_) foreach @$mids;
-
-        $self->{skel} and die "Should not have read-only skel here\n";;
-        foreach my $mid (@$mids) {
-                my $vivified = 0;
-                $self->each_smsg_by_mid($mid, sub {
-                        my ($cur) = @_;
-                        my $type = $cur->type;
-                        my $cur_tid = $cur->thread_id;
-                        $old_tid = $cur_tid unless defined $old_tid;
-                        if ($type eq 'mail') {
-                                # do not break existing mail messages,
-                                # just merge the threads
-                                merge_threads($self, $old_tid, $cur_tid);
-                                return 1;
-                        }
-                        if ($type ne 'ghost') {
-                                die "<$mid> has a bad type: $type\n";
-                        }
-                        my $tid = link_doc($self, $doc, $refs, $old_tid);
-                        $old_tid = $tid unless defined $old_tid;
-                        $doc_id = $cur->{doc_id};
-                        $self->{xdb}->replace_document($doc_id, $doc);
-                        ++$vivified;
-                        1;
-                });
-                $vivified > 1 and warn
-                        "BUG: vivified multiple ($vivified) ghosts for $mid\n";
-        }
-        # not really important, but we return any vivified ghost docid, here:
-        return $doc_id if defined $doc_id;
-        link_doc($self, $doc, $refs, $old_tid);
-        $self->{xdb}->add_document($doc);
-}
-
 sub index_git_blob_id {
         my ($doc, $pfx, $objid) = @_;
 
@@ -675,14 +599,10 @@ sub rlog {
 
 sub _msgmap_init {
         my ($self) = @_;
+        die "BUG: _msgmap_init is only for v1\n" if $self->{version} != 1;
         $self->{mm} ||= eval {
                 require PublicInbox::Msgmap;
-                my $msgmap_path = $self->{msgmap_path};
-                if (defined $msgmap_path) { # v2
-                        PublicInbox::Msgmap->new_file($msgmap_path, 1);
-                } else {
-                        PublicInbox::Msgmap->new($self->{mainrepo}, 1);
-                }
+                PublicInbox::Msgmap->new($self->{mainrepo}, 1);
         };
 }
 
@@ -699,8 +619,7 @@ sub _index_sync {
         my $reindex = $opts->{reindex};
         my ($mkey, $last_commit, $lx, $xlog);
         $self->{git}->batch_prepare;
-        my $xdb = _xdb_acquire($self);
-        $xdb->begin_transaction;
+        my $xdb = $self->begin_txn_lazy;
         do {
                 $xlog = undef;
                 $mkey = 'last_commit';
@@ -710,6 +629,9 @@ sub _index_sync {
                         $lx = '';
                         $mkey = undef if $last_commit ne '';
                 }
+                $self->{over}->rollback_lazy;
+                $self->{over}->disconnect;
+                delete $self->{txn};
                 $xdb->cancel_transaction;
                 $xdb = _xdb_release($self);
 
@@ -717,8 +639,7 @@ sub _index_sync {
                 my $range = $lx eq '' ? $tip : "$lx..$tip";
                 $xlog = _git_log($self, $range);
 
-                $xdb = _xdb_acquire($self);
-                $xdb->begin_transaction;
+                $xdb = $self->begin_txn_lazy;
         } while ($xdb->get_metadata('last_commit') ne $last_commit);
 
         my $mm = _msgmap_init($self);
@@ -732,14 +653,12 @@ sub _index_sync {
                 }
                 if (!$mm_only) {
                         $xdb->set_metadata($mkey, $commit) if $mkey && $commit;
-                        $xdb->commit_transaction;
-                        $xdb = _xdb_release($self);
+                        $self->commit_txn_lazy;
                 }
                 # let another process do some work... <
                 if ($more) {
                         if (!$mm_only) {
-                                $xdb = _xdb_acquire($self);
-                                $xdb->begin_transaction;
+                                $xdb = $self->begin_txn_lazy;
                         }
                         $dbh->begin_work if $dbh;
                 }
@@ -779,71 +698,19 @@ sub _index_sync {
         }
 }
 
-# this will create a ghost as necessary
-sub resolve_mid_to_tid {
-        my ($self, $mid) = @_;
-        my $tid;
-        $self->each_smsg_by_mid($mid, sub {
-                my ($smsg) = @_;
-                my $cur_tid = $smsg->thread_id;
-                if (defined $tid) {
-                        merge_threads($self, $tid, $cur_tid);
-                } else {
-                        $tid = $smsg->thread_id;
-                }
-                1;
-        });
-        return $tid if defined $tid;
-
-        $self->create_ghost($mid)->thread_id;
-}
-
-sub create_ghost {
-        my ($self, $mid) = @_;
-
-        my $tid = $self->next_thread_id;
-        my $doc = Search::Xapian::Document->new;
-        $doc->add_boolean_term('Q' . $mid);
-        $doc->add_boolean_term('G' . $tid);
-        $doc->add_boolean_term('T' . 'ghost');
-
-        my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid);
-        $self->{xdb}->add_document($doc);
-
-        $smsg;
-}
-
-sub merge_threads {
-        my ($self, $winner_tid, $loser_tid) = @_;
-        return if $winner_tid == $loser_tid;
-        my $db = $self->{xdb};
-        batch_do($self, 'G' . $loser_tid, sub {
-                my ($ids) = @_;
-                foreach my $docid (@$ids) {
-                        my $doc = $db->get_document($docid);
-                        $doc->remove_term('G' . $loser_tid);
-                        $doc->add_boolean_term('G' . $winner_tid);
-                        $db->replace_document($docid, $doc);
-                }
-        });
-}
-
 sub DESTROY {
         # order matters for unlocking
         $_[0]->{xdb} = undef;
         $_[0]->{lockfh} = undef;
 }
 
-# remote_* subs are only used by SearchIdxPart and SearchIdxSkeleton
+# remote_* subs are only used by SearchIdxPart
 sub remote_commit {
         my ($self) = @_;
         if (my $w = $self->{w}) {
                 print $w "commit\n" or die "failed to write commit: $!";
         } else {
                 $self->commit_txn_lazy;
-                if (my $skel = $self->{skeleton}) {
-                        $skel->commit_txn_lazy;
-                }
         }
 }
 
@@ -864,7 +731,7 @@ sub remote_close {
 sub remote_remove {
         my ($self, $oid, $mid) = @_;
         if (my $w = $self->{w}) {
-                # triggers remove_by_oid in partition or skeleton
+                # triggers remove_by_oid in a partition
                 print $w "D $oid $mid\n" or die "failed to write remove $!";
         } else {
                 $self->begin_txn_lazy;
@@ -876,14 +743,17 @@ sub begin_txn_lazy {
         my ($self) = @_;
         return if $self->{txn};
         my $xdb = $self->{xdb} || $self->_xdb_acquire;
+        $self->{over}->begin_lazy;
         $xdb->begin_transaction;
         $self->{txn} = 1;
+        $xdb;
 }
 
 sub commit_txn_lazy {
         my ($self) = @_;
         delete $self->{txn} or return;
         $self->{xdb}->commit_transaction;
+        $self->{over}->commit_lazy;
 }
 
 sub worker_done {
diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm
index 82f5c1bc..e5766a82 100644
--- a/lib/PublicInbox/SearchIdxPart.pm
+++ b/lib/PublicInbox/SearchIdxPart.pm
@@ -6,12 +6,12 @@ use warnings;
 use base qw(PublicInbox::SearchIdx);
 
 sub new {
-        my ($class, $v2writable, $part, $skel) = @_;
+        my ($class, $v2writable, $part) = @_;
         my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part);
-        $self->{skeleton} = $skel;
-        # create the DB:
+        # create the DB before forking:
         $self->_xdb_acquire;
         $self->_xdb_release;
+        $self->{over} = $v2writable->{over};
         $self->spawn_worker($v2writable, $part) if $v2writable->{parallel};
         $self;
 }
@@ -27,7 +27,7 @@ sub spawn_worker {
         if ($pid == 0) {
                 $v2writable->atfork_child;
                 $v2writable = undef;
-                close $w;
+                close $w or die "failed to close: $!";
 
                 # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
                 # speeds V2Writable batch imports across 8 cores by nearly 20%
@@ -40,7 +40,7 @@ sub spawn_worker {
         }
         $self->{pid} = $pid;
         $self->{w} = $w;
-        close $r;
+        close $r or die "failed to close: $!";
 }
 
 sub partition_worker_loop ($$$) {
@@ -50,13 +50,12 @@ sub partition_worker_loop ($$$) {
         while (my $line = $r->getline) {
                 if ($line eq "commit\n") {
                         $self->commit_txn_lazy;
-                        $self->{skeleton}->remote_commit;
                 } elsif ($line eq "close\n") {
                         $self->_xdb_release;
                 } elsif ($line eq "barrier\n") {
                         $self->commit_txn_lazy;
-                        print { $self->{skeleton}->{w} } "barrier $part\n" or
-                                        die "write failed to skeleton: $!\n";
+                        print { $self->{over}->{w} } "barrier $part\n" or
+                                        die "write failed to overview $!\n";
                 } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) {
                         my ($oid, $mid) = ($1, $2);
                         $self->begin_txn_lazy;
@@ -101,7 +100,6 @@ sub remote_barrier {
                 $w->flush or die "failed to flush: $!";
         } else {
                 $self->commit_txn_lazy;
-                $self->{skeleton}->remote_commit;
         }
 }
 
diff --git a/lib/PublicInbox/SearchMsg.pm b/lib/PublicInbox/SearchMsg.pm
index f5510b8e..6c0780e5 100644
--- a/lib/PublicInbox/SearchMsg.pm
+++ b/lib/PublicInbox/SearchMsg.pm
@@ -13,9 +13,7 @@ use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
 sub new {
         my ($class, $mime) = @_;
         my $doc = Search::Xapian::Document->new;
-        $doc->add_boolean_term('T' . 'mail');
-
-        bless { type => 'mail', doc => $doc, mime => $mime }, $class;
+        bless { doc => $doc, mime => $mime }, $class;
 }
 
 sub wrap {
@@ -52,6 +50,7 @@ sub to_doc_data {
         );
 }
 
+
 sub load_from_data ($$) {
         my ($self) = $_[0]; # data = $_[1]
         (
@@ -187,26 +186,9 @@ sub mid ($;$) {
 
 sub _extract_mid { mid_clean(mid_mime($_[0]->{mime})) }
 
-sub thread_id {
-        my ($self) = @_;
-        my $tid = $self->{thread};
-        return $tid if defined $tid;
-        $self->{thread} = _get_term_val($self, 'G', qr/\AG/); # *G*roup
-}
+sub tid { $_[0]->{tid} }
 
 # XXX: consider removing this, we can phrase match subject
-sub path {
-        my ($self) = @_;
-        my $path = $self->{path};
-        return $path if defined $path;
-        $self->{path} = _get_term_val($self, 'XPATH', qr/\AXPATH/); # path
-}
-
-sub type {
-        my ($self) = @_;
-        my $type = $self->{type};
-        return $type if defined $type;
-        $self->{type} = _get_term_val($self, 'T', qr/\AT/);
-}
+sub path { $_[0]->{path} }
 
 1;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 51723e55..8e3122ab 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -7,13 +7,15 @@ use strict;
 use warnings;
 use base qw(PublicInbox::Lock);
 use PublicInbox::SearchIdxPart;
-use PublicInbox::SearchIdxSkeleton;
 use PublicInbox::MIME;
 use PublicInbox::Git;
 use PublicInbox::Import;
 use PublicInbox::MID qw(mids);
 use PublicInbox::ContentId qw(content_id content_digest);
 use PublicInbox::Inbox;
+use PublicInbox::OverIdxFork;
+use PublicInbox::Msgmap;
+use IO::Handle;
 
 # an estimate of the post-packed size to the raw uncompressed size
 my $PACKING_FACTOR = 0.4;
@@ -57,6 +59,7 @@ sub new {
                 partitions => $nparts,
                 parallel => 1,
                 transact_bytes => 0,
+                over => PublicInbox::OverIdxFork->new("$xpfx/over.sqlite3"),
                 lock_path => "$dir/inbox.lock",
                 # limit each repo to 1GB or so
                 rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
@@ -117,7 +120,7 @@ sub num_for {
         my $mids = mids($mime->header_obj);
         if (@$mids) {
                 my $mid = $mids->[0];
-                my $num = $self->{skel}->{mm}->mid_insert($mid);
+                my $num = $self->{mm}->mid_insert($mid);
                 if (defined $num) { # common case
                         $$mid0 = $mid;
                         return $num;
@@ -140,7 +143,7 @@ sub num_for {
                 # try the rest of the mids
                 for(my $i = $#$mids; $i >= 1; $i--) {
                         my $m = $mids->[$i];
-                        $num = $self->{skel}->{mm}->mid_insert($m);
+                        $num = $self->{mm}->mid_insert($m);
                         if (defined $num) {
                                 warn "alternative <$m> for <$mid> found\n";
                                 $$mid0 = $m;
@@ -158,20 +161,20 @@ sub num_for_harder {
         my $hdr = $mime->header_obj;
         my $dig = content_digest($mime);
         $$mid0 = PublicInbox::Import::digest2mid($dig);
-        my $num = $self->{skel}->{mm}->mid_insert($$mid0);
+        my $num = $self->{mm}->mid_insert($$mid0);
         unless (defined $num) {
                 # it's hard to spoof the last Received: header
                 my @recvd = $hdr->header_raw('Received');
                 $dig->add("Received: $_") foreach (@recvd);
                 $$mid0 = PublicInbox::Import::digest2mid($dig);
-                $num = $self->{skel}->{mm}->mid_insert($$mid0);
+                $num = $self->{mm}->mid_insert($$mid0);
 
                 # fall back to a random Message-ID and give up determinism:
                 until (defined($num)) {
                         $dig->add(rand);
                         $$mid0 = PublicInbox::Import::digest2mid($dig);
                         warn "using random Message-ID <$$mid0> as fallback\n";
-                        $num = $self->{skel}->{mm}->mid_insert($$mid0);
+                        $num = $self->{mm}->mid_insert($$mid0);
                 }
         }
         PublicInbox::Import::append_mid($hdr, $$mid0);
@@ -194,13 +197,11 @@ sub idx_init {
         # frequently activated.
         delete $ibx->{$_} foreach (qw(git mm search));
 
+        my $over = $self->{over};
         $ibx->umask_prepare;
         $ibx->with_umask(sub {
                 $self->lock_acquire;
-
-                # first time initialization, first we create the skeleton pipe:
-                my $skel = PublicInbox::SearchIdxSkeleton->new($self);
-                $self->{skel} = $skel;
+                $over->create($self);
 
                 # need to create all parts before initializing msgmap FD
                 my $max = $self->{partitions} - 1;
@@ -208,12 +209,14 @@ sub idx_init {
                 # idx_parts must be visible to all forked processes
                 my $idx = $self->{idx_parts} = [];
                 for my $i (0..$max) {
-                        push @$idx,
-                             PublicInbox::SearchIdxPart->new($self, $i, $skel);
+                        push @$idx, PublicInbox::SearchIdxPart->new($self, $i);
                 }
 
-                # Now that all subprocesses are up, we can open the FD for SQLite:
-                $skel->_msgmap_init->{dbh}->begin_work;
+                # Now that all subprocesses are up, we can open the FDs
+                # for SQLite:
+                my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
+                        "$self->{-inbox}->{mainrepo}/msgmap.sqlite3", 1);
+                $mm->{dbh}->begin_work;
         });
 }
 
@@ -236,9 +239,8 @@ sub remove_internal {
         my $ibx = $self->{-inbox};
         my $srch = $ibx->search;
         my $cid = content_id($mime);
-        my $skel = $self->{skel};
         my $parts = $self->{idx_parts};
-        my $mm = $skel->{mm};
+        my $mm = $self->{mm};
         my $removed;
         my $mids = mids($mime->header_obj);
 
@@ -273,9 +275,10 @@ sub remove_internal {
                                 $orig = undef;
                                 $removed->num; # memoize this for callers
 
-                                foreach my $idx (@$parts, $skel) {
+                                foreach my $idx (@$parts) {
                                         $idx->remote_remove($oid, $mid);
                                 }
+                                $self->{over}->remove_oid($oid, $mid);
                         }
                         1; # continue
                 });
@@ -322,18 +325,20 @@ sub barrier {
         if (my $im = $self->{im}) {
                 $im->barrier;
         }
-        my $skel = $self->{skel};
         my $parts = $self->{idx_parts};
-        if ($parts && $skel) {
-                my $dbh = $skel->{mm}->{dbh};
-                $dbh->commit; # SQLite data is second in importance
+        if ($parts) {
+                my $dbh = $self->{mm}->{dbh};
+                $dbh->commit; # SQLite msgmap data is second in importance
+
+                my $over = $self->{over};
 
-                # Now deal with Xapian
-                $skel->barrier_init(scalar(@$parts));
-                # each partition needs to issue a barrier command to skel:
+                # Now deal with Xapian and overview DB
+                $over->barrier_init(scalar(@$parts));
+
+                # each partition needs to issue a barrier command to over
                 $_->remote_barrier foreach @$parts;
 
-                $skel->barrier_wait; # wait for each Xapian partition
+                $over->barrier_wait; # wait for each Xapian partition
 
                 $dbh->begin_work;
         }
@@ -343,26 +348,30 @@ sub barrier {
 sub searchidx_checkpoint {
         my ($self, $more) = @_;
 
-        # order matters, we can only close {skel} after all partitions
-        # are done because the partitions also write to {skel}
+        # order matters, we can only close {over} after all partitions
+        # are done because the partitions also write to {over}
         if (my $parts = $self->{idx_parts}) {
                 foreach my $idx (@$parts) {
-                        $idx->remote_commit; # propagates commit to skel
+                        $idx->remote_commit; # propagates commit to over
                         $idx->remote_close unless $more;
                 }
                 delete $self->{idx_parts} unless $more;
         }
 
-        if (my $skel = $self->{skel}) {
-                my $dbh = $skel->{mm}->{dbh};
+        if (my $mm = $self->{mm}) {
+                my $dbh = $mm->{dbh};
                 $dbh->commit;
                 if ($more) {
                         $dbh->begin_work;
                 } else {
-                        $skel->remote_close;
-                        delete $self->{skel};
+                        delete $self->{mm};
                 }
         }
+        my $over = $self->{over};
+        $over->remote_commit;
+        if (!$more) {
+                $over->remote_close;
+        }
         $self->{transact_bytes} = 0;
 }
 
@@ -522,6 +531,7 @@ sub atfork_child {
         if (my $im = $self->{im}) {
                 $im->atfork_child;
         }
+        die "unexpected mm" if $self->{mm};
 }
 
 sub mark_deleted {
@@ -559,7 +569,7 @@ sub reindex_oid {
         if (!defined($mid0) && $regen && !$del) {
                 $num = $$regen--;
                 die "BUG: ran out of article numbers\n" if $num <= 0;
-                my $mm = $self->{skel}->{mm};
+                my $mm = $self->{mm};
                 foreach my $mid (reverse @$mids) {
                         if ($mm->mid_set($num, $mid) == 1) {
                                 $mid0 = $mid;
@@ -620,7 +630,7 @@ sub reindex {
         my $head = $ibx->{ref_head} || 'refs/heads/master';
         $self->idx_init; # acquire lock
         my $x40 = qr/[a-f0-9]{40}/;
-        my $mm_tmp = $self->{skel}->{mm}->tmp_clone;
+        my $mm_tmp = $self->{mm}->tmp_clone;
         if (!$regen) {
                 my (undef, $max) = $mm_tmp->minmax;
                 unless (defined $max) {
diff --git a/script/public-inbox-compact b/script/public-inbox-compact
index 79cd039b..e6977165 100755
--- a/script/public-inbox-compact
+++ b/script/public-inbox-compact
@@ -10,7 +10,6 @@ use PublicInbox::Config;
 use Cwd 'abs_path';
 use File::Temp qw(tempdir);
 use File::Path qw(remove_tree);
-use PublicInbox::Spawn qw(spawn);
 my $usage = "Usage: public-inbox-compact REPO_DIR\n";
 my $dir = shift or die $usage;
 my $config = PublicInbox::Config->new;
@@ -36,6 +35,8 @@ $ibx->umask_prepare;
 sub commit_changes ($$$) {
         my ($im, $old, $new) = @_;
         my @st = stat($old) or die "failed to stat($old): $!\n";
+        link("$old/over.sqlite3", "$new/over.sqlite3") or die
+                "failed to link {$old => $new}/over.sqlite3: $!\n";
         rename($old, "$new/old") or die "rename $old => $new/old: $!\n";
         chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
         rename($new, $old) or die "rename $new => $old: $!\n";
@@ -53,41 +54,18 @@ if ($v == 2) {
         $ibx->with_umask(sub {
                 $v2w->lock_acquire;
                 my @parts;
-                my $skel;
                 while (defined(my $dn = readdir($dh))) {
                         if ($dn =~ /\A\d+\z/) {
                                 push @parts, "$old/$dn";
-                        } elsif ($dn eq 'skel') {
-                                $skel = "$old/$dn";
                         } elsif ($dn eq '.' || $dn eq '..') {
                         } else {
                                 warn "W: skipping unknown Xapian DB: $old/$dn\n"
                         }
                 }
                 close $dh;
-                my %pids;
-
-                if (@parts) {
-                        my $pid = spawn(['xapian-compact', @parts, "$new/0" ]);
-                        defined $pid or die "compact failed: $?\n";
-                        $pids{$pid} = 'xapian-compact (parts)';
-                } else {
-                        warn "No parts found in $old\n";
-                }
-                if (defined $skel) {
-                        my $pid = spawn(['xapian-compact', $skel, "$new/skel"]);
-                        defined $pid or die "compact failed: $?\n";
-                        $pids{$pid} = 'xapian-compact (skel)';
-                } else {
-                        warn "$old/skel missing\n";
-                }
-                scalar keys %pids or
-                        die "No xapian-compact processes running\n";
-                while (scalar keys %pids) {
-                        my $pid = waitpid(-1, 0);
-                        my $desc = delete $pids{$pid};
-                        die "$desc failed: $?\n" if $?;
-                }
+                die "No Xapian parts found in $old\n" unless @parts;
+                my $cmd = ['xapian-compact', @parts, "$new/0" ];
+                PublicInbox::Import::run_die($cmd);
                 commit_changes($v2w, $old, $new);
         });
 } elsif ($v == 1) {
diff --git a/t/over.t b/t/over.t
new file mode 100644
index 00000000..1d3f9b37
--- /dev/null
+++ b/t/over.t
@@ -0,0 +1,38 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+use File::Temp qw/tempdir/;
+foreach my $mod (qw(DBD::SQLite)) {
+        eval "require $mod";
+        plan skip_all => "$mod missing for over.t" if $@;
+}
+
+use_ok 'PublicInbox::OverIdx';
+my $tmpdir = tempdir('pi-over-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+my $over = PublicInbox::OverIdx->new("$tmpdir/over.sqlite3");
+$over->connect;
+my $x = $over->next_tid;
+is(int($x), $x, 'integer tid');
+my $y = $over->next_tid;
+is($y, $x+1, 'tid increases');
+
+$x = $over->sid('hello-world');
+is(int($x), $x, 'integer sid');
+$y = $over->sid('hello-WORLD');
+is($y, $x+1, 'sid ncreases');
+is($over->sid('hello-world'), $x, 'idempotent');
+$over->disconnect;
+
+$over = PublicInbox::OverIdx->new("$tmpdir/over.sqlite3");
+$over->connect;
+is($over->sid('hello-world'), $x, 'idempotent across reopen');
+$over->each_by_mid('never', sub { fail('should not be called') });
+
+$x = $over->create_ghost('never');
+is(int($x), $x, 'integer tid for ghost');
+$y = $over->create_ghost('NEVAR');
+is($y, $x + 1, 'integer tid for ghost increases');
+
+done_testing();
diff --git a/t/psgi_search.t b/t/psgi_search.t
index 1df38691..60a44bde 100644
--- a/t/psgi_search.t
+++ b/t/psgi_search.t
@@ -30,8 +30,7 @@ EOF
 
 my $num = 0;
 # nb. using internal API, fragile!
-my $xdb = $rw->_xdb_acquire;
-$xdb->begin_transaction;
+$rw->begin_txn_lazy;
 
 foreach (reverse split(/\n\n/, $data)) {
         $_ .= "\n";
@@ -42,8 +41,7 @@ foreach (reverse split(/\n\n/, $data)) {
         ok($doc_id, 'message added: '. $mid);
 }
 
-$xdb->commit_transaction;
-$rw = undef;
+$rw->commit_txn_lazy;
 
 my $cfgpfx = "publicinbox.test";
 my $config = PublicInbox::Config->new({
diff --git a/t/search-thr-index.t b/t/search-thr-index.t
index 9549976d..3ddef809 100644
--- a/t/search-thr-index.t
+++ b/t/search-thr-index.t
@@ -32,8 +32,7 @@ EOF
 
 my $num = 0;
 # nb. using internal API, fragile!
-my $xdb = $rw->_xdb_acquire;
-$xdb->begin_transaction;
+my $xdb = $rw->begin_txn_lazy;
 my @mids;
 
 foreach (reverse split(/\n\n/, $data)) {
@@ -50,10 +49,12 @@ foreach (reverse split(/\n\n/, $data)) {
 
 my $prev;
 foreach my $mid (@mids) {
-        my $res = $rw->get_thread($mid);
+        my $res = $rw->{over}->get_thread($mid);
         is(3, $res->{total}, "got all messages from $mid");
 }
 
+$rw->commit_txn_lazy;
+
 done_testing();
 
 1;
diff --git a/t/search.t b/t/search.t
index 9ab15f77..51adb9fb 100644
--- a/t/search.t
+++ b/t/search.t
@@ -22,9 +22,9 @@ my $ibx = $rw->{-inbox};
 $rw = undef;
 my $ro = PublicInbox::Search->new($git_dir);
 my $rw_commit = sub {
-        $rw->{xdb}->commit_transaction if $rw && $rw->{xdb};
+        $rw->commit_txn_lazy if $rw;
         $rw = PublicInbox::SearchIdx->new($git_dir, 1);
-        $rw->_xdb_acquire->begin_transaction;
+        $rw->begin_txn_lazy;
 };
 
 {
@@ -93,7 +93,6 @@ sub filter_mids {
         ok($found, "message found");
         is($root_id, $found->{doc_id}, 'doc_id set correctly');
         is($found->mid, 'root@s', 'mid set correctly');
-        ok(int($found->thread_id) > 0, 'thread_id is an integer');
 
         my ($res, @res);
         my @exp = sort qw(root@s last@s);
@@ -148,7 +147,13 @@ sub filter_mids {
 
         my $ghost_id = $rw->add_message($was_ghost);
         is($ghost_id, int($ghost_id), "ghost_id is an integer: $ghost_id");
-        ok($ghost_id < $reply_id, "ghost vivified from earlier message");
+        my $msgs = $rw->{over}->get_thread('ghost-message@s')->{msgs};
+        is(scalar(@$msgs), 2, 'got both messages in ghost thread');
+        foreach (qw(sid tid)) {
+                is($msgs->[0]->{$_}, $msgs->[1]->{$_}, "{$_} match");
+        }
+        isnt($msgs->[0]->{num}, $msgs->[1]->{num}, "num do not match");
+        ok($_->{num} > 0, 'positive art num') foreach @$msgs
 }
 
 # search thread on ghost
@@ -400,6 +405,7 @@ sub filter_mids {
         is($txt->{msgs}->[0]->mid, $res->{msgs}->[0]->mid,
                 'search inside text attachments works');
 }
+$rw->commit_txn_lazy;
 
 done_testing();