about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiToMail.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LeiToMail.pm')
-rw-r--r--lib/PublicInbox/LeiToMail.pm89
1 files changed, 59 insertions, 30 deletions
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 5d4b7978..744f331d 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -187,8 +187,9 @@ sub zsfx2cmd ($$$) {
         \@cmd;
 }
 
-sub compress_dst {
-        my ($self, $zsfx, $lei) = @_;
+sub _post_augment_mbox { # open a compressor process
+        my ($self, $lei) = @_;
+        my $zsfx = $self->{zsfx} or return;
         my $cmd = zsfx2cmd($zsfx, undef, $lei);
         pipe(my ($r, $w)) or die "pipe: $!";
         my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} };
@@ -209,7 +210,9 @@ sub decompress_src ($$$) {
 
 sub dup_src ($) {
         my ($in) = @_;
-        open my $dup, '+>>&', $in or die "dup: $!";
+        # fileno needed because wq_set_recv_modes only used ">&=" for {1}
+        # and Perl blindly trusts that to reject the '+' (readability flag)
+        open my $dup, '+>>&=', fileno($in) or die "dup: $!";
         $dup;
 }
 
@@ -321,11 +324,13 @@ sub new {
         } else {
                 die "bad mail --format=$fmt\n";
         }
-        my $dedupe = $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei, $dst);
+        $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
         $self;
 }
 
-sub _prepare_maildir {
+sub _pre_augment_maildir {} # noop
+
+sub _do_augment_maildir {
         my ($self, $lei) = @_;
         my $dst = $lei->{ovv}->{dst};
         if ($lei->{opt}->{augment}) {
@@ -338,6 +343,11 @@ sub _prepare_maildir {
         } else { # clobber existing Maildir
                 _maildir_each_file($dst, \&_unlink);
         }
+}
+
+sub _post_augment_maildir {
+        my ($self, $lei) = @_;
+        my $dst = $lei->{ovv}->{dst};
         for my $x (qw(tmp new cur)) {
                 my $d = $dst.$x;
                 next if -d $d;
@@ -347,45 +357,64 @@ sub _prepare_maildir {
         }
 }
 
-sub _prepare_mbox {
+sub _pre_augment_mbox {
         my ($self, $lei) = @_;
         my $dst = $lei->{ovv}->{dst};
-        my ($out, $seekable);
-        if ($dst eq '/dev/stdout') {
-                $out = $lei->{1};
-        } else {
+        if ($dst ne '/dev/stdout') {
                 my $mode = -p $dst ? '>' : '+>>';
                 if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) {
                         $! == ENOENT or die "unlink($dst): $!";
                 }
-                open $out, $mode, $dst or die "open($dst): $!";
-                # Perl does SEEK_END even with O_APPEND :<
-                $seekable = seek($out, 0, SEEK_SET);
-                die "seek($dst): $!\n" if !$seekable && $! != ESPIPE;
+                open my $out, $mode, $dst or die "open($dst): $!";
                 $lei->{1} = $out;
         }
+        # Perl does SEEK_END even with O_APPEND :<
+        $self->{seekable} = seek($lei->{1}, 0, SEEK_SET);
+        if (!$self->{seekable} && $! != ESPIPE && $dst ne '/dev/stdout') {
+                die "seek($dst): $!\n";
+        }
         state $zsfx_allow = join('|', keys %zsfx2cmd);
-        my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/);
+        ($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/);
+}
+
+sub _do_augment_mbox {
+        my ($self, $lei) = @_;
+        return if !$lei->{opt}->{augment};
         my $dedupe = $lei->{dedupe};
-        if ($lei->{opt}->{augment}) {
-                die "cannot augment $dst, not seekable\n" if !$seekable;
-                if (-s $out && $dedupe && $dedupe->prepare_dedupe) {
-                        my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
-                                        dup_src($out);
-                        my $fmt = $lei->{ovv}->{fmt};
-                        require PublicInbox::MboxReader;
-                        PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei);
-                }
-                # maybe some systems don't honor O_APPEND, Perl does this:
-                seek($out, 0, SEEK_END) or die "seek $dst: $!";
-                $dedupe->pause_dedupe if $dedupe;
+        my $dst = $lei->{ovv}->{dst};
+        die "cannot augment $dst, not seekable\n" if !$self->{seekable};
+        my $out = $lei->{1};
+        if (-s $out && $dedupe && $dedupe->prepare_dedupe) {
+                my $zsfx = $self->{zsfx};
+                my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
+                                dup_src($out);
+                my $fmt = $lei->{ovv}->{fmt};
+                require PublicInbox::MboxReader;
+                PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei);
         }
-        compress_dst($self, $zsfx, $lei) if $zsfx;
+        # maybe some systems don't honor O_APPEND, Perl does this:
+        seek($out, 0, SEEK_END) or die "seek $dst: $!";
+        $dedupe->pause_dedupe if $dedupe;
+}
+
+sub pre_augment { # fast (1 disk seek), runs in main daemon
+        my ($self, $lei) = @_;
+        # _pre_augment_maildir, _pre_augment_mbox
+        my $m = "_pre_augment_$self->{base_type}";
+        $self->$m($lei);
+}
+
+sub do_augment { # slow, runs in wq worker
+        my ($self, $lei) = @_;
+        # _do_augment_maildir, _do_augment_mbox
+        my $m = "_do_augment_$self->{base_type}";
+        $self->$m($lei);
 }
 
-sub do_prepare {
+sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
         my ($self, $lei) = @_;
-        my $m = "_prepare_$self->{base_type}";
+        # _post_augment_maildir, _post_augment_mbox
+        my $m = "_post_augment_$self->{base_type}";
         $self->$m($lei);
 }