From 7281c5c492f9d6bbd585da9f061d19819d952352 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 15 Dec 2020 02:02:16 +0000 Subject: extindex: preliminary --reindex support --reindex allows us to catch missed and stale messages due to -extindex vs -index races prior to commit 02b2fcc46f364b51 ("extsearchidx: enforce -index before -extindex"). We'll also rely on reindex to internally deal with v1/v2 inbox removals and partial-unindexing of messages which are only removed from one inbox out of many. This reindex design is completely different than how normal v1/v2 inbox reindex operates due to extindex having multiple histories to work with. Instead of scanning git history, this relies exclusively on comparing over.sqlite3 contents between the v1/v2 inboxes and the extindex. Changes to Xapian behavior also get picked up, now. Xapian indexing is handled by workers with minimal IPC to the parent process. This results in more read I/O but fewer writes when dealing with cross-posted messages. Changes to $smsg->populate and --rethread still need further work. --- lib/PublicInbox/ExtSearchIdx.pm | 195 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 185 insertions(+), 10 deletions(-) (limited to 'lib/PublicInbox/ExtSearchIdx.pm') diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 84449cb4..394a89d4 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -29,6 +29,7 @@ use PublicInbox::InboxWritable; use PublicInbox::ContentHash qw(content_hash); use PublicInbox::Eml; use File::Spec; +use DBI qw(:sql_types); # SQL_BLOB sub new { my (undef, $dir, $opt) = @_; @@ -123,9 +124,11 @@ sub do_xpost ($$) { my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key, \$rm_eidx_info); if ($nr == 0) { + $self->{oidx}->eidxq_del($docid); $idx->shard_remove($docid); } elsif ($rm_eidx_info) { $idx->shard_remove_eidx_info($docid, $eidx_key, $eml); + $self->{oidx}->eidxq_add($docid); # yes, add } } } @@ -168,7 +171,7 @@ sub do_finalize ($) { sub do_step ($) { # main iterator for adding messages to the index my ($req) = @_; - my $self = $req->{self}; + my $self = $req->{self} // die 'BUG: {self} missing'; while (1) { if (my $next_arg = $req->{next_arg}) { if (my $smsg = $self->{oidx}->next_by_mid(@$next_arg)) { @@ -311,7 +314,7 @@ sub _sync_inbox ($$$) { $ibx->git->cleanup; # done with this inbox, now } -sub unref_doc ($$$$) { +sub gc_unref_doc ($$$$) { my ($self, $ibx_id, $eidx_key, $docid) = @_; my $dbh = $self->{oidx}->dbh; @@ -326,15 +329,14 @@ SELECT oidbin FROM xref3 WHERE docid = ? AND ibx_id = ? DELETE FROM xref3 WHERE docid = ? AND ibx_id = ? my $remain = $self->{oidx}->get_xref3($docid); - my $idx = $self->idx_shard($docid); - if (@$remain) { + if (scalar(@$remain)) { + $self->{oidx}->eidxq_add($docid); # enqueue for reindex for my $oid (@oid) { warn "I: unref #$docid $eidx_key $oid\n"; - $idx->shard_remove_eidx_info($docid, $eidx_key); } } else { warn "I: remove #$docid $eidx_key @oid\n"; - $idx->shard_remove($docid); + $self->idx_shard($docid)->shard_remove($docid); } } @@ -356,7 +358,7 @@ sub eidx_gc { warn "I: deleting messages for $eidx_key...\n"; $x3_doc->execute($ibx_id); while (defined(my $docid = $x3_doc->fetchrow_array)) { - unref_doc($self, $ibx_id, $eidx_key, $docid); + gc_unref_doc($self, $ibx_id, $eidx_key, $docid); } $dbh->prepare_cached(<<'')->execute($ibx_id); DELETE FROM inboxes WHERE ibx_id = ? @@ -393,20 +395,187 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over) done($self); } +sub eidxq_process ($$) { # for reindexing + my ($self, $sync) = @_; + + $self->{oidx}->commit_lazy; # ensure shard workers can see it + $self->{oidx}->begin_lazy; + my $dbh = $self->{oidx}->dbh; + my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return; + ${$sync->{nr}} = 0; + $sync->{-regen_fmt} = "%u/$tot\n"; + my $pr = $sync->{-opt}->{-progress}; + if ($pr) { + my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq'); + my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq'); + $pr->("Xapian indexing $min..$max (total=$tot)\n"); + } + + my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?'); + my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC'); + $iter->execute; + while (defined(my $docid = $iter->fetchrow_array)) { + $self->idx_shard($docid)->shard_reindex_docid($docid); + $del->execute($docid); + last if $sync->{quit}; + my $cur = ++${$sync->{nr}}; + + # shards flush on their own, just don't queue up too many + # deletes + if (($cur % 1000) == 0) { + $self->{oidx}->commit_lazy; + $self->{oidx}->begin_lazy; + $pr->("reindexed $cur/$tot\n") if $pr; + } + # this is only for SIGUSR1, shards do their own accounting: + reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}}; + } + $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr; + $self->{oidx}->commit_lazy; + $self->{oidx}->begin_lazy; +} + +sub _reindex_unseen { # git->cat_async callback + my ($bref, $oid, $type, $size, $req) = @_; + return if is_bad_blob($oid, $type, $size, $req->{oid}); + my $self = $req->{self} // die 'BUG: {self} unset'; + local $self->{current_info} = "$self->{current_info} $oid"; + my $new_smsg = bless { blob => $oid, }, 'PublicInbox::Smsg'; + $new_smsg->{bytes} = $size + crlf_adjust($$bref); + my $eml = $req->{eml} = PublicInbox::Eml->new($bref); + $req->{new_smsg} = $new_smsg; + $req->{chash} = content_hash($eml); + $req->{mids} = mids($eml); # do_step iterates through this + do_step($req); # enter the normal indexing flow +} + +# --reindex may catch totally unseen messages, this handles them +sub reindex_unseen ($$$$) { + my ($self, $sync, $ibx, $xsmsg) = @_; + my $req = { + %$sync, # has {self} + autime => $xsmsg->{ds}, + cotime => $xsmsg->{ts}, + oid => $xsmsg->{blob}, + ibx => $ibx, + xnum => $xsmsg->{num}, + # {mids} and {chash} will be filled in at _reindex_unseen + }; + warn "I: reindex_unseen ${\$ibx->eidx_key}:$req->{xnum}:$req->{oid}\n"; + $self->git->cat_async($xsmsg->{blob}, \&_reindex_unseen, $req); +} + +sub _reindex_check_unseen ($$$) { + my ($self, $sync, $ibx) = @_; + my $ibx_id = $ibx->{-ibx_id}; + my ($beg, $end) = (1, 1000); + + # first, check if we missed any messages in target $ibx + my $inx3 = $self->{oidx}->dbh->prepare(<<''); +SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ? + + my $msgs; + while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) { + $beg = $msgs->[-1]->{num} + 1; + $end = $beg + 1000; + for my $xsmsg (@$msgs) { + my $oidbin = pack('H*', $xsmsg->{blob}); + $inx3->bind_param(1, $ibx_id); + $inx3->bind_param(2, $xsmsg->{num}); + $inx3->bind_param(3, $oidbin, SQL_BLOB); + $inx3->execute; + my $docids = $inx3->fetchall_arrayref; + # index messages which were totally missed + # the first time around ASAP: + if (scalar(@$docids) == 0) { + reindex_unseen($self, $sync, $ibx, $xsmsg); + } else { # already seen, reindex later + for my $r (@$docids) { + $self->{oidx}->eidxq_add($r->[0]); + } + } + last if $sync->{quit}; + } + last if $sync->{quit}; + } +} + +sub _reindex_check_stale ($$$) { + my ($self, $sync, $ibx) = @_; + + # now, check if there's stale xrefs + my $get_xnum = $self->{oidx}->dbh->prepare(<<''); +SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? ORDER BY docid ASC + + $get_xnum->execute($ibx->{-ibx_id}); + my $del_xref3 = $self->{oidx}->dbh->prepare(<<''); +DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ? + + while (my ($docid, $xnum, $oidbin) = $get_xnum->fetchrow_array) { + last if $sync->{quit}; + my $smsg = $ibx->over->get_art($xnum); + my $oidhex = unpack('H*', $oidbin); + my $err; + if (!$smsg) { + $err = 'stale'; + } elsif ($smsg->{blob} ne $oidhex) { + $err = "mismatch (!= $smsg->{blob})"; + } else { + next; # likely, all good + } + warn $ibx->eidx_key . ":$xnum:$oidhex (#$docid): $err\n"; + $del_xref3->bind_param(1, $ibx->{-ibx_id}); + $del_xref3->bind_param(2, $xnum); + $del_xref3->bind_param(3, $oidbin, SQL_BLOB); + $del_xref3->execute; + + # get_xref3 over-fetches, but this is a rare path: + my $xr3 = $self->{oidx}->get_xref3($docid); + my $idx = $self->idx_shard($docid); + if (scalar(@$xr3) == 0) { # all gone + $self->{oidx}->eidxq_del($docid); + $idx->shard_remove($docid); + } else { # enqueue for reindex of remaining messages + $idx->shard_remove_eidx_info($docid, $ibx->eidx_key); + $self->{oidx}->eidxq_add($docid); # yes, add + } + } +} + +sub _reindex_inbox ($$$) { + my ($self, $sync, $ibx) = @_; + _reindex_check_unseen($self, $sync, $ibx); + _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit}; + delete @$ibx{qw(over mm search git)}; # won't need these for a bit +} + +sub eidx_reindex { + my ($self, $sync) = @_; + + for my $ibx (@{$self->{ibx_list}}) { + _reindex_inbox($self, $sync, $ibx); + last if $sync->{quit}; + } + $self->git->async_wait_all; # ensure eidxq gets filled completely + eidxq_process($self, $sync) unless $sync->{quit}; +} + sub eidx_sync { # main entry point my ($self, $opt) = @_; - $self->idx_init($opt); # acquire lock via V2Writable::_idx_init - $self->{oidx}->rethread_prepare($opt); my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ }; local $self->{current_info} = ''; local $SIG{__WARN__} = sub { $warn_cb->($self->{current_info}, ': ', @_); }; + $self->idx_init($opt); # acquire lock via V2Writable::_idx_init + $self->{oidx}->rethread_prepare($opt); my $sync = { need_checkpoint => \(my $need_checkpoint = 0), - reindex => $opt->{reindex}, -opt => $opt, + # DO NOT SET {reindex} here, it's incompatible with reused + # V2Writable code, reindex is totally different here + # compared to v1/v2 inboxes because we have multiple histories self => $self, -regen_fmt => "%u/?\n", }; @@ -415,6 +584,10 @@ sub eidx_sync { # main entry point local $SIG{QUIT} = $quit; local $SIG{INT} = $quit; local $SIG{TERM} = $quit; + for my $ibx (@{$self->{ibx_list}}) { + $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key); + } + eidx_reindex($self, $sync) if delete($opt->{reindex}); # don't use $_ here, it'll get clobbered by reindex_checkpoint for my $ibx (@{$self->{ibx_list}}) { @@ -422,6 +595,7 @@ sub eidx_sync { # main entry point _sync_inbox($self, $sync, $ibx); } $self->{oidx}->rethread_done($opt) unless $sync->{quit}; + eidxq_process($self, $sync) unless $sync->{quit}; PublicInbox::V2Writable::done($self); } @@ -522,5 +696,6 @@ no warnings 'once'; *count_shards = \&PublicInbox::V2Writable::count_shards; *atfork_child = \&PublicInbox::V2Writable::atfork_child; *idx_shard = \&PublicInbox::V2Writable::idx_shard; +*reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint; 1; -- cgit v1.2.3-24-ge0c7