From d2a7dcb58ffb9604b2023159431fcdc4871f368f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 31 Dec 2020 13:51:28 +0000 Subject: lei: implement various deduplication strategies For writing mboxes and Maildirs, users may wish to use stricter or looser deduplication strategies. This gives them more control. --- lib/PublicInbox/LeiToMail.pm | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) (limited to 'lib/PublicInbox/LeiToMail.pm') diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 294291b2..ead00d1a 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -8,9 +8,8 @@ use v5.10.1; use PublicInbox::Eml; use PublicInbox::Lock; use PublicInbox::ProcessPipe; -use PublicInbox::SharedKV; use PublicInbox::Spawn qw(which spawn popen_rd); -use PublicInbox::ContentHash qw(content_hash); +use PublicInbox::LeiDedupe; use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET); @@ -226,10 +225,11 @@ sub dup_src ($) { $dup; } -# --augment existing output destination, without duplicating anything +# --augment existing output destination, with deduplication sub _augment { # MboxReader eml_cb my ($eml, $lei) = @_; - $lei->{skv}->set_maybe(content_hash($eml), ''); + # ignore return value, just populate the skv + $lei->{dedupe_cb}->is_dup($eml); } sub _mbox_write_cb ($$$$) { @@ -240,23 +240,27 @@ sub _mbox_write_cb ($$$$) { open $out, '+>>', $dst or die "open $dst: $!"; # Perl does SEEK_END even with O_APPEND :< seek($out, 0, SEEK_SET) or die "seek $dst: $!"; - my $atomic = !!(($lei->{opt}->{jobs} // 0) > 1); - $lei->{skv} = PublicInbox::SharedKV->new; - $lei->{skv}->dbh; + my $jobs = $lei->{opt}->{jobs} // 0; + my $atomic = $jobs > 1; + my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei); state $zsfx_allow = join('|', keys %zsfx2cmd); my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/); if ($lei->{opt}->{augment}) { - my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : - dup_src($out); - PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei); + if (-s $out && $dedupe->prepare_dedupe) { + my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : + dup_src($out); + PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei); + } + $dedupe->pause_dedupe if $jobs; # are we forking? } else { truncate($out, 0) or die "truncate $dst: $!"; + $dedupe->prepare_dedupe if !$jobs; } ($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx; sub { my ($buf, $oid, $kw) = @_; my $eml = PublicInbox::Eml->new($buf); - if ($lei->{skv}->set_maybe(content_hash($eml), '')) { + if (!$lei->{dedupe}->is_dup($eml, $oid)) { $buf = $eml2mbox->($eml, $kw); my $lock = $pipe_lk->lock_for_scope if $pipe_lk; write_in_full($out, $buf, $atomic); -- cgit v1.2.3-24-ge0c7