about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--MANIFEST5
-rw-r--r--lib/PublicInbox/Inbox.pm15
-rw-r--r--lib/PublicInbox/Msgmap.pm1
-rw-r--r--lib/PublicInbox/NNTP.pm29
-rw-r--r--lib/PublicInbox/Over.pm119
-rw-r--r--lib/PublicInbox/OverIdx.pm370
-rw-r--r--lib/PublicInbox/OverIdxFork.pm (renamed from lib/PublicInbox/SearchIdxSkeleton.pm)128
-rw-r--r--lib/PublicInbox/Search.pm109
-rw-r--r--lib/PublicInbox/SearchIdx.pm214
-rw-r--r--lib/PublicInbox/SearchIdxPart.pm16
-rw-r--r--lib/PublicInbox/SearchMsg.pm26
-rw-r--r--lib/PublicInbox/V2Writable.pm78
-rwxr-xr-xscript/public-inbox-compact32
-rw-r--r--t/over.t38
-rw-r--r--t/psgi_search.t6
-rw-r--r--t/search-thr-index.t7
-rw-r--r--t/search.t14
17 files changed, 763 insertions, 444 deletions
diff --git a/MANIFEST b/MANIFEST
index 60e15f28..5fd8acf8 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -82,6 +82,9 @@ lib/PublicInbox/Msgmap.pm
 lib/PublicInbox/NNTP.pm
 lib/PublicInbox/NNTPD.pm
 lib/PublicInbox/NewsWWW.pm
+lib/PublicInbox/Over.pm
+lib/PublicInbox/OverIdx.pm
+lib/PublicInbox/OverIdxFork.pm
 lib/PublicInbox/ParentPipe.pm
 lib/PublicInbox/ProcessPipe.pm
 lib/PublicInbox/Qspawn.pm
@@ -90,7 +93,6 @@ lib/PublicInbox/SaPlugin/ListMirror.pm
 lib/PublicInbox/Search.pm
 lib/PublicInbox/SearchIdx.pm
 lib/PublicInbox/SearchIdxPart.pm
-lib/PublicInbox/SearchIdxSkeleton.pm
 lib/PublicInbox/SearchMsg.pm
 lib/PublicInbox/SearchThread.pm
 lib/PublicInbox/SearchView.pm
@@ -170,6 +172,7 @@ t/msg_iter.t
 t/msgmap.t
 t/nntp.t
 t/nntpd.t
+t/over.t
 t/plack.t
 t/precheck.t
 t/psgi_attach.t
