diff options
-rw-r--r-- | MANIFEST | 5 | ||||
-rw-r--r-- | lib/PublicInbox/Inbox.pm | 15 | ||||
-rw-r--r-- | lib/PublicInbox/Msgmap.pm | 1 | ||||
-rw-r--r-- | lib/PublicInbox/NNTP.pm | 29 | ||||
-rw-r--r-- | lib/PublicInbox/Over.pm | 119 | ||||
-rw-r--r-- | lib/PublicInbox/OverIdx.pm | 370 | ||||
-rw-r--r-- | lib/PublicInbox/OverIdxFork.pm (renamed from lib/PublicInbox/SearchIdxSkeleton.pm) | 128 | ||||
-rw-r--r-- | lib/PublicInbox/Search.pm | 109 | ||||
-rw-r--r-- | lib/PublicInbox/SearchIdx.pm | 214 | ||||
-rw-r--r-- | lib/PublicInbox/SearchIdxPart.pm | 16 | ||||
-rw-r--r-- | lib/PublicInbox/SearchMsg.pm | 26 | ||||
-rw-r--r-- | lib/PublicInbox/V2Writable.pm | 78 | ||||
-rwxr-xr-x | script/public-inbox-compact | 32 | ||||
-rw-r--r-- | t/over.t | 38 | ||||
-rw-r--r-- | t/psgi_search.t | 6 | ||||
-rw-r--r-- | t/search-thr-index.t | 7 | ||||
-rw-r--r-- | t/search.t | 14 |
17 files changed, 763 insertions, 444 deletions
@@ -82,6 +82,9 @@ lib/PublicInbox/Msgmap.pm lib/PublicInbox/NNTP.pm lib/PublicInbox/NNTPD.pm lib/PublicInbox/NewsWWW.pm +lib/PublicInbox/Over.pm +lib/PublicInbox/OverIdx.pm +lib/PublicInbox/OverIdxFork.pm lib/PublicInbox/ParentPipe.pm lib/PublicInbox/ProcessPipe.pm lib/PublicInbox/Qspawn.pm @@ -90,7 +93,6 @@ lib/PublicInbox/SaPlugin/ListMirror.pm lib/PublicInbox/Search.pm lib/PublicInbox/SearchIdx.pm lib/PublicInbox/SearchIdxPart.pm -lib/PublicInbox/SearchIdxSkeleton.pm lib/PublicInbox/SearchMsg.pm lib/PublicInbox/SearchThread.pm lib/PublicInbox/SearchView.pm @@ -170,6 +172,7 @@ t/msg_iter.t t/msgmap.t t/nntp.t t/nntpd.t +t/over.t t/plack.t t/precheck.t t/psgi_attach.t diff --git a/lib/PublicInbox/Inbox.pm b/lib/PublicInbox/Inbox.pm index 43cf15ba..142b5c89 100644 --- a/lib/PublicInbox/Inbox.pm +++ b/lib/PublicInbox/Inbox.pm @@ -319,20 +319,7 @@ sub msg_by_mid ($$;$) { sub recent { my ($self, $opts) = @_; - my $qs = ''; - my $srch = search($self); - if (!$opts->{offset}) { - # this complicated bit cuts /$INBOX/ loading time by - # over 400ms on my system: - my ($min, $max) = mm($self)->minmax; - my $n = $max - $opts->{limit}; - $n = $min if $n < $min; - for (; $qs eq '' && $n >= $min; --$n) { - my $smsg = $srch->lookup_article($n) or next; - $qs = strftime('d:%Y%m%d..', gmtime($smsg->ts)); - } - } - $srch->query($qs, $opts); + search($self)->query('', $opts); } 1; diff --git a/lib/PublicInbox/Msgmap.pm b/lib/PublicInbox/Msgmap.pm index 12833050..dea95731 100644 --- a/lib/PublicInbox/Msgmap.pm +++ b/lib/PublicInbox/Msgmap.pm @@ -39,6 +39,7 @@ sub dbh_new { sub new_file { my ($class, $f, $writable) = @_; + return if !$writable && !-r $f; my $dbh = dbh_new($f, $writable); my $self = bless { dbh => $dbh }, $class; diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index fb65ddc0..48ab7fc2 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -331,13 +331,11 @@ sub cmd_newnews ($$$$;$$) { }; return '.' unless @srch; - $ts .= '..'; - my $opts = { asc => 1, limit => 1000, offset => 0 }; + my $opts = { limit => 1000, offset => 0 }; long_response($self, 0, long_response_limit, sub { my ($i) = @_; my $srch = $srch[0]; - my $res = $srch->query_ts($ts, $opts); - my $msgs = $res->{msgs}; + my $msgs = $srch->query_ts($ts, $opts); if (my $nr = scalar @$msgs) { more($self, '<' . join(">\r\n<", map { $_->mid } @$msgs ). @@ -463,7 +461,7 @@ find_mid: defined $mid or return $err; } found: - my $smsg = $ng->search->lookup_article($n) or return $err; + my $smsg = $ng->search->{over_ro}->get_art($n) or return $err; my $msg = $ng->msg_by_smsg($smsg) or return $err; my $s = Email::Simple->new($msg); if ($set_headers) { @@ -692,8 +690,9 @@ sub hdr_xref ($$$) { # optimize XHDR Xref [range] for rtin sub search_header_for { my ($srch, $num, $field) = @_; - my $smsg = $srch->lookup_article($num) or return; - $smsg->$field; + my $smsg = $srch->{over_ro}->get_art($num) or return; + return PublicInbox::SearchMsg::date($smsg) if $field eq 'date'; + $smsg->{$field}; } sub hdr_searchmsg ($$$$) { @@ -714,8 +713,7 @@ sub hdr_searchmsg ($$$$) { my $off = 0; long_response($self, $beg, $end, sub { my ($i) = @_; - my $res = $srch->query_xover($beg, $end, $off); - my $msgs = $res->{msgs}; + my $msgs = $srch->query_xover($beg, $end, $off); my $nr = scalar @$msgs or return; $off += $nr; my $tmp = ''; @@ -816,10 +814,10 @@ sub over_line ($$) { $smsg->{subject}, $smsg->{from}, PublicInbox::SearchMsg::date($smsg), - '<'.PublicInbox::SearchMsg::mid($smsg).'>', + "<$smsg->{mid}>", $smsg->{references}, - PublicInbox::SearchMsg::bytes($smsg), - PublicInbox::SearchMsg::lines($smsg)); + $smsg->{bytes}, + $smsg->{lines}); utf8::encode($s); $s } @@ -829,7 +827,7 @@ sub cmd_over ($;$) { if ($range && $range =~ /\A<(.+)>\z/) { my ($ng, $n) = mid_lookup($self, $1); defined $n or return r430; - my $smsg = $ng->search->lookup_article($n) or return r430; + my $smsg = $ng->search->{over_ro}->get_art($n) or return r430; more($self, '224 Overview information follows (multi-line)'); # Only set article number column if it's the current group @@ -853,14 +851,13 @@ sub cmd_xover ($;$) { my $off = 0; long_response($self, $beg, $end, sub { my ($i) = @_; - my $res = $srch->query_xover($beg, $end, $off); - my $msgs = $res->{msgs}; + my $msgs = $srch->query_xover($beg, $end, $off); my $nr = scalar @$msgs or return; $off += $nr; # OVERVIEW.FMT more($self, join("\r\n", map { - over_line(PublicInbox::SearchMsg::num($_), $_); + over_line($_->{num}, $_); } @$msgs)); # -1 to adjust for implicit increment in long_response diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm new file mode 100644 index 00000000..cf7a8849 --- /dev/null +++ b/lib/PublicInbox/Over.pm @@ -0,0 +1,119 @@ +# Copyright (C) 2018 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI +# Unlike Msgmap, this is an _UNSTABLE_ database which can be +# tweaked/updated over time and rebuilt. +package PublicInbox::Over; +use strict; +use warnings; +use DBI; +use DBD::SQLite; +use PublicInbox::SearchMsg; +use Compress::Zlib qw(uncompress); + +sub dbh_new { + my ($self) = @_; + my $ro = ref($self) eq 'PublicInbox::Over'; + my $dbh = DBI->connect("dbi:SQLite:dbname=$self->{filename}",'','', { + AutoCommit => 1, + RaiseError => 1, + PrintError => 0, + ReadOnly => $ro, + sqlite_use_immediate_transaction => 1, + }); + $dbh->{sqlite_unicode} = 1; + $dbh; +} + +sub new { + my ($class, $f) = @_; + bless { filename => $f }, $class; +} + +sub disconnect { $_[0]->{dbh} = undef } + +sub connect { $_[0]->{dbh} ||= $_[0]->dbh_new } + +sub load_from_row { + my ($smsg) = @_; + bless $smsg, 'PublicInbox::SearchMsg'; + if (defined(my $data = delete $smsg->{ddd})) { + $data = uncompress($data); + utf8::decode($data); + $smsg->load_from_data($data); + } + $smsg +} + +sub do_get { + my ($self, $sql, $opts, @args) = @_; + my $dbh = $self->connect; + my $lim = (($opts->{limit} || 0) + 0) || 1000; + my $off = (($opts->{offset} || 0) + 0) || 0; + $sql .= "LIMIT $lim OFFSET $off"; + my $msgs = $dbh->selectall_arrayref($sql, { Slice => {} }, @args); + load_from_row($_) for @$msgs; + $msgs +} + +sub query_xover { + my ($self, $beg, $end, $off) = @_; + do_get($self, <<'', { offset => $off }, $beg, $end); +SELECT * FROM over WHERE num >= ? AND num <= ? +ORDER BY num ASC + +} + +sub query_ts { + my ($self, $ts, $opts) = @_; + do_get($self, <<'', $opts, $ts); +SELECT * FROM over WHERE num > 0 AND ts >= ? +ORDER BY ts ASC + +} + +sub get_thread { + my ($self, $mid, $opts) = @_; + my $dbh = $self->connect; + my ($tid, $sid) = $dbh->selectrow_array(<<'', undef, $mid); +SELECT tid,sid FROM over +LEFT JOIN id2num ON over.num = id2num.num +LEFT JOIN msgid ON id2num.id = msgid.id +WHERE msgid.mid = ? AND over.num > 0 +LIMIT 1 + + my $cond = 'FROM over WHERE (tid = ? OR sid = ?) AND num > 0'; + my $msgs = do_get($self, <<"", $opts, $tid, $sid); +SELECT * $cond +ORDER BY ts ASC + + my $nr = $dbh->selectrow_array(<<"", undef, $tid, $sid); +SELECT COUNT(num) $cond + + { total => $nr, msgs => $msgs }; +} + +sub recent { + my ($self, $opts) = @_; + my $msgs = do_get($self, <<'', $opts); +SELECT * FROM over WHERE num > 0 +ORDER BY ts DESC + + my $nr = $self->{dbh}->selectrow_array(<<''); +SELECT COUNT(num) FROM over WHERE num > 0 + + { total => $nr, msgs => $msgs }; +} + +sub get_art { + my ($self, $num) = @_; + my $dbh = $self->connect; + my $smsg = $dbh->selectrow_hashref(<<'', undef, $num); +SELECT * from OVER where num = ? LIMIT 1 + + return load_from_row($smsg) if $smsg; + undef; +} + +1; diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm new file mode 100644 index 00000000..0e43aabc --- /dev/null +++ b/lib/PublicInbox/OverIdx.pm @@ -0,0 +1,370 @@ +# Copyright (C) 2018 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI +# Unlike Msgmap, this is an _UNSTABLE_ database which can be +# tweaked/updated over time and rebuilt. +package PublicInbox::OverIdx; +use strict; +use warnings; +use base qw(PublicInbox::Over); +use IO::Handle; +use DBI qw(:sql_types); # SQL_BLOB + +sub dbh_new { + my ($self) = @_; + my $dbh = $self->SUPER::dbh_new; + $dbh->do('PRAGMA synchronous = OFF'); # commit_fsync instead + $dbh->do('PRAGMA journal_mode = TRUNCATE'); + $dbh->do('PRAGMA cache_size = 80000'); + create_tables($dbh); + $dbh; +} + +sub commit_fsync { + my $fn = $_[0]->{filename}; + if (open my $fh, '+<', $fn) { + $fh->sync; + close $fh; + } +} + +sub get_counter ($$) { + my ($dbh, $key) = @_; + my $sth = $dbh->prepare_cached(<<'', undef, 1); +SELECT val FROM counter WHERE key = ? LIMIT 1 + + $sth->execute($key); + $sth->fetchrow_array; +} + +sub adj_counter ($$$) { + my ($self, $key, $op) = @_; + my $dbh = $self->{dbh}; + my $sth = $dbh->prepare_cached(<<""); +UPDATE counter SET val = val $op 1 WHERE key = ? + + $sth->execute($key); + + get_counter($dbh, $key); +} + +sub next_tid { adj_counter($_[0], 'thread', '+') } +sub next_ghost_num { adj_counter($_[0], 'ghost', '-') } + +sub id_for ($$$$$) { + my ($self, $tbl, $id_col, $val_col, $val) = @_; + my $dbh = $self->{dbh}; + my $in = $dbh->prepare_cached(<<"")->execute($val); +INSERT OR IGNORE INTO $tbl ($val_col) VALUES (?) + + if ($in == 0) { + my $sth = $dbh->prepare_cached(<<"", undef, 1); +SELECT $id_col FROM $tbl WHERE $val_col = ? LIMIT 1 + + $sth->execute($val); + $sth->fetchrow_array; + } else { + $dbh->last_insert_id(undef, undef, $tbl, $id_col); + } +} + +sub sid { + my ($self, $path) = @_; + return unless defined $path && $path ne ''; + id_for($self, 'subject', 'sid', 'path' => $path); +} + +sub mid2id { + my ($self, $mid) = @_; + id_for($self, 'msgid', 'id', 'mid' => $mid); +} + +sub delete_by_num { + my ($self, $num) = @_; + my $dbh = $self->{dbh}; + foreach (qw(over id2num)) { + $dbh->prepare_cached(<<"")->execute($num); +DELETE FROM $_ WHERE num = ? + + } +} + +# this includes ghosts +sub each_by_mid { + my ($self, $mid, $cols, $cb) = @_; + my $dbh = $self->{dbh}; + +=over + I originally wanted to stuff everything into a single query: + + SELECT over.* FROM over + LEFT JOIN id2num ON over.num = id2num.num + LEFT JOIN msgid ON msgid.id = id2num.id + WHERE msgid.mid = ? AND over.num >= ? + ORDER BY over.num ASC + LIMIT 1000 + + But it's faster broken out (and we're always in a + transaction for subroutines in this file) +=cut + + my $sth = $dbh->prepare_cached(<<'', undef, 1); +SELECT id FROM msgid WHERE mid = ? LIMIT 1 + + $sth->execute($mid); + my $id = $sth->fetchrow_array; + defined $id or return; + + push(@$cols, 'num'); + $cols = join(',', map { $_ } @$cols); + my $lim = 10; + my $prev = get_counter($dbh, 'ghost'); + while (1) { + $sth = $dbh->prepare_cached(<<"", undef, 1); +SELECT num FROM id2num WHERE id = ? AND num >= ? +ORDER BY num ASC +LIMIT $lim + + $sth->execute($id, $prev); + my $nums = $sth->fetchall_arrayref; + my $nr = scalar(@$nums) or return; + $prev = $nums->[-1]->[0]; + + $sth = $dbh->prepare_cached(<<"", undef, 1); +SELECT $cols FROM over WHERE over.num = ? LIMIT 1 + + foreach (@$nums) { + $sth->execute($_->[0]); + my $smsg = $sth->fetchrow_hashref; + $cb->(PublicInbox::Over::load_from_row($smsg)) or + return; + } + return if $nr != $lim; + } +} + +# this will create a ghost as necessary +sub resolve_mid_to_tid { + my ($self, $mid) = @_; + my $tid; + each_by_mid($self, $mid, ['tid'], sub { + my ($smsg) = @_; + my $cur_tid = $smsg->{tid}; + if (defined $tid) { + merge_threads($self, $tid, $cur_tid); + } else { + $tid = $cur_tid; + } + 1; + }); + defined $tid ? $tid : create_ghost($self, $mid); +} + +sub create_ghost { + my ($self, $mid) = @_; + my $id = $self->mid2id($mid); + my $num = $self->next_ghost_num; + $num < 0 or die "ghost num is non-negative: $num\n"; + my $tid = $self->next_tid; + my $dbh = $self->{dbh}; + $dbh->prepare_cached(<<'')->execute($num, $tid); +INSERT INTO over (num, tid) VALUES (?,?) + + $dbh->prepare_cached(<<'')->execute($id, $num); +INSERT INTO id2num (id, num) VALUES (?,?) + + $tid; +} + +sub merge_threads { + my ($self, $winner_tid, $loser_tid) = @_; + return if $winner_tid == $loser_tid; + my $dbh = $self->{dbh}; + $dbh->prepare_cached(<<'')->execute($winner_tid, $loser_tid); +UPDATE over SET tid = ? WHERE tid = ? + +} + +sub link_refs { + my ($self, $refs, $old_tid) = @_; + my $tid; + + if (@$refs) { + # first ref *should* be the thread root, + # but we can never trust clients to do the right thing + my $ref = $refs->[0]; + $tid = resolve_mid_to_tid($self, $ref); + merge_threads($self, $tid, $old_tid) if defined $old_tid; + + # the rest of the refs should point to this tid: + foreach my $i (1..$#$refs) { + $ref = $refs->[$i]; + my $ptid = resolve_mid_to_tid($self, $ref); + merge_threads($self, $tid, $ptid); + } + } else { + $tid = defined $old_tid ? $old_tid : $self->next_tid; + } + $tid; +} + +sub add_over { + my ($self, $values) = @_; + my ($ts, $num, $mids, $refs, $xpath, $ddd) = @$values; + my $old_tid; + my $vivified = 0; + + $self->begin_lazy; + $self->delete_by_num($num); + foreach my $mid (@$mids) { + my $v = 0; + each_by_mid($self, $mid, ['tid'], sub { + my ($cur) = @_; + my $cur_tid = $cur->{tid}; + my $n = $cur->{num}; + die "num must not be zero for $mid" if !$n; + $old_tid = $cur_tid unless defined $old_tid; + if ($n > 0) { # regular mail + merge_threads($self, $old_tid, $cur_tid); + } elsif ($n < 0) { # ghost + link_refs($self, $refs, $old_tid); + $self->delete_by_num($n); + $v++; + } + 1; + }); + $v > 1 and warn "BUG: vivified multiple ($v) ghosts for $mid\n"; + $vivified += $v; + } + my $tid = $vivified ? $old_tid : link_refs($self, $refs, $old_tid); + my $sid = $self->sid($xpath); + my $dbh = $self->{dbh}; + my $sth = $dbh->prepare_cached(<<''); +INSERT INTO over (num, tid, sid, ts, ddd) +VALUES (?,?,?,?,?) + + my $n = 0; + my @v = ($num, $tid, $sid, $ts); + foreach (@v) { $sth->bind_param(++$n, $_) } + $sth->bind_param(++$n, $ddd, SQL_BLOB); + $sth->execute; + $sth = $dbh->prepare_cached(<<''); +INSERT INTO id2num (id, num) VALUES (?,?) + + foreach my $mid (@$mids) { + my $id = $self->mid2id($mid); + $sth->execute($id, $num); + } +} + +sub delete_articles { + my ($self, $nums) = @_; + my $dbh = $self->connect; + $self->delete_by_num($_) foreach @$nums; +} + +sub remove_oid { + my ($self, $oid, $mid) = @_; + $self->begin_lazy; + each_by_mid($self, $mid, ['ddd'], sub { + my ($smsg) = @_; + $self->delete_by_num($smsg->{num}) if $smsg->{blob} eq $oid; + 1; + }); +} + +sub create_tables { + my ($dbh) = @_; + + $dbh->do(<<''); +CREATE TABLE IF NOT EXISTS over ( + num INTEGER NOT NULL, + tid INTEGER NOT NULL, + sid INTEGER, + ts INTEGER, + ddd VARBINARY, /* doc-data-deflated */ + UNIQUE (num) +) + + $dbh->do('CREATE INDEX IF NOT EXISTS idx_tid ON over (tid)'); + $dbh->do('CREATE INDEX IF NOT EXISTS idx_sid ON over (sid)'); + $dbh->do('CREATE INDEX IF NOT EXISTS idx_ts ON over (ts)'); + + $dbh->do(<<''); +CREATE TABLE IF NOT EXISTS counter ( + key VARCHAR(8) PRIMARY KEY NOT NULL, + val INTEGER DEFAULT 0, + UNIQUE (key) +) + + $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('thread')"); + $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('ghost')"); + + $dbh->do(<<''); +CREATE TABLE IF NOT EXISTS subject ( + sid INTEGER PRIMARY KEY AUTOINCREMENT, + path VARCHAR(40) NOT NULL, + UNIQUE (path) +) + + $dbh->do(<<''); +CREATE TABLE IF NOT EXISTS id2num ( + id INTEGER NOT NULL, + num INTEGER NOT NULL, + UNIQUE (id, num) +) + + # performance critical: + $dbh->do('CREATE INDEX IF NOT EXISTS idx_inum ON id2num (num)'); + $dbh->do('CREATE INDEX IF NOT EXISTS idx_id ON id2num (id)'); + + $dbh->do(<<''); +CREATE TABLE IF NOT EXISTS msgid ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + mid VARCHAR(244) NOT NULL, + UNIQUE (mid) +) + +} + +sub commit_lazy { + my ($self) = @_; + delete $self->{txn} or return; + $self->{dbh}->commit; +} + +sub begin_lazy { + my ($self) = @_; + return if $self->{txn}; + my $dbh = $self->connect or return; + $dbh->begin_work; + # $dbh->{Profile} = 2; + $self->{txn} = 1; +} + +sub rollback_lazy { + my ($self) = @_; + delete $self->{txn} or return; + $self->{dbh}->rollback; +} + +sub disconnect { + my ($self) = @_; + die "in transaction" if $self->{txn}; + $self->{dbh} = undef; +} + +sub create { + my ($self) = @_; + unless (-r $self->{filename}) { + require File::Path; + require File::Basename; + File::Path::mkpath(File::Basename::dirname($self->{filename})); + } + # create the DB: + PublicInbox::Over::connect($self); + $self->disconnect; +} + +1; diff --git a/lib/PublicInbox/SearchIdxSkeleton.pm b/lib/PublicInbox/OverIdxFork.pm index 2be64960..f4f7cddd 100644 --- a/lib/PublicInbox/SearchIdxSkeleton.pm +++ b/lib/PublicInbox/OverIdxFork.pm @@ -1,19 +1,16 @@ # Copyright (C) 2018 all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> -package PublicInbox::SearchIdxSkeleton; +package PublicInbox::OverIdxFork; use strict; use warnings; -use base qw(PublicInbox::SearchIdx); +use base qw(PublicInbox::OverIdx PublicInbox::Lock); use Storable qw(freeze thaw); +use IO::Handle; -sub new { - my ($class, $v2writable) = @_; - my $self = $class->SUPER::new($v2writable->{-inbox}, 1, 'skel'); - # create the DB: - $self->_xdb_acquire; - $self->_xdb_release; +sub create { + my ($self, $v2writable) = @_; + $self->SUPER::create(); $self->spawn_worker($v2writable) if $v2writable->{parallel}; - $self } sub spawn_worker { @@ -30,33 +27,35 @@ sub spawn_worker { $v2writable = undef; close $w; close $barrier_wait; - eval { skeleton_worker_loop($self, $r, $barrier_note) }; - die "skeleton worker died: $@\n" if $@; + + # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here + # speeds V2Writable batch imports across 8 cores by nearly 20% + fcntl($r, 1031, 1048576) if $^O eq 'linux'; + + eval { over_worker_loop($self, $r, $barrier_note) }; + die "over worker died: $@\n" if $@; exit; } $self->{w} = $w; $self->{pid} = $pid; + $self->{lock_path} = "$self->{filename}.pipe.lock"; close $r; close $barrier_note; $self->{barrier_wait} = $barrier_wait; - $w->autoflush(1); - - # lock on only exists in parent, not in worker - $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock'; } -sub skeleton_worker_loop { +sub over_worker_loop { my ($self, $r, $barrier_note) = @_; $barrier_note->autoflush(1); - $0 = 'pi-v2-skeleton'; - $self->begin_txn_lazy; + $0 = 'pi-v2-overview'; + $self->begin_lazy; my $barrier = undef; while (my $line = $r->getline) { if ($line eq "commit\n") { - $self->commit_txn_lazy; + $self->commit_lazy; } elsif ($line eq "close\n") { - $self->_xdb_release; + $self->disconnect; } elsif ($line =~ /\Abarrier_init (\d+)\n\z/) { my $n = $1 - 1; die "barrier in-progress\n" if defined $barrier; @@ -67,30 +66,30 @@ sub skeleton_worker_loop { delete $barrier->{$1} or die "unknown barrier: $part\n"; if ((scalar keys %$barrier) == 0) { $barrier = undef; - $self->commit_txn_lazy; + $self->commit_lazy; print $barrier_note "barrier_done\n" or die "print failed to barrier note: $!"; } } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.*)\n\z/s) { my ($oid, $mid) = ($1, $2); - $self->begin_txn_lazy; - $self->remove_by_oid($oid, $mid); + $self->remove_oid($oid, $mid); } else { my $len = int($line); my $n = read($r, my $msg, $len) or die "read: $!\n"; $n == $len or die "short read: $n != $len\n"; $msg = thaw($msg); # should raise on error defined $msg or die "failed to thaw buffer\n"; - $self->begin_txn_lazy; - eval { index_skeleton_real($self, $msg) }; + eval { add_over($self, $msg) }; warn "failed to index message <$msg->[-1]>: $@\n" if $@; } } - $self->worker_done; + die "$$ $0 dbh not released\n" if $self->{dbh}; + die "$$ $0 still in transaction\n" if $self->{txn}; } # called by a partition worker -sub index_skeleton { +# values: [ DS, NUM, BYTES, LINES, TS, MIDS, XPATH, doc_data ] +sub add_over { my ($self, $values) = @_; if (my $w = $self->{w}) { my $err; @@ -106,32 +105,21 @@ sub index_skeleton { die "print failed: $err\n" if $err; } else { - $self->begin_txn_lazy; - index_skeleton_real($self, $values); + $self->SUPER::add_over($values); } } -sub remote_remove { +sub remove_oid { my ($self, $oid, $mid) = @_; - my $err; - $self->lock_acquire; - eval { $self->SUPER::remote_remove($oid, $mid) }; - $err = $@; - $self->lock_release; - die $err if $err; -} - -sub index_skeleton_real ($$) { - my ($self, $values) = @_; - my ($ts, $num, $mids, $xpath, $doc_data) = @$values; - my $smsg = PublicInbox::SearchMsg->new(undef); - $smsg->load_from_data($doc_data); - my $doc = $smsg->{doc}; - $doc->set_data($doc_data); - PublicInbox::SearchIdx::add_values($doc, $ts, $smsg->ds, $num); - my @refs = ($smsg->references =~ /<([^>]+)>/g); - $self->delete_article($num) if defined $num; # for reindexing - $self->link_and_save($doc, $mids, \@refs, $num, $xpath); + if (my $w = $self->{w}) { + my $err; + $self->lock_acquire; + print $w "D $oid $mid\n" or $err = $!; + $self->lock_release; + die $err if $err; + } else { + $self->SUPER::remove_oid($oid, $mid); # OverIdx + } } # write to the subprocess @@ -140,7 +128,7 @@ sub barrier_init { my $w = $self->{w} or return; my $err; $self->lock_acquire; - print $w "barrier_init $nparts\n" or $err = "failed to write: $!\n"; + print $w "barrier_init $nparts\n" or $err = $!; $self->lock_release; die $err if $err; } @@ -152,4 +140,44 @@ sub barrier_wait { $l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n"; } +sub remote_commit { + my ($self) = @_; + if (my $w = $self->{w}) { + my $err; + $self->lock_acquire; + print $w "commit\n" or $err = $!; + $self->lock_release; + die $err if $err; + } else { + $self->commit_lazy; + } +} + +# prevent connections when using forked subprocesses +sub connect { + my ($self) = @_; + return if $self->{w}; + $self->SUPER::connect; +} + +sub remote_close { + my ($self) = @_; + if (my $w = delete $self->{w}) { + my $pid = delete $self->{pid} or die "no process to wait on\n"; + print $w "close\n" or die "failed to write to pid:$pid: $!\n"; + close $w or die "failed to close pipe for pid:$pid: $!\n"; + waitpid($pid, 0) == $pid or die "remote process did not finish"; + $? == 0 or die ref($self)." pid:$pid exited with: $?"; + } else { + die "transaction in progress $self\n" if $self->{txn}; + $self->disconnect; + } +} + +sub commit_fsync { + my ($self) = @_; + return if $self->{w}; # don't bother; main parent can also call this + $self->SUPER::commit_fsync; +} + 1; diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index ca389e32..91251246 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -10,12 +10,12 @@ use warnings; # values for searching use constant TS => 0; # Received: header in Unix time use constant YYYYMMDD => 1; # for searching in the WWW UI -use constant NUM => 2; # NNTP article number use Search::Xapian qw/:standard/; use PublicInbox::SearchMsg; use PublicInbox::MIME; use PublicInbox::MID qw/id_compress/; +use PublicInbox::Over; # This is English-only, everything else is non-standard and may be confused as # a prefix common in patch emails @@ -40,19 +40,13 @@ use constant { # 13 - fix threading for empty References/In-Reply-To # (commit 83425ef12e4b65cdcecd11ddcb38175d4a91d5a0) # 14 - fix ghost root vivification - SCHEMA_VERSION => 14, + SCHEMA_VERSION => 15, # n.b. FLAG_PURE_NOT is expensive not suitable for a public website # as it could become a denial-of-service vector QP_FLAGS => FLAG_PHRASE|FLAG_BOOLEAN|FLAG_LOVEHATE|FLAG_WILDCARD, }; -# setup prefixes -my %bool_pfx_internal = ( - type => 'T', # "mail" or "ghost" - thread => 'G', # newsGroup (or similar entity - e.g. a web forum name) -); - my %bool_pfx_external = ( mid => 'Q', # Message-ID (full/exact), this is mostly uniQue ); @@ -116,8 +110,6 @@ EOF ); chomp @HELP; -my $mail_query = Search::Xapian::Query->new('T' . 'mail'); - sub xdir { my ($self) = @_; if ($self->{version} == 1) { @@ -143,8 +135,9 @@ sub new { altid => $altid, version => $version, }, $class; + my $dir; if ($version >= 2) { - my $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION; + $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION; my $xdb; my $parts = 0; foreach my $part (<$dir/*>) { @@ -158,55 +151,36 @@ sub new { } } $self->{xdb} = $xdb; - $self->{skel} = Search::Xapian::Database->new("$dir/skel"); } else { - $self->{xdb} = Search::Xapian::Database->new($self->xdir); + $dir = $self->xdir; + $self->{xdb} = Search::Xapian::Database->new($dir); } + $self->{over_ro} = PublicInbox::Over->new("$dir/over.sqlite3"); $self; } sub reopen { my ($self) = @_; $self->{xdb}->reopen; - if (my $skel = $self->{skel}) { - $skel->reopen; - } $self; # make chaining easier } # read-only sub query { my ($self, $query_string, $opts) = @_; - my $query; - $opts ||= {}; - unless ($query_string eq '') { - $query = $self->qp->parse_query($query_string, QP_FLAGS); + if ($query_string eq '' && !$opts->{mset}) { + $self->{over_ro}->recent($opts); + } else { + my $query = $self->qp->parse_query($query_string, QP_FLAGS); $opts->{relevance} = 1 unless exists $opts->{relevance}; + _do_enquire($self, $query, $opts); } - - _do_enquire($self, $query, $opts); } sub get_thread { my ($self, $mid, $opts) = @_; - my $smsg = first_smsg_by_mid($self, $mid) or - return { total => 0, msgs => [] }; - my $qtid = Search::Xapian::Query->new('G' . $smsg->thread_id); - my $path = $smsg->path; - if (defined $path && $path ne '') { - my $path = id_compress($smsg->path); - my $qsub = Search::Xapian::Query->new('XPATH' . $path); - $qtid = Search::Xapian::Query->new(OP_OR, $qtid, $qsub); - } - $opts ||= {}; - $opts->{limit} ||= 1000; - - # always sort threads by timestamp, this makes life easier - # for the threading algorithm (in SearchThread.pm) - $opts->{asc} = 1; - $opts->{enquire} = enquire_skel($self); - _do_enquire($self, $qtid, $opts); + $self->{over_ro}->get_thread($mid, $opts); } sub retry_reopen { @@ -235,19 +209,13 @@ sub _do_enquire { sub _enquire_once { my ($self, $query, $opts) = @_; - my $enquire = $opts->{enquire} || enquire($self); - if (defined $query) { - $query = Search::Xapian::Query->new(OP_AND,$query,$mail_query); - } else { - $query = $mail_query; - } + my $enquire = enquire($self); + $query = Search::Xapian::Query->new(OP_AND,$query); $enquire->set_query($query); $opts ||= {}; my $desc = !$opts->{asc}; if ($opts->{relevance}) { $enquire->set_sort_by_relevance_then_value(TS, $desc); - } elsif ($opts->{num}) { - $enquire->set_sort_by_value(NUM, 0); } else { $enquire->set_sort_by_value_then_relevance(TS, $desc); } @@ -309,39 +277,15 @@ EOF $self->{query_parser} = $qp; } -sub num_range_processor { - $_[0]->{nrp} ||= Search::Xapian::NumberValueRangeProcessor->new(NUM); -} - # only used for NNTP server sub query_xover { my ($self, $beg, $end, $offset) = @_; - my $qp = Search::Xapian::QueryParser->new; - $qp->set_database($self->{skel} || $self->{xdb}); - $qp->add_valuerangeprocessor($self->num_range_processor); - my $query = $qp->parse_query("$beg..$end", QP_FLAGS); - - my $opts = { - enquire => enquire_skel($self), - num => 1, - limit => 200, - offset => $offset, - }; - _do_enquire($self, $query, $opts); + $self->{over_ro}->query_xover($beg, $end, $offset); } sub query_ts { - my ($self, $ts, $opts) = @_; - my $qp = $self->{qp_ts} ||= eval { - my $q = Search::Xapian::QueryParser->new; - $q->set_database($self->{skel} || $self->{xdb}); - $q->add_valuerangeprocessor( - Search::Xapian::NumberValueRangeProcessor->new(TS)); - $q - }; - my $query = $qp->parse_query($ts, QP_FLAGS); - $opts->{enquire} = enquire_skel($self); - _do_enquire($self, $query, $opts); + my ($self, $ts, $offset) = @_; + $self->{over_ro}->query_ts($ts, $offset); } sub first_smsg_by_mid { @@ -356,7 +300,7 @@ sub first_smsg_by_mid { sub lookup_article { my ($self, $num) = @_; my $term = 'XNUM'.$num; - my $db = $self->{skel} || $self->{xdb}; + my $db = $self->{xdb}; retry_reopen($self, sub { my $head = $db->postlist_begin($term); my $tail = $db->postlist_end($term); @@ -365,9 +309,7 @@ sub lookup_article { return unless defined $doc_id; $head->inc; if ($head->nequal($tail)) { - my $loc= $self->{mainrepo} . - ($self->{skel} ? 'skel' : 'xdb'); - warn "article #$num is not unique in $loc\n"; + warn "article #$num is not unique\n"; } # raises on error: my $doc = $db->get_document($doc_id); @@ -381,7 +323,7 @@ sub each_smsg_by_mid { my ($self, $mid, $cb) = @_; # XXX retry_reopen isn't necessary for V2Writable, but the PSGI # interface will need it... - my $db = $self->{skel} || $self->{xdb}; + my $db = $self->{xdb}; my $term = 'Q' . $mid; my $head = $db->postlist_begin($term); my $tail = $db->postlist_end($term); @@ -424,15 +366,6 @@ sub enquire { $self->{enquire} ||= Search::Xapian::Enquire->new($self->{xdb}); } -sub enquire_skel { - my ($self) = @_; - if (my $skel = $self->{skel}) { - $self->{enquire_skel} ||= Search::Xapian::Enquire->new($skel); - } else { - enquire($self); - } -} - sub help { my ($self) = @_; $self->qp; # parse altids diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 2e0b9a43..3412a615 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -16,10 +16,12 @@ use PublicInbox::MID qw/mid_clean id_compress mid_mime mids references/; use PublicInbox::MsgIter; use Carp qw(croak); use POSIX qw(strftime); +use PublicInbox::OverIdx; require PublicInbox::Git; +use Compress::Zlib qw(compress); use constant { - BATCH_BYTES => 1_000_000, + BATCH_BYTES => 10_000_000, DEBUG => !!$ENV{DEBUG}, }; @@ -73,12 +75,13 @@ sub new { $ibx->umask_prepare; if ($version == 1) { $self->{lock_path} = "$mainrepo/ssoma.lock"; + my $dir = $self->xdir; + $self->{over} = PublicInbox::OverIdx->new("$dir/over.sqlite3"); } elsif ($version == 2) { defined $part or die "partition is required for v2\n"; - # partition is a number or "all" + # partition is a number $self->{partition} = $part; $self->{lock_path} = undef; - $self->{msgmap_path} = "$mainrepo/msgmap.sqlite3"; } else { die "unsupported inbox version=$version\n"; } @@ -114,14 +117,6 @@ sub add_val ($$$) { $doc->add_value($col, $num); } -sub add_values { - my ($doc, $ts, $ds, $num) = @_; - add_val($doc, PublicInbox::Search::TS, $ts); - my $yyyymmdd = strftime('%Y%m%d', gmtime($ds)); - add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd); - defined($num) and add_val($doc, PublicInbox::Search::NUM, $num); -} - sub index_users ($$) { my ($tg, $smsg) = @_; @@ -269,8 +264,11 @@ sub add_message { my ($self, $mime, $bytes, $num, $oid, $mid0) = @_; my $doc_id; my $mids = mids($mime->header_obj); - my $skel = $self->{skeleton}; - + $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility + unless (defined $num) { # v1 + my $mm = $self->_msgmap_init; + $num = $mm->mid_insert($mid0) || $mm->num_for($mid0); + } eval { my $smsg = PublicInbox::SearchMsg->new($mime); my $doc = $smsg->{doc}; @@ -281,11 +279,12 @@ sub add_message { $xpath = id_compress($xpath); } - my $lines = $mime->body_raw =~ tr!\n!\n!; $smsg->{lines} = $mime->body_raw =~ tr!\n!\n!; defined $bytes or $bytes = length($mime->as_string); $smsg->{bytes} = $bytes; - add_values($doc, $smsg->ts, $smsg->ds, $num); + add_val($doc, PublicInbox::Search::TS(), $smsg->ts); + my $yyyymmdd = strftime('%Y%m%d', gmtime($smsg->ds)); + add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd); my $tg = $self->term_generator; @@ -336,7 +335,6 @@ sub add_message { # populates smsg->references for smsg->to_doc_data my $refs = parse_references($smsg); - $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility my $data = $smsg->to_doc_data($oid, $mid0); foreach my $mid (@$mids) { $tg->index_text($mid, 1, 'XM'); @@ -354,16 +352,14 @@ sub add_message { } $self->delete_article($num) if defined $num; # for reindexing - if ($skel) { - my @vals = ($smsg->ts, $num, $mids, $xpath, $data); - $skel->index_skeleton(\@vals); - $doc->add_boolean_term('Q' . $_) foreach @$mids; - $doc->add_boolean_term('XNUM' . $num) if defined $num; - $doc_id = $self->{xdb}->add_document($doc); - } else { - $doc_id = link_and_save($self, $doc, $mids, $refs, - $num, $xpath); - } + + utf8::encode($data); + $data = compress($data); + my @vals = ($smsg->ts, $num, $mids, $refs, $xpath, $data); + $self->{over}->add_over(\@vals); + $doc->add_boolean_term('Q' . $_) foreach @$mids; + $doc->add_boolean_term('XNUM' . $num) if defined $num; + $doc_id = $self->{xdb}->add_document($doc); }; if ($@) { @@ -439,14 +435,19 @@ sub remove_by_oid { # there is only ONE element in @delete unless we # have bugs in our v2writable deduplication check my @delete; + my @over_del; for (; $head != $tail; $head->inc) { my $docid = $head->get_docid; my $doc = $db->get_document($docid); my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid); $smsg->load_expand; - push(@delete, $docid) if $smsg->{blob} eq $oid; + if ($smsg->{blob} eq $oid) { + push(@delete, $docid); + push(@over_del, $smsg->num); + } } $db->delete_document($_) foreach @delete; + $self->{over}->remove_oid($oid, $mid); scalar(@delete); } @@ -462,18 +463,6 @@ sub term_generator { # write-only $self->{term_generator} = $tg; } -# increments last_thread_id counter -# returns a 64-bit integer represented as a decimal string -sub next_thread_id { - my ($self) = @_; - my $db = $self->{xdb}; - my $last_thread_id = int($db->get_metadata('last_thread_id') || 0); - - $db->set_metadata('last_thread_id', ++$last_thread_id); - - $last_thread_id; -} - sub parse_references ($) { my ($smsg) = @_; my $mime = $smsg->{mime}; @@ -496,71 +485,6 @@ sub parse_references ($) { \@keep; } -sub link_doc { - my ($self, $doc, $refs, $old_tid) = @_; - my $tid; - - if (@$refs) { - # first ref *should* be the thread root, - # but we can never trust clients to do the right thing - my $ref = shift @$refs; - $tid = resolve_mid_to_tid($self, $ref); - merge_threads($self, $tid, $old_tid) if defined $old_tid; - - # the rest of the refs should point to this tid: - foreach $ref (@$refs) { - my $ptid = resolve_mid_to_tid($self, $ref); - merge_threads($self, $tid, $ptid); - } - } else { - $tid = defined $old_tid ? $old_tid : $self->next_thread_id; - } - $doc->add_boolean_term('G' . $tid); - $tid; -} - -sub link_and_save { - my ($self, $doc, $mids, $refs, $num, $xpath) = @_; - my $db = $self->{xdb}; - my $old_tid; - my $doc_id; - $doc->add_boolean_term('XNUM' . $num) if defined $num; - $doc->add_boolean_term('XPATH' . $xpath) if defined $xpath; - $doc->add_boolean_term('Q' . $_) foreach @$mids; - - $self->{skel} and die "Should not have read-only skel here\n";; - foreach my $mid (@$mids) { - my $vivified = 0; - $self->each_smsg_by_mid($mid, sub { - my ($cur) = @_; - my $type = $cur->type; - my $cur_tid = $cur->thread_id; - $old_tid = $cur_tid unless defined $old_tid; - if ($type eq 'mail') { - # do not break existing mail messages, - # just merge the threads - merge_threads($self, $old_tid, $cur_tid); - return 1; - } - if ($type ne 'ghost') { - die "<$mid> has a bad type: $type\n"; - } - my $tid = link_doc($self, $doc, $refs, $old_tid); - $old_tid = $tid unless defined $old_tid; - $doc_id = $cur->{doc_id}; - $self->{xdb}->replace_document($doc_id, $doc); - ++$vivified; - 1; - }); - $vivified > 1 and warn - "BUG: vivified multiple ($vivified) ghosts for $mid\n"; - } - # not really important, but we return any vivified ghost docid, here: - return $doc_id if defined $doc_id; - link_doc($self, $doc, $refs, $old_tid); - $self->{xdb}->add_document($doc); -} - sub index_git_blob_id { my ($doc, $pfx, $objid) = @_; @@ -675,14 +599,10 @@ sub rlog { sub _msgmap_init { my ($self) = @_; + die "BUG: _msgmap_init is only for v1\n" if $self->{version} != 1; $self->{mm} ||= eval { require PublicInbox::Msgmap; - my $msgmap_path = $self->{msgmap_path}; - if (defined $msgmap_path) { # v2 - PublicInbox::Msgmap->new_file($msgmap_path, 1); - } else { - PublicInbox::Msgmap->new($self->{mainrepo}, 1); - } + PublicInbox::Msgmap->new($self->{mainrepo}, 1); }; } @@ -699,8 +619,7 @@ sub _index_sync { my $reindex = $opts->{reindex}; my ($mkey, $last_commit, $lx, $xlog); $self->{git}->batch_prepare; - my $xdb = _xdb_acquire($self); - $xdb->begin_transaction; + my $xdb = $self->begin_txn_lazy; do { $xlog = undef; $mkey = 'last_commit'; @@ -710,6 +629,9 @@ sub _index_sync { $lx = ''; $mkey = undef if $last_commit ne ''; } + $self->{over}->rollback_lazy; + $self->{over}->disconnect; + delete $self->{txn}; $xdb->cancel_transaction; $xdb = _xdb_release($self); @@ -717,8 +639,7 @@ sub _index_sync { my $range = $lx eq '' ? $tip : "$lx..$tip"; $xlog = _git_log($self, $range); - $xdb = _xdb_acquire($self); - $xdb->begin_transaction; + $xdb = $self->begin_txn_lazy; } while ($xdb->get_metadata('last_commit') ne $last_commit); my $mm = _msgmap_init($self); @@ -732,14 +653,12 @@ sub _index_sync { } if (!$mm_only) { $xdb->set_metadata($mkey, $commit) if $mkey && $commit; - $xdb->commit_transaction; - $xdb = _xdb_release($self); + $self->commit_txn_lazy; } # let another process do some work... < if ($more) { if (!$mm_only) { - $xdb = _xdb_acquire($self); - $xdb->begin_transaction; + $xdb = $self->begin_txn_lazy; } $dbh->begin_work if $dbh; } @@ -779,71 +698,19 @@ sub _index_sync { } } -# this will create a ghost as necessary -sub resolve_mid_to_tid { - my ($self, $mid) = @_; - my $tid; - $self->each_smsg_by_mid($mid, sub { - my ($smsg) = @_; - my $cur_tid = $smsg->thread_id; - if (defined $tid) { - merge_threads($self, $tid, $cur_tid); - } else { - $tid = $smsg->thread_id; - } - 1; - }); - return $tid if defined $tid; - - $self->create_ghost($mid)->thread_id; -} - -sub create_ghost { - my ($self, $mid) = @_; - - my $tid = $self->next_thread_id; - my $doc = Search::Xapian::Document->new; - $doc->add_boolean_term('Q' . $mid); - $doc->add_boolean_term('G' . $tid); - $doc->add_boolean_term('T' . 'ghost'); - - my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid); - $self->{xdb}->add_document($doc); - - $smsg; -} - -sub merge_threads { - my ($self, $winner_tid, $loser_tid) = @_; - return if $winner_tid == $loser_tid; - my $db = $self->{xdb}; - batch_do($self, 'G' . $loser_tid, sub { - my ($ids) = @_; - foreach my $docid (@$ids) { - my $doc = $db->get_document($docid); - $doc->remove_term('G' . $loser_tid); - $doc->add_boolean_term('G' . $winner_tid); - $db->replace_document($docid, $doc); - } - }); -} - sub DESTROY { # order matters for unlocking $_[0]->{xdb} = undef; $_[0]->{lockfh} = undef; } -# remote_* subs are only used by SearchIdxPart and SearchIdxSkeleton +# remote_* subs are only used by SearchIdxPart sub remote_commit { my ($self) = @_; if (my $w = $self->{w}) { print $w "commit\n" or die "failed to write commit: $!"; } else { $self->commit_txn_lazy; - if (my $skel = $self->{skeleton}) { - $skel->commit_txn_lazy; - } } } @@ -864,7 +731,7 @@ sub remote_close { sub remote_remove { my ($self, $oid, $mid) = @_; if (my $w = $self->{w}) { - # triggers remove_by_oid in partition or skeleton + # triggers remove_by_oid in a partition print $w "D $oid $mid\n" or die "failed to write remove $!"; } else { $self->begin_txn_lazy; @@ -876,14 +743,17 @@ sub begin_txn_lazy { my ($self) = @_; return if $self->{txn}; my $xdb = $self->{xdb} || $self->_xdb_acquire; + $self->{over}->begin_lazy; $xdb->begin_transaction; $self->{txn} = 1; + $xdb; } sub commit_txn_lazy { my ($self) = @_; delete $self->{txn} or return; $self->{xdb}->commit_transaction; + $self->{over}->commit_lazy; } sub worker_done { diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm index 82f5c1bc..e5766a82 100644 --- a/lib/PublicInbox/SearchIdxPart.pm +++ b/lib/PublicInbox/SearchIdxPart.pm @@ -6,12 +6,12 @@ use warnings; use base qw(PublicInbox::SearchIdx); sub new { - my ($class, $v2writable, $part, $skel) = @_; + my ($class, $v2writable, $part) = @_; my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part); - $self->{skeleton} = $skel; - # create the DB: + # create the DB before forking: $self->_xdb_acquire; $self->_xdb_release; + $self->{over} = $v2writable->{over}; $self->spawn_worker($v2writable, $part) if $v2writable->{parallel}; $self; } @@ -27,7 +27,7 @@ sub spawn_worker { if ($pid == 0) { $v2writable->atfork_child; $v2writable = undef; - close $w; + close $w or die "failed to close: $!"; # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here # speeds V2Writable batch imports across 8 cores by nearly 20% @@ -40,7 +40,7 @@ sub spawn_worker { } $self->{pid} = $pid; $self->{w} = $w; - close $r; + close $r or die "failed to close: $!"; } sub partition_worker_loop ($$$) { @@ -50,13 +50,12 @@ sub partition_worker_loop ($$$) { while (my $line = $r->getline) { if ($line eq "commit\n") { $self->commit_txn_lazy; - $self->{skeleton}->remote_commit; } elsif ($line eq "close\n") { $self->_xdb_release; } elsif ($line eq "barrier\n") { $self->commit_txn_lazy; - print { $self->{skeleton}->{w} } "barrier $part\n" or - die "write failed to skeleton: $!\n"; + print { $self->{over}->{w} } "barrier $part\n" or + die "write failed to overview $!\n"; } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) { my ($oid, $mid) = ($1, $2); $self->begin_txn_lazy; @@ -101,7 +100,6 @@ sub remote_barrier { $w->flush or die "failed to flush: $!"; } else { $self->commit_txn_lazy; - $self->{skeleton}->remote_commit; } } diff --git a/lib/PublicInbox/SearchMsg.pm b/lib/PublicInbox/SearchMsg.pm index f5510b8e..6c0780e5 100644 --- a/lib/PublicInbox/SearchMsg.pm +++ b/lib/PublicInbox/SearchMsg.pm @@ -13,9 +13,7 @@ use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp); sub new { my ($class, $mime) = @_; my $doc = Search::Xapian::Document->new; - $doc->add_boolean_term('T' . 'mail'); - - bless { type => 'mail', doc => $doc, mime => $mime }, $class; + bless { doc => $doc, mime => $mime }, $class; } sub wrap { @@ -52,6 +50,7 @@ sub to_doc_data { ); } + sub load_from_data ($$) { my ($self) = $_[0]; # data = $_[1] ( @@ -187,26 +186,9 @@ sub mid ($;$) { sub _extract_mid { mid_clean(mid_mime($_[0]->{mime})) } -sub thread_id { - my ($self) = @_; - my $tid = $self->{thread}; - return $tid if defined $tid; - $self->{thread} = _get_term_val($self, 'G', qr/\AG/); # *G*roup -} +sub tid { $_[0]->{tid} } # XXX: consider removing this, we can phrase match subject -sub path { - my ($self) = @_; - my $path = $self->{path}; - return $path if defined $path; - $self->{path} = _get_term_val($self, 'XPATH', qr/\AXPATH/); # path -} - -sub type { - my ($self) = @_; - my $type = $self->{type}; - return $type if defined $type; - $self->{type} = _get_term_val($self, 'T', qr/\AT/); -} +sub path { $_[0]->{path} } 1; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 51723e55..8e3122ab 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -7,13 +7,15 @@ use strict; use warnings; use base qw(PublicInbox::Lock); use PublicInbox::SearchIdxPart; -use PublicInbox::SearchIdxSkeleton; use PublicInbox::MIME; use PublicInbox::Git; use PublicInbox::Import; use PublicInbox::MID qw(mids); use PublicInbox::ContentId qw(content_id content_digest); use PublicInbox::Inbox; +use PublicInbox::OverIdxFork; +use PublicInbox::Msgmap; +use IO::Handle; # an estimate of the post-packed size to the raw uncompressed size my $PACKING_FACTOR = 0.4; @@ -57,6 +59,7 @@ sub new { partitions => $nparts, parallel => 1, transact_bytes => 0, + over => PublicInbox::OverIdxFork->new("$xpfx/over.sqlite3"), lock_path => "$dir/inbox.lock", # limit each repo to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), @@ -117,7 +120,7 @@ sub num_for { my $mids = mids($mime->header_obj); if (@$mids) { my $mid = $mids->[0]; - my $num = $self->{skel}->{mm}->mid_insert($mid); + my $num = $self->{mm}->mid_insert($mid); if (defined $num) { # common case $$mid0 = $mid; return $num; @@ -140,7 +143,7 @@ sub num_for { # try the rest of the mids for(my $i = $#$mids; $i >= 1; $i--) { my $m = $mids->[$i]; - $num = $self->{skel}->{mm}->mid_insert($m); + $num = $self->{mm}->mid_insert($m); if (defined $num) { warn "alternative <$m> for <$mid> found\n"; $$mid0 = $m; @@ -158,20 +161,20 @@ sub num_for_harder { my $hdr = $mime->header_obj; my $dig = content_digest($mime); $$mid0 = PublicInbox::Import::digest2mid($dig); - my $num = $self->{skel}->{mm}->mid_insert($$mid0); + my $num = $self->{mm}->mid_insert($$mid0); unless (defined $num) { # it's hard to spoof the last Received: header my @recvd = $hdr->header_raw('Received'); $dig->add("Received: $_") foreach (@recvd); $$mid0 = PublicInbox::Import::digest2mid($dig); - $num = $self->{skel}->{mm}->mid_insert($$mid0); + $num = $self->{mm}->mid_insert($$mid0); # fall back to a random Message-ID and give up determinism: until (defined($num)) { $dig->add(rand); $$mid0 = PublicInbox::Import::digest2mid($dig); warn "using random Message-ID <$$mid0> as fallback\n"; - $num = $self->{skel}->{mm}->mid_insert($$mid0); + $num = $self->{mm}->mid_insert($$mid0); } } PublicInbox::Import::append_mid($hdr, $$mid0); @@ -194,13 +197,11 @@ sub idx_init { # frequently activated. delete $ibx->{$_} foreach (qw(git mm search)); + my $over = $self->{over}; $ibx->umask_prepare; $ibx->with_umask(sub { $self->lock_acquire; - - # first time initialization, first we create the skeleton pipe: - my $skel = PublicInbox::SearchIdxSkeleton->new($self); - $self->{skel} = $skel; + $over->create($self); # need to create all parts before initializing msgmap FD my $max = $self->{partitions} - 1; @@ -208,12 +209,14 @@ sub idx_init { # idx_parts must be visible to all forked processes my $idx = $self->{idx_parts} = []; for my $i (0..$max) { - push @$idx, - PublicInbox::SearchIdxPart->new($self, $i, $skel); + push @$idx, PublicInbox::SearchIdxPart->new($self, $i); } - # Now that all subprocesses are up, we can open the FD for SQLite: - $skel->_msgmap_init->{dbh}->begin_work; + # Now that all subprocesses are up, we can open the FDs + # for SQLite: + my $mm = $self->{mm} = PublicInbox::Msgmap->new_file( + "$self->{-inbox}->{mainrepo}/msgmap.sqlite3", 1); + $mm->{dbh}->begin_work; }); } @@ -236,9 +239,8 @@ sub remove_internal { my $ibx = $self->{-inbox}; my $srch = $ibx->search; my $cid = content_id($mime); - my $skel = $self->{skel}; my $parts = $self->{idx_parts}; - my $mm = $skel->{mm}; + my $mm = $self->{mm}; my $removed; my $mids = mids($mime->header_obj); @@ -273,9 +275,10 @@ sub remove_internal { $orig = undef; $removed->num; # memoize this for callers - foreach my $idx (@$parts, $skel) { + foreach my $idx (@$parts) { $idx->remote_remove($oid, $mid); } + $self->{over}->remove_oid($oid, $mid); } 1; # continue }); @@ -322,18 +325,20 @@ sub barrier { if (my $im = $self->{im}) { $im->barrier; } - my $skel = $self->{skel}; my $parts = $self->{idx_parts}; - if ($parts && $skel) { - my $dbh = $skel->{mm}->{dbh}; - $dbh->commit; # SQLite data is second in importance + if ($parts) { + my $dbh = $self->{mm}->{dbh}; + $dbh->commit; # SQLite msgmap data is second in importance + + my $over = $self->{over}; - # Now deal with Xapian - $skel->barrier_init(scalar(@$parts)); - # each partition needs to issue a barrier command to skel: + # Now deal with Xapian and overview DB + $over->barrier_init(scalar(@$parts)); + + # each partition needs to issue a barrier command to over $_->remote_barrier foreach @$parts; - $skel->barrier_wait; # wait for each Xapian partition + $over->barrier_wait; # wait for each Xapian partition $dbh->begin_work; } @@ -343,26 +348,30 @@ sub barrier { sub searchidx_checkpoint { my ($self, $more) = @_; - # order matters, we can only close {skel} after all partitions - # are done because the partitions also write to {skel} + # order matters, we can only close {over} after all partitions + # are done because the partitions also write to {over} if (my $parts = $self->{idx_parts}) { foreach my $idx (@$parts) { - $idx->remote_commit; # propagates commit to skel + $idx->remote_commit; # propagates commit to over $idx->remote_close unless $more; } delete $self->{idx_parts} unless $more; } - if (my $skel = $self->{skel}) { - my $dbh = $skel->{mm}->{dbh}; + if (my $mm = $self->{mm}) { + my $dbh = $mm->{dbh}; $dbh->commit; if ($more) { $dbh->begin_work; } else { - $skel->remote_close; - delete $self->{skel}; + delete $self->{mm}; } } + my $over = $self->{over}; + $over->remote_commit; + if (!$more) { + $over->remote_close; + } $self->{transact_bytes} = 0; } @@ -522,6 +531,7 @@ sub atfork_child { if (my $im = $self->{im}) { $im->atfork_child; } + die "unexpected mm" if $self->{mm}; } sub mark_deleted { @@ -559,7 +569,7 @@ sub reindex_oid { if (!defined($mid0) && $regen && !$del) { $num = $$regen--; die "BUG: ran out of article numbers\n" if $num <= 0; - my $mm = $self->{skel}->{mm}; + my $mm = $self->{mm}; foreach my $mid (reverse @$mids) { if ($mm->mid_set($num, $mid) == 1) { $mid0 = $mid; @@ -620,7 +630,7 @@ sub reindex { my $head = $ibx->{ref_head} || 'refs/heads/master'; $self->idx_init; # acquire lock my $x40 = qr/[a-f0-9]{40}/; - my $mm_tmp = $self->{skel}->{mm}->tmp_clone; + my $mm_tmp = $self->{mm}->tmp_clone; if (!$regen) { my (undef, $max) = $mm_tmp->minmax; unless (defined $max) { diff --git a/script/public-inbox-compact b/script/public-inbox-compact index 79cd039b..e6977165 100755 --- a/script/public-inbox-compact +++ b/script/public-inbox-compact @@ -10,7 +10,6 @@ use PublicInbox::Config; use Cwd 'abs_path'; use File::Temp qw(tempdir); use File::Path qw(remove_tree); -use PublicInbox::Spawn qw(spawn); my $usage = "Usage: public-inbox-compact REPO_DIR\n"; my $dir = shift or die $usage; my $config = PublicInbox::Config->new; @@ -36,6 +35,8 @@ $ibx->umask_prepare; sub commit_changes ($$$) { my ($im, $old, $new) = @_; my @st = stat($old) or die "failed to stat($old): $!\n"; + link("$old/over.sqlite3", "$new/over.sqlite3") or die + "failed to link {$old => $new}/over.sqlite3: $!\n"; rename($old, "$new/old") or die "rename $old => $new/old: $!\n"; chmod($st[2] & 07777, $new) or die "chmod $old: $!\n"; rename($new, $old) or die "rename $new => $old: $!\n"; @@ -53,41 +54,18 @@ if ($v == 2) { $ibx->with_umask(sub { $v2w->lock_acquire; my @parts; - my $skel; while (defined(my $dn = readdir($dh))) { if ($dn =~ /\A\d+\z/) { push @parts, "$old/$dn"; - } elsif ($dn eq 'skel') { - $skel = "$old/$dn"; } elsif ($dn eq '.' || $dn eq '..') { } else { warn "W: skipping unknown Xapian DB: $old/$dn\n" } } close $dh; - my %pids; - - if (@parts) { - my $pid = spawn(['xapian-compact', @parts, "$new/0" ]); - defined $pid or die "compact failed: $?\n"; - $pids{$pid} = 'xapian-compact (parts)'; - } else { - warn "No parts found in $old\n"; - } - if (defined $skel) { - my $pid = spawn(['xapian-compact', $skel, "$new/skel"]); - defined $pid or die "compact failed: $?\n"; - $pids{$pid} = 'xapian-compact (skel)'; - } else { - warn "$old/skel missing\n"; - } - scalar keys %pids or - die "No xapian-compact processes running\n"; - while (scalar keys %pids) { - my $pid = waitpid(-1, 0); - my $desc = delete $pids{$pid}; - die "$desc failed: $?\n" if $?; - } + die "No Xapian parts found in $old\n" unless @parts; + my $cmd = ['xapian-compact', @parts, "$new/0" ]; + PublicInbox::Import::run_die($cmd); commit_changes($v2w, $old, $new); }); } elsif ($v == 1) { diff --git a/t/over.t b/t/over.t new file mode 100644 index 00000000..1d3f9b37 --- /dev/null +++ b/t/over.t @@ -0,0 +1,38 @@ +# Copyright (C) 2018 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; +use warnings; +use Test::More; +use File::Temp qw/tempdir/; +foreach my $mod (qw(DBD::SQLite)) { + eval "require $mod"; + plan skip_all => "$mod missing for over.t" if $@; +} + +use_ok 'PublicInbox::OverIdx'; +my $tmpdir = tempdir('pi-over-XXXXXX', TMPDIR => 1, CLEANUP => 1); +my $over = PublicInbox::OverIdx->new("$tmpdir/over.sqlite3"); +$over->connect; +my $x = $over->next_tid; +is(int($x), $x, 'integer tid'); +my $y = $over->next_tid; +is($y, $x+1, 'tid increases'); + +$x = $over->sid('hello-world'); +is(int($x), $x, 'integer sid'); +$y = $over->sid('hello-WORLD'); +is($y, $x+1, 'sid ncreases'); +is($over->sid('hello-world'), $x, 'idempotent'); +$over->disconnect; + +$over = PublicInbox::OverIdx->new("$tmpdir/over.sqlite3"); +$over->connect; +is($over->sid('hello-world'), $x, 'idempotent across reopen'); +$over->each_by_mid('never', sub { fail('should not be called') }); + +$x = $over->create_ghost('never'); +is(int($x), $x, 'integer tid for ghost'); +$y = $over->create_ghost('NEVAR'); +is($y, $x + 1, 'integer tid for ghost increases'); + +done_testing(); diff --git a/t/psgi_search.t b/t/psgi_search.t index 1df38691..60a44bde 100644 --- a/t/psgi_search.t +++ b/t/psgi_search.t @@ -30,8 +30,7 @@ EOF my $num = 0; # nb. using internal API, fragile! -my $xdb = $rw->_xdb_acquire; -$xdb->begin_transaction; +$rw->begin_txn_lazy; foreach (reverse split(/\n\n/, $data)) { $_ .= "\n"; @@ -42,8 +41,7 @@ foreach (reverse split(/\n\n/, $data)) { ok($doc_id, 'message added: '. $mid); } -$xdb->commit_transaction; -$rw = undef; +$rw->commit_txn_lazy; my $cfgpfx = "publicinbox.test"; my $config = PublicInbox::Config->new({ diff --git a/t/search-thr-index.t b/t/search-thr-index.t index 9549976d..3ddef809 100644 --- a/t/search-thr-index.t +++ b/t/search-thr-index.t @@ -32,8 +32,7 @@ EOF my $num = 0; # nb. using internal API, fragile! -my $xdb = $rw->_xdb_acquire; -$xdb->begin_transaction; +my $xdb = $rw->begin_txn_lazy; my @mids; foreach (reverse split(/\n\n/, $data)) { @@ -50,10 +49,12 @@ foreach (reverse split(/\n\n/, $data)) { my $prev; foreach my $mid (@mids) { - my $res = $rw->get_thread($mid); + my $res = $rw->{over}->get_thread($mid); is(3, $res->{total}, "got all messages from $mid"); } +$rw->commit_txn_lazy; + done_testing(); 1; @@ -22,9 +22,9 @@ my $ibx = $rw->{-inbox}; $rw = undef; my $ro = PublicInbox::Search->new($git_dir); my $rw_commit = sub { - $rw->{xdb}->commit_transaction if $rw && $rw->{xdb}; + $rw->commit_txn_lazy if $rw; $rw = PublicInbox::SearchIdx->new($git_dir, 1); - $rw->_xdb_acquire->begin_transaction; + $rw->begin_txn_lazy; }; { @@ -93,7 +93,6 @@ sub filter_mids { ok($found, "message found"); is($root_id, $found->{doc_id}, 'doc_id set correctly'); is($found->mid, 'root@s', 'mid set correctly'); - ok(int($found->thread_id) > 0, 'thread_id is an integer'); my ($res, @res); my @exp = sort qw(root@s last@s); @@ -148,7 +147,13 @@ sub filter_mids { my $ghost_id = $rw->add_message($was_ghost); is($ghost_id, int($ghost_id), "ghost_id is an integer: $ghost_id"); - ok($ghost_id < $reply_id, "ghost vivified from earlier message"); + my $msgs = $rw->{over}->get_thread('ghost-message@s')->{msgs}; + is(scalar(@$msgs), 2, 'got both messages in ghost thread'); + foreach (qw(sid tid)) { + is($msgs->[0]->{$_}, $msgs->[1]->{$_}, "{$_} match"); + } + isnt($msgs->[0]->{num}, $msgs->[1]->{num}, "num do not match"); + ok($_->{num} > 0, 'positive art num') foreach @$msgs } # search thread on ghost @@ -400,6 +405,7 @@ sub filter_mids { is($txt->{msgs}->[0]->mid, $res->{msgs}->[0]->mid, 'search inside text attachments works'); } +$rw->commit_txn_lazy; done_testing(); |