about summary refs log tree commit homepage
path: root/lib/PublicInbox/ExtSearchIdx.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm117
1 files changed, 113 insertions, 4 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index ec86a7c0..c77fb197 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -395,6 +395,106 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
         done($self);
 }
 
+sub _reindex_finalize ($$$) {
+        my ($req, $smsg, $eml) = @_;
+        my $sync = $req->{sync};
+        my $self = $sync->{self};
+        my $by_chash = $req->{by_chash};
+        my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
+        my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
+        my $docid = $smsg->{num} = $orig_smsg->{num};
+        $self->{oidx}->add_overview($eml, $smsg); # may rethread
+        return if $nr == 1; # likely, all good
+
+        warn "W: #$docid split into $nr due to deduplication change\n";
+        my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
+        delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
+        for my $ary (values %$by_chash) {
+                for my $x (reverse @$ary) {
+                        my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
+                        die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
+                }
+                my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty";
+                $x->{num} = delete($x->{xnum}) // die '{xnum} unset';
+                my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset';
+                my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+                my $ibx = $self->{ibx_list}->[$pos] //
+                        die "BUG: ibx for $x->{blob} not mapped";
+                my $e = $ibx->over->get_art($x->{num});
+                $e->{blob} eq $x->{blob} or die <<EOF;
+$x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
+EOF
+                reindex_unseen($self, $sync, $ibx, $e);
+        }
+}
+
+sub _reindex_oid { # git->cat_async callback
+        my ($bref, $oid, $type, $size, $req) = @_;
+        my $sync = $req->{sync};
+        my $self = $sync->{self};
+        my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
+        my $expect_oid = $req->{xr3}->[$req->{ix}]->[2];
+        my $docid = $orig_smsg->{num};
+        if (is_bad_blob($oid, $type, $size, $expect_oid)) {
+                my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
+                if ($remain == 0) {
+                        warn "W: #$docid gone or corrupted\n";
+                        $self->idx_shard($docid)->shard_remove($docid);
+                } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+                        $self->git->cat_async($next_oid, \&_reindex_oid, $req);
+                } else {
+                        warn "BUG: #$docid gone (UNEXPECTED)\n";
+                        $self->idx_shard($docid)->shard_remove($docid);
+                }
+                return;
+        }
+        my $ci = $self->{current_info};
+        local $self->{current_info} = "$ci #$docid $oid";
+        my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
+        $re_smsg->{bytes} = $size + crlf_adjust($$bref);
+        my $eml = PublicInbox::Eml->new($bref);
+        $re_smsg->populate($eml, { autime => $orig_smsg->{ds},
+                                cotime => $orig_smsg->{ts} });
+        my $chash = content_hash($eml);
+        $re_smsg->{chash} = $chash;
+        $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1];
+        $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0];
+        push @{$req->{by_chash}->{$chash}}, $re_smsg;
+        if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+                $self->git->cat_async($next_oid, \&_reindex_oid, $req);
+        } else { # last $re_smsg is the highest priority xref3
+                local $self->{current_info} = "$ci #$docid";
+                _reindex_finalize($req, $re_smsg, $eml);
+        }
+}
+
+sub _reindex_smsg ($$$) {
+        my ($self, $sync, $smsg) = @_;
+        my $docid = $smsg->{num};
+        my $xr3 = $self->{oidx}->get_xref3($docid, 1);
+        if (scalar(@$xr3) == 0) { # _reindex_check_stale should've covered this
+                warn <<"";
+BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
+
+                $self->{oidx}->delete_by_num($docid);
+                $self->idx_shard($docid)->shard_remove($docid);
+                return;
+        }
+
+        # we sort {xr3} in the reverse order of {ibx_list} so we can
+        # hit the common case in _reindex_finalize without rereading
+        # from git (or holding multiple messages in memory).
+        my $id2pos = $sync->{id2pos}; # index in {ibx_list}
+        @$xr3 = sort {
+                $id2pos->{$b->[0]} <=> $id2pos->{$a->[0]}
+                                ||
+                $b->[1] <=> $a->[1] # break ties with {xnum}
+        } @$xr3;
+        @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
+        my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 };
+        $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
+}
+
 sub eidxq_process ($$) { # for reindexing
         my ($self, $sync) = @_;
 
@@ -410,19 +510,29 @@ sub eidxq_process ($$) { # for reindexing
                 my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
                 $pr->("Xapian indexing $min..$max (total=$tot)\n");
         }
+        my %id2pos;
+        my $pos = 0;
+        $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
+        $sync->{id2pos} = \%id2pos;
 
         my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
         my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
         $iter->execute;
         while (defined(my $docid = $iter->fetchrow_array)) {
-                $self->idx_shard($docid)->shard_reindex_docid($docid);
-                $del->execute($docid);
                 last if $sync->{quit};
+                if (my $smsg = $self->{oidx}->get_art($docid)) {
+                        _reindex_smsg($self, $sync, $smsg);
+                } else {
+                        warn "E: #$docid does not exist in over\n";
+                }
+                $del->execute($docid);
+
                 my $cur = ++${$sync->{nr}};
 
                 # shards flush on their own, just don't queue up too many
                 # deletes
                 if (($cur % 1000) == 0) {
+                        $self->git->async_wait_all;
                         $self->{oidx}->commit_lazy;
                         $self->{oidx}->begin_lazy;
                         $pr->("reindexed $cur/$tot\n") if $pr;
@@ -430,9 +540,8 @@ sub eidxq_process ($$) { # for reindexing
                 # this is only for SIGUSR1, shards do their own accounting:
                 reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
         }
+        $self->git->async_wait_all;
         $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
-        $self->{oidx}->commit_lazy;
-        $self->{oidx}->begin_lazy;
 }
 
 sub _reindex_unseen { # git->cat_async callback