diff --git a/lib/PublicInbox/Inbox.pm b/lib/PublicInbox/Inbox.pm
index 43cf15ba..142b5c89 100644
--- a/lib/PublicInbox/Inbox.pm
+++ b/lib/PublicInbox/Inbox.pm
@@ -319,20 +319,7 @@ sub msg_by_mid ($$;$) {
 
 sub recent {
         my ($self, $opts) = @_;
-        my $qs = '';
-        my $srch = search($self);
-        if (!$opts->{offset}) {
-                # this complicated bit cuts /$INBOX/ loading time by
-                # over 400ms on my system:
-                my ($min, $max) = mm($self)->minmax;
-                my $n = $max - $opts->{limit};
-                $n = $min if $n < $min;
-                for (; $qs eq '' && $n >= $min; --$n) {
-                        my $smsg = $srch->lookup_article($n) or next;
-                        $qs = strftime('d:%Y%m%d..', gmtime($smsg->ts));
-                }
-        }
-        $srch->query($qs, $opts);
+        search($self)->query('', $opts);
 }
 
 1;
diff --git a/lib/PublicInbox/Msgmap.pm b/lib/PublicInbox/Msgmap.pm
index 12833050..dea95731 100644
--- a/lib/PublicInbox/Msgmap.pm
+++ b/lib/PublicInbox/Msgmap.pm
@@ -39,6 +39,7 @@ sub dbh_new {
 
 sub new_file {
         my ($class, $f, $writable) = @_;
+        return if !$writable && !-r $f;
 
         my $dbh = dbh_new($f, $writable);
         my $self = bless { dbh => $dbh }, $class;
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index fb65ddc0..48ab7fc2 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -331,13 +331,11 @@ sub cmd_newnews ($$$$;$$) {
         };
         return '.' unless @srch;
 
-        $ts .= '..';
-        my $opts = { asc => 1, limit => 1000, offset => 0 };
+        my $opts = { limit => 1000, offset => 0 };
         long_response($self, 0, long_response_limit, sub {
                 my ($i) = @_;
                 my $srch = $srch[0];
-                my $res = $srch->query_ts($ts, $opts);
-                my $msgs = $res->{msgs};
+                my $msgs = $srch->query_ts($ts, $opts);
                 if (my $nr = scalar @$msgs) {
                         more($self, '<' .
                                 join(">\r\n<", map { $_->mid } @$msgs ).
@@ -463,7 +461,7 @@ find_mid:
                 defined $mid or return $err;
         }
 found:
-        my $smsg = $ng->search->lookup_article($n) or return $err;
+        my $smsg = $ng->search->{over_ro}->get_art($n) or return $err;
         my $msg = $ng->msg_by_smsg($smsg) or return $err;
         my $s = Email::Simple->new($msg);
         if ($set_headers) {
@@ -692,8 +690,9 @@ sub hdr_xref ($$$) { # optimize XHDR Xref [range] for rtin
 
 sub search_header_for {
         my ($srch, $num, $field) = @_;
-        my $smsg = $srch->lookup_article($num) or return;
-        $smsg->$field;
+        my $smsg = $srch->{over_ro}->get_art($num) or return;
+        return PublicInbox::SearchMsg::date($smsg) if $field eq 'date';
+        $smsg->{$field};
 }
 
 sub hdr_searchmsg ($$$$) {
@@ -714,8 +713,7 @@ sub hdr_searchmsg ($$$$) {
                 my $off = 0;
                 long_response($self, $beg, $end, sub {
                         my ($i) = @_;
-                        my $res = $srch->query_xover($beg, $end, $off);
-                        my $msgs = $res->{msgs};
+                        my $msgs = $srch->query_xover($beg, $end, $off);
                         my $nr = scalar @$msgs or return;
                         $off += $nr;
                         my $tmp = '';
@@ -816,10 +814,10 @@ sub over_line ($$) {
                 $smsg->{subject},
                 $smsg->{from},
                 PublicInbox::SearchMsg::date($smsg),
-                '<'.PublicInbox::SearchMsg::mid($smsg).'>',
+                "<$smsg->{mid}>",
                 $smsg->{references},
-                PublicInbox::SearchMsg::bytes($smsg),
-                PublicInbox::SearchMsg::lines($smsg));
+                $smsg->{bytes},
+                $smsg->{lines});
         utf8::encode($s);
         $s
 }
@@ -829,7 +827,7 @@ sub cmd_over ($;$) {
         if ($range && $range =~ /\A<(.+)>\z/) {
                 my ($ng, $n) = mid_lookup($self, $1);
                 defined $n or return r430;
-                my $smsg = $ng->search->lookup_article($n) or return r430;
+                my $smsg = $ng->search->{over_ro}->get_art($n) or return r430;
                 more($self, '224 Overview information follows (multi-line)');
 
                 # Only set article number column if it's the current group
@@ -853,14 +851,13 @@ sub cmd_xover ($;$) {
         my $off = 0;
         long_response($self, $beg, $end, sub {
                 my ($i) = @_;
-                my $res = $srch->query_xover($beg, $end, $off);
-                my $msgs = $res->{msgs};
+                my $msgs = $srch->query_xover($beg, $end, $off);
                 my $nr = scalar @$msgs or return;
                 $off += $nr;
 
                 # OVERVIEW.FMT
                 more($self, join("\r\n", map {
-                        over_line(PublicInbox::SearchMsg::num($_), $_);
+                        over_line($_->{num}, $_);
                         } @$msgs));
 
                 # -1 to adjust for implicit increment in long_response
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
new file mode 100644
index 00000000..cf7a8849
--- /dev/null
+++ b/lib/PublicInbox/Over.pm
@@ -0,0 +1,119 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI
+# Unlike Msgmap, this is an _UNSTABLE_ database which can be
+# tweaked/updated over time and rebuilt.
+package PublicInbox::Over;
+use strict;
+use warnings;
+use DBI;
+use DBD::SQLite;
+use PublicInbox::SearchMsg;
+use Compress::Zlib qw(uncompress);
+
+sub dbh_new {
+        my ($self) = @_;
+        my $ro = ref($self) eq 'PublicInbox::Over';
+        my $dbh = DBI->connect("dbi:SQLite:dbname=$self->{filename}",'','', {
+                AutoCommit => 1,
+                RaiseError => 1,
+                PrintError => 0,
+                ReadOnly => $ro,
+                sqlite_use_immediate_transaction => 1,
+        });
+        $dbh->{sqlite_unicode} = 1;
+        $dbh;
+}
+
+sub new {
+        my ($class, $f) = @_;
+        bless { filename => $f }, $class;
+}
+
+sub disconnect { $_[0]->{dbh} = undef }
+
+sub connect { $_[0]->{dbh} ||= $_[0]->dbh_new }
+
+sub load_from_row {
+        my ($smsg) = @_;
+        bless $smsg, 'PublicInbox::SearchMsg';
+        if (defined(my $data = delete $smsg->{ddd})) {
+                $data = uncompress($data);
+                utf8::decode($data);
+                $smsg->load_from_data($data);
+        }
+        $smsg
+}
+
+sub do_get {
+        my ($self, $sql, $opts, @args) = @_;
+        my $dbh = $self->connect;
+        my $lim = (($opts->{limit} || 0) + 0) || 1000;
+        my $off = (($opts->{offset} || 0) + 0) || 0;
+        $sql .= "LIMIT $lim OFFSET $off";
+        my $msgs = $dbh->selectall_arrayref($sql, { Slice => {} }, @args);
+        load_from_row($_) for @$msgs;
+        $msgs
+}
+
+sub query_xover {
+        my ($self, $beg, $end, $off) = @_;
+        do_get($self, <<'', { offset => $off }, $beg, $end);
+SELECT * FROM over WHERE num >= ? AND num <= ?
+ORDER BY num ASC
+
+}
+
+sub query_ts {
+        my ($self, $ts, $opts) = @_;
+        do_get($self, <<'', $opts, $ts);
+SELECT * FROM over WHERE num > 0 AND ts >= ?
+ORDER BY ts ASC
+
+}
+
+sub get_thread {
+        my ($self, $mid, $opts) = @_;
+        my $dbh = $self->connect;
+        my ($tid, $sid) = $dbh->selectrow_array(<<'', undef, $mid);
+SELECT tid,sid FROM over
+LEFT JOIN id2num ON over.num = id2num.num
+LEFT JOIN msgid ON id2num.id = msgid.id
+WHERE msgid.mid = ? AND over.num > 0
+LIMIT 1
+
+        my $cond = 'FROM over WHERE (tid = ? OR sid = ?) AND num > 0';
+        my $msgs = do_get($self, <<"", $opts, $tid, $sid);
+SELECT * $cond
+ORDER BY ts ASC
+
+        my $nr = $dbh->selectrow_array(<<"", undef, $tid, $sid);
+SELECT COUNT(num) $cond
+
+        { total => $nr, msgs => $msgs };
+}
+
+sub recent {
+        my ($self, $opts) = @_;
+        my $msgs = do_get($self, <<'', $opts);
+SELECT * FROM over WHERE num > 0
+ORDER BY ts DESC
+
+        my $nr = $self->{dbh}->selectrow_array(<<'');
+SELECT COUNT(num) FROM over WHERE num > 0
+
+        { total => $nr, msgs => $msgs };
+}
+
+sub get_art {
+        my ($self, $num) = @_;
+        my $dbh = $self->connect;
+        my $smsg = $dbh->selectrow_hashref(<<'', undef, $num);
+SELECT * from OVER where num = ? LIMIT 1
+
+        return load_from_row($smsg) if $smsg;
+        undef;
+}
+
+1;
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
new file mode 100644
index 00000000..0e43aabc
--- /dev/null
+++ b/lib/PublicInbox/OverIdx.pm
@@ -0,0 +1,370 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI
+# Unlike Msgmap, this is an _UNSTABLE_ database which can be
+# tweaked/updated over time and rebuilt.
+package PublicInbox::OverIdx;
+use strict;
+use warnings;
+use base qw(PublicInbox::Over);
+use IO::Handle;
+use DBI qw(:sql_types); # SQL_BLOB
+
+sub dbh_new {
+        my ($self) = @_;
+        my $dbh = $self->SUPER::dbh_new;
+        $dbh->do('PRAGMA synchronous = OFF'); # commit_fsync instead
+        $dbh->do('PRAGMA journal_mode = TRUNCATE');
+        $dbh->do('PRAGMA cache_size = 80000');
+        create_tables($dbh);
+        $dbh;
+}
+
+sub commit_fsync {
+        my $fn = $_[0]->{filename};
+        if (open my $fh, '+<', $fn) {
+                $fh->sync;
+                close $fh;
+        }
+}
+
+sub get_counter ($$) {
+        my ($dbh, $key) = @_;
+        my $sth = $dbh->prepare_cached(<<'', undef, 1);
+SELECT val FROM counter WHERE key = ? LIMIT 1
+
+        $sth->execute($key);
+        $sth->fetchrow_array;
+}
+
+sub adj_counter ($$$) {
+        my ($self, $key, $op) = @_;
+        my $dbh = $self->{dbh};
+        my $sth = $dbh->prepare_cached(<<"");
+UPDATE counter SET val = val $op 1 WHERE key = ?
+
+        $sth->execute($key);
+
+        get_counter($dbh, $key);
+}
+
+sub next_tid { adj_counter($_[0], 'thread', '+') }
+sub next_ghost_num { adj_counter($_[0], 'ghost', '-') }
+
+sub id_for ($$$$$) {
+        my ($self, $tbl, $id_col, $val_col, $val) = @_;
+        my $dbh = $self->{dbh};
+        my $in = $dbh->prepare_cached(<<"")->execute($val);
+INSERT OR IGNORE INTO $tbl ($val_col) VALUES (?)
+
+        if ($in == 0) {
+                my $sth = $dbh->prepare_cached(<<"", undef, 1);
+SELECT $id_col FROM $tbl WHERE $val_col = ? LIMIT 1
+
+                $sth->execute($val);
+                $sth->fetchrow_array;
+        } else {
+                $dbh->last_insert_id(undef, undef, $tbl, $id_col);
+        }
+}
+
+sub sid {
+        my ($self, $path) = @_;
+        return unless defined $path && $path ne '';
+        id_for($self, 'subject', 'sid', 'path' => $path);
+}
+
+sub mid2id {
+        my ($self, $mid) = @_;
+        id_for($self, 'msgid', 'id', 'mid' => $mid);
+}
+
+sub delete_by_num {
+        my ($self, $num) = @_;
+        my $dbh = $self->{dbh};
+        foreach (qw(over id2num)) {
+                $dbh->prepare_cached(<<"")->execute($num);
+DELETE FROM $_ WHERE num = ?
+
+        }
+}
+
+# this includes ghosts
+sub each_by_mid {
+        my ($self, $mid, $cols, $cb) = @_;
+        my $dbh = $self->{dbh};
+
+=over
+        I originally wanted to stuff everything into a single query:
+
+        SELECT over.* FROM over
+        LEFT JOIN id2num ON over.num = id2num.num
+        LEFT JOIN msgid ON msgid.id = id2num.id
+        WHERE msgid.mid = ? AND over.num >= ?
+        ORDER BY over.num ASC
+        LIMIT 1000
+
+        But it's faster broken out (and we're always in a
+        transaction for subroutines in this file)
+=cut
+
+        my $sth = $dbh->prepare_cached(<<'', undef, 1);
+SELECT id FROM msgid WHERE mid = ? LIMIT 1
+
+        $sth->execute($mid);
+        my $id = $sth->fetchrow_array;
+        defined $id or return;
+
+        push(@$cols, 'num');
+        $cols = join(',', map { $_ } @$cols);
+        my $lim = 10;
+        my $prev = get_counter($dbh, 'ghost');
+        while (1) {
+                $sth = $dbh->prepare_cached(<<"", undef, 1);
+SELECT num FROM id2num WHERE id = ? AND num >= ?
+ORDER BY num ASC
+LIMIT $lim
+
+                $sth->execute($id, $prev);
+                my $nums = $sth->fetchall_arrayref;
+                my $nr = scalar(@$nums) or return;
+                $prev = $nums->[-1]->[0];
+
+                $sth = $dbh->prepare_cached(<<"", undef, 1);
+SELECT $cols FROM over WHERE over.num = ? LIMIT 1
+
+                foreach (@$nums) {
+                        $sth->execute($_->[0]);
+                        my $smsg = $sth->fetchrow_hashref;
+                        $cb->(PublicInbox::Over::load_from_row($smsg)) or
+                                return;
+                }
+                return if $nr != $lim;
+        }
+}
+
+# this will create a ghost as necessary
+sub resolve_mid_to_tid {
+        my ($self, $mid) = @_;
+        my $tid;
+        each_by_mid($self, $mid, ['tid'], sub {
+                my ($smsg) = @_;
+                my $cur_tid = $smsg->{tid};
+                if (defined $tid) {
+                        merge_threads($self, $tid, $cur_tid);
+                } else {
+                        $tid = $cur_tid;
+                }
+                1;
+        });
+        defined $tid ? $tid : create_ghost($self, $mid);
+}
+
+sub create_ghost {
+        my ($self, $mid) = @_;
+        my $id = $self->mid2id($mid);
+        my $num = $self->next_ghost_num;
+        $num < 0 or die "ghost num is non-negative: $num\n";
+        my $tid = $self->next_tid;
+        my $dbh = $self->{dbh};
+        $dbh->prepare_cached(<<'')->execute($num, $tid);
+INSERT INTO over (num, tid) VALUES (?,?)
+
+        $dbh->prepare_cached(<<'')->execute($id, $num);
+INSERT INTO id2num (id, num) VALUES (?,?)
+
+        $tid;
+}
+
+sub merge_threads {
+        my ($self, $winner_tid, $loser_tid) = @_;
+        return if $winner_tid == $loser_tid;
+        my $dbh = $self->{dbh};
+        $dbh->prepare_cached(<<'')->execute($winner_tid, $loser_tid);
+UPDATE over SET tid = ? WHERE tid = ?
+
+}
+
+sub link_refs {
+        my ($self, $refs, $old_tid) = @_;
+        my $tid;
+
+        if (@$refs) {
+                # first ref *should* be the thread root,
+                # but we can never trust clients to do the right thing
+                my $ref = $refs->[0];
+                $tid = resolve_mid_to_tid($self, $ref);
+                merge_threads($self, $tid, $old_tid) if defined $old_tid;
+
+                # the rest of the refs should point to this tid:
+                foreach my $i (1..$#$refs) {
+                        $ref = $refs->[$i];
+                        my $ptid = resolve_mid_to_tid($self, $ref);
+                        merge_threads($self, $tid, $ptid);
+                }
+        } else {
+                $tid = defined $old_tid ? $old_tid : $self->next_tid;
+        }
+        $tid;
+}
+
+sub add_over {
+        my ($self, $values) = @_;
+        my ($ts, $num, $mids, $refs, $xpath, $ddd) = @$values;
+        my $old_tid;
+        my $vivified = 0;
+
+        $self->begin_lazy;
+        $self->delete_by_num($num);
+        foreach my $mid (@$mids) {
+                my $v = 0;
+                each_by_mid($self, $mid, ['tid'], sub {
+                        my ($cur) = @_;
+                        my $cur_tid = $cur->{tid};
+                        my $n = $cur->{num};
+                        die "num must not be zero for $mid" if !$n;
+                        $old_tid = $cur_tid unless defined $old_tid;
+                        if ($n > 0) { # regular mail
+                                merge_threads($self, $old_tid, $cur_tid);
+                        } elsif ($n < 0) { # ghost
+                                link_refs($self, $refs, $old_tid);
+                                $self->delete_by_num($n);
+                                $v++;
+                        }
+                        1;
+                });
+                $v > 1 and warn "BUG: vivified multiple ($v) ghosts for $mid\n";
+                $vivified += $v;
+        }
+        my $tid = $vivified ? $old_tid : link_refs($self, $refs, $old_tid);
+        my $sid = $self->sid($xpath);
+        my $dbh = $self->{dbh};
+        my $sth = $dbh->prepare_cached(<<'');
+INSERT INTO over (num, tid, sid, ts, ddd)
+VALUES (?,?,?,?,?)
+
+        my $n = 0;
+        my @v = ($num, $tid, $sid, $ts);
+        foreach (@v) { $sth->bind_param(++$n, $_) }
+        $sth->bind_param(++$n, $ddd, SQL_BLOB);
+        $sth->execute;
+        $sth = $dbh->prepare_cached(<<'');
+INSERT INTO id2num (id, num) VALUES (?,?)
+
+        foreach my $mid (@$mids) {
+                my $id = $self->mid2id($mid);
+                $sth->execute($id, $num);
+        }
+}
+
+sub delete_articles {
+        my ($self, $nums) = @_;
+        my $dbh = $self->connect;
+        $self->delete_by_num($_) foreach @$nums;
+}
+
+sub remove_oid {
+        my ($self, $oid, $mid) = @_;
+        $self->begin_lazy;
+        each_by_mid($self, $mid, ['ddd'], sub {
+                my ($smsg) = @_;
+                $self->delete_by_num($smsg->{num}) if $smsg->{blob} eq $oid;
+                1;
+        });
+}
+
+sub create_tables {
+        my ($dbh) = @_;
+
+        $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS over (
+        num INTEGER NOT NULL,
+        tid INTEGER NOT NULL,
+        sid INTEGER,
+        ts INTEGER,
+        ddd VARBINARY, /* doc-data-deflated */
+        UNIQUE (num)
+)
+
+        $dbh->do('CREATE INDEX IF NOT EXISTS idx_tid ON over (tid)');
+        $dbh->do('CREATE INDEX IF NOT EXISTS idx_sid ON over (sid)');
+        $dbh->do('CREATE INDEX IF NOT EXISTS idx_ts ON over (ts)');
+
+        $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS counter (
+        key VARCHAR(8) PRIMARY KEY NOT NULL,
+        val INTEGER DEFAULT 0,
+        UNIQUE (key)
+)
+
+        $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('thread')");
+        $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('ghost')");
+
+        $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS subject (
+        sid INTEGER PRIMARY KEY AUTOINCREMENT,
+        path VARCHAR(40) NOT NULL,
+        UNIQUE (path)
+)
+
+        $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS id2num (
+        id INTEGER NOT NULL,
+        num INTEGER NOT NULL,
+        UNIQUE (id, num)
+)
+
+        # performance critical:
+        $dbh->do('CREATE INDEX IF NOT EXISTS idx_inum ON id2num (num)');
+        $dbh->do('CREATE INDEX IF NOT EXISTS idx_id ON id2num (id)');
+
+        $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS msgid (
+        id INTEGER PRIMARY KEY AUTOINCREMENT,
+        mid VARCHAR(244) NOT NULL,
+        UNIQUE (mid)
+)
+
+}
+
+sub commit_lazy {
+        my ($self) = @_;
+        delete $self->{txn} or return;
+        $self->{dbh}->commit;
+}
+
+sub begin_lazy {
+        my ($self) = @_;
+        return if $self->{txn};
+        my $dbh = $self->connect or return;
+        $dbh->begin_work;
+        # $dbh->{Profile} = 2;
+        $self->{txn} = 1;
+}
+
+sub rollback_lazy {
+        my ($self) = @_;
+        delete $self->{txn} or return;
+        $self->{dbh}->rollback;
+}
+
+sub disconnect {
+        my ($self) = @_;
+        die "in transaction" if $self->{txn};
+        $self->{dbh} = undef;
+}
+
+sub create {
+        my ($self) = @_;
+        unless (-r $self->{filename}) {
+                require File::Path;
+                require File::Basename;
+                File::Path::mkpath(File::Basename::dirname($self->{filename}));
+        }
+        # create the DB:
+        PublicInbox::Over::connect($self);
+        $self->disconnect;
+}
+
+1;
diff --git a/lib/PublicInbox/SearchIdxSkeleton.pm b/lib/PublicInbox/OverIdxFork.pm
index 2be64960..f4f7cddd 100644
--- a/lib/PublicInbox/SearchIdxSkeleton.pm
+++ b/lib/PublicInbox/OverIdxFork.pm
@@ -1,19 +1,16 @@
 # Copyright (C) 2018 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-package PublicInbox::SearchIdxSkeleton;
+package PublicInbox::OverIdxFork;
 use strict;
 use warnings;
-use base qw(PublicInbox::SearchIdx);
+use base qw(PublicInbox::OverIdx PublicInbox::Lock);
 use Storable qw(freeze thaw);
+use IO::Handle;
 
-sub new {
-        my ($class, $v2writable) = @_;
-        my $self = $class->SUPER::new($v2writable->{-inbox}, 1, 'skel');
-        # create the DB:
-        $self->_xdb_acquire;
-        $self->_xdb_release;
+sub create {
+        my ($self, $v2writable) = @_;
+        $self->SUPER::create();
         $self->spawn_worker($v2writable) if $v2writable->{parallel};
-        $self
 }
 
 sub spawn_worker {
@@ -30,33 +27,35 @@ sub spawn_worker {
                 $v2writable = undef;
                 close $w;
                 close $barrier_wait;
-                eval { skeleton_worker_loop($self, $r, $barrier_note) };
-                die "skeleton worker died: $@\n" if $@;
+
+                # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
+                # speeds V2Writable batch imports across 8 cores by nearly 20%
+                fcntl($r, 1031, 1048576) if $^O eq 'linux';
+
+                eval { over_worker_loop($self, $r, $barrier_note) };
+                die "over worker died: $@\n" if $@;
                 exit;
         }
         $self->{w} = $w;
         $self->{pid} = $pid;
+        $self->{lock_path} = "$self->{filename}.pipe.lock";
         close $r;
         close $barrier_note;
         $self->{barrier_wait} = $barrier_wait;
-
         $w->autoflush(1);
-
-        # lock on only exists in parent, not in worker
-        $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock';
 }
 
-sub skeleton_worker_loop {
+sub over_worker_loop {
         my ($self, $r, $barrier_note) = @_;
         $barrier_note->autoflush(1);
-        $0 = 'pi-v2-skeleton';
-        $self->begin_txn_lazy;
+        $0 = 'pi-v2-overview';
+        $self->begin_lazy;
         my $barrier = undef;
         while (my $line = $r->getline) {
                 if ($line eq "commit\n") {
-                        $self->commit_txn_lazy;
+                        $self->commit_lazy;
                 } elsif ($line eq "close\n") {
-                        $self->_xdb_release;
+                        $self->disconnect;
                 } elsif ($line =~ /\Abarrier_init (\d+)\n\z/) {
                         my $n = $1 - 1;
                         die "barrier in-progress\n" if defined $barrier;
@@ -67,30 +66,30 @@ sub skeleton_worker_loop {
                         delete $barrier->{$1} or die "unknown barrier: $part\n";
                         if ((scalar keys %$barrier) == 0) {
                                 $barrier = undef;
-                                $self->commit_txn_lazy;
+                                $self->commit_lazy;
                                 print $barrier_note "barrier_done\n" or die
                                         "print failed to barrier note: $!";
                         }
                 } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.*)\n\z/s) {
                         my ($oid, $mid) = ($1, $2);
-                        $self->begin_txn_lazy;
-                        $self->remove_by_oid($oid, $mid);
+                        $self->remove_oid($oid, $mid);
                 } else {
                         my $len = int($line);
                         my $n = read($r, my $msg, $len) or die "read: $!\n";
                         $n == $len or die "short read: $n != $len\n";
                         $msg = thaw($msg); # should raise on error
                         defined $msg or die "failed to thaw buffer\n";
-                        $self->begin_txn_lazy;
-                        eval { index_skeleton_real($self, $msg) };
+                        eval { add_over($self, $msg) };
                         warn "failed to index message <$msg->[-1]>: $@\n" if $@;
                 }
         }
-        $self->worker_done;
+        die "$$ $0 dbh not released\n" if $self->{dbh};
+        die "$$ $0 still in transaction\n" if $self->{txn};
 }
 
 # called by a partition worker
-sub index_skeleton {
+# values: [ DS, NUM, BYTES, LINES, TS, MIDS, XPATH, doc_data ]
+sub add_over {
         my ($self, $values) = @_;
         if (my $w = $self->{w}) {
                 my $err;
@@ -106,32 +105,21 @@ sub index_skeleton {
 
                 die "print failed: $err\n" if $err;
         } else {
-                $self->begin_txn_lazy;
-                index_skeleton_real($self, $values);
+                $self->SUPER::add_over($values);
         }
 }
 
-sub remote_remove {
+sub remove_oid {
         my ($self, $oid, $mid) = @_;
-        my $err;
-        $self->lock_acquire;
-        eval { $self->SUPER::remote_remove($oid, $mid) };
-        $err = $@;
-        $self->lock_release;
-        die $err if $err;
-}
-
-sub index_skeleton_real ($$) {
-        my ($self, $values) = @_;
-        my ($ts, $num, $mids, $xpath, $doc_data) = @$values;
-        my $smsg = PublicInbox::SearchMsg->new(undef);
-        $smsg->load_from_data($doc_data);
-        my $doc = $smsg->{doc};
-        $doc->set_data($doc_data);
-        PublicInbox::SearchIdx::add_values($doc, $ts, $smsg->ds, $num);
-        my @refs = ($smsg->references =~ /<([^>]+)>/g);
-        $self->delete_article($num) if defined $num; # for reindexing
-        $self->link_and_save($doc, $mids, \@refs, $num, $xpath);
+        if (my $w = $self->{w}) {
+                my $err;
+                $self->lock_acquire;
+                print $w "D $oid $mid\n" or $err = $!;
+                $self->lock_release;
+                die $err if $err;
+        } else {
+                $self->SUPER::remove_oid($oid, $mid); # OverIdx
+        }
 }
 
 # write to the subprocess
@@ -140,7 +128,7 @@ sub barrier_init {
         my $w = $self->{w} or return;
         my $err;
         $self->lock_acquire;
-        print $w "barrier_init $nparts\n" or $err = "failed to write: $!\n";
+        print $w "barrier_init $nparts\n" or $err = $!;
         $self->lock_release;
         die $err if $err;
 }
@@ -152,4 +140,44 @@ sub barrier_wait {
         $l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n";
 }
 
+sub remote_commit {
+        my ($self) = @_;
+        if (my $w = $self->{w}) {
+                my $err;
+                $self->lock_acquire;
+                print $w "commit\n" or $err = $!;
+                $self->lock_release;
+                die $err if $err;
+        } else {
+                $self->commit_lazy;
+        }
+}
+
+# prevent connections when using forked subprocesses
+sub connect {
+        my ($self) = @_;
+        return if $self->{w};
+        $self->SUPER::connect;
+}
+
+sub remote_close {
+        my ($self) = @_;
+        if (my $w = delete $self->{w}) {
+                my $pid = delete $self->{pid} or die "no process to wait on\n";
+                print $w "close\n" or die "failed to write to pid:$pid: $!\n";
+                close $w or die "failed to close pipe for pid:$pid: $!\n";
+                waitpid($pid, 0) == $pid or die "remote process did not finish";
+                $? == 0 or die ref($self)." pid:$pid exited with: $?";
+        } else {
+                die "transaction in progress $self\n" if $self->{txn};
+                $self->disconnect;
+        }
+}
+
+sub commit_fsync {
+        my ($self) = @_;
+        return if $self->{w}; # don't bother; main parent can also call this
+        $self->SUPER::commit_fsync;
+}
+
 1;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index ca389e32..91251246 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -10,12 +10,12 @@ use warnings;
 # values for searching
 use constant TS => 0;  # Received: header in Unix time
 use constant YYYYMMDD => 1; # for searching in the WWW UI
-use constant NUM => 2; # NNTP article number
 
 use Search::Xapian qw/:standard/;
 use PublicInbox::SearchMsg;
 use PublicInbox::MIME;
 use PublicInbox::MID qw/id_compress/;
+use PublicInbox::Over;
 
 # This is English-only, everything else is non-standard and may be confused as
 # a prefix common in patch emails
@@ -40,19 +40,13 @@ use constant {
         # 13 - fix threading for empty References/In-Reply-To
         #      (commit 83425ef12e4b65cdcecd11ddcb38175d4a91d5a0)
         # 14 - fix ghost root vivification
-        SCHEMA_VERSION => 14,
+        SCHEMA_VERSION => 15,
 
         # n.b. FLAG_PURE_NOT is expensive not suitable for a public website
         # as it could become a denial-of-service vector
         QP_FLAGS => FLAG_PHRASE|FLAG_BOOLEAN|FLAG_LOVEHATE|FLAG_WILDCARD,
 };
 
-# setup prefixes
-my %bool_pfx_internal = (
-        type => 'T', # "mail" or "ghost"
-        thread => 'G', # newsGroup (or similar entity - e.g. a web forum name)
-);
-
 my %bool_pfx_external = (
         mid => 'Q', # Message-ID (full/exact), this is mostly uniQue
 );
@@ -116,8 +110,6 @@ EOF
 );
 chomp @HELP;
 
-my $mail_query = Search::Xapian::Query->new('T' . 'mail');
-
 sub xdir {
         my ($self) = @_;
         if ($self->{version} == 1) {
@@ -143,8 +135,9 @@ sub new {
                 altid => $altid,
                 version => $version,
         }, $class;
+        my $dir;
         if ($version >= 2) {
-                my $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION;
+                $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION;
                 my $xdb;
                 my $parts = 0;
                 foreach my $part (<$dir/*>) {
@@ -158,55 +151,36 @@ sub new {
                         }
                 }
                 $self->{xdb} = $xdb;
-                $self->{skel} = Search::Xapian::Database->new("$dir/skel");
         } else {
-                $self->{xdb} = Search::Xapian::Database->new($self->xdir);
+                $dir = $self->xdir;
+                $self->{xdb} = Search::Xapian::Database->new($dir);
         }
+        $self->{over_ro} = PublicInbox::Over->new("$dir/over.sqlite3");
         $self;
 }
 
 sub reopen {
         my ($self) = @_;
         $self->{xdb}->reopen;
-        if (my $skel = $self->{skel}) {
-                $skel->reopen;
-        }
         $self; # make chaining easier
 }
 
 # read-only
 sub query {
         my ($self, $query_string, $opts) = @_;
-        my $query;
-
         $opts ||= {};
-        unless ($query_string eq '') {
-                $query = $self->qp->parse_query($query_string, QP_FLAGS);
+        if ($query_string eq '' && !$opts->{mset}) {
+                $self->{over_ro}->recent($opts);
+        } else {
+                my $query = $self->qp->parse_query($query_string, QP_FLAGS);
                 $opts->{relevance} = 1 unless exists $opts->{relevance};
+                _do_enquire($self, $query, $opts);
         }
-
-        _do_enquire($self, $query, $opts);
 }
 
 sub get_thread {
         my ($self, $mid, $opts) = @_;
-        my $smsg = first_smsg_by_mid($self, $mid) or
-                        return { total => 0, msgs => [] };
-        my $qtid = Search::Xapian::Query->new('G' . $smsg->thread_id);
-        my $path = $smsg->path;
-        if (defined $path && $path ne '') {
-                my $path = id_compress($smsg->path);
-                my $qsub = Search::Xapian::Query->new('XPATH' . $path);
-                $qtid = Search::Xapian::Query->new(OP_OR, $qtid, $qsub);
-        }
-        $opts ||= {};
-        $opts->{limit} ||= 1000;
-
-        # always sort threads by timestamp, this makes life easier
-        # for the threading algorithm (in SearchThread.pm)
-        $opts->{asc} = 1;
-        $opts->{enquire} = enquire_skel($self);
-        _do_enquire($self, $qtid, $opts);
+        $self->{over_ro}->get_thread($mid, $opts);
 }
 
 sub retry_reopen {
@@ -235,19 +209,13 @@ sub _do_enquire {
 
 sub _enquire_once {
         my ($self, $query, $opts) = @_;
-        my $enquire = $opts->{enquire} || enquire($self);
-        if (defined $query) {
-                $query = Search::Xapian::Query->new(OP_AND,$query,$mail_query);
-        } else {
-                $query = $mail_query;
-        }
+        my $enquire = enquire($self);
+        $query = Search::Xapian::Query->new(OP_AND,$query);
         $enquire->set_query($query);
         $opts ||= {};
         my $desc = !$opts->{asc};
         if ($opts->{relevance}) {
                 $enquire->set_sort_by_relevance_then_value(TS, $desc);
-        } elsif ($opts->{num}) {
-                $enquire->set_sort_by_value(NUM, 0);
         } else {
                 $enquire->set_sort_by_value_then_relevance(TS, $desc);
         }
@@ -309,39 +277,15 @@ EOF
         $self->{query_parser} = $qp;
 }
 
-sub num_range_processor {
-        $_[0]->{nrp} ||= Search::Xapian::NumberValueRangeProcessor->new(NUM);
-}
-
 # only used for NNTP server
 sub query_xover {
         my ($self, $beg, $end, $offset) = @_;
-        my $qp = Search::Xapian::QueryParser->new;
-        $qp->set_database($self->{skel} || $self->{xdb});
-        $qp->add_valuerangeprocessor($self->num_range_processor);
-        my $query = $qp->parse_query("$beg..$end", QP_FLAGS);
-
-        my $opts = {
-                enquire => enquire_skel($self),
-                num => 1,
-                limit => 200,
-                offset => $offset,
-        };
-        _do_enquire($self, $query, $opts);
+        $self->{over_ro}->query_xover($beg, $end, $offset);
 }
 
 sub query_ts {
-        my ($self, $ts, $opts) = @_;
-        my $qp = $self->{qp_ts} ||= eval {
-                my $q = Search::Xapian::QueryParser->new;
-                $q->set_database($self->{skel} || $self->{xdb});
-                $q->add_valuerangeprocessor(
-                        Search::Xapian::NumberValueRangeProcessor->new(TS));
-                $q
-        };
-        my $query = $qp->parse_query($ts, QP_FLAGS);
-        $opts->{enquire} = enquire_skel($self);
-        _do_enquire($self, $query, $opts);
+        my ($self, $ts, $offset) = @_;
+        $self->{over_ro}->query_ts($ts, $offset);
 }
 
 sub first_smsg_by_mid {
@@ -356,7 +300,7 @@ sub first_smsg_by_mid {
 sub lookup_article {
         my ($self, $num) = @_;
         my $term = 'XNUM'.$num;
-        my $db = $self->{skel} || $self->{xdb};
+        my $db = $self->{xdb};
         retry_reopen($self, sub {
                 my $head = $db->postlist_begin($term);
                 my $tail = $db->postlist_end($term);
@@ -365,9 +309,7 @@ sub lookup_article {
                 return unless defined $doc_id;
                 $head->inc;
                 if ($head->nequal($tail)) {
-                        my $loc= $self->{mainrepo} .
-                                ($self->{skel} ? 'skel' : 'xdb');
-                        warn "article #$num is not unique in $loc\n";
+                        warn "article #$num is not unique\n";
                 }
                 # raises on error:
                 my $doc = $db->get_document($doc_id);
@@ -381,7 +323,7 @@ sub each_smsg_by_mid {
         my ($self, $mid, $cb) = @_;
         # XXX retry_reopen isn't necessary for V2Writable, but the PSGI
         # interface will need it...
-        my $db = $self->{skel} || $self->{xdb};
+        my $db = $self->{xdb};
         my $term = 'Q' . $mid;
         my $head = $db->postlist_begin($term);
         my $tail = $db->postlist_end($term);
@@ -424,15 +366,6 @@ sub enquire {
         $self->{enquire} ||= Search::Xapian::Enquire->new($self->{xdb});
 }
 
-sub enquire_skel {
-        my ($self) = @_;
-        if (my $skel = $self->{skel}) {
-                $self->{enquire_skel} ||= Search::Xapian::Enquire->new($skel);
-        } else {
-                enquire($self);
-        }
-}
-
 sub help {
         my ($self) = @_;
         $self->qp; # parse altids
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 2e0b9a43..3412a615 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -16,10 +16,12 @@ use PublicInbox::MID qw/mid_clean id_compress mid_mime mids references/;
 use PublicInbox::MsgIter;
 use Carp qw(croak);
 use POSIX qw(strftime);
+use PublicInbox::OverIdx;
 require PublicInbox::Git;
+use Compress::Zlib qw(compress);
 
 use constant {
-        BATCH_BYTES => 1_000_000,
+        BATCH_BYTES => 10_000_000,
         DEBUG => !!$ENV{DEBUG},
 };
 
@@ -73,12 +75,13 @@ sub new {
         $ibx->umask_prepare;
         if ($version == 1) {
                 $self->{lock_path} = "$mainrepo/ssoma.lock";
+                my $dir = $self->xdir;
+                $self->{over} = PublicInbox::OverIdx->new("$dir/over.sqlite3");
         } elsif ($version == 2) {
                 defined $part or die "partition is required for v2\n";
-                # partition is a number or "all"
+                # partition is a number
                 $self->{partition} = $part;
                 $self->{lock_path} = undef;
-                $self->{msgmap_path} = "$mainrepo/msgmap.sqlite3";
         } else {
                 die "unsupported inbox version=$version\n";
         }
@@ -114,14 +117,6 @@ sub add_val ($$$) {
         $doc->add_value($col, $num);
 }
 
-sub add_values {
-        my ($doc, $ts, $ds, $num) = @_;
-        add_val($doc, PublicInbox::Search::TS, $ts);
-        my $yyyymmdd = strftime('%Y%m%d', gmtime($ds));
-        add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd);
-        defined($num) and add_val($doc, PublicInbox::Search::NUM, $num);
-}
-
 sub index_users ($$) {
         my ($tg, $smsg) = @_;
 
@@ -269,8 +264,11 @@ sub add_message {
         my ($self, $mime, $bytes, $num, $oid, $mid0) = @_;
         my $doc_id;
         my $mids = mids($mime->header_obj);
-        my $skel = $self->{skeleton};
-
+        $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility
+        unless (defined $num) { # v1
+                my $mm = $self->_msgmap_init;
+                $num = $mm->mid_insert($mid0) || $mm->num_for($mid0);
+        }
         eval {
                 my $smsg = PublicInbox::SearchMsg->new($mime);
                 my $doc = $smsg->{doc};
@@ -281,11 +279,12 @@ sub add_message {
                         $xpath = id_compress($xpath);
                 }
 
-                my $lines = $mime->body_raw =~ tr!\n!\n!;
                 $smsg->{lines} = $mime->body_raw =~ tr!\n!\n!;
                 defined $bytes or $bytes = length($mime->as_string);
                 $smsg->{bytes} = $bytes;
-                add_values($doc, $smsg->ts, $smsg->ds, $num);
+                add_val($doc, PublicInbox::Search::TS(), $smsg->ts);
+                my $yyyymmdd = strftime('%Y%m%d', gmtime($smsg->ds));
+                add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd);
 
                 my $tg = $self->term_generator;
 
@@ -336,7 +335,6 @@ sub add_message {
 
                 # populates smsg->references for smsg->to_doc_data
                 my $refs = parse_references($smsg);
-                $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility
                 my $data = $smsg->to_doc_data($oid, $mid0);
                 foreach my $mid (@$mids) {
                         $tg->index_text($mid, 1, 'XM');
@@ -354,16 +352,14 @@ sub add_message {
                 }
 
                 $self->delete_article($num) if defined $num; # for reindexing
-                if ($skel) {
-                        my @vals = ($smsg->ts, $num, $mids, $xpath, $data);
-                        $skel->index_skeleton(\@vals);
-                        $doc->add_boolean_term('Q' . $_) foreach @$mids;
-                        $doc->add_boolean_term('XNUM' . $num) if defined $num;
-                        $doc_id = $self->{xdb}->add_document($doc);
-                } else {
-                        $doc_id = link_and_save($self, $doc, $mids, $refs,
-                                                $num, $xpath);
-                }
+
+                utf8::encode($data);
+                $data = compress($data);
+                my @vals = ($smsg->ts, $num, $mids, $refs, $xpath, $data);
+                $self->{over}->add_over(\@vals);
+                $doc->add_boolean_term('Q' . $_) foreach @$mids;
+                $doc->add_boolean_term('XNUM' . $num) if defined $num;
+                $doc_id = $self->{xdb}->add_document($doc);
         };
 
         if ($@) {
@@ -439,14 +435,19 @@ sub remove_by_oid {
         # there is only ONE element in @delete unless we
         # have bugs in our v2writable deduplication check
         my @delete;
+        my @over_del;
         for (; $head != $tail; $head->inc) {
                 my $docid = $head->get_docid;
                 my $doc = $db->get_document($docid);
                 my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid);
                 $smsg->load_expand;
-                push(@delete, $docid) if $smsg->{blob} eq $oid;
+                if ($smsg->{blob} eq $oid) {
+                        push(@delete, $docid);
+                        push(@over_del, $smsg->num);
+                }
         }
         $db->delete_document($_) foreach @delete;
+        $self->{over}->remove_oid($oid, $mid);
         scalar(@delete);
 }
 
@@ -462,18 +463,6 @@ sub term_generator { # write-only
         $self->{term_generator} = $tg;
 }
 
-# increments last_thread_id counter
-# returns a 64-bit integer represented as a decimal string
-sub next_thread_id {
-        my ($self) = @_;
-        my $db = $self->{xdb};
-        my $last_thread_id = int($db->get_metadata('last_thread_id') || 0);
-
-        $db->set_metadata('last_thread_id', ++$last_thread_id);
-
-        $last_thread_id;
-}
-
 sub parse_references ($) {
         my ($smsg) = @_;
         my $mime = $smsg->{mime};
@@ -496,71 +485,6 @@ sub parse_references ($) {
         \@keep;
 }
 
-sub link_doc {
-        my ($self, $doc, $refs, $old_tid) = @_;
-        my $tid;
-
-        if (@$refs) {
-                # first ref *should* be the thread root,
-                # but we can never trust clients to do the right thing
-                my $ref = shift @$refs;
-                $tid = resolve_mid_to_tid($self, $ref);
-                merge_threads($self, $tid, $old_tid) if defined $old_tid;
-
-                # the rest of the refs should point to this tid:
-                foreach $ref (@$refs) {
-                        my $ptid = resolve_mid_to_tid($self, $ref);
-                        merge_threads($self, $tid, $ptid);
-                }
-        } else {
-                $tid = defined $old_tid ? $old_tid : $self->next_thread_id;
-        }
-        $doc->add_boolean_term('G' . $tid);
-        $tid;
-}
-
-sub link_and_save {
-        my ($self, $doc, $mids, $refs, $num, $xpath) = @_;
-        my $db = $self->{xdb};
-        my $old_tid;
-        my $doc_id;
-        $doc->add_boolean_term('XNUM' . $num) if defined $num;
-        $doc->add_boolean_term('XPATH' . $xpath) if defined $xpath;
-        $doc->add_boolean_term('Q' . $_) foreach @$mids;
-
-        $self->{skel} and die "Should not have read-only skel here\n";;
-        foreach my $mid (@$mids) {
-                my $vivified = 0;
-                $self->each_smsg_by_mid($mid, sub {
-                        my ($cur) = @_;
-                        my $type = $cur->type;
-                        my $cur_tid = $cur->thread_id;
-                        $old_tid = $cur_tid unless defined $old_tid;
-                        if ($type eq 'mail') {
-                                # do not break existing mail messages,
-                                # just merge the threads
-                                merge_threads($self, $old_tid, $cur_tid);
-                                return 1;
-                        }
-                        if ($type ne 'ghost') {
-                                die "<$mid> has a bad type: $type\n";
-                        }
-                        my $tid = link_doc($self, $doc, $refs, $old_tid);
-                        $old_tid = $tid unless defined $old_tid;
-                        $doc_id = $cur->{doc_id};
-                        $self->{xdb}->replace_document($doc_id, $doc);
-                        ++$vivified;
-                        1;
-                });
-                $vivified > 1 and warn
-                        "BUG: vivified multiple ($vivified) ghosts for $mid\n";
-        }
-        # not really important, but we return any vivified ghost docid, here:
-        return $doc_id if defined $doc_id;
-        link_doc($self, $doc, $refs, $old_tid);
-        $self->{xdb}->add_document($doc);
-}
-
 sub index_git_blob_id {
         my ($doc, $pfx, $objid) = @_;
 
@@ -675,14 +599,10 @@ sub rlog {
 
 sub _msgmap_init {
         my ($self) = @_;
+        die "BUG: _msgmap_init is only for v1\n" if $self->{version} != 1;
         $self->{mm} ||= eval {
                 require PublicInbox::Msgmap;
-                my $msgmap_path = $self->{msgmap_path};
-                if (defined $msgmap_path) { # v2
-                        PublicInbox::Msgmap->new_file($msgmap_path, 1);
-                } else {
-                        PublicInbox::Msgmap->new($self->{mainrepo}, 1);
-                }
+                PublicInbox::Msgmap->new($self->{mainrepo}, 1);
         };
 }
 
@@ -699,8 +619,7 @@ sub _index_sync {
         my $reindex = $opts->{reindex};
         my ($mkey, $last_commit, $lx, $xlog);
         $self->{git}->batch_prepare;
-        my $xdb = _xdb_acquire($self);
-        $xdb->begin_transaction;
+        my $xdb = $self->begin_txn_lazy;
         do {
                 $xlog = undef;
                 $mkey = 'last_commit';
@@ -710,6 +629,9 @@ sub _index_sync {
                         $lx = '';
                         $mkey = undef if $last_commit ne '';
                 }
+                $self->{over}->rollback_lazy;
+                $self->{over}->disconnect;
+                delete $self->{txn};
                 $xdb->cancel_transaction;
                 $xdb = _xdb_release($self);
 
@@ -717,8 +639,7 @@ sub _index_sync {
                 my $range = $lx eq '' ? $tip : "$lx..$tip";
                 $xlog = _git_log($self, $range);
 
-                $xdb = _xdb_acquire($self);
-                $xdb->begin_transaction;
+                $xdb = $self->begin_txn_lazy;
         } while ($xdb->get_metadata('last_commit') ne $last_commit);
 
         my $mm = _msgmap_init($self);
@@ -732,14 +653,12 @@ sub _index_sync {
                 }
                 if (!$mm_only) {
                         $xdb->set_metadata($mkey, $commit) if $mkey && $commit;
-                        $xdb->commit_transaction;
-                        $xdb = _xdb_release($self);
+                        $self->commit_txn_lazy;
                 }
                 # let another process do some work... <
                 if ($more) {
                         if (!$mm_only) {
-                                $xdb = _xdb_acquire($self);
-                                $xdb->begin_transaction;
+                                $xdb = $self->begin_txn_lazy;
                         }
                         $dbh->begin_work if $dbh;
                 }
@@ -779,71 +698,19 @@ sub _index_sync {
         }
 }
 
-# this will create a ghost as necessary
-sub resolve_mid_to_tid {
-        my ($self, $mid) = @_;
-        my $tid;
-        $self->each_smsg_by_mid($mid, sub {
-                my ($smsg) = @_;
-                my $cur_tid = $smsg->thread_id;
-                if (defined $tid) {
-                        merge_threads($self, $tid, $cur_tid);
-                } else {
-                        $tid = $smsg->thread_id;
-                }
-                1;
-        });
-        return $tid if defined $tid;
-
-        $self->create_ghost($mid)->thread_id;
-}
-
-sub create_ghost {
-        my ($self, $mid) = @_;
-
-        my $tid = $self->next_thread_id;
-        my $doc = Search::Xapian::Document->new;
-        $doc->add_boolean_term('Q' . $mid);
-        $doc->add_boolean_term('G' . $tid);
-        $doc->add_boolean_term('T' . 'ghost');
-
-        my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid);
-        $self->{xdb}->add_document($doc);
-
-        $smsg;
-}
-
-sub merge_threads {
-        my ($self, $winner_tid, $loser_tid) = @_;
-        return if $winner_tid == $loser_tid;
-        my $db = $self->{xdb};
-        batch_do($self, 'G' . $loser_tid, sub {
-                my ($ids) = @_;
-                foreach my $docid (@$ids) {
-                        my $doc = $db->get_document($docid);
-                        $doc->remove_term('G' . $loser_tid);
-                        $doc->add_boolean_term('G' . $winner_tid);
-                        $db->replace_document($docid, $doc);
-                }
-        });
-}
-
 sub DESTROY {
         # order matters for unlocking
         $_[0]->{xdb} = undef;
         $_[0]->{lockfh} = undef;
 }
 
-# remote_* subs are only used by SearchIdxPart and SearchIdxSkeleton
+# remote_* subs are only used by SearchIdxPart
 sub remote_commit {
         my ($self) = @_;
         if (my $w = $self->{w}) {
                 print $w "commit\n" or die "failed to write commit: $!";
         } else {
                 $self->commit_txn_lazy;
-                if (my $skel = $self->{skeleton}) {
-                        $skel->commit_txn_lazy;
-                }
         }
 }
 
@@ -864,7 +731,7 @@ sub remote_close {
 sub remote_remove {
         my ($self, $oid, $mid) = @_;
         if (my $w = $self->{w}) {
-                # triggers remove_by_oid in partition or skeleton
+                # triggers remove_by_oid in a partition
                 print $w "D $oid $mid\n" or die "failed to write remove $!";
         } else {
                 $self->begin_txn_lazy;
@@ -876,14 +743,17 @@ sub begin_txn_lazy {
         my ($self) = @_;
         return if $self->{txn};
         my $xdb = $self->{xdb} || $self->_xdb_acquire;
+        $self->{over}->begin_lazy;
         $xdb->begin_transaction;
         $self->{txn} = 1;
+        $xdb;
 }
 
 sub commit_txn_lazy {
         my ($self) = @_;
         delete $self->{txn} or return;
         $self->{xdb}->commit_transaction;
+        $self->{over}->commit_lazy;
 }
 
 sub worker_done {
diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm
index 82f5c1bc..e5766a82 100644
--- a/lib/PublicInbox/SearchIdxPart.pm
+++ b/lib/PublicInbox/SearchIdxPart.pm
@@ -6,12 +6,12 @@ use warnings;
 use base qw(PublicInbox::SearchIdx);
 
 sub new {
-        my ($class, $v2writable, $part, $skel) = @_;
+        my ($class, $v2writable, $part) = @_;
         my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part);
-        $self->{skeleton} = $skel;
-        # create the DB:
+        # create the DB before forking:
         $self->_xdb_acquire;
         $self->_xdb_release;
+        $self->{over} = $v2writable->{over};
         $self->spawn_worker($v2writable, $part) if $v2writable->{parallel};
         $self;
 }
@@ -27,7 +27,7 @@ sub spawn_worker {
         if ($pid == 0) {
                 $v2writable->atfork_child;
                 $v2writable = undef;
-                close $w;
+                close $w or die "failed to close: $!";
 
                 # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
                 # speeds V2Writable batch imports across 8 cores by nearly 20%
@@ -40,7 +40,7 @@ sub spawn_worker {
         }
         $self->{pid} = $pid;
         $self->{w} = $w;
-        close $r;
+        close $r or die "failed to close: $!";
 }
 
 sub partition_worker_loop ($$$) {
@@ -50,13 +50,12 @@ sub partition_worker_loop ($$$) {
         while (my $line = $r->getline) {
                 if ($line eq "commit\n") {
                         $self->commit_txn_lazy;
-                        $self->{skeleton}->remote_commit;
                 } elsif ($line eq "close\n") {
                         $self->_xdb_release;
                 } elsif ($line eq "barrier\n") {
                         $self->commit_txn_lazy;
-                        print { $self->{skeleton}->{w} } "barrier $part\n" or
-                                        die "write failed to skeleton: $!\n";
+                        print { $self->{over}->{w} } "barrier $part\n" or
+                                        die "write failed to overview $!\n";
                 } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) {
                         my ($oid, $mid) = ($1, $2);
                         $self->begin_txn_lazy;
@@ -101,7 +100,6 @@ sub remote_barrier {
                 $w->flush or die "failed to flush: $!";
         } else {
                 $self->commit_txn_lazy;
-                $self->{skeleton}->remote_commit;
         }
 }
 
diff --git a/lib/PublicInbox/SearchMsg.pm b/lib/PublicInbox/SearchMsg.pm
index f5510b8e..6c0780e5 100644
--- a/lib/PublicInbox/SearchMsg.pm
+++ b/lib/PublicInbox/SearchMsg.pm
@@ -13,9 +13,7 @@ use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
 sub new {
         my ($class, $mime) = @_;
         my $doc = Search::Xapian::Document->new;
-        $doc->add_boolean_term('T' . 'mail');
-
-        bless { type => 'mail', doc => $doc, mime => $mime }, $class;
+        bless { doc => $doc, mime => $mime }, $class;
 }
 
 sub wrap {
@@ -52,6 +50,7 @@ sub to_doc_data {
         );
 }
 
+
 sub load_from_data ($$) {
         my ($self) = $_[0]; # data = $_[1]
         (
@@ -187,26 +186,9 @@ sub mid ($;$) {
 
 sub _extract_mid { mid_clean(mid_mime($_[0]->{mime})) }
 
-sub thread_id {
-        my ($self) = @_;
-        my $tid = $self->{thread};
-        return $tid if defined $tid;
-        $self->{thread} = _get_term_val($self, 'G', qr/\AG/); # *G*roup
-}
+sub tid { $_[0]->{tid} }
 
 # XXX: consider removing this, we can phrase match subject
-sub path {
-        my ($self) = @_;
-        my $path = $self->{path};
-        return $path if defined $path;
-        $self->{path} = _get_term_val($self, 'XPATH', qr/\AXPATH/); # path
-}
-
-sub type {
-        my ($self) = @_;
-        my $type = $self->{type};
-        return $type if defined $type;
-        $self->{type} = _get_term_val($self, 'T', qr/\AT/);
-}
+sub path { $_[0]->{path} }
 
 1;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 51723e55..8e3122ab 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -7,13 +7,15 @@ use strict;
 use warnings;
 use base qw(PublicInbox::Lock);
 use PublicInbox::SearchIdxPart;
-use PublicInbox::SearchIdxSkeleton;
 use PublicInbox::MIME;
 use PublicInbox::Git;
 use PublicInbox::Import;
 use PublicInbox::MID qw(mids);
 use PublicInbox::ContentId qw(content_id content_digest);
 use PublicInbox::Inbox;
+use PublicInbox::OverIdxFork;
+use PublicInbox::Msgmap;
+use IO::Handle;
 
 # an estimate of the post-packed size to the raw uncompressed size
 my $PACKING_FACTOR = 0.4;
@@ -57,6 +59,7 @@ sub new {
                 partitions => $nparts,
                 parallel => 1,
                 transact_bytes => 0,
+                over => PublicInbox::OverIdxFork->new("$xpfx/over.sqlite3"),
                 lock_path => "$dir/inbox.lock",
                 # limit each repo to 1GB or so
                 rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
@@ -117,7 +120,7 @@ sub num_for {
         my $mids = mids($mime->header_obj);
         if (@$mids) {
                 my $mid = $mids->[0];
-                my $num = $self->{skel}->{mm}->mid_insert($mid);
+                my $num = $self->{mm}->mid_insert($mid);
                 if (defined $num) { # common case
                         $$mid0 = $mid;
                         return $num;
@@ -140,7 +143,7 @@ sub num_for {
                 # try the rest of the mids
                 for(my $i = $#$mids; $i >= 1; $i--) {
                         my $m = $mids->[$i];
-                        $num = $self->{skel}->{mm}->mid_insert($m);
+                        $num = $self->{mm}->mid_insert($m);
                         if (defined $num) {
                                 warn "alternative <$m> for <$mid> found\n";
                                 $$mid0 = $m;
@@ -158,20 +161,20 @@ sub num_for_harder {
         my $hdr = $mime->header_obj;
         my $dig = content_digest($mime);
         $$mid0 = PublicInbox::Import::digest2mid($dig);
-        my $num = $self->{skel}->{mm}->mid_insert($$mid0);
+        my $num = $self->{mm}->mid_insert($$mid0);
         unless (defined $num) {
                 # it's hard to spoof the last Received: header
                 my @recvd = $hdr->header_raw('Received');
                 $dig->add("Received: $_") foreach (@recvd);
                 $$mid0 = PublicInbox::Import::digest2mid($dig);
-                $num = $self->{skel}->{mm}->mid_insert($$mid0);
+                $num = $self->{mm}->mid_insert($$mid0);
 
                 # fall back to a random Message-ID and give up determinism:
                 until (defined($num)) {
                         $dig->add(rand);
                         $$mid0 = PublicInbox::Import::digest2mid($dig);
                         warn "using random Message-ID <$$mid0> as fallback\n";
-                        $num = $self->{skel}->{mm}->mid_insert($$mid0);
+                        $num = $self->{mm}->mid_insert($$mid0);
                 }
         }
         PublicInbox::Import::append_mid($hdr, $$mid0);
@@ -194,13 +197,11 @@ sub idx_init {
         # frequently activated.
         delete $ibx->{$_} foreach (qw(git mm search));
 
+        my $over = $self->{over};
         $ibx->umask_prepare;
         $ibx->with_umask(sub {
                 $self->lock_acquire;
-
-                # first time initialization, first we create the skeleton pipe:
-                my $skel = PublicInbox::SearchIdxSkeleton->new($self);
-                $self->{skel} = $skel;
+                $over->create($self);
 
                 # need to create all parts before initializing msgmap FD
                 my $max = $self->{partitions} - 1;
@@ -208,12 +209,14 @@ sub idx_init {
                 # idx_parts must be visible to all forked processes
                 my $idx = $self->{idx_parts} = [];
                 for my $i (0..$max) {
-                        push @$idx,
-                             PublicInbox::SearchIdxPart->new($self, $i, $skel);
+                        push @$idx, PublicInbox::SearchIdxPart->new($self, $i);
                 }
 
-                # Now that all subprocesses are up, we can open the FD for SQLite:
-                $skel->_msgmap_init->{dbh}->begin_work;
+                # Now that all subprocesses are up, we can open the FDs
+                # for SQLite:
+                my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
+                        "$self->{-inbox}->{mainrepo}/msgmap.sqlite3", 1);
+                $mm->{dbh}->begin_work;
         });
 }
 
@@ -236,9 +239,8 @@ sub remove_internal {
         my $ibx = $self->{-inbox};
         my $srch = $ibx->search;
         my $cid = content_id($mime);
-        my $skel = $self->{skel};
         my $parts = $self->{idx_parts};
-        my $mm = $skel->{mm};
+        my $mm = $self->{mm};
         my $removed;
         my $mids = mids($mime->header_obj);
 
@@ -273,9 +275,10 @@ sub remove_internal {
                                 $orig = undef;
                                 $removed->num; # memoize this for callers
 
-                                foreach my $idx (@$parts, $skel) {
+                                foreach my $idx (@$parts) {
                                         $idx->remote_remove($oid, $mid);
                                 }
+                                $self->{over}->remove_oid($oid, $mid);
                         }
                         1; # continue
                 });
@@ -322,18 +325,20 @@ sub barrier {
         if (my $im = $self->{im}) {
                 $im->barrier;
         }
-        my $skel = $self->{skel};
         my $parts = $self->{idx_parts};
-        if ($parts && $skel) {
-                my $dbh = $skel->{mm}->{dbh};
-                $dbh->commit; # SQLite data is second in importance
+        if ($parts) {
+                my $dbh = $self->{mm}->{dbh};
+                $dbh->commit; # SQLite msgmap data is second in importance
+
+                my $over = $self->{over};
 
-                # Now deal with Xapian
-                $skel->barrier_init(scalar(@$parts));
-                # each partition needs to issue a barrier command to skel:
+                # Now deal with Xapian and overview DB
+                $over->barrier_init(scalar(@$parts));
+
+                # each partition needs to issue a barrier command to over
                 $_->remote_barrier foreach @$parts;
 
-                $skel->barrier_wait; # wait for each Xapian partition
+                $over->barrier_wait; # wait for each Xapian partition
 
                 $dbh->begin_work;
         }
@@ -343,26 +348,30 @@ sub barrier {
 sub searchidx_checkpoint {
         my ($self, $more) = @_;
 
-        # order matters, we can only close {skel} after all partitions
-        # are done because the partitions also write to {skel}
+        # order matters, we can only close {over} after all partitions
+        # are done because the partitions also write to {over}
         if (my $parts = $self->{idx_parts}) {
                 foreach my $idx (@$parts) {
-                        $idx->remote_commit; # propagates commit to skel
+                        $idx->remote_commit; # propagates commit to over
                         $idx->remote_close unless $more;
                 }
                 delete $self->{idx_parts} unless $more;
         }
 
-        if (my $skel = $self->{skel}) {
-                my $dbh = $skel->{mm}->{dbh};
+        if (my $mm = $self->{mm}) {
+                my $dbh = $mm->{dbh};
                 $dbh->commit;
                 if ($more) {
                         $dbh->begin_work;
                 } else {
-                        $skel->remote_close;
-                        delete $self->{skel};
+                        delete $self->{mm};
                 }
         }
+        my $over = $self->{over};
+        $over->remote_commit;
+        if (!$more) {
+                $over->remote_close;
+        }
         $self->{transact_bytes} = 0;
 }
 
@@ -522,6 +531,7 @@ sub atfork_child {
         if (my $im = $self->{im}) {
                 $im->atfork_child;
         }
+        die "unexpected mm" if $self->{mm};
 }
 
 sub mark_deleted {
@@ -559,7 +569,7 @@ sub reindex_oid {
         if (!defined($mid0) && $regen && !$del) {
                 $num = $$regen--;
                 die "BUG: ran out of article numbers\n" if $num <= 0;
-                my $mm = $self->{skel}->{mm};
+                my $mm = $self->{mm};
                 foreach my $mid (reverse @$mids) {
                         if ($mm->mid_set($num, $mid) == 1) {
                                 $mid0 = $mid;
@@ -620,7 +630,7 @@ sub reindex {
         my $head = $ibx->{ref_head} || 'refs/heads/master';
         $self->idx_init; # acquire lock
         my $x40 = qr/[a-f0-9]{40}/;
-        my $mm_tmp = $self->{skel}->{mm}->tmp_clone;
+        my $mm_tmp = $self->{mm}->tmp_clone;
         if (!$regen) {
                 my (undef, $max) = $mm_tmp->minmax;
                 unless (defined $max) {
diff --git a/script/public-inbox-compact b/script/public-inbox-compact
index 79cd039b..e6977165 100755
--- a/script/public-inbox-compact
+++ b/script/public-inbox-compact
@@ -10,7 +10,6 @@ use PublicInbox::Config;
 use Cwd 'abs_path';
 use File::Temp qw(tempdir);
 use File::Path qw(remove_tree);
-use PublicInbox::Spawn qw(spawn);
 my $usage = "Usage: public-inbox-compact REPO_DIR\n";
 my $dir = shift or die $usage;
 my $config = PublicInbox::Config->new;
@@ -36,6 +35,8 @@ $ibx->umask_prepare;
 sub commit_changes ($$$) {
         my ($im, $old, $new) = @_;
         my @st = stat($old) or die "failed to stat($old): $!\n";
+        link("$old/over.sqlite3", "$new/over.sqlite3") or die
+                "failed to link {$old => $new}/over.sqlite3: $!\n";
         rename($old, "$new/old") or die "rename $old => $new/old: $!\n";
         chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
         rename($new, $old) or die "rename $new => $old: $!\n";
@@ -53,41 +54,18 @@ if ($v == 2) {
         $ibx->with_umask(sub {
                 $v2w->lock_acquire;
                 my @parts;
-                my $skel;
                 while (defined(my $dn = readdir($dh))) {
                         if ($dn =~ /\A\d+\z/) {
                                 push @parts, "$old/$dn";
-                        } elsif ($dn eq 'skel') {
-                                $skel = "$old/$dn";
                         } elsif ($dn eq '.' || $dn eq '..') {
                         } else {
                                 warn "W: skipping unknown Xapian DB: $old/$dn\n"
                         }
                 }
                 close $dh;
-                my %pids;
-
-                if (@parts) {
-                        my $pid = spawn(['xapian-compact', @parts, "$new/0" ]);
-                        defined $pid or die "compact failed: $?\n";
-                        $pids{$pid} = 'xapian-compact (parts)';
-                } else {
-                        warn "No parts found in $old\n";
-                }
-                if (defined $skel) {
-                        my $pid = spawn(['xapian-compact', $skel, "$new/skel"]);
-                        defined $pid or die "compact failed: $?\n";
-                        $pids{$pid} = 'xapian-compact (skel)';
-                } else {
-                        warn "$old/skel missing\n";
-                }
-                scalar keys %pids or
-                        die "No xapian-compact processes running\n";
-                while (scalar keys %pids) {
-                        my $pid = waitpid(-1, 0);
-                        my $desc = delete $pids{$pid};
-                        die "$desc failed: $?\n" if $?;
-                }
+                die "No Xapian parts found in $old\n" unless @parts;
+                my $cmd = ['xapian-compact', @parts, "$new/0" ];
+                PublicInbox::Import::run_die($cmd);
                 commit_changes($v2w, $old, $new);
         });
 } elsif ($v == 1) {
diff --git a/t/over.t b/t/over.t
new file mode 100644
index 00000000..1d3f9b37
--- /dev/null
+++ b/t/over.t
@@ -0,0 +1,38 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+use File::Temp qw/tempdir/;
+foreach my $mod (qw(DBD::SQLite)) {
+        eval "require $mod";
+        plan skip_all => "$mod missing for over.t" if $@;
+}
+
+use_ok 'PublicInbox::OverIdx';
+my $tmpdir = tempdir('pi-over-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+my $over = PublicInbox::OverIdx->new("$tmpdir/over.sqlite3");
+$over->connect;
+my $x = $over->next_tid;
+is(int($x), $x, 'integer tid');
+my $y = $over->next_tid;
+is($y, $x+1, 'tid increases');
+
+$x = $over->sid('hello-world');
+is(int($x), $x, 'integer sid');
+$y = $over->sid('hello-WORLD');
+is($y, $x+1, 'sid ncreases');
+is($over->sid('hello-world'), $x, 'idempotent');
+$over->disconnect;
+
+$over = PublicInbox::OverIdx->new("$tmpdir/over.sqlite3");
+$over->connect;
+is($over->sid('hello-world'), $x, 'idempotent across reopen');
+$over->each_by_mid('never', sub { fail('should not be called') });
+
+$x = $over->create_ghost('never');
+is(int($x), $x, 'integer tid for ghost');
+$y = $over->create_ghost('NEVAR');
+is($y, $x + 1, 'integer tid for ghost increases');
+
+done_testing();
diff --git a/t/psgi_search.t b/t/psgi_search.t
index 1df38691..60a44bde 100644
--- a/t/psgi_search.t
+++ b/t/psgi_search.t
@@ -30,8 +30,7 @@ EOF
 
 my $num = 0;
 # nb. using internal API, fragile!
-my $xdb = $rw->_xdb_acquire;
-$xdb->begin_transaction;
+$rw->begin_txn_lazy;
 
 foreach (reverse split(/\n\n/, $data)) {
         $_ .= "\n";
@@ -42,8 +41,7 @@ foreach (reverse split(/\n\n/, $data)) {
         ok($doc_id, 'message added: '. $mid);
 }
 
-$xdb->commit_transaction;
-$rw = undef;
+$rw->commit_txn_lazy;
 
 my $cfgpfx = "publicinbox.test";
 my $config = PublicInbox::Config->new({
diff --git a/t/search-thr-index.t b/t/search-thr-index.t
index 9549976d..3ddef809 100644
--- a/t/search-thr-index.t
+++ b/t/search-thr-index.t
@@ -32,8 +32,7 @@ EOF
 
 my $num = 0;
 # nb. using internal API, fragile!
-my $xdb = $rw->_xdb_acquire;
-$xdb->begin_transaction;
+my $xdb = $rw->begin_txn_lazy;
 my @mids;
 
 foreach (reverse split(/\n\n/, $data)) {
@@ -50,10 +49,12 @@ foreach (reverse split(/\n\n/, $data)) {
 
 my $prev;
 foreach my $mid (@mids) {
-        my $res = $rw->get_thread($mid);
+        my $res = $rw->{over}->get_thread($mid);
         is(3, $res->{total}, "got all messages from $mid");
 }
 
+$rw->commit_txn_lazy;
+
 done_testing();
 
 1;
diff --git a/t/search.t b/t/search.t
index 9ab15f77..51adb9fb 100644
--- a/t/search.t
+++ b/t/search.t
@@ -22,9 +22,9 @@ my $ibx = $rw->{-inbox};
 $rw = undef;
 my $ro = PublicInbox::Search->new($git_dir);
 my $rw_commit = sub {
-        $rw->{xdb}->commit_transaction if $rw && $rw->{xdb};
+        $rw->commit_txn_lazy if $rw;
         $rw = PublicInbox::SearchIdx->new($git_dir, 1);
-        $rw->_xdb_acquire->begin_transaction;
+        $rw->begin_txn_lazy;
 };
 
 {
@@ -93,7 +93,6 @@ sub filter_mids {
         ok($found, "message found");
         is($root_id, $found->{doc_id}, 'doc_id set correctly');
         is($found->mid, 'root@s', 'mid set correctly');
-        ok(int($found->thread_id) > 0, 'thread_id is an integer');
 
         my ($res, @res);
         my @exp = sort qw(root@s last@s);
@@ -148,7 +147,13 @@ sub filter_mids {
 
         my $ghost_id = $rw->add_message($was_ghost);
         is($ghost_id, int($ghost_id), "ghost_id is an integer: $ghost_id");
-        ok($ghost_id < $reply_id, "ghost vivified from earlier message");
+        my $msgs = $rw->{over}->get_thread('ghost-message@s')->{msgs};
+        is(scalar(@$msgs), 2, 'got both messages in ghost thread');
+        foreach (qw(sid tid)) {
+                is($msgs->[0]->{$_}, $msgs->[1]->{$_}, "{$_} match");
+        }
+        isnt($msgs->[0]->{num}, $msgs->[1]->{num}, "num do not match");
+        ok($_->{num} > 0, 'positive art num') foreach @$msgs
 }
 
 # search thread on ghost
@@ -400,6 +405,7 @@ sub filter_mids {
         is($txt->{msgs}->[0]->mid, $res->{msgs}->[0]->mid,
                 'search inside text attachments works');
 }
+$rw->commit_txn_lazy;
 
 done_testing();