about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-12-15 02:02:16 +0000
committerEric Wong <e@80x24.org>2020-12-17 19:12:53 +0000
commit7281c5c492f9d6bbd585da9f061d19819d952352 (patch)
tree947228cf3f08bc6ae8874d99936fcef457096282 /lib
parentd26c2837f479b41182946a6540aad95d34b2b594 (diff)
downloadpublic-inbox-7281c5c492f9d6bbd585da9f061d19819d952352.tar.gz
--reindex allows us to catch missed and stale messages due to
-extindex vs -index races prior to commit 02b2fcc46f364b51
("extsearchidx: enforce -index before -extindex").

We'll also rely on reindex to internally deal with v1/v2 inbox
removals and partial-unindexing of messages which are only
removed from one inbox out of many.

This reindex design is completely different than how normal
v1/v2 inbox reindex operates due to extindex having multiple
histories to work with.  Instead of scanning git history, this
relies exclusively on comparing over.sqlite3 contents between
the v1/v2 inboxes and the extindex.

Changes to Xapian behavior also get picked up, now.  Xapian indexing
is handled by workers with minimal IPC to the parent process.
This results in more read I/O but fewer writes when dealing
with cross-posted messages.

Changes to $smsg->populate and --rethread still need further
work.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm195
-rw-r--r--lib/PublicInbox/OverIdx.pm23
-rw-r--r--lib/PublicInbox/SearchIdx.pm77
-rw-r--r--lib/PublicInbox/SearchIdxShard.pm11
-rw-r--r--lib/PublicInbox/V2Writable.pm6
5 files changed, 300 insertions, 12 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 84449cb4..394a89d4 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -29,6 +29,7 @@ use PublicInbox::InboxWritable;
 use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
 use File::Spec;
+use DBI qw(:sql_types); # SQL_BLOB
 
 sub new {
         my (undef, $dir, $opt) = @_;
@@ -123,9 +124,11 @@ sub do_xpost ($$) {
                 my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key,
                                                         \$rm_eidx_info);
                 if ($nr == 0) {
+                        $self->{oidx}->eidxq_del($docid);
                         $idx->shard_remove($docid);
                 } elsif ($rm_eidx_info) {
                         $idx->shard_remove_eidx_info($docid, $eidx_key, $eml);
+                        $self->{oidx}->eidxq_add($docid); # yes, add
                 }
         }
 }
