diff options
Diffstat (limited to 'lib/PublicInbox/LeiToMail.pm')
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 175 |
1 files changed, 102 insertions, 73 deletions
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 4c65dce2..5d4b7978 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -146,8 +146,7 @@ sub reap_compress { # dwaitpid callback # { foo => '' } means "--foo" is passed to the command-line, # otherwise { foo => '--bar' } passes "--bar" our %zsfx2cmd = ( - gz => [ qw(GZIP pigz gzip), { - rsyncable => '', threads => '-p' } ], + gz => [ qw(GZIP pigz gzip), { rsyncable => '', threads => '-p' } ], bz2 => [ 'bzip2', {} ], xz => [ 'xz', { threads => '-T' } ], # XXX does anybody care for these? I prefer zstd on entire FSes, @@ -189,24 +188,23 @@ sub zsfx2cmd ($$$) { } sub compress_dst { - my ($out, $zsfx, $lei) = @_; + my ($self, $zsfx, $lei) = @_; my $cmd = zsfx2cmd($zsfx, undef, $lei); pipe(my ($r, $w)) or die "pipe: $!"; - my $rdr = { 0 => $r, 1 => $out, 2 => $lei->{2} }; + my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} }; my $pid = spawn($cmd, $lei->{env}, $rdr); $lei->{"pid.$pid"} = $cmd; my $pp = gensym; tie *$pp, 'PublicInbox::ProcessPipe', $pid, $w, \&reap_compress, $lei; - my $pipe_lk = ($lei->{opt}->{jobs} // 0) > 1 ? - PublicInbox::Lock->new_tmp($zsfx) : undef; - ($pp, $pipe_lk); + $lei->{1} = $pp; + die 'BUG: unexpected {ovv}->{lock_path}' if $lei->{ovv}->{lock_path}; + $lei->{ovv}->ovv_out_lk_init if ($lei->{opt}->{jobs} // 2) > 1; } sub decompress_src ($$$) { my ($in, $zsfx, $lei) = @_; my $cmd = zsfx2cmd($zsfx, 1, $lei); - my $rdr = { 0 => $in, 2 => $lei->{2} }; - popen_rd($cmd, $lei->{env}, $rdr); + popen_rd($cmd, $lei->{env}, { 0 => $in, 2 => $lei->{2} }); } sub dup_src ($) { @@ -222,48 +220,22 @@ sub _augment { # MboxReader eml_cb $lei->{dedupe}->is_dup($eml); } -sub _mbox_write_cb ($$$$) { - my ($cls, $mbox, $dst, $lei) = @_; - my $m = "eml2$mbox"; - my $eml2mbox = $cls->can($m) or die "$cls->$m missing"; - my ($out, $pipe_lk, $seekable); - # XXX should we support /dev/stdout.gz ? - if ($dst eq '/dev/stdout') { - $out = $lei->{1}; - } else { # TODO: mbox locking (but mairix doesn't...) - my $mode = -p $dst ? '>' : '+>>'; - if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) { - die "unlink $dst: $!" if $! != ENOENT; - } - open $out, $mode, $dst or die "open $dst: $!"; - # Perl does SEEK_END even with O_APPEND :< - $seekable = seek($out, 0, SEEK_SET); - die "seek $dst: $!\n" if !$seekable && $! != ESPIPE; - } - my $jobs = $lei->{opt}->{jobs} // 0; - state $zsfx_allow = join('|', keys %zsfx2cmd); - my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/); - my $write = $jobs > 1 && !$zsfx ? \&atomic_append : \&_print_full; - my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei); - if ($lei->{opt}->{augment}) { - die "cannot augment $dst, not seekable\n" if !$seekable; - if (-s $out && $dedupe->prepare_dedupe) { - my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : - dup_src($out); - PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei); - } - # maybe some systems don't honor O_APPEND, Perl does this: - seek($out, 0, SEEK_END) or die "seek $dst: $!"; - $dedupe->pause_dedupe if $jobs; # are we forking? - } - $dedupe->prepare_dedupe if !$jobs; - ($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx; +sub _mbox_write_cb ($$) { + my ($self, $lei) = @_; + my $ovv = $lei->{ovv}; + my $m = 'eml2'.$ovv->{fmt}; + my $eml2mbox = $self->can($m) or die "$self->$m missing"; + my $out = $lei->{1} // die "no stdout ($m, $ovv->{dst})"; # redirected earlier + $out->autoflush(1); + my $write = $ovv->{lock_path} ? \&_print_full : \&atomic_append; + my $dedupe = $lei->{dedupe}; + $dedupe->prepare_dedupe; sub { # for git_to_mail my ($buf, $oid, $kw) = @_; my $eml = PublicInbox::Eml->new($buf); if (!$dedupe->is_dup($eml, $oid)) { $buf = $eml2mbox->($eml, $kw); - my $lock = $pipe_lk->lock_for_scope if $pipe_lk; + my $lk = $ovv->lock_for_scope; $write->($out, $buf); } } @@ -313,17 +285,55 @@ sub _buf2maildir { } } - sub _maildir_write_cb ($$) { - my ($dst, $lei) = @_; - $dst .= '/' unless substr($dst, -1) eq '/'; - my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei, $dst); - my $jobs = $lei->{opt}->{jobs} // 0; + my ($self, $lei) = @_; + my $dedupe = $lei->{dedupe}; + $dedupe->prepare_dedupe; + my $dst = $lei->{ovv}->{dst}; + sub { # for git_to_mail + my ($buf, $oid, $kw) = @_; + return _buf2maildir($dst, $buf, $oid, $kw) if !$dedupe; + my $eml = PublicInbox::Eml->new($$buf); # copy buf + return if $dedupe->is_dup($eml, $oid); + undef $eml; + _buf2maildir($dst, $buf, $oid, $kw); + } +} + +sub write_cb { # returns a callback for git_to_mail + my ($self, $lei) = @_; + # _mbox_write_cb or _maildir_write_cb + my $m = "_$self->{base_type}_write_cb"; + $self->$m($lei); +} + +sub new { + my ($cls, $lei) = @_; + my $fmt = $lei->{ovv}->{fmt}; + my $dst = $lei->{ovv}->{dst}; + my $self = bless {}, $cls; + if ($fmt eq 'maildir') { + $self->{base_type} = 'maildir'; + $lei->{ovv}->{dst} = $dst .= '/' if substr($dst, -1) ne '/'; + } elsif (substr($fmt, 0, 4) eq 'mbox') { + $self->can("eml2$fmt") or die "bad mbox --format=$fmt\n"; + $self->{base_type} = 'mbox'; + } else { + die "bad mail --format=$fmt\n"; + } + my $dedupe = $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei, $dst); + $self; +} + +sub _prepare_maildir { + my ($self, $lei) = @_; + my $dst = $lei->{ovv}->{dst}; if ($lei->{opt}->{augment}) { + my $dedupe = $lei->{dedupe}; if ($dedupe && $dedupe->prepare_dedupe) { require PublicInbox::InboxWritable; # eml_from_path _maildir_each_file($dst, \&_augment_file, $lei); - $dedupe->pause_dedupe if $jobs; # are we forking? + $dedupe->pause_dedupe; } } else { # clobber existing Maildir _maildir_each_file($dst, \&_unlink); @@ -332,32 +342,51 @@ sub _maildir_write_cb ($$) { my $d = $dst.$x; next if -d $d; require File::Path; - if (!File::Path::mkpath($d) && !-d $d) { - die "failed to mkpath($d): $!\n"; - } - } - $dedupe->prepare_dedupe if $dedupe && !$jobs; - sub { # for git_to_mail - my ($buf, $oid, $kw) = @_; - return _buf2maildir($dst, $buf, $oid, $kw) if !$dedupe; - my $eml = PublicInbox::Eml->new($$buf); # copy buf - return if $dedupe->is_dup($eml, $oid); - undef $eml; - _buf2maildir($dst, $buf, $oid, $kw); + File::Path::mkpath($d) or die "mkpath($d): $!"; + -d $d or die "$d is not a directory"; } } -sub write_cb { # returns a callback for git_to_mail - my ($cls, $dst, $lei) = @_; - require PublicInbox::LeiDedupe; - if ($dst =~ s!\A(mbox(?:rd|cl|cl2|o))?:!!) { - _mbox_write_cb($cls, $1, $dst, $lei); - } elsif ($dst =~ s!\A[Mm]aildir:!!) { # typically capitalized - _maildir_write_cb($dst, $lei); +sub _prepare_mbox { + my ($self, $lei) = @_; + my $dst = $lei->{ovv}->{dst}; + my ($out, $seekable); + if ($dst eq '/dev/stdout') { + $out = $lei->{1}; } else { - undef; + my $mode = -p $dst ? '>' : '+>>'; + if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) { + $! == ENOENT or die "unlink($dst): $!"; + } + open $out, $mode, $dst or die "open($dst): $!"; + # Perl does SEEK_END even with O_APPEND :< + $seekable = seek($out, 0, SEEK_SET); + die "seek($dst): $!\n" if !$seekable && $! != ESPIPE; + $lei->{1} = $out; + } + state $zsfx_allow = join('|', keys %zsfx2cmd); + my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/); + my $dedupe = $lei->{dedupe}; + if ($lei->{opt}->{augment}) { + die "cannot augment $dst, not seekable\n" if !$seekable; + if (-s $out && $dedupe && $dedupe->prepare_dedupe) { + my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : + dup_src($out); + my $fmt = $lei->{ovv}->{fmt}; + require PublicInbox::MboxReader; + PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei); + } + # maybe some systems don't honor O_APPEND, Perl does this: + seek($out, 0, SEEK_END) or die "seek $dst: $!"; + $dedupe->pause_dedupe if $dedupe; } - # TODO: Maildir, MH, IMAP, JMAP ... + compress_dst($self, $zsfx, $lei) if $zsfx; +} + +sub do_prepare { + my ($self, $lei) = @_; + my $m = "_prepare_$self->{base_type}"; + $self->$m($lei); } 1; |