diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 140 | ||||
-rw-r--r-- | lib/PublicInbox/Lock.pm | 7 | ||||
-rw-r--r-- | lib/PublicInbox/MboxReader.pm | 3 |
3 files changed, 117 insertions, 33 deletions
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index ebb50c50..294291b2 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -8,10 +8,12 @@ use v5.10.1; use PublicInbox::Eml; use PublicInbox::Lock; use PublicInbox::ProcessPipe; -use PublicInbox::Spawn qw(which spawn); +use PublicInbox::SharedKV; +use PublicInbox::Spawn qw(which spawn popen_rd); +use PublicInbox::ContentHash qw(content_hash); use Symbol qw(gensym); -use File::Temp (); use IO::Handle; # ->autoflush +use Fcntl qw(SEEK_SET); my %kw2char = ( # Maildir characters draft => 'D', @@ -150,51 +152,123 @@ sub reap_compress { # dwaitpid callback $lei->fail("@$cmd failed", $? >> 8); } -sub compress_dst { - my ($out, $sfx, $lei) = @_; - my $cmd = []; - if ($sfx eq 'gz') { - $cmd->[0] = which($lei->{env}->{GZIP} // 'pigz') // - which('gzip') // - die "pigz or gzip missing for $sfx\n"; - # TODO: use IO::Compress::Gzip - push @$cmd, '-c'; # stdout - push @$cmd, '--rsyncable' if $lei->{opt}->{rsyncable}; - } else { - die "TODO $sfx" +# all of these support -c for stdout and -d for decompression, +# mutt is commonly distributed with hooks for gz, bz2 and xz, at least +# { foo => '' } means "--foo" is passed to the command-line, +# otherwise { foo => '--bar' } passes "--bar" +our %zsfx2cmd = ( + gz => [ qw(GZIP pigz gzip), { + rsyncable => '', threads => '-p' } ], + bz2 => [ 'bzip2', {} ], + xz => [ 'xz', { threads => '-T' } ], + # XXX does anybody care for these? I prefer zstd on entire FSes, + # so it's probably not necessary on a per-file basis + # zst => [ 'zstd', { -default => [ qw(-q) ], # it's noisy by default + # rsyncable => '', threads => '-T' } ], + # zz => [ 'pigz', { -default => [ '--zlib' ], + # rsyncable => '', threads => '-p' }], + # lzo => [ 'lzop', {} ], + # lzma => [ 'lzma', {} ], +); + +sub zsfx2cmd ($$$) { + my ($zsfx, $decompress, $lei) = @_; + my $x = $zsfx2cmd{$zsfx} // die "no support for suffix=.$zsfx"; + my @info = @$x; + my $cmd_opt = pop @info; + my @cmd = (undef, $decompress ? qw(-dc) : qw(-c)); + for my $exe (@info) { + # I think respecting client's ENV{GZIP} is OK, not sure + # about ENV overrides for other, less-common compressors + if ($exe eq uc($exe)) { + $exe = $lei->{env}->{$exe} or next; + } + $cmd[0] = which($exe) and last; + } + $cmd[0] // die join(' or ', @info)." missing for .$zsfx"; + # push @cmd, @{$cmd_opt->{-default}} if $cmd_opt->{-default}; + for my $bool (qw(rsyncable)) { + my $switch = $cmd_opt->{rsyncable} // next; + push @cmd, '--'.($switch || $bool); + } + for my $key (qw(threads)) { # support compression level? + my $switch = $cmd_opt->{$key} // next; + my $val = $lei->{opt}->{$key} // next; + push @cmd, $switch, $val; } + \@cmd; +} + +sub compress_dst { + my ($out, $zsfx, $lei) = @_; + my $cmd = zsfx2cmd($zsfx, undef, $lei); pipe(my ($r, $w)) or die "pipe: $!"; my $rdr = { 0 => $r, 1 => $out, 2 => $lei->{2} }; my $pid = spawn($cmd, $lei->{env}, $rdr); $lei->{"pid.$pid"} = $cmd; my $pp = gensym; tie *$pp, 'PublicInbox::ProcessPipe', $pid, $w, \&reap_compress, $lei; - my $tmp = File::Temp->new("$sfx.lock-XXXXXX", TMPDIR => 1); - my $pipe_lk = ($lei->{opt}->{jobs} // 0) > 1 ? bless({ - lock_path => $tmp->filename, - tmp => $tmp - }, 'PublicInbox::Lock') : undef; + my $pipe_lk = ($lei->{opt}->{jobs} // 0) > 1 ? + PublicInbox::Lock->new_tmp($zsfx) : undef; ($pp, $pipe_lk); } -sub write_cb { - my ($cls, $dst, $lei) = @_; - if ($dst =~ s!\A(mbox(?:rd|cl|cl2|o))?:!!) { - my $m = "eml2$1"; - my $eml2mbox = $cls->can($m) or die "$cls->$m missing"; - my ($out, $pipe_lk); - open $out, '>>', $dst or die "open $dst: $!"; - my $atomic = !!(($lei->{opt}->{jobs} // 0) > 1); - if ($dst =~ /\.(gz|bz2|xz)\z/) { - ($out, $pipe_lk) = compress_dst($out, $1, $lei); - } - sub { - my ($buf, $oid, $kw) = @_; - $buf = $eml2mbox->(PublicInbox::Eml->new($buf), $kw); +sub decompress_src ($$$) { + my ($in, $zsfx, $lei) = @_; + my $cmd = zsfx2cmd($zsfx, 1, $lei); + my $rdr = { 0 => $in, 2 => $lei->{2} }; + popen_rd($cmd, $lei->{env}, $rdr); +} + +sub dup_src ($) { + my ($in) = @_; + open my $dup, '+>>&', $in or die "dup: $!"; + $dup; +} + +# --augment existing output destination, without duplicating anything +sub _augment { # MboxReader eml_cb + my ($eml, $lei) = @_; + $lei->{skv}->set_maybe(content_hash($eml), ''); +} + +sub _mbox_write_cb ($$$$) { + my ($cls, $mbox, $dst, $lei) = @_; + my $m = "eml2$mbox"; + my $eml2mbox = $cls->can($m) or die "$cls->$m missing"; + my ($out, $pipe_lk); + open $out, '+>>', $dst or die "open $dst: $!"; + # Perl does SEEK_END even with O_APPEND :< + seek($out, 0, SEEK_SET) or die "seek $dst: $!"; + my $atomic = !!(($lei->{opt}->{jobs} // 0) > 1); + $lei->{skv} = PublicInbox::SharedKV->new; + $lei->{skv}->dbh; + state $zsfx_allow = join('|', keys %zsfx2cmd); + my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/); + if ($lei->{opt}->{augment}) { + my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : + dup_src($out); + PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei); + } else { + truncate($out, 0) or die "truncate $dst: $!"; + } + ($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx; + sub { + my ($buf, $oid, $kw) = @_; + my $eml = PublicInbox::Eml->new($buf); + if ($lei->{skv}->set_maybe(content_hash($eml), '')) { + $buf = $eml2mbox->($eml, $kw); my $lock = $pipe_lk->lock_for_scope if $pipe_lk; write_in_full($out, $buf, $atomic); } } } +sub write_cb { # returns a callback for git_to_mail + my ($cls, $dst, $lei) = @_; + if ($dst =~ s!\A(mbox(?:rd|cl|cl2|o))?:!!) { + _mbox_write_cb($cls, $1, $dst, $lei); + } +} + 1; diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm index 7fd17745..f6eaa5ce 100644 --- a/lib/PublicInbox/Lock.pm +++ b/lib/PublicInbox/Lock.pm @@ -8,6 +8,7 @@ use v5.10.1; use Fcntl qw(:flock :DEFAULT); use Carp qw(croak); use PublicInbox::OnDestroy; +use File::Temp (); # we only acquire the flock if creating or reindexing; # PublicInbox::Import already has the lock on its own. @@ -40,4 +41,10 @@ sub lock_for_scope { PublicInbox::OnDestroy->new(\&lock_release, $self); } +sub new_tmp { + my ($cls, $ident) = @_; + my $tmp = File::Temp->new("$ident.lock-XXXXXX", TMPDIR => 1); + bless { lock_path => $tmp->filename, tmp => $tmp }, $cls; +} + 1; diff --git a/lib/PublicInbox/MboxReader.pm b/lib/PublicInbox/MboxReader.pm index e1944aaf..ac0c0f52 100644 --- a/lib/PublicInbox/MboxReader.pm +++ b/lib/PublicInbox/MboxReader.pm @@ -5,6 +5,7 @@ package PublicInbox::MboxReader; use strict; use v5.10.1; +use PublicInbox::DS (); # localize $in_loop for error detection :< use Data::Dumper; $Data::Dumper::Useqq = 1; # should've been the default, for bad data @@ -13,6 +14,7 @@ my $from_strict = sub _mbox_from { my ($mbfh, $from_re, $eml_cb, @arg) = @_; + local $PublicInbox::DS::in_loop; # disable dwaitpid my $buf = ''; my @raw; while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) { @@ -73,6 +75,7 @@ sub _extract_hdr { sub _mbox_cl ($$$;@) { my ($mbfh, $uxs_from, $eml_cb, @arg) = @_; + local $PublicInbox::DS::in_loop; # disable dwaitpid my $buf = ''; while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) { if ($r == 0) { # detect "curl --fail" |