about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm96
-rw-r--r--lib/PublicInbox/OverIdx.pm20
-rwxr-xr-xscript/public-inbox-extindex13
-rw-r--r--t/extsearch.t11
4 files changed, 137 insertions, 3 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 29414e4a..495579a2 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -844,6 +844,98 @@ sub sync_inbox {
         warn $err, "\n" if defined($err);
 }
 
+sub dd_smsg { # git->cat_async callback
+        my ($bref, $oid, $type, $size, $dd) = @_;
+        my $smsg = $dd->{smsg} // die 'BUG: dd->{smsg} missing';
+        my $self = $dd->{self} // die 'BUG: {self} missing';
+        my $per_mid = $dd->{per_mid} // die 'BUG: {per_mid} missing';
+        if ($type eq 'missing') {
+                _blob_missing($dd, $smsg);
+        } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) {
+                local $self->{current_info} = "$self->{current_info} $oid";
+                my $chash = content_hash(PublicInbox::Eml->new($bref));
+                push(@{$per_mid->{dd_chash}->{$chash}}, $smsg);
+        }
+        return if $per_mid->{last_smsg} != $smsg;
+        while (my ($chash, $ary) = each %{$per_mid->{dd_chash}}) {
+                my $keep = shift @$ary;
+                next if !scalar(@$ary);
+                $per_mid->{sync}->{dedupe_cull} += scalar(@$ary);
+                print STDERR
+                        "# <$keep->{mid}> keeping #$keep->{num}, dropping ",
+                        join(', ', map { "#$_->{num}" } @$ary),"\n";
+                next if $per_mid->{sync}->{-opt}->{'dry-run'};
+                my $oidx = $self->{oidx};
+                for my $smsg (@$ary) {
+                        my $gone = $smsg->{num};
+                        $oidx->merge_xref3($keep->{num}, $gone, $smsg->{blob});
+                        $self->idx_shard($gone)->ipc_do('xdb_remove', $gone);
+                        $oidx->delete_by_num($gone);
+                }
+        }
+}
+
+sub eidx_dedupe ($$) {
+        my ($self, $sync) = @_;
+        $sync->{dedupe_cull} = 0;
+        my $candidates = 0;
+        my $nr_mid = 0;
+        return unless eidxq_lock_acquire($self);
+        my $iter;
+        my $min_id = 0;
+        local $sync->{-regen_fmt} = "dedupe %u/".$self->{oidx}->max."\n";
+dedupe_restart:
+        $iter = $self->{oidx}->dbh->prepare(<<EOS);
+SELECT DISTINCT(mid),id FROM msgid WHERE id IN
+(SELECT id FROM id2num WHERE id > ? GROUP BY num HAVING COUNT(num) > 1)
+ORDER BY id
+EOS
+        $iter->execute($min_id);
+        local $SIG{__WARN__} = sub {
+                return if PublicInbox::Eml::warn_ignore(@_);
+                warn @_;
+        };
+        while (my ($mid, $id) = $iter->fetchrow_array) {
+                last if $sync->{quit};
+                $self->{current_info} = "dedupe $mid";
+                ${$sync->{nr}} = $min_id = $id;
+                my ($n, $prv, @smsg);
+                while (my $x = $self->{oidx}->next_by_mid($mid, \$n, \$prv)) {
+                        push @smsg, $x;
+                }
+                next if scalar(@smsg) < 2;
+                my $per_mid = {
+                        dd_chash => {}, # chash => [ary of smsgs]
+                        last_smsg => $smsg[-1],
+                        sync => $sync
+                };
+                $nr_mid++;
+                $candidates += scalar(@smsg) - 1;
+                for my $smsg (@smsg) {
+                        my $dd = {
+                                per_mid => $per_mid,
+                                smsg => $smsg,
+                                self => $self,
+                        };
+                        $self->git->cat_async($smsg->{blob}, \&dd_smsg, $dd);
+                }
+                # need to wait on every single one
+                $self->git->async_wait_all;
+
+                # is checkpoint needed? $iter is a very expensive query to restart
+                if (0 && checkpoint_due($sync)) {
+                        undef $iter;
+                        reindex_checkpoint($self, $sync);
+                        goto dedupe_restart;
+                }
+        }
+        my $n = delete $sync->{dedupe_cull};
+        if (my $pr = $sync->{-opt}->{-progress}) {
+                $pr->("culled $n/$candidates candidates ($nr_mid msgids)\n");
+        }
+        ${$sync->{nr}} = 0;
+}
+
 sub eidx_sync { # main entry point
         my ($self, $opt) = @_;
 
@@ -873,6 +965,10 @@ sub eidx_sync { # main entry point
         for my $ibx (@{$self->{ibx_list}}) {
                 $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
         }
+        if (delete($opt->{dedupe})) {
+                local $sync->{checkpoint_unlocks} = 1;
+                eidx_dedupe($self, $sync);
+        }
         if (delete($opt->{reindex})) {
                 local $sync->{checkpoint_unlocks} = 1;
                 eidx_reindex($self, $sync);
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index 5f96a5b0..8f7cf2bb 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -656,6 +656,26 @@ UPDATE over SET ddd = ? WHERE num = ?
         $sth->execute;
 }
 
+sub merge_xref3 { # used for "-extindex --dedupe"
+        my ($self, $keep_docid, $drop_docid, $oidhex) = @_;
+        my $oidbin = pack('H*', $oidhex);
+        my $sth = $self->{dbh}->prepare_cached(<<'');
+UPDATE OR IGNORE xref3 SET docid = ? WHERE docid = ? AND oidbin = ?
+
+        $sth->bind_param(1, $keep_docid);
+        $sth->bind_param(2, $drop_docid);
+        $sth->bind_param(3, $oidbin, SQL_BLOB);
+        $sth->execute;
+
+        # drop anything that conflicted
+        $sth = $self->{dbh}->prepare_cached(<<'');
+DELETE FROM xref3 WHERE docid = ? AND oidbin = ?
+
+        $sth->bind_param(1, $drop_docid);
+        $sth->bind_param(2, $oidbin, SQL_BLOB);
+        $sth->execute;
+}
+
 sub eidxq_add {
         my ($self, $docid) = @_;
         $self->dbh->prepare_cached(<<'')->execute($docid);
diff --git a/script/public-inbox-extindex b/script/public-inbox-extindex
index 771486c4..dcb12e5a 100755
--- a/script/public-inbox-extindex
+++ b/script/public-inbox-extindex
@@ -17,7 +17,9 @@ usage: public-inbox-extindex [options] [EXTINDEX_DIR] [INBOX_DIR...]
   --batch-size=BYTES  flush changes to OS after a given number of bytes
   --max-size=BYTES    do not index messages larger than the given size
   --gc                perform garbage collection instead of indexing
+  --dedupe            fix prior deduplication errors
   --verbose | -v      increase verbosity (may be repeated)
+  --dry-run | -n      dry-run on --dedupe
 
 BYTES may use `k', `m', and `g' suffixes (e.g. `10m' for 10 megabytes)
 See public-inbox-extindex(1) man page for full documentation.
@@ -27,7 +29,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i
                 fsync|sync!
                 indexlevel|index-level|L=s max_size|max-size=s
                 batch_size|batch-size=s
-                gc commit-interval=i watch scan!
+                dedupe gc commit-interval=i watch scan! dry-run|n
                 all help|h))
         or die $help;
 if ($opt->{help}) { print $help; exit 0 };
@@ -50,11 +52,16 @@ unless (defined $eidx_dir) {
 my @ibxs;
 if ($opt->{gc}) {
         die "E: inbox paths must not be specified with --gc\n" if @ARGV;
-        die "E: --all not compatible with --gc\n" if $opt->{all};
-        die "E: --watch is not compatible with --gc\n" if $opt->{watch};
+        for my $sw (qw(all watch dry-run dedupe)) {
+                die "E: --$sw is not compatible with --gc\n" if $opt->{$sw};
+        }
 } else {
         @ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV, $opt, $cfg);
 }
+if ($opt->{'dry-run'} && !$opt->{dedupe}) {
+        die "E: --dry-run only affects --dedupe\n";
+}
+
 PublicInbox::Admin::require_or_die(qw(-search));
 PublicInbox::Config::json() or die "Cpanel::JSON::XS or similar missing\n";
 PublicInbox::Admin::progress_prepare($opt);
diff --git a/t/extsearch.t b/t/extsearch.t
index ae889ac6..5f0cd866 100644
--- a/t/extsearch.t
+++ b/t/extsearch.t
@@ -368,4 +368,15 @@ if ('remove v1test and test gc') {
         is(scalar(@it), 1, 'only one inbox left');
 }
 
+if ('dedupe + dry-run') {
+        my @cmd = ('-extindex', "$home/extindex");
+        my $opt = { 2 => \(my $err = '') };
+        ok(run_script([@cmd, '--dedupe'], undef, $opt), '--dedupe');
+        ok(run_script([@cmd, qw(--dedupe --dry-run)], undef, $opt),
+                '--dry-run --dedupe');
+        is $err, '', 'no errors';
+        ok(!run_script([@cmd, qw(--dry-run)], undef, $opt),
+                '--dry-run alone fails');
+}
+
 done_testing;