From 8ef622d408d2e4d98ad3aada8466f539c9ac61ba Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 6 Jul 2021 12:42:02 +0000 Subject: extindex: implement --dedupe to fix old extindices This is intended to fix older indices that had deduplication bugs for matching content. It'll also make dealing with future changes to ContentHash easier since that's never guaranteed stable. It also supports --dry-run to print changes only without making them. --- lib/PublicInbox/ExtSearchIdx.pm | 96 +++++++++++++++++++++++++++++++++++++++++ lib/PublicInbox/OverIdx.pm | 20 +++++++++ 2 files changed, 116 insertions(+) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 29414e4a..495579a2 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -844,6 +844,98 @@ sub sync_inbox { warn $err, "\n" if defined($err); } +sub dd_smsg { # git->cat_async callback + my ($bref, $oid, $type, $size, $dd) = @_; + my $smsg = $dd->{smsg} // die 'BUG: dd->{smsg} missing'; + my $self = $dd->{self} // die 'BUG: {self} missing'; + my $per_mid = $dd->{per_mid} // die 'BUG: {per_mid} missing'; + if ($type eq 'missing') { + _blob_missing($dd, $smsg); + } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) { + local $self->{current_info} = "$self->{current_info} $oid"; + my $chash = content_hash(PublicInbox::Eml->new($bref)); + push(@{$per_mid->{dd_chash}->{$chash}}, $smsg); + } + return if $per_mid->{last_smsg} != $smsg; + while (my ($chash, $ary) = each %{$per_mid->{dd_chash}}) { + my $keep = shift @$ary; + next if !scalar(@$ary); + $per_mid->{sync}->{dedupe_cull} += scalar(@$ary); + print STDERR + "# <$keep->{mid}> keeping #$keep->{num}, dropping ", + join(', ', map { "#$_->{num}" } @$ary),"\n"; + next if $per_mid->{sync}->{-opt}->{'dry-run'}; + my $oidx = $self->{oidx}; + for my $smsg (@$ary) { + my $gone = $smsg->{num}; + $oidx->merge_xref3($keep->{num}, $gone, $smsg->{blob}); + $self->idx_shard($gone)->ipc_do('xdb_remove', $gone); + $oidx->delete_by_num($gone); + } + } +} + +sub eidx_dedupe ($$) { + my ($self, $sync) = @_; + $sync->{dedupe_cull} = 0; + my $candidates = 0; + my $nr_mid = 0; + return unless eidxq_lock_acquire($self); + my $iter; + my $min_id = 0; + local $sync->{-regen_fmt} = "dedupe %u/".$self->{oidx}->max."\n"; +dedupe_restart: + $iter = $self->{oidx}->dbh->prepare(< ? GROUP BY num HAVING COUNT(num) > 1) +ORDER BY id +EOS + $iter->execute($min_id); + local $SIG{__WARN__} = sub { + return if PublicInbox::Eml::warn_ignore(@_); + warn @_; + }; + while (my ($mid, $id) = $iter->fetchrow_array) { + last if $sync->{quit}; + $self->{current_info} = "dedupe $mid"; + ${$sync->{nr}} = $min_id = $id; + my ($n, $prv, @smsg); + while (my $x = $self->{oidx}->next_by_mid($mid, \$n, \$prv)) { + push @smsg, $x; + } + next if scalar(@smsg) < 2; + my $per_mid = { + dd_chash => {}, # chash => [ary of smsgs] + last_smsg => $smsg[-1], + sync => $sync + }; + $nr_mid++; + $candidates += scalar(@smsg) - 1; + for my $smsg (@smsg) { + my $dd = { + per_mid => $per_mid, + smsg => $smsg, + self => $self, + }; + $self->git->cat_async($smsg->{blob}, \&dd_smsg, $dd); + } + # need to wait on every single one + $self->git->async_wait_all; + + # is checkpoint needed? $iter is a very expensive query to restart + if (0 && checkpoint_due($sync)) { + undef $iter; + reindex_checkpoint($self, $sync); + goto dedupe_restart; + } + } + my $n = delete $sync->{dedupe_cull}; + if (my $pr = $sync->{-opt}->{-progress}) { + $pr->("culled $n/$candidates candidates ($nr_mid msgids)\n"); + } + ${$sync->{nr}} = 0; +} + sub eidx_sync { # main entry point my ($self, $opt) = @_; @@ -873,6 +965,10 @@ sub eidx_sync { # main entry point for my $ibx (@{$self->{ibx_list}}) { $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key); } + if (delete($opt->{dedupe})) { + local $sync->{checkpoint_unlocks} = 1; + eidx_dedupe($self, $sync); + } if (delete($opt->{reindex})) { local $sync->{checkpoint_unlocks} = 1; eidx_reindex($self, $sync); diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm index 5f96a5b0..8f7cf2bb 100644 --- a/lib/PublicInbox/OverIdx.pm +++ b/lib/PublicInbox/OverIdx.pm @@ -656,6 +656,26 @@ UPDATE over SET ddd = ? WHERE num = ? $sth->execute; } +sub merge_xref3 { # used for "-extindex --dedupe" + my ($self, $keep_docid, $drop_docid, $oidhex) = @_; + my $oidbin = pack('H*', $oidhex); + my $sth = $self->{dbh}->prepare_cached(<<''); +UPDATE OR IGNORE xref3 SET docid = ? WHERE docid = ? AND oidbin = ? + + $sth->bind_param(1, $keep_docid); + $sth->bind_param(2, $drop_docid); + $sth->bind_param(3, $oidbin, SQL_BLOB); + $sth->execute; + + # drop anything that conflicted + $sth = $self->{dbh}->prepare_cached(<<''); +DELETE FROM xref3 WHERE docid = ? AND oidbin = ? + + $sth->bind_param(1, $drop_docid); + $sth->bind_param(2, $oidbin, SQL_BLOB); + $sth->execute; +} + sub eidxq_add { my ($self, $docid) = @_; $self->dbh->prepare_cached(<<'')->execute($docid); -- cgit v1.2.3-24-ge0c7