@@ -168,7 +171,7 @@ sub do_finalize ($) {
 
 sub do_step ($) { # main iterator for adding messages to the index
         my ($req) = @_;
-        my $self = $req->{self};
+        my $self = $req->{self} // die 'BUG: {self} missing';
         while (1) {
                 if (my $next_arg = $req->{next_arg}) {
                         if (my $smsg = $self->{oidx}->next_by_mid(@$next_arg)) {
@@ -311,7 +314,7 @@ sub _sync_inbox ($$$) {
         $ibx->git->cleanup; # done with this inbox, now
 }
 
-sub unref_doc ($$$$) {
+sub gc_unref_doc ($$$$) {
         my ($self, $ibx_id, $eidx_key, $docid) = @_;
         my $dbh = $self->{oidx}->dbh;
 
@@ -326,15 +329,14 @@ SELECT oidbin FROM xref3 WHERE docid = ? AND ibx_id = ?
 DELETE FROM xref3 WHERE docid = ? AND ibx_id = ?
 
         my $remain = $self->{oidx}->get_xref3($docid);
-        my $idx = $self->idx_shard($docid);
-        if (@$remain) {
+        if (scalar(@$remain)) {
+                $self->{oidx}->eidxq_add($docid); # enqueue for reindex
                 for my $oid (@oid) {
                         warn "I: unref #$docid $eidx_key $oid\n";
-                        $idx->shard_remove_eidx_info($docid, $eidx_key);
                 }
         } else {
                 warn "I: remove #$docid $eidx_key @oid\n";
-                $idx->shard_remove($docid);
+                $self->idx_shard($docid)->shard_remove($docid);
         }
 }
 
@@ -356,7 +358,7 @@ sub eidx_gc {
                 warn "I: deleting messages for $eidx_key...\n";
                 $x3_doc->execute($ibx_id);
                 while (defined(my $docid = $x3_doc->fetchrow_array)) {
-                        unref_doc($self, $ibx_id, $eidx_key, $docid);
+                        gc_unref_doc($self, $ibx_id, $eidx_key, $docid);
                 }
                 $dbh->prepare_cached(<<'')->execute($ibx_id);
 DELETE FROM inboxes WHERE ibx_id = ?
@@ -393,20 +395,187 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
         done($self);
 }
 
+sub eidxq_process ($$) { # for reindexing
+        my ($self, $sync) = @_;
+
+        $self->{oidx}->commit_lazy; # ensure shard workers can see it
+        $self->{oidx}->begin_lazy;
+        my $dbh = $self->{oidx}->dbh;
+        my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
+        ${$sync->{nr}} = 0;
+        $sync->{-regen_fmt} = "%u/$tot\n";
+        my $pr = $sync->{-opt}->{-progress};
+        if ($pr) {
+                my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
+                my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
+                $pr->("Xapian indexing $min..$max (total=$tot)\n");
+        }
+
+        my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
+        my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
+        $iter->execute;
+        while (defined(my $docid = $iter->fetchrow_array)) {
+                $self->idx_shard($docid)->shard_reindex_docid($docid);
+                $del->execute($docid);
+                last if $sync->{quit};
+                my $cur = ++${$sync->{nr}};
+
+                # shards flush on their own, just don't queue up too many
+                # deletes
+                if (($cur % 1000) == 0) {
+                        $self->{oidx}->commit_lazy;
+                        $self->{oidx}->begin_lazy;
+                        $pr->("reindexed $cur/$tot\n") if $pr;
+                }
+                # this is only for SIGUSR1, shards do their own accounting:
+                reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
+        }
+        $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
+        $self->{oidx}->commit_lazy;
+        $self->{oidx}->begin_lazy;
+}
+
+sub _reindex_unseen { # git->cat_async callback
+        my ($bref, $oid, $type, $size, $req) = @_;
+        return if is_bad_blob($oid, $type, $size, $req->{oid});
+        my $self = $req->{self} // die 'BUG: {self} unset';
+        local $self->{current_info} = "$self->{current_info} $oid";
+        my $new_smsg = bless { blob => $oid, }, 'PublicInbox::Smsg';
+        $new_smsg->{bytes} = $size + crlf_adjust($$bref);
+        my $eml = $req->{eml} = PublicInbox::Eml->new($bref);
+        $req->{new_smsg} = $new_smsg;
+        $req->{chash} = content_hash($eml);
+        $req->{mids} = mids($eml); # do_step iterates through this
+        do_step($req); # enter the normal indexing flow
+}
+
+# --reindex may catch totally unseen messages, this handles them
+sub reindex_unseen ($$$$) {
+        my ($self, $sync, $ibx, $xsmsg) = @_;
+        my $req = {
+                %$sync, # has {self}
+                autime => $xsmsg->{ds},
+                cotime => $xsmsg->{ts},
+                oid => $xsmsg->{blob},
+                ibx => $ibx,
+                xnum => $xsmsg->{num},
+                # {mids} and {chash} will be filled in at _reindex_unseen
+        };
+        warn "I: reindex_unseen ${\$ibx->eidx_key}:$req->{xnum}:$req->{oid}\n";
+        $self->git->cat_async($xsmsg->{blob}, \&_reindex_unseen, $req);
+}
+
+sub _reindex_check_unseen ($$$) {
+        my ($self, $sync, $ibx) = @_;
+        my $ibx_id = $ibx->{-ibx_id};
+        my ($beg, $end) = (1, 1000);
+
+        # first, check if we missed any messages in target $ibx
+        my $inx3 = $self->{oidx}->dbh->prepare(<<'');
+SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
+
+        my $msgs;
+        while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
+                $beg = $msgs->[-1]->{num} + 1;
+                $end = $beg + 1000;
+                for my $xsmsg (@$msgs) {
+                        my $oidbin = pack('H*', $xsmsg->{blob});
+                        $inx3->bind_param(1, $ibx_id);
+                        $inx3->bind_param(2, $xsmsg->{num});
+                        $inx3->bind_param(3, $oidbin, SQL_BLOB);
+                        $inx3->execute;
+                        my $docids = $inx3->fetchall_arrayref;
+                        # index messages which were totally missed
+                        # the first time around ASAP:
+                        if (scalar(@$docids) == 0) {
+                                reindex_unseen($self, $sync, $ibx, $xsmsg);
+                        } else { # already seen, reindex later
+                                for my $r (@$docids) {
+                                        $self->{oidx}->eidxq_add($r->[0]);
+                                }
+                        }
+                        last if $sync->{quit};
+                }
+                last if $sync->{quit};
+        }
+}
+
+sub _reindex_check_stale ($$$) {
+        my ($self, $sync, $ibx) = @_;
+
+        # now, check if there's stale xrefs
+        my $get_xnum = $self->{oidx}->dbh->prepare(<<'');
+SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? ORDER BY docid ASC
+
+        $get_xnum->execute($ibx->{-ibx_id});
+        my $del_xref3 = $self->{oidx}->dbh->prepare(<<'');
+DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
+
+        while (my ($docid, $xnum, $oidbin) = $get_xnum->fetchrow_array) {
+                last if $sync->{quit};
+                my $smsg = $ibx->over->get_art($xnum);
+                my $oidhex = unpack('H*', $oidbin);
+                my $err;
+                if (!$smsg) {
+                        $err = 'stale';
+                } elsif ($smsg->{blob} ne $oidhex) {
+                        $err = "mismatch (!= $smsg->{blob})";
+                } else {
+                        next; # likely, all good
+                }
+                warn $ibx->eidx_key . ":$xnum:$oidhex (#$docid): $err\n";
+                $del_xref3->bind_param(1, $ibx->{-ibx_id});
+                $del_xref3->bind_param(2, $xnum);
+                $del_xref3->bind_param(3, $oidbin, SQL_BLOB);
+                $del_xref3->execute;
+
+                # get_xref3 over-fetches, but this is a rare path:
+                my $xr3 = $self->{oidx}->get_xref3($docid);
+                my $idx = $self->idx_shard($docid);
+                if (scalar(@$xr3) == 0) { # all gone
+                        $self->{oidx}->eidxq_del($docid);
+                        $idx->shard_remove($docid);
+                } else { # enqueue for reindex of remaining messages
+                        $idx->shard_remove_eidx_info($docid, $ibx->eidx_key);
+                        $self->{oidx}->eidxq_add($docid); # yes, add
+                }
+        }
+}
+
+sub _reindex_inbox ($$$) {
+        my ($self, $sync, $ibx) = @_;
+        _reindex_check_unseen($self, $sync, $ibx);
+        _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+        delete @$ibx{qw(over mm search git)}; # won't need these for a bit
+}
+
+sub eidx_reindex {
+        my ($self, $sync) = @_;
+
+        for my $ibx (@{$self->{ibx_list}}) {
+                _reindex_inbox($self, $sync, $ibx);
+                last if $sync->{quit};
+        }
+        $self->git->async_wait_all; # ensure eidxq gets filled completely
+        eidxq_process($self, $sync) unless $sync->{quit};
+}
+
 sub eidx_sync { # main entry point
         my ($self, $opt) = @_;
-        $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
-        $self->{oidx}->rethread_prepare($opt);
 
         my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
         local $self->{current_info} = '';
         local $SIG{__WARN__} = sub {
                 $warn_cb->($self->{current_info}, ': ', @_);
         };
+        $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
+        $self->{oidx}->rethread_prepare($opt);
         my $sync = {
                 need_checkpoint => \(my $need_checkpoint = 0),
-                reindex => $opt->{reindex},
                 -opt => $opt,
+                # DO NOT SET {reindex} here, it's incompatible with reused
+                # V2Writable code, reindex is totally different here
+                # compared to v1/v2 inboxes because we have multiple histories
                 self => $self,
                 -regen_fmt => "%u/?\n",
         };
@@ -415,6 +584,10 @@ sub eidx_sync { # main entry point
         local $SIG{QUIT} = $quit;
         local $SIG{INT} = $quit;
         local $SIG{TERM} = $quit;
+        for my $ibx (@{$self->{ibx_list}}) {
+                $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
+        }
+        eidx_reindex($self, $sync) if delete($opt->{reindex});
 
         # don't use $_ here, it'll get clobbered by reindex_checkpoint
         for my $ibx (@{$self->{ibx_list}}) {
@@ -422,6 +595,7 @@ sub eidx_sync { # main entry point
                 _sync_inbox($self, $sync, $ibx);
         }
         $self->{oidx}->rethread_done($opt) unless $sync->{quit};
+        eidxq_process($self, $sync) unless $sync->{quit};
 
         PublicInbox::V2Writable::done($self);
 }
@@ -522,5 +696,6 @@ no warnings 'once';
 *count_shards = \&PublicInbox::V2Writable::count_shards;
 *atfork_child = \&PublicInbox::V2Writable::atfork_child;
 *idx_shard = \&PublicInbox::V2Writable::idx_shard;
+*reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint;
 
 1;
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index 38552247..4a39bf53 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -563,6 +563,15 @@ CREATE TABLE IF NOT EXISTS eidx_meta (
         val VARCHAR(255) NOT NULL
 )
 
+                # A queue of current docids which need reindexing.
+                # eidxq persists across aborted -extindex invocations
+                # Currently used for "-extindex --reindex" for Xapian
+                # data, but may be used in more places down the line.
+                $dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS eidxq (
+        docid INTEGER PRIMARY KEY NOT NULL
+)
+
                 $dbh;
         };
 }
@@ -661,4 +670,18 @@ UPDATE over SET ddd = ? WHERE num = ?
         $sth->execute;
 }
 
+sub eidxq_add {
+        my ($self, $docid) = @_;
+        $self->dbh->prepare_cached(<<'')->execute($docid);
+INSERT OR IGNORE INTO eidxq (docid) VALUES (?)
+
+}
+
+sub eidxq_del {
+        my ($self, $docid) = @_;
+        $self->dbh->prepare_cached(<<'')->execute($docid);
+DELETE FROM eidxq WHERE docid = ?
+
+}
+
 1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 0fbe6560..cd8f4dd7 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -352,8 +352,9 @@ sub index_ids ($$$$) {
         index_list_id($self, $doc, $hdr);
 }
 
-sub add_xapian ($$$$) {
+sub eml2doc ($$$;$) {
         my ($self, $eml, $smsg, $mids) = @_;
+        $mids //= mids_for_index($eml);
         my $doc = $X->{Document}->new;
         add_val($doc, PublicInbox::Search::TS(), $smsg->{ts});
         my @ds = gmtime($smsg->{ds});
@@ -396,6 +397,12 @@ sub add_xapian ($$$$) {
                         }
                 }
         }
+        $doc;
+}
+
+sub add_xapian ($$$$) {
+        my ($self, $eml, $smsg, $mids) = @_;
+        my $doc = eml2doc($self, $eml, $smsg, $mids);
         $self->{xdb}->replace_document($smsg->{num}, $doc);
 }
 
@@ -941,6 +948,10 @@ sub set_metadata_once {
 
 sub _commit_txn {
         my ($self) = @_;
+        if (my $eidx = $self->{eidx}) {
+                $eidx->git->async_wait_all;
+                $eidx->{transact_bytes} = 0;
+        }
         if (my $xdb = $self->{xdb}) {
                 set_metadata_once($self);
                 $xdb->commit_transaction;
@@ -997,4 +1008,68 @@ SELECT COUNT(*) FROM over WHERE num = ?
         }
 }
 
+sub reindex_xap { # git->cat_async callback
+        my ($bref, $oid, $type, $size, $ary) = @_;
+        my ($ibx_id, $oidhex, $req, $more) = @$ary;
+        my $self = $req->{self} // die 'BUG: {self} missing';
+        my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
+        my $eidx_key = $self->{-eidx_key_for}->{$ibx_id} //
+                        die "BUG: bad ibx_id=$ibx_id ($oid)";
+
+        my $docid = $req->{docid};
+        local $eidx->{current_info} = "#$docid $oid";
+        return if is_bad_blob($oid, $type, $size, $oidhex);
+        if (my $doc = $req->{doc}) { # modify existing doc
+                $req->{tg_isset} //= do { # for existing documents in {xdb}
+                        term_generator($self)->set_document($doc);
+                        1;
+                };
+                $doc->add_boolean_term('O'.$eidx_key);
+                index_list_id($self, $doc, PublicInbox::Eml->new($bref));
+        } else { # first time seeing this doc
+                my $smsg = $self->{eidx}->over->get_art($docid) //
+                        die "BUG: #$docid ($oid) not in over";
+                $smsg->{bytes} = $size + crlf_adjust($$bref);
+                $smsg->{eidx_key} = $eidx_key;
+                my $eml = PublicInbox::Eml->new($bref);
+                $req->{doc} = eml2doc($self, $eml, $smsg);
+                $req->{tg_isset} = 1; # eml2doc calls $tg->set_document
+        }
+        return if $more;
+        my $doc = delete($req->{doc}) or return; # all bad blobs!
+        $eidx->{transact_bytes} += $size;
+        $self->{xdb}->replace_document($req->{docid}, $doc);
+}
+
+sub reindex_docid {
+        my ($self, $docid) = @_;
+        my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
+        my $eidx_key_for = $self->{-eidx_key_for} //= do {
+                my %eidx_key_for = map {
+                        $_->[0] => $_->[1];
+                } @{$eidx->over->dbh->selectall_arrayref(<<'')};
+SELECT ibx_id,eidx_key FROM inboxes
+
+                \%eidx_key_for;
+        };
+
+        begin_txn_lazy($self);
+        my $doc = eval { $self->{xdb}->get_document($docid) };
+        my $req = { doc => $doc, self => $self, docid => $docid };
+        my $sth = $eidx->over->dbh->prepare_cached(<<'', undef, 1);
+SELECT ibx_id,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id ASC
+
+        $sth->execute($docid);
+        my $rows = $sth->fetchall_arrayref;
+        while (my $row = shift(@$rows)) {
+                my ($ibx_id, $oidbin) = @$row;
+                my $oidhex = unpack('H*', $oidbin);
+                $eidx->git->cat_async($oidhex, \&reindex_xap,
+                                [ $ibx_id, $oidhex, $req, scalar(@$rows) ]);
+        }
+        if ($eidx->{transact_bytes} >= $eidx->{batch_bytes}) {
+                commit_txn_lazy($self);
+        }
+}
+
 1;
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 579ed196..b6eef6bd 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -94,6 +94,8 @@ sub shard_worker_loop ($$$$$) {
                         my $over_fn = $1;
                         $over_fn =~ tr/\0/\n/;
                         $self->over_check(PublicInbox::Over->new($over_fn));
+                } elsif ($line =~ /\AE ([0-9]+)\n/) {
+                        $self->reindex_docid($1 + 0);
                 } else {
                         chomp $line;
                         my $eidx_key;
@@ -223,4 +225,13 @@ sub shard_over_check {
         }
 }
 
+sub shard_reindex_docid {
+        my ($self, $docid) = @_;
+        if (my $w = $self->{w}) {
+                print $w "E $docid\n" or die "failed to write to shard: $!";
+        } else {
+                $self->reindex_docid($docid);
+        }
+}
+
 1;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index bef3a67a..572eb418 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -275,13 +275,17 @@ sub _idx_init { # with_umask callback
         $self->{shards} = $nshards if $nshards && $nshards != $self->{shards};
         $self->{batch_bytes} = $opt->{batch_size} //
                                 $PublicInbox::SearchIdx::BATCH_BYTES;
-        $self->{batch_bytes} *= $self->{shards} if $self->{parallel};
 
         # need to create all shards before initializing msgmap FD
         # idx_shards must be visible to all forked processes
         my $max = $self->{shards} - 1;
         my $idx = $self->{idx_shards} = [];
         push @$idx, PublicInbox::SearchIdxShard->new($self, $_) for (0..$max);
+
+        # SearchIdxShard may do their own flushing, so don't scale
+        # until after forking
+        $self->{batch_bytes} *= $self->{shards} if $self->{parallel};
+
         my $ibx = $self->{ibx} or return; # ExtIdxSearch
 
         # Now that all subprocesses are up, we can open the FDs