about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-12-15 02:02:19 +0000
committerEric Wong <e@80x24.org>2020-12-17 19:13:10 +0000
commit7cc8cb8a8554e1eae817ce2f21dd28b413d3bafc (patch)
tree859eb7a59e00d60982f52e336f0477487e8e6094 /lib
parent4c434e62a115c1158a0dc31468f109616a4ed088 (diff)
downloadpublic-inbox-7cc8cb8a8554e1eae817ce2f21dd28b413d3bafc.tar.gz
--rethread is useful for dealing with bugs and behaves
just like it does with current inboxes.

This is in case our content deduplication logic changes for
whatever reason and causes previously merged messages to be
considered "different".  As with v2, this won't allow us to
merge messages in a way that allows deduplicating messages which
were previously considered different, but v2 inboxes do not
allow that, either.

In other words, this makes the --reindex and --rethread
switches of -extindex match the behavior of v2 -index.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm117
-rw-r--r--lib/PublicInbox/Over.pm3
2 files changed, 115 insertions, 5 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index ec86a7c0..c77fb197 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -395,6 +395,106 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
         done($self);
 }
 
+sub _reindex_finalize ($$$) {
+        my ($req, $smsg, $eml) = @_;
+        my $sync = $req->{sync};
+        my $self = $sync->{self};
+        my $by_chash = $req->{by_chash};
+        my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
+        my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
+        my $docid = $smsg->{num} = $orig_smsg->{num};
+        $self->{oidx}->add_overview($eml, $smsg); # may rethread
+        return if $nr == 1; # likely, all good
+
+        warn "W: #$docid split into $nr due to deduplication change\n";
+        my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
+        delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
+        for my $ary (values %$by_chash) {
+                for my $x (reverse @$ary) {
+                        my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
+                        die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
+                }
+                my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty";
+                $x->{num} = delete($x->{xnum}) // die '{xnum} unset';
+                my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset';
+                my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+                my $ibx = $self->{ibx_list}->[$pos] //
+                        die "BUG: ibx for $x->{blob} not mapped";
+                my $e = $ibx->over->get_art($x->{num});
+                $e->{blob} eq $x->{blob} or die <<EOF;
+$x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
+EOF
+                reindex_unseen($self, $sync, $ibx, $e);
+        }
+}
+
+sub _reindex_oid { # git->cat_async callback
+        my ($bref, $oid, $type, $size, $req) = @_;
+        my $sync = $req->{sync};
+        my $self = $sync->{self};
+        my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
+        my $expect_oid = $req->{xr3}->[$req->{ix}]->[2];
+        my $docid = $orig_smsg->{num};
+        if (is_bad_blob($oid, $type, $size, $expect_oid)) {
+                my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
+                if ($remain == 0) {
+                        warn "W: #$docid gone or corrupted\n";
+                        $self->idx_shard($docid)->shard_remove($docid);
+                } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+                        $self->git->cat_async($next_oid, \&_reindex_oid, $req);
+                } else {
+                        warn "BUG: #$docid gone (UNEXPECTED)\n";
+                        $self->idx_shard($docid)->shard_remove($docid);
+                }
+                return;
+        }
+        my $ci = $self->{current_info};
+        local $self->{current_info} = "$ci #$docid $oid";
+        my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
+        $re_smsg->{bytes} = $size + crlf_adjust($$bref);
+        my $eml = PublicInbox::Eml->new($bref);
+        $re_smsg->populate($eml, { autime => $orig_smsg->{ds},
+                                cotime => $orig_smsg->{ts} });
+        my $chash = content_hash($eml);
+        $re_smsg->{chash} = $chash;
+        $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1];
+        $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0];
+        push @{$req->{by_chash}->{$chash}}, $re_smsg;
+        if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+                $self->git->cat_async($next_oid, \&_reindex_oid, $req);
+        } else { # last $re_smsg is the highest priority xref3
+                local $self->{current_info} = "$ci #$docid";
+                _reindex_finalize($req, $re_smsg, $eml);
+        }
+}
+
+sub _reindex_smsg ($$$) {
+        my ($self, $sync, $smsg) = @_;
+        my $docid = $smsg->{num};
+        my $xr3 = $self->{oidx}->get_xref3($docid, 1);
+        if (scalar(@$xr3) == 0) { # _reindex_check_stale should've covered this
+                warn <<"";
+BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
+
+                $self->{oidx}->delete_by_num($docid);
+                $self->idx_shard($docid)->shard_remove($docid);
+                return;
+        }
+
+        # we sort {xr3} in the reverse order of {ibx_list} so we can
+        # hit the common case in _reindex_finalize without rereading
+        # from git (or holding multiple messages in memory).
+        my $id2pos = $sync->{id2pos}; # index in {ibx_list}
+        @$xr3 = sort {
+                $id2pos->{$b->[0]} <=> $id2pos->{$a->[0]}
+                                ||
+                $b->[1] <=> $a->[1] # break ties with {xnum}
+        } @$xr3;
+        @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
+        my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 };
+        $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
+}
+
 sub eidxq_process ($$) { # for reindexing
         my ($self, $sync) = @_;
 
@@ -410,19 +510,29 @@ sub eidxq_process ($$) { # for reindexing
                 my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
                 $pr->("Xapian indexing $min..$max (total=$tot)\n");
         }
+        my %id2pos;
+        my $pos = 0;
+        $id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
+        $sync->{id2pos} = \%id2pos;
 
         my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
         my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
         $iter->execute;
         while (defined(my $docid = $iter->fetchrow_array)) {
-                $self->idx_shard($docid)->shard_reindex_docid($docid);
-                $del->execute($docid);
                 last if $sync->{quit};
+                if (my $smsg = $self->{oidx}->get_art($docid)) {
+                        _reindex_smsg($self, $sync, $smsg);
+                } else {
+                        warn "E: #$docid does not exist in over\n";
+                }
+                $del->execute($docid);
+
                 my $cur = ++${$sync->{nr}};
 
                 # shards flush on their own, just don't queue up too many
                 # deletes
                 if (($cur % 1000) == 0) {
+                        $self->git->async_wait_all;
                         $self->{oidx}->commit_lazy;
                         $self->{oidx}->begin_lazy;
                         $pr->("reindexed $cur/$tot\n") if $pr;
@@ -430,9 +540,8 @@ sub eidxq_process ($$) { # for reindexing
                 # this is only for SIGUSR1, shards do their own accounting:
                 reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
         }
+        $self->git->async_wait_all;
         $pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
-        $self->{oidx}->commit_lazy;
-        $self->{oidx}->begin_lazy;
 }
 
 sub _reindex_unseen { # git->cat_async callback
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
index 62709468..49ba180b 100644
--- a/lib/PublicInbox/Over.pm
+++ b/lib/PublicInbox/Over.pm
@@ -261,13 +261,14 @@ SELECT num,tid,ds,ts,ddd FROM over WHERE num = ? LIMIT 1
 }
 
 sub get_xref3 {
-        my ($self, $num) = @_;
+        my ($self, $num, $raw) = @_;
         my $dbh = dbh($self);
         my $sth = $dbh->prepare_cached(<<'', undef, 1);
 SELECT ibx_id,xnum,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id,xnum ASC
 
         $sth->execute($num);
         my $rows = $sth->fetchall_arrayref;
+        return $rows if $raw;
         my $eidx_key_sth = $dbh->prepare_cached(<<'', undef, 1);
 SELECT eidx_key FROM inboxes WHERE ibx_id = ?