diff options
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 147 | ||||
-rw-r--r-- | lib/PublicInbox/V2Writable.pm | 3 |
2 files changed, 100 insertions, 50 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index d5295735..b5024823 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -29,6 +29,7 @@ use PublicInbox::InboxWritable; use PublicInbox::ContentHash qw(content_hash); use PublicInbox::Eml; use File::Spec; +use PublicInbox::DS qw(now); use DBI qw(:sql_types); # SQL_BLOB sub new { @@ -518,6 +519,11 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req); } +sub checkpoint_due ($) { + my ($sync) = @_; + ${$sync->{need_checkpoint}} || (now() > $sync->{next_check}); +} + sub eidxq_process ($$) { # for reindexing my ($self, $sync) = @_; @@ -531,13 +537,16 @@ sub eidxq_process ($$) { # for reindexing my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq'); $pr->("Xapian indexing $min..$max (total=$tot)\n"); } - my %id2pos; - my $pos = 0; - $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}}; - $sync->{id2pos} = \%id2pos; - - my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?'); - my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC'); + $sync->{id2pos} //= do { + my %id2pos; + my $pos = 0; + $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}}; + \%id2pos; + }; + my ($del, $iter); +restart: + $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?'); + $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC'); $iter->execute; while (defined(my $docid = $iter->fetchrow_array)) { last if $sync->{quit}; @@ -549,8 +558,12 @@ sub eidxq_process ($$) { # for reindexing $del->execute($docid); ++${$sync->{nr}}; - # this is only for SIGUSR1, shards do their own accounting: - reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}}; + if (checkpoint_due($sync)) { + $dbh = $del = $iter = undef; + reindex_checkpoint($self, $sync); # release lock + $dbh = $self->{oidx}->dbh; + goto restart; + } } $self->git->async_wait_all; $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr; @@ -589,16 +602,28 @@ sub reindex_unseen ($$$$) { sub _reindex_check_unseen ($$$) { my ($self, $sync, $ibx) = @_; my $ibx_id = $ibx->{-ibx_id}; - my ($beg, $end) = (1, 1000); + my $slice = 1000; + my ($beg, $end) = (1, $slice); # first, check if we missed any messages in target $ibx - my $inx3 = $self->{oidx}->dbh->prepare(<<''); -SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ? - my $msgs; + my $pr = $sync->{-opt}->{-progress}; + my $ekey = $ibx->eidx_key; + $sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n"; + ${$sync->{nr}} = 0; + while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) { + ${$sync->{nr}} = $beg; $beg = $msgs->[-1]->{num} + 1; - $end = $beg + 1000; + $end = $beg + $slice; + if (checkpoint_due($sync)) { + reindex_checkpoint($self, $sync); # release lock + } + + my $inx3 = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1); +SELECT DISTINCT(docid) FROM xref3 WHERE +ibx_id = ? AND xnum = ? AND oidbin = ? + for my $xsmsg (@$msgs) { my $oidbin = pack('H*', $xsmsg->{blob}); $inx3->bind_param(1, $ibx_id); @@ -623,49 +648,69 @@ SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ? sub _reindex_check_stale ($$$) { my ($self, $sync, $ibx) = @_; - - # now, check if there's stale xrefs - my $get_xnum = $self->{oidx}->dbh->prepare(<<''); -SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? ORDER BY docid ASC - - $get_xnum->execute($ibx->{-ibx_id}); - my $del_xref3 = $self->{oidx}->dbh->prepare(<<''); + my $min = 0; + my $pr = $sync->{-opt}->{-progress}; + my $fetching; + my $ekey = $ibx->eidx_key; + $sync->{-regen_fmt} = + "$ekey check stale/missing %u/".$ibx->over->max."\n"; + ${$sync->{nr}} = 0; + do { + if (checkpoint_due($sync)) { + reindex_checkpoint($self, $sync); # release lock + } + # now, check if there's stale xrefs + my $iter = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1); +SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? AND docid > ? +ORDER BY docid,xnum ASC LIMIT 10000 + + $iter->execute($ibx->{-ibx_id}, $min); + $fetching = undef; + + while (my ($docid, $xnum, $oidbin) = $iter->fetchrow_array) { + return if $sync->{quit}; + ${$sync->{nr}} = $xnum; + + $fetching = $min = $docid; + my $smsg = $ibx->over->get_art($xnum); + my $oidhex = unpack('H*', $oidbin); + my $err; + if (!$smsg) { + $err = 'stale'; + } elsif ($smsg->{blob} ne $oidhex) { + $err = "mismatch (!= $smsg->{blob})"; + } else { + next; # likely, all good + } + # current_info already has eidx_key + warn "$xnum:$oidhex (#$docid): $err\n"; + my $del = $self->{oidx}->dbh->prepare_cached(<<''); DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ? - while (my ($docid, $xnum, $oidbin) = $get_xnum->fetchrow_array) { - last if $sync->{quit}; - my $smsg = $ibx->over->get_art($xnum); - my $oidhex = unpack('H*', $oidbin); - my $err; - if (!$smsg) { - $err = 'stale'; - } elsif ($smsg->{blob} ne $oidhex) { - $err = "mismatch (!= $smsg->{blob})"; - } else { - next; # likely, all good - } - warn $ibx->eidx_key . ":$xnum:$oidhex (#$docid): $err\n"; - $del_xref3->bind_param(1, $ibx->{-ibx_id}); - $del_xref3->bind_param(2, $xnum); - $del_xref3->bind_param(3, $oidbin, SQL_BLOB); - $del_xref3->execute; - - # get_xref3 over-fetches, but this is a rare path: - my $xr3 = $self->{oidx}->get_xref3($docid); - my $idx = $self->idx_shard($docid); - if (scalar(@$xr3) == 0) { # all gone - $self->{oidx}->delete_by_num($docid); - $self->{oidx}->eidxq_del($docid); - $idx->shard_remove($docid); - } else { # enqueue for reindex of remaining messages - $idx->shard_remove_eidx_info($docid, $ibx->eidx_key); - $self->{oidx}->eidxq_add($docid); # yes, add + $del->bind_param(1, $ibx->{-ibx_id}); + $del->bind_param(2, $xnum); + $del->bind_param(3, $oidbin, SQL_BLOB); + $del->execute; + + # get_xref3 over-fetches, but this is a rare path: + my $xr3 = $self->{oidx}->get_xref3($docid); + my $idx = $self->idx_shard($docid); + if (scalar(@$xr3) == 0) { # all gone + $self->{oidx}->delete_by_num($docid); + $self->{oidx}->eidxq_del($docid); + $idx->shard_remove($docid); + } else { # enqueue for reindex of remaining messages + $idx->shard_remove_eidx_info($docid, + $ibx->eidx_key); + $self->{oidx}->eidxq_add($docid); # yes, add + } } - } + } while (defined $fetching); } sub _reindex_inbox ($$$) { my ($self, $sync, $ibx) = @_; + local $self->{current_info} = $ibx->eidx_key; _reindex_check_unseen($self, $sync, $ibx); _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit}; delete @$ibx{qw(over mm search git)}; # won't need these for a bit @@ -694,6 +739,8 @@ sub eidx_sync { # main entry point $self->{oidx}->rethread_prepare($opt); my $sync = { need_checkpoint => \(my $need_checkpoint = 0), + check_intvl => 10, + next_check => now() + 10, -opt => $opt, # DO NOT SET {reindex} here, it's incompatible with reused # V2Writable code, reindex is totally different here diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 97dbf328..992305c5 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -896,6 +896,9 @@ sub reindex_checkpoint ($$) { # allow -watch or -mda to write... $self->idx_init($sync->{-opt}); # reacquire lock + if (my $intvl = $sync->{check_intvl}) { # eidx + $sync->{next_check} = PublicInbox::DS::now() + $intvl; + } $mm_tmp->atfork_parent if $mm_tmp; } |