diff options
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 195 |
1 files changed, 185 insertions, 10 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 84449cb4..394a89d4 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -29,6 +29,7 @@ use PublicInbox::InboxWritable; use PublicInbox::ContentHash qw(content_hash); use PublicInbox::Eml; use File::Spec; +use DBI qw(:sql_types); # SQL_BLOB sub new { my (undef, $dir, $opt) = @_; @@ -123,9 +124,11 @@ sub do_xpost ($$) { my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key, \$rm_eidx_info); if ($nr == 0) { + $self->{oidx}->eidxq_del($docid); $idx->shard_remove($docid); } elsif ($rm_eidx_info) { $idx->shard_remove_eidx_info($docid, $eidx_key, $eml); + $self->{oidx}->eidxq_add($docid); # yes, add } } } @@ -168,7 +171,7 @@ sub do_finalize ($) { sub do_step ($) { # main iterator for adding messages to the index my ($req) = @_; - my $self = $req->{self}; + my $self = $req->{self} // die 'BUG: {self} missing'; while (1) { if (my $next_arg = $req->{next_arg}) { if (my $smsg = $self->{oidx}->next_by_mid(@$next_arg)) { @@ -311,7 +314,7 @@ sub _sync_inbox ($$$) { $ibx->git->cleanup; # done with this inbox, now } -sub unref_doc ($$$$) { +sub gc_unref_doc ($$$$) { my ($self, $ibx_id, $eidx_key, $docid) = @_; my $dbh = $self->{oidx}->dbh; @@ -326,15 +329,14 @@ SELECT oidbin FROM xref3 WHERE docid = ? AND ibx_id = ? DELETE FROM xref3 WHERE docid = ? AND ibx_id = ? my $remain = $self->{oidx}->get_xref3($docid); - my $idx = $self->idx_shard($docid); - if (@$remain) { + if (scalar(@$remain)) { + $self->{oidx}->eidxq_add($docid); # enqueue for reindex for my $oid (@oid) { warn "I: unref #$docid $eidx_key $oid\n"; - $idx->shard_remove_eidx_info($docid, $eidx_key); } } else { warn "I: remove #$docid $eidx_key @oid\n"; - $idx->shard_remove($docid); + $self->idx_shard($docid)->shard_remove($docid); } } @@ -356,7 +358,7 @@ sub eidx_gc { warn "I: deleting messages for $eidx_key...\n"; $x3_doc->execute($ibx_id); while (defined(my $docid = $x3_doc->fetchrow_array)) { - unref_doc($self, $ibx_id, $eidx_key, $docid); + gc_unref_doc($self, $ibx_id, $eidx_key, $docid); } $dbh->prepare_cached(<<'')->execute($ibx_id); DELETE FROM inboxes WHERE ibx_id = ? @@ -393,20 +395,187 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over) done($self); } +sub eidxq_process ($$) { # for reindexing + my ($self, $sync) = @_; + + $self->{oidx}->commit_lazy; # ensure shard workers can see it + $self->{oidx}->begin_lazy; + my $dbh = $self->{oidx}->dbh; + my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return; + ${$sync->{nr}} = 0; + $sync->{-regen_fmt} = "%u/$tot\n"; + my $pr = $sync->{-opt}->{-progress}; + if ($pr) { + my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq'); + my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq'); + $pr->("Xapian indexing $min..$max (total=$tot)\n"); + } + + my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?'); + my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC'); + $iter->execute; + while (defined(my $docid = $iter->fetchrow_array)) { + $self->idx_shard($docid)->shard_reindex_docid($docid); + $del->execute($docid); + last if $sync->{quit}; + my $cur = ++${$sync->{nr}}; + + # shards flush on their own, just don't queue up too many + # deletes + if (($cur % 1000) == 0) { + $self->{oidx}->commit_lazy; + $self->{oidx}->begin_lazy; + $pr->("reindexed $cur/$tot\n") if $pr; + } + # this is only for SIGUSR1, shards do their own accounting: + reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}}; + } + $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr; + $self->{oidx}->commit_lazy; + $self->{oidx}->begin_lazy; +} + +sub _reindex_unseen { # git->cat_async callback + my ($bref, $oid, $type, $size, $req) = @_; + return if is_bad_blob($oid, $type, $size, $req->{oid}); + my $self = $req->{self} // die 'BUG: {self} unset'; + local $self->{current_info} = "$self->{current_info} $oid"; + my $new_smsg = bless { blob => $oid, }, 'PublicInbox::Smsg'; + $new_smsg->{bytes} = $size + crlf_adjust($$bref); + my $eml = $req->{eml} = PublicInbox::Eml->new($bref); + $req->{new_smsg} = $new_smsg; + $req->{chash} = content_hash($eml); + $req->{mids} = mids($eml); # do_step iterates through this + do_step($req); # enter the normal indexing flow +} + +# --reindex may catch totally unseen messages, this handles them +sub reindex_unseen ($$$$) { + my ($self, $sync, $ibx, $xsmsg) = @_; + my $req = { + %$sync, # has {self} + autime => $xsmsg->{ds}, + cotime => $xsmsg->{ts}, + oid => $xsmsg->{blob}, + ibx => $ibx, + xnum => $xsmsg->{num}, + # {mids} and {chash} will be filled in at _reindex_unseen + }; + warn "I: reindex_unseen ${\$ibx->eidx_key}:$req->{xnum}:$req->{oid}\n"; + $self->git->cat_async($xsmsg->{blob}, \&_reindex_unseen, $req); +} + +sub _reindex_check_unseen ($$$) { + my ($self, $sync, $ibx) = @_; + my $ibx_id = $ibx->{-ibx_id}; + my ($beg, $end) = (1, 1000); + + # first, check if we missed any messages in target $ibx + my $inx3 = $self->{oidx}->dbh->prepare(<<''); +SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ? + + my $msgs; + while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) { + $beg = $msgs->[-1]->{num} + 1; + $end = $beg + 1000; + for my $xsmsg (@$msgs) { + my $oidbin = pack('H*', $xsmsg->{blob}); + $inx3->bind_param(1, $ibx_id); + $inx3->bind_param(2, $xsmsg->{num}); + $inx3->bind_param(3, $oidbin, SQL_BLOB); + $inx3->execute; + my $docids = $inx3->fetchall_arrayref; + # index messages which were totally missed + # the first time around ASAP: + if (scalar(@$docids) == 0) { + reindex_unseen($self, $sync, $ibx, $xsmsg); + } else { # already seen, reindex later + for my $r (@$docids) { + $self->{oidx}->eidxq_add($r->[0]); + } + } + last if $sync->{quit}; + } + last if $sync->{quit}; + } +} + +sub _reindex_check_stale ($$$) { + my ($self, $sync, $ibx) = @_; + + # now, check if there's stale xrefs + my $get_xnum = $self->{oidx}->dbh->prepare(<<''); +SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? ORDER BY docid ASC + + $get_xnum->execute($ibx->{-ibx_id}); + my $del_xref3 = $self->{oidx}->dbh->prepare(<<''); +DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ? + + while (my ($docid, $xnum, $oidbin) = $get_xnum->fetchrow_array) { + last if $sync->{quit}; + my $smsg = $ibx->over->get_art($xnum); + my $oidhex = unpack('H*', $oidbin); + my $err; + if (!$smsg) { + $err = 'stale'; + } elsif ($smsg->{blob} ne $oidhex) { + $err = "mismatch (!= $smsg->{blob})"; + } else { + next; # likely, all good + } + warn $ibx->eidx_key . ":$xnum:$oidhex (#$docid): $err\n"; + $del_xref3->bind_param(1, $ibx->{-ibx_id}); + $del_xref3->bind_param(2, $xnum); + $del_xref3->bind_param(3, $oidbin, SQL_BLOB); + $del_xref3->execute; + + # get_xref3 over-fetches, but this is a rare path: + my $xr3 = $self->{oidx}->get_xref3($docid); + my $idx = $self->idx_shard($docid); + if (scalar(@$xr3) == 0) { # all gone + $self->{oidx}->eidxq_del($docid); + $idx->shard_remove($docid); + } else { # enqueue for reindex of remaining messages + $idx->shard_remove_eidx_info($docid, $ibx->eidx_key); + $self->{oidx}->eidxq_add($docid); # yes, add + } + } +} + +sub _reindex_inbox ($$$) { + my ($self, $sync, $ibx) = @_; + _reindex_check_unseen($self, $sync, $ibx); + _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit}; + delete @$ibx{qw(over mm search git)}; # won't need these for a bit +} + +sub eidx_reindex { + my ($self, $sync) = @_; + + for my $ibx (@{$self->{ibx_list}}) { + _reindex_inbox($self, $sync, $ibx); + last if $sync->{quit}; + } + $self->git->async_wait_all; # ensure eidxq gets filled completely + eidxq_process($self, $sync) unless $sync->{quit}; +} + sub eidx_sync { # main entry point my ($self, $opt) = @_; - $self->idx_init($opt); # acquire lock via V2Writable::_idx_init - $self->{oidx}->rethread_prepare($opt); my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ }; local $self->{current_info} = ''; local $SIG{__WARN__} = sub { $warn_cb->($self->{current_info}, ': ', @_); }; + $self->idx_init($opt); # acquire lock via V2Writable::_idx_init + $self->{oidx}->rethread_prepare($opt); my $sync = { need_checkpoint => \(my $need_checkpoint = 0), - reindex => $opt->{reindex}, -opt => $opt, + # DO NOT SET {reindex} here, it's incompatible with reused + # V2Writable code, reindex is totally different here + # compared to v1/v2 inboxes because we have multiple histories self => $self, -regen_fmt => "%u/?\n", }; @@ -415,6 +584,10 @@ sub eidx_sync { # main entry point local $SIG{QUIT} = $quit; local $SIG{INT} = $quit; local $SIG{TERM} = $quit; + for my $ibx (@{$self->{ibx_list}}) { + $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key); + } + eidx_reindex($self, $sync) if delete($opt->{reindex}); # don't use $_ here, it'll get clobbered by reindex_checkpoint for my $ibx (@{$self->{ibx_list}}) { @@ -422,6 +595,7 @@ sub eidx_sync { # main entry point _sync_inbox($self, $sync, $ibx); } $self->{oidx}->rethread_done($opt) unless $sync->{quit}; + eidxq_process($self, $sync) unless $sync->{quit}; PublicInbox::V2Writable::done($self); } @@ -522,5 +696,6 @@ no warnings 'once'; *count_shards = \&PublicInbox::V2Writable::count_shards; *atfork_child = \&PublicInbox::V2Writable::atfork_child; *idx_shard = \&PublicInbox::V2Writable::idx_shard; +*reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint; 1; |