about summary refs log tree commit homepage
path: root/lib/PublicInbox/ExtSearchIdx.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm147
1 files changed, 97 insertions, 50 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index d5295735..b5024823 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -29,6 +29,7 @@ use PublicInbox::InboxWritable;
 use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
 use File::Spec;
+use PublicInbox::DS qw(now);
 use DBI qw(:sql_types); # SQL_BLOB
 
 sub new {
@@ -518,6 +519,11 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
         $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
 }
 
+sub checkpoint_due ($) {
+        my ($sync) = @_;
+        ${$sync->{need_checkpoint}} || (now() > $sync->{next_check});
+}
+
 sub eidxq_process ($$) { # for reindexing
         my ($self, $sync) = @_;
 
@@ -531,13 +537,16 @@ sub eidxq_process ($$) { # for reindexing
                 my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
                 $pr->("Xapian indexing $min..$max (total=$tot)\n");
         }
-        my %id2pos;
-        my $pos = 0;
-        $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
-        $sync->{id2pos} = \%id2pos;
-
-        my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
-        my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
+        $sync->{id2pos} //= do {
+                my %id2pos;
+                my $pos = 0;
+                $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
+                \%id2pos;
+        };
+        my ($del, $iter);
+restart:
+        $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
+        $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
         $iter->execute;
         while (defined(my $docid = $iter->fetchrow_array)) {
                 last if $sync->{quit};
@@ -549,8 +558,12 @@ sub eidxq_process ($$) { # for reindexing
                 $del->execute($docid);
                 ++${$sync->{nr}};
 
-                # this is only for SIGUSR1, shards do their own accounting:
-                reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
+                if (checkpoint_due($sync)) {
+                        $dbh = $del = $iter = undef;
+                        reindex_checkpoint($self, $sync); # release lock
+                        $dbh = $self->{oidx}->dbh;
+                        goto restart;
+                }
         }
         $self->git->async_wait_all;
         $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
@@ -589,16 +602,28 @@ sub reindex_unseen ($$$$) {
 sub _reindex_check_unseen ($$$) {
         my ($self, $sync, $ibx) = @_;
         my $ibx_id = $ibx->{-ibx_id};
-        my ($beg, $end) = (1, 1000);
+        my $slice = 1000;
+        my ($beg, $end) = (1, $slice);
 
         # first, check if we missed any messages in target $ibx
-        my $inx3 = $self->{oidx}->dbh->prepare(<<'');
-SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
-
         my $msgs;
+        my $pr = $sync->{-opt}->{-progress};
+        my $ekey = $ibx->eidx_key;
+        $sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n";
+        ${$sync->{nr}} = 0;
+
         while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
+                ${$sync->{nr}} = $beg;
                 $beg = $msgs->[-1]->{num} + 1;
-                $end = $beg + 1000;
+                $end = $beg + $slice;
+                if (checkpoint_due($sync)) {
+                        reindex_checkpoint($self, $sync); # release lock
+                }
+
+                my $inx3 = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
+SELECT DISTINCT(docid) FROM xref3 WHERE
+ibx_id = ? AND xnum = ? AND oidbin = ?
+
                 for my $xsmsg (@$msgs) {
                         my $oidbin = pack('H*', $xsmsg->{blob});
                         $inx3->bind_param(1, $ibx_id);
@@ -623,49 +648,69 @@ SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 
 sub _reindex_check_stale ($$$) {
         my ($self, $sync, $ibx) = @_;
-
-        # now, check if there's stale xrefs
-        my $get_xnum = $self->{oidx}->dbh->prepare(<<'');
-SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? ORDER BY docid ASC
-
-        $get_xnum->execute($ibx->{-ibx_id});
-        my $del_xref3 = $self->{oidx}->dbh->prepare(<<'');
+        my $min = 0;
+        my $pr = $sync->{-opt}->{-progress};
+        my $fetching;
+        my $ekey = $ibx->eidx_key;
+        $sync->{-regen_fmt} =
+                        "$ekey check stale/missing %u/".$ibx->over->max."\n";
+        ${$sync->{nr}} = 0;
+        do {
+                if (checkpoint_due($sync)) {
+                        reindex_checkpoint($self, $sync); # release lock
+                }
+                # now, check if there's stale xrefs
+                my $iter = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
+SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? AND docid > ?
+ORDER BY docid,xnum ASC LIMIT 10000
+
+                $iter->execute($ibx->{-ibx_id}, $min);
+                $fetching = undef;
+
+                while (my ($docid, $xnum, $oidbin) = $iter->fetchrow_array) {
+                        return if $sync->{quit};
+                        ${$sync->{nr}} = $xnum;
+
+                        $fetching = $min = $docid;
+                        my $smsg = $ibx->over->get_art($xnum);
+                        my $oidhex = unpack('H*', $oidbin);
+                        my $err;
+                        if (!$smsg) {
+                                $err = 'stale';
+                        } elsif ($smsg->{blob} ne $oidhex) {
+                                $err = "mismatch (!= $smsg->{blob})";
+                        } else {
+                                next; # likely, all good
+                        }
+                        # current_info already has eidx_key
+                        warn "$xnum:$oidhex (#$docid): $err\n";
+                        my $del = $self->{oidx}->dbh->prepare_cached(<<'');
 DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 
-        while (my ($docid, $xnum, $oidbin) = $get_xnum->fetchrow_array) {
-                last if $sync->{quit};
-                my $smsg = $ibx->over->get_art($xnum);
-                my $oidhex = unpack('H*', $oidbin);
-                my $err;
-                if (!$smsg) {
-                        $err = 'stale';
-                } elsif ($smsg->{blob} ne $oidhex) {
-                        $err = "mismatch (!= $smsg->{blob})";
-                } else {
-                        next; # likely, all good
-                }
-                warn $ibx->eidx_key . ":$xnum:$oidhex (#$docid): $err\n";
-                $del_xref3->bind_param(1, $ibx->{-ibx_id});
-                $del_xref3->bind_param(2, $xnum);
-                $del_xref3->bind_param(3, $oidbin, SQL_BLOB);
-                $del_xref3->execute;
-
-                # get_xref3 over-fetches, but this is a rare path:
-                my $xr3 = $self->{oidx}->get_xref3($docid);
-                my $idx = $self->idx_shard($docid);
-                if (scalar(@$xr3) == 0) { # all gone
-                        $self->{oidx}->delete_by_num($docid);
-                        $self->{oidx}->eidxq_del($docid);
-                        $idx->shard_remove($docid);
-                } else { # enqueue for reindex of remaining messages
-                        $idx->shard_remove_eidx_info($docid, $ibx->eidx_key);
-                        $self->{oidx}->eidxq_add($docid); # yes, add
+                        $del->bind_param(1, $ibx->{-ibx_id});
+                        $del->bind_param(2, $xnum);
+                        $del->bind_param(3, $oidbin, SQL_BLOB);
+                        $del->execute;
+
+                        # get_xref3 over-fetches, but this is a rare path:
+                        my $xr3 = $self->{oidx}->get_xref3($docid);
+                        my $idx = $self->idx_shard($docid);
+                        if (scalar(@$xr3) == 0) { # all gone
+                                $self->{oidx}->delete_by_num($docid);
+                                $self->{oidx}->eidxq_del($docid);
+                                $idx->shard_remove($docid);
+                        } else { # enqueue for reindex of remaining messages
+                                $idx->shard_remove_eidx_info($docid,
+                                                        $ibx->eidx_key);
+                                $self->{oidx}->eidxq_add($docid); # yes, add
+                        }
                 }
-        }
+        } while (defined $fetching);
 }
 
 sub _reindex_inbox ($$$) {
         my ($self, $sync, $ibx) = @_;
+        local $self->{current_info} = $ibx->eidx_key;
         _reindex_check_unseen($self, $sync, $ibx);
         _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
         delete @$ibx{qw(over mm search git)}; # won't need these for a bit
@@ -694,6 +739,8 @@ sub eidx_sync { # main entry point
         $self->{oidx}->rethread_prepare($opt);
         my $sync = {
                 need_checkpoint => \(my $need_checkpoint = 0),
+                check_intvl => 10,
+                next_check => now() + 10,
                 -opt => $opt,
                 # DO NOT SET {reindex} here, it's incompatible with reused
                 # V2Writable code, reindex is totally different here