From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 98C111FB07 for ; Thu, 31 Dec 2020 13:51:55 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 10/36] lei: implement various deduplication strategies Date: Thu, 31 Dec 2020 13:51:28 +0000 Message-Id: <20201231135154.6070-11-e@80x24.org> In-Reply-To: <20201231135154.6070-1-e@80x24.org> References: <20201231135154.6070-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: For writing mboxes and Maildirs, users may wish to use stricter or looser deduplication strategies. This gives them more control. --- MANIFEST | 2 + lib/PublicInbox/LEI.pm | 2 +- lib/PublicInbox/LeiDedupe.pm | 96 ++++++++++++++++++++++++++++++++++++ lib/PublicInbox/LeiToMail.pm | 26 +++++----- t/lei_dedupe.t | 59 ++++++++++++++++++++++ t/lei_to_mail.t | 3 ++ 6 files changed, 176 insertions(+), 12 deletions(-) create mode 100644 lib/PublicInbox/LeiDedupe.pm create mode 100644 t/lei_dedupe.t diff --git a/MANIFEST b/MANIFEST index 1fb1e181..7ce2075e 100644 --- a/MANIFEST +++ b/MANIFEST @@ -162,6 +162,7 @@ lib/PublicInbox/InboxWritable.pm lib/PublicInbox/Isearch.pm lib/PublicInbox/KQNotify.pm lib/PublicInbox/LEI.pm +lib/PublicInbox/LeiDedupe.pm lib/PublicInbox/LeiExtinbox.pm lib/PublicInbox/LeiSearch.pm lib/PublicInbox/LeiStore.pm @@ -330,6 +331,7 @@ t/iso-2202-jp.eml t/kqnotify.t t/lei-oneshot.t t/lei.t +t/lei_dedupe.t t/lei_store.t t/lei_to_mail.t t/lei_xsearch.t diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 7002a1f7..9aa4d95a 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -172,7 +172,7 @@ my %OPTDESC = ( 'type=s' => [ 'any|mid|git', 'disambiguate type' ], -'dedupe|d=s' => ['STRAT|content|oid|mid', +'dedupe|d=s' => ['STRAT|content|oid|mid|none', 'deduplication strategy'], 'show thread|t' => 'display entire thread a message belongs to', 'q thread|t' => diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm new file mode 100644 index 00000000..c6eb7196 --- /dev/null +++ b/lib/PublicInbox/LeiDedupe.pm @@ -0,0 +1,96 @@ +# Copyright (C) 2020 all contributors +# License: AGPL-3.0+ +package PublicInbox::LeiDedupe; +use strict; +use v5.10.1; +use PublicInbox::SharedKV; +use PublicInbox::ContentHash qw(content_hash); + +# n.b. mutt sets most of these headers not sure about Bytes +our @OID_IGNORE = qw(Status X-Status Content-Length Lines Bytes); + +# best-effort regeneration of OID when augmenting existing results +sub _regen_oid ($) { + my ($eml) = @_; + my @stash; # stash away headers we shouldn't have in git + for my $k (@OID_IGNORE) { + my @v = $eml->header_raw($k) or next; + push @stash, [ $k, \@v ]; + $eml->header_set($k); # restore below + } + my $dig = Digest::SHA->new(1); # XXX SHA256 later + my $buf = $eml->as_string; + $dig->add('blob '.length($buf)."\0"); + $dig->add($buf); + undef $buf; + + for my $kv (@stash) { # restore stashed headers + my ($k, @v) = @$kv; + $eml->header_set($k, @v); + } + $dig->digest; +} + +sub _oidbin ($) { defined($_[0]) ? pack('H*', $_[0]) : undef } + +# the paranoid option +sub dedupe_oid () { + my $skv = PublicInbox::SharedKV->new; + ($skv, sub { # may be called in a child process + my ($eml, $oid) = @_; + $skv->set_maybe(_oidbin($oid) // _regen_oid($eml), ''); + }); +} + +# dangerous if there's duplicate messages with different Message-IDs +sub dedupe_mid () { + my $skv = PublicInbox::SharedKV->new; + ($skv, sub { # may be called in a child process + my ($eml, $oid) = @_; + # TODO: lei will support non-public messages w/o Message-ID + my $mid = $eml->header_raw('Message-ID') // _oidbin($oid) // + content_hash($eml); + $skv->set_maybe($mid, ''); + }); +} + +# our default deduplication strategy (used by v2, also) +sub dedupe_content () { + my $skv = PublicInbox::SharedKV->new; + ($skv, sub { # may be called in a child process + my ($eml) = @_; # oid = $_[1], ignored + $skv->set_maybe(content_hash($eml), ''); + }); +} + +# no deduplication at all +sub dedupe_none () { (undef, sub { 1 }) } + +sub new { + my ($cls, $lei) = @_; + my $dd = $lei->{opt}->{dedupe} // 'content'; + my $dd_new = $cls->can("dedupe_$dd") // + die "unsupported dedupe strategy: $dd\n"; + bless [ $dd_new->() ], $cls; # [ $skv, $cb ] +} + +# returns true on unseen messages according to the deduplication strategy, +# returns false if seen +sub is_dup { + my ($self, $eml, $oid) = @_; + !$self->[1]->($eml, $oid); +} + +sub prepare_dedupe { + my ($self) = @_; + my $skv = $self->[0]; + $skv ? $skv->dbh : undef; +} + +sub pause_dedupe { + my ($self) = @_; + my $skv = $self->[0]; + delete($skv->{dbh}) if $skv; +} + +1; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 294291b2..ead00d1a 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -8,9 +8,8 @@ use v5.10.1; use PublicInbox::Eml; use PublicInbox::Lock; use PublicInbox::ProcessPipe; -use PublicInbox::SharedKV; use PublicInbox::Spawn qw(which spawn popen_rd); -use PublicInbox::ContentHash qw(content_hash); +use PublicInbox::LeiDedupe; use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET); @@ -226,10 +225,11 @@ sub dup_src ($) { $dup; } -# --augment existing output destination, without duplicating anything +# --augment existing output destination, with deduplication sub _augment { # MboxReader eml_cb my ($eml, $lei) = @_; - $lei->{skv}->set_maybe(content_hash($eml), ''); + # ignore return value, just populate the skv + $lei->{dedupe_cb}->is_dup($eml); } sub _mbox_write_cb ($$$$) { @@ -240,23 +240,27 @@ sub _mbox_write_cb ($$$$) { open $out, '+>>', $dst or die "open $dst: $!"; # Perl does SEEK_END even with O_APPEND :< seek($out, 0, SEEK_SET) or die "seek $dst: $!"; - my $atomic = !!(($lei->{opt}->{jobs} // 0) > 1); - $lei->{skv} = PublicInbox::SharedKV->new; - $lei->{skv}->dbh; + my $jobs = $lei->{opt}->{jobs} // 0; + my $atomic = $jobs > 1; + my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei); state $zsfx_allow = join('|', keys %zsfx2cmd); my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/); if ($lei->{opt}->{augment}) { - my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : - dup_src($out); - PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei); + if (-s $out && $dedupe->prepare_dedupe) { + my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : + dup_src($out); + PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei); + } + $dedupe->pause_dedupe if $jobs; # are we forking? } else { truncate($out, 0) or die "truncate $dst: $!"; + $dedupe->prepare_dedupe if !$jobs; } ($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx; sub { my ($buf, $oid, $kw) = @_; my $eml = PublicInbox::Eml->new($buf); - if ($lei->{skv}->set_maybe(content_hash($eml), '')) { + if (!$lei->{dedupe}->is_dup($eml, $oid)) { $buf = $eml2mbox->($eml, $kw); my $lock = $pipe_lk->lock_for_scope if $pipe_lk; write_in_full($out, $buf, $atomic); diff --git a/t/lei_dedupe.t b/t/lei_dedupe.t new file mode 100644 index 00000000..08f38aa0 --- /dev/null +++ b/t/lei_dedupe.t @@ -0,0 +1,59 @@ +#!perl -w +# Copyright (C) 2020 all contributors +# License: AGPL-3.0+ +use strict; +use v5.10.1; +use Test::More; +use PublicInbox::TestCommon; +use PublicInbox::Eml; +require_mods(qw(DBD::SQLite)); +use_ok 'PublicInbox::LeiDedupe'; +my $eml = eml_load('t/plack-qp.eml'); +my $mid = $eml->header_raw('Message-ID'); +my $different = eml_load('t/msg_iter-order.eml'); +$different->header_set('Message-ID', $mid); + +my $lei = { opt => { dedupe => 'none' } }; +my $dd = PublicInbox::LeiDedupe->new($lei); +$dd->prepare_dedupe; +ok(!$dd->is_dup($eml), '1st is_dup w/o dedupe'); +ok(!$dd->is_dup($eml), '2nd is_dup w/o dedupe'); +ok(!$dd->is_dup($different), 'different is_dup w/o dedupe'); + +for my $strat (undef, 'content') { + $lei->{opt}->{dedupe} = $strat; + $dd = PublicInbox::LeiDedupe->new($lei); + $dd->prepare_dedupe; + my $desc = $strat // 'default'; + ok(!$dd->is_dup($eml), "1st is_dup with $desc dedupe"); + ok($dd->is_dup($eml), "2nd seen with $desc dedupe"); + ok(!$dd->is_dup($different), "different is_dup with $desc dedupe"); +} +$lei->{opt}->{dedupe} = 'bogus'; +eval { PublicInbox::LeiDedupe->new($lei) }; +like($@, qr/unsupported.*bogus/, 'died on bogus strategy'); + +$lei->{opt}->{dedupe} = 'mid'; +$dd = PublicInbox::LeiDedupe->new($lei); +$dd->prepare_dedupe; +ok(!$dd->is_dup($eml), '1st is_dup with mid dedupe'); +ok($dd->is_dup($eml), '2nd seen with mid dedupe'); +ok($dd->is_dup($different), 'different seen with mid dedupe'); + +$lei->{opt}->{dedupe} = 'oid'; +$dd = PublicInbox::LeiDedupe->new($lei); +$dd->prepare_dedupe; + +# --augment won't have OIDs: +ok(!$dd->is_dup($eml), '1st is_dup with oid dedupe (augment)'); +ok($dd->is_dup($eml), '2nd seen with oid dedupe (augment)'); +ok(!$dd->is_dup($different), 'different is_dup with mid dedupe (augment)'); +$different->header_set('Status', 'RO'); +ok($dd->is_dup($different), 'different seen with oid dedupe Status removed'); + +ok(!$dd->is_dup($eml, '01d'), '1st is_dup with oid dedupe'); +ok($dd->is_dup($different, '01d'), 'different content ignored if oid matches'); +ok($dd->is_dup($eml, '01D'), 'case insensitive oid comparison :P'); +ok(!$dd->is_dup($eml, '01dbad'), 'case insensitive oid comparison :P'); + +done_testing; diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index e4551e69..5be4e285 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -6,6 +6,7 @@ use v5.10.1; use Test::More; use PublicInbox::TestCommon; use PublicInbox::Eml; +require_mods(qw(DBD::SQLite)); use_ok 'PublicInbox::LeiToMail'; my $from = "Content-Length: 10\nSubject: x\n\nFrom hell\n"; my $noeol = "Subject: x\n\nFrom hell"; @@ -86,6 +87,7 @@ my $orig = do { local $lei->{opt} = { jobs => 2 }; $wcb = PublicInbox::LeiToMail->write_cb("mboxcl2:$fn", $lei); + $lei->{dedupe}->prepare_dedupe; $wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]); undef $wcb; open $fh, '<', $fn or BAIL_OUT $!; @@ -110,6 +112,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? local $lei->{opt} = { jobs => 2 }; # for atomic writes unlink $f or BAIL_OUT "unlink $!"; $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei); + $lei->{dedupe}->prepare_dedupe; $wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]); undef $wcb; is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock");