about summary refs log tree commit homepage
path: root/lib/PublicInbox/ExtSearchIdx.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-07-06 12:42:02 +0000
committerEric Wong <e@80x24.org>2021-07-06 13:36:54 +0000
commit8ef622d408d2e4d98ad3aada8466f539c9ac61ba (patch)
tree5a66df2970f98c9cb591b615d0f94e28a7f9b32c /lib/PublicInbox/ExtSearchIdx.pm
parentf1f2464064af3840f2f1a697b638e5b769f111af (diff)
downloadpublic-inbox-8ef622d408d2e4d98ad3aada8466f539c9ac61ba.tar.gz
This is intended to fix older indices that had deduplication
bugs for matching content.  It'll also make dealing with
future changes to ContentHash easier since that's never
guaranteed stable.

It also supports --dry-run to print changes only without
making them.
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm96
1 files changed, 96 insertions, 0 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 29414e4a..495579a2 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -844,6 +844,98 @@ sub sync_inbox {
         warn $err, "\n" if defined($err);
 }
 
+sub dd_smsg { # git->cat_async callback
+        my ($bref, $oid, $type, $size, $dd) = @_;
+        my $smsg = $dd->{smsg} // die 'BUG: dd->{smsg} missing';
+        my $self = $dd->{self} // die 'BUG: {self} missing';
+        my $per_mid = $dd->{per_mid} // die 'BUG: {per_mid} missing';
+        if ($type eq 'missing') {
+                _blob_missing($dd, $smsg);
+        } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) {
+                local $self->{current_info} = "$self->{current_info} $oid";
+                my $chash = content_hash(PublicInbox::Eml->new($bref));
+                push(@{$per_mid->{dd_chash}->{$chash}}, $smsg);
+        }
+        return if $per_mid->{last_smsg} != $smsg;
+        while (my ($chash, $ary) = each %{$per_mid->{dd_chash}}) {
+                my $keep = shift @$ary;
+                next if !scalar(@$ary);
+                $per_mid->{sync}->{dedupe_cull} += scalar(@$ary);
+                print STDERR
+                        "# <$keep->{mid}> keeping #$keep->{num}, dropping ",
+                        join(', ', map { "#$_->{num}" } @$ary),"\n";
+                next if $per_mid->{sync}->{-opt}->{'dry-run'};
+                my $oidx = $self->{oidx};
+                for my $smsg (@$ary) {
+                        my $gone = $smsg->{num};
+                        $oidx->merge_xref3($keep->{num}, $gone, $smsg->{blob});
+                        $self->idx_shard($gone)->ipc_do('xdb_remove', $gone);
+                        $oidx->delete_by_num($gone);
+                }
+        }
+}
+
+sub eidx_dedupe ($$) {
+        my ($self, $sync) = @_;
+        $sync->{dedupe_cull} = 0;
+        my $candidates = 0;
+        my $nr_mid = 0;
+        return unless eidxq_lock_acquire($self);
+        my $iter;
+        my $min_id = 0;
+        local $sync->{-regen_fmt} = "dedupe %u/".$self->{oidx}->max."\n";
+dedupe_restart:
+        $iter = $self->{oidx}->dbh->prepare(<<EOS);
+SELECT DISTINCT(mid),id FROM msgid WHERE id IN
+(SELECT id FROM id2num WHERE id > ? GROUP BY num HAVING COUNT(num) > 1)
+ORDER BY id
+EOS
+        $iter->execute($min_id);
+        local $SIG{__WARN__} = sub {
+                return if PublicInbox::Eml::warn_ignore(@_);
+                warn @_;
+        };
+        while (my ($mid, $id) = $iter->fetchrow_array) {
+                last if $sync->{quit};
+                $self->{current_info} = "dedupe $mid";
+                ${$sync->{nr}} = $min_id = $id;
+                my ($n, $prv, @smsg);
+                while (my $x = $self->{oidx}->next_by_mid($mid, \$n, \$prv)) {
+                        push @smsg, $x;
+                }
+                next if scalar(@smsg) < 2;
+                my $per_mid = {
+                        dd_chash => {}, # chash => [ary of smsgs]
+                        last_smsg => $smsg[-1],
+                        sync => $sync
+                };
+                $nr_mid++;
+                $candidates += scalar(@smsg) - 1;
+                for my $smsg (@smsg) {
+                        my $dd = {
+                                per_mid => $per_mid,
+                                smsg => $smsg,
+                                self => $self,
+                        };
+                        $self->git->cat_async($smsg->{blob}, \&dd_smsg, $dd);
+                }
+                # need to wait on every single one
+                $self->git->async_wait_all;
+
+                # is checkpoint needed? $iter is a very expensive query to restart
+                if (0 && checkpoint_due($sync)) {
+                        undef $iter;
+                        reindex_checkpoint($self, $sync);
+                        goto dedupe_restart;
+                }
+        }
+        my $n = delete $sync->{dedupe_cull};
+        if (my $pr = $sync->{-opt}->{-progress}) {
+                $pr->("culled $n/$candidates candidates ($nr_mid msgids)\n");
+        }
+        ${$sync->{nr}} = 0;
+}
+
 sub eidx_sync { # main entry point
         my ($self, $opt) = @_;
 
@@ -873,6 +965,10 @@ sub eidx_sync { # main entry point
         for my $ibx (@{$self->{ibx_list}}) {
                 $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
         }
+        if (delete($opt->{dedupe})) {
+                local $sync->{checkpoint_unlocks} = 1;
+                eidx_dedupe($self, $sync);
+        }
         if (delete($opt->{reindex})) {
                 local $sync->{checkpoint_unlocks} = 1;
                 eidx_reindex($self, $sync);