about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiToMail.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-12-31 13:51:28 +0000
committerEric Wong <e@80x24.org>2021-01-01 05:00:39 +0000
commitd2a7dcb58ffb9604b2023159431fcdc4871f368f (patch)
tree1895ef3c9f091dc14ef5654b179d66fe24f65a7e /lib/PublicInbox/LeiToMail.pm
parent7f17df5c6f1892ef53f149a0ab24a5d917cce7d9 (diff)
downloadpublic-inbox-d2a7dcb58ffb9604b2023159431fcdc4871f368f.tar.gz
For writing mboxes and Maildirs, users may wish to use
stricter or looser deduplication strategies.  This
gives them more control.
Diffstat (limited to 'lib/PublicInbox/LeiToMail.pm')
-rw-r--r--lib/PublicInbox/LeiToMail.pm26
1 files changed, 15 insertions, 11 deletions
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 294291b2..ead00d1a 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -8,9 +8,8 @@ use v5.10.1;
 use PublicInbox::Eml;
 use PublicInbox::Lock;
 use PublicInbox::ProcessPipe;
-use PublicInbox::SharedKV;
 use PublicInbox::Spawn qw(which spawn popen_rd);
-use PublicInbox::ContentHash qw(content_hash);
+use PublicInbox::LeiDedupe;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET);
@@ -226,10 +225,11 @@ sub dup_src ($) {
         $dup;
 }
 
-# --augment existing output destination, without duplicating anything
+# --augment existing output destination, with deduplication
 sub _augment { # MboxReader eml_cb
         my ($eml, $lei) = @_;
-        $lei->{skv}->set_maybe(content_hash($eml), '');
+        # ignore return value, just populate the skv
+        $lei->{dedupe_cb}->is_dup($eml);
 }
 
 sub _mbox_write_cb ($$$$) {
@@ -240,23 +240,27 @@ sub _mbox_write_cb ($$$$) {
         open $out, '+>>', $dst or die "open $dst: $!";
         # Perl does SEEK_END even with O_APPEND :<
         seek($out, 0, SEEK_SET) or die "seek $dst: $!";
-        my $atomic = !!(($lei->{opt}->{jobs} // 0) > 1);
-        $lei->{skv} = PublicInbox::SharedKV->new;
-        $lei->{skv}->dbh;
+        my $jobs = $lei->{opt}->{jobs} // 0;
+        my $atomic = $jobs > 1;
+        my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
         state $zsfx_allow = join('|', keys %zsfx2cmd);
         my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/);
         if ($lei->{opt}->{augment}) {
-                my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
-                                dup_src($out);
-                PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei);
+                if (-s $out && $dedupe->prepare_dedupe) {
+                        my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
+                                        dup_src($out);
+                        PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei);
+                }
+                $dedupe->pause_dedupe if $jobs; # are we forking?
         } else {
                 truncate($out, 0) or die "truncate $dst: $!";
+                $dedupe->prepare_dedupe if !$jobs;
         }
         ($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx;
         sub {
                 my ($buf, $oid, $kw) = @_;
                 my $eml = PublicInbox::Eml->new($buf);
-                if ($lei->{skv}->set_maybe(content_hash($eml), '')) {
+                if (!$lei->{dedupe}->is_dup($eml, $oid)) {
                         $buf = $eml2mbox->($eml, $kw);
                         my $lock = $pipe_lk->lock_for_scope if $pipe_lk;
                         write_in_full($out, $buf, $atomic);