From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id C7D6D1F934 for ; Sat, 16 Jan 2021 11:36:24 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 1/4] lei_to_mail: prepare for worker offload Date: Fri, 15 Jan 2021 23:36:21 -1200 Message-Id: <20210116113624.19930-2-e@80x24.org> In-Reply-To: <20210116113624.19930-1-e@80x24.org> References: <20210116113624.19930-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We'll be doing most of the work in forked off worker processes, so ensure some of it is fork and serialization-friendly. --- lib/PublicInbox/LeiOverview.pm | 20 ++-- lib/PublicInbox/LeiToMail.pm | 175 +++++++++++++++++++-------------- t/lei_to_mail.t | 62 +++++++----- 3 files changed, 147 insertions(+), 110 deletions(-) diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index ef5f27c1..9846bc8a 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -25,7 +25,7 @@ sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) } sub ovv_out_lk_init ($) { my ($self) = @_; $self->{tmp_lk_id} = "$self.$$"; - my $tmp = File::Temp->new("lei-ovv.out.$$.lock-XXXXXX", + my $tmp = File::Temp->new("lei-ovv.dst.$$.lock-XXXXXX", TMPDIR => 1, UNLINK => 0); $self->{lock_path} = $tmp->filename; } @@ -39,32 +39,32 @@ sub ovv_out_lk_cancel ($) { sub new { my ($class, $lei) = @_; my $opt = $lei->{opt}; - my $out = $opt->{output} // '-'; - $out = '/dev/stdout' if $out eq '-'; + my $dst = $opt->{output} // '-'; + $dst = '/dev/stdout' if $dst eq '-'; my $fmt = $opt->{'format'}; $fmt = lc($fmt) if defined $fmt; - if ($out =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/ + if ($dst =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/ my $ofmt = lc $1; $fmt //= $ofmt; return $lei->fail(<<"") if $fmt ne $ofmt; --format=$fmt and --output=$ofmt conflict } - $fmt //= 'json' if $out eq '/dev/stdout'; + $fmt //= 'json' if $dst eq '/dev/stdout'; $fmt //= 'maildir'; # TODO - if (index($out, '://') < 0) { # not a URL, so assume path - $out = File::Spec->canonpath($out); + if (index($dst, '://') < 0) { # not a URL, so assume path + $dst = File::Spec->canonpath($dst); } # else URL - my $self = bless { fmt => $fmt, out => $out }, $class; + my $self = bless { fmt => $fmt, dst => $dst }, $class; my $json; if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) { $json = $self->{json} = ref(PublicInbox::Config->json); } my ($isatty, $seekable); - if ($out eq '/dev/stdout') { + if ($dst eq '/dev/stdout') { $isatty = -t $lei->{1}; $lei->start_pager if $isatty; $opt->{pretty} //= $isatty; @@ -78,7 +78,7 @@ sub new { } elsif ($json) { return $lei->fail('JSON formats only output to stdout'); } else { - return $lei->fail("TODO: $out -f $fmt"); + return $lei->fail("TODO: $dst -f $fmt"); } $self; } diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 4c65dce2..5d4b7978 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -146,8 +146,7 @@ sub reap_compress { # dwaitpid callback # { foo => '' } means "--foo" is passed to the command-line, # otherwise { foo => '--bar' } passes "--bar" our %zsfx2cmd = ( - gz => [ qw(GZIP pigz gzip), { - rsyncable => '', threads => '-p' } ], + gz => [ qw(GZIP pigz gzip), { rsyncable => '', threads => '-p' } ], bz2 => [ 'bzip2', {} ], xz => [ 'xz', { threads => '-T' } ], # XXX does anybody care for these? I prefer zstd on entire FSes, @@ -189,24 +188,23 @@ sub zsfx2cmd ($$$) { } sub compress_dst { - my ($out, $zsfx, $lei) = @_; + my ($self, $zsfx, $lei) = @_; my $cmd = zsfx2cmd($zsfx, undef, $lei); pipe(my ($r, $w)) or die "pipe: $!"; - my $rdr = { 0 => $r, 1 => $out, 2 => $lei->{2} }; + my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} }; my $pid = spawn($cmd, $lei->{env}, $rdr); $lei->{"pid.$pid"} = $cmd; my $pp = gensym; tie *$pp, 'PublicInbox::ProcessPipe', $pid, $w, \&reap_compress, $lei; - my $pipe_lk = ($lei->{opt}->{jobs} // 0) > 1 ? - PublicInbox::Lock->new_tmp($zsfx) : undef; - ($pp, $pipe_lk); + $lei->{1} = $pp; + die 'BUG: unexpected {ovv}->{lock_path}' if $lei->{ovv}->{lock_path}; + $lei->{ovv}->ovv_out_lk_init if ($lei->{opt}->{jobs} // 2) > 1; } sub decompress_src ($$$) { my ($in, $zsfx, $lei) = @_; my $cmd = zsfx2cmd($zsfx, 1, $lei); - my $rdr = { 0 => $in, 2 => $lei->{2} }; - popen_rd($cmd, $lei->{env}, $rdr); + popen_rd($cmd, $lei->{env}, { 0 => $in, 2 => $lei->{2} }); } sub dup_src ($) { @@ -222,48 +220,22 @@ sub _augment { # MboxReader eml_cb $lei->{dedupe}->is_dup($eml); } -sub _mbox_write_cb ($$$$) { - my ($cls, $mbox, $dst, $lei) = @_; - my $m = "eml2$mbox"; - my $eml2mbox = $cls->can($m) or die "$cls->$m missing"; - my ($out, $pipe_lk, $seekable); - # XXX should we support /dev/stdout.gz ? - if ($dst eq '/dev/stdout') { - $out = $lei->{1}; - } else { # TODO: mbox locking (but mairix doesn't...) - my $mode = -p $dst ? '>' : '+>>'; - if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) { - die "unlink $dst: $!" if $! != ENOENT; - } - open $out, $mode, $dst or die "open $dst: $!"; - # Perl does SEEK_END even with O_APPEND :< - $seekable = seek($out, 0, SEEK_SET); - die "seek $dst: $!\n" if !$seekable && $! != ESPIPE; - } - my $jobs = $lei->{opt}->{jobs} // 0; - state $zsfx_allow = join('|', keys %zsfx2cmd); - my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/); - my $write = $jobs > 1 && !$zsfx ? \&atomic_append : \&_print_full; - my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei); - if ($lei->{opt}->{augment}) { - die "cannot augment $dst, not seekable\n" if !$seekable; - if (-s $out && $dedupe->prepare_dedupe) { - my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : - dup_src($out); - PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei); - } - # maybe some systems don't honor O_APPEND, Perl does this: - seek($out, 0, SEEK_END) or die "seek $dst: $!"; - $dedupe->pause_dedupe if $jobs; # are we forking? - } - $dedupe->prepare_dedupe if !$jobs; - ($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx; +sub _mbox_write_cb ($$) { + my ($self, $lei) = @_; + my $ovv = $lei->{ovv}; + my $m = 'eml2'.$ovv->{fmt}; + my $eml2mbox = $self->can($m) or die "$self->$m missing"; + my $out = $lei->{1} // die "no stdout ($m, $ovv->{dst})"; # redirected earlier + $out->autoflush(1); + my $write = $ovv->{lock_path} ? \&_print_full : \&atomic_append; + my $dedupe = $lei->{dedupe}; + $dedupe->prepare_dedupe; sub { # for git_to_mail my ($buf, $oid, $kw) = @_; my $eml = PublicInbox::Eml->new($buf); if (!$dedupe->is_dup($eml, $oid)) { $buf = $eml2mbox->($eml, $kw); - my $lock = $pipe_lk->lock_for_scope if $pipe_lk; + my $lk = $ovv->lock_for_scope; $write->($out, $buf); } } @@ -313,17 +285,55 @@ sub _buf2maildir { } } - sub _maildir_write_cb ($$) { - my ($dst, $lei) = @_; - $dst .= '/' unless substr($dst, -1) eq '/'; - my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei, $dst); - my $jobs = $lei->{opt}->{jobs} // 0; + my ($self, $lei) = @_; + my $dedupe = $lei->{dedupe}; + $dedupe->prepare_dedupe; + my $dst = $lei->{ovv}->{dst}; + sub { # for git_to_mail + my ($buf, $oid, $kw) = @_; + return _buf2maildir($dst, $buf, $oid, $kw) if !$dedupe; + my $eml = PublicInbox::Eml->new($$buf); # copy buf + return if $dedupe->is_dup($eml, $oid); + undef $eml; + _buf2maildir($dst, $buf, $oid, $kw); + } +} + +sub write_cb { # returns a callback for git_to_mail + my ($self, $lei) = @_; + # _mbox_write_cb or _maildir_write_cb + my $m = "_$self->{base_type}_write_cb"; + $self->$m($lei); +} + +sub new { + my ($cls, $lei) = @_; + my $fmt = $lei->{ovv}->{fmt}; + my $dst = $lei->{ovv}->{dst}; + my $self = bless {}, $cls; + if ($fmt eq 'maildir') { + $self->{base_type} = 'maildir'; + $lei->{ovv}->{dst} = $dst .= '/' if substr($dst, -1) ne '/'; + } elsif (substr($fmt, 0, 4) eq 'mbox') { + $self->can("eml2$fmt") or die "bad mbox --format=$fmt\n"; + $self->{base_type} = 'mbox'; + } else { + die "bad mail --format=$fmt\n"; + } + my $dedupe = $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei, $dst); + $self; +} + +sub _prepare_maildir { + my ($self, $lei) = @_; + my $dst = $lei->{ovv}->{dst}; if ($lei->{opt}->{augment}) { + my $dedupe = $lei->{dedupe}; if ($dedupe && $dedupe->prepare_dedupe) { require PublicInbox::InboxWritable; # eml_from_path _maildir_each_file($dst, \&_augment_file, $lei); - $dedupe->pause_dedupe if $jobs; # are we forking? + $dedupe->pause_dedupe; } } else { # clobber existing Maildir _maildir_each_file($dst, \&_unlink); @@ -332,32 +342,51 @@ sub _maildir_write_cb ($$) { my $d = $dst.$x; next if -d $d; require File::Path; - if (!File::Path::mkpath($d) && !-d $d) { - die "failed to mkpath($d): $!\n"; - } - } - $dedupe->prepare_dedupe if $dedupe && !$jobs; - sub { # for git_to_mail - my ($buf, $oid, $kw) = @_; - return _buf2maildir($dst, $buf, $oid, $kw) if !$dedupe; - my $eml = PublicInbox::Eml->new($$buf); # copy buf - return if $dedupe->is_dup($eml, $oid); - undef $eml; - _buf2maildir($dst, $buf, $oid, $kw); + File::Path::mkpath($d) or die "mkpath($d): $!"; + -d $d or die "$d is not a directory"; } } -sub write_cb { # returns a callback for git_to_mail - my ($cls, $dst, $lei) = @_; - require PublicInbox::LeiDedupe; - if ($dst =~ s!\A(mbox(?:rd|cl|cl2|o))?:!!) { - _mbox_write_cb($cls, $1, $dst, $lei); - } elsif ($dst =~ s!\A[Mm]aildir:!!) { # typically capitalized - _maildir_write_cb($dst, $lei); +sub _prepare_mbox { + my ($self, $lei) = @_; + my $dst = $lei->{ovv}->{dst}; + my ($out, $seekable); + if ($dst eq '/dev/stdout') { + $out = $lei->{1}; } else { - undef; + my $mode = -p $dst ? '>' : '+>>'; + if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) { + $! == ENOENT or die "unlink($dst): $!"; + } + open $out, $mode, $dst or die "open($dst): $!"; + # Perl does SEEK_END even with O_APPEND :< + $seekable = seek($out, 0, SEEK_SET); + die "seek($dst): $!\n" if !$seekable && $! != ESPIPE; + $lei->{1} = $out; + } + state $zsfx_allow = join('|', keys %zsfx2cmd); + my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/); + my $dedupe = $lei->{dedupe}; + if ($lei->{opt}->{augment}) { + die "cannot augment $dst, not seekable\n" if !$seekable; + if (-s $out && $dedupe && $dedupe->prepare_dedupe) { + my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : + dup_src($out); + my $fmt = $lei->{ovv}->{fmt}; + require PublicInbox::MboxReader; + PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei); + } + # maybe some systems don't honor O_APPEND, Perl does this: + seek($out, 0, SEEK_END) or die "seek $dst: $!"; + $dedupe->pause_dedupe if $dedupe; } - # TODO: Maildir, MH, IMAP, JMAP ... + compress_dst($self, $zsfx, $lei) if $zsfx; +} + +sub do_prepare { + my ($self, $lei) = @_; + my $m = "_prepare_$self->{base_type}"; + $self->$m($lei); } 1; diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index cb30fed5..d5beb3d2 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -11,6 +11,7 @@ use PublicInbox::Spawn qw(popen_rd which); use List::Util qw(shuffle); require_mods(qw(DBD::SQLite)); require PublicInbox::MboxReader; +require PublicInbox::LeiOverview; use_ok 'PublicInbox::LeiToMail'; my $from = "Content-Length: 10\nSubject: x\n\nFrom hell\n"; my $noeol = "Subject: x\n\nFrom hell"; @@ -80,8 +81,27 @@ blah EOM my $fn = "$tmpdir/x.mbox"; my ($mbox) = shuffle(@MBOX); # pick one, shouldn't matter +my $wcb_get = sub { + my ($fmt, $dst) = @_; + delete $lei->{dedupe}; + $lei->{ovv} = bless { + fmt => $fmt, + dst => $dst + }, 'PublicInbox::LeiOverview'; + my $l2m = PublicInbox::LeiToMail->new($lei); + SKIP: { + require_mods('Storable', 1); + my $dup = Storable::thaw(Storable::freeze($l2m)); + is_deeply($dup, $l2m, "$fmt round-trips through storable"); + } + $l2m->do_prepare($lei); + my $cb = $l2m->write_cb($lei); + delete $lei->{1}; + $cb; +}; + my $orig = do { - my $wcb = PublicInbox::LeiToMail->write_cb("$mbox:$fn", $lei); + my $wcb = $wcb_get->($mbox, $fn); is(ref $wcb, 'CODE', 'write_cb returned callback'); ok(-f $fn && !-s _, 'empty file created'); $wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]); @@ -92,13 +112,12 @@ my $orig = do { unlink $fn or BAIL_OUT $!; local $lei->{opt} = { jobs => 2 }; - $wcb = PublicInbox::LeiToMail->write_cb("$mbox:$fn", $lei); + $wcb = $wcb_get->($mbox, $fn); ok(-f $fn && !-s _, 'truncated mbox destination'); - $lei->{dedupe}->prepare_dedupe; $wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]); undef $wcb; open $fh, '<', $fn or BAIL_OUT $!; - is($raw, do { local $/; <$fh> }, 'jobs > 1'); + is(do { local $/; <$fh> }, $raw, 'jobs > 1'); $raw; }; for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? @@ -109,8 +128,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? my $dc_cmd = eval { $zsfx2cmd->($zsfx, 1, $lei) }; ok($dc_cmd, "decompressor for .$zsfx"); my $f = "$fn.$zsfx"; - my $dst = "$mbox:$f"; - my $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei); + my $wcb = $wcb_get->($mbox, $f); $wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]); undef $wcb; my $uncompressed = xqx([@$dc_cmd, $f]); @@ -118,15 +136,13 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? local $lei->{opt} = { jobs => 2 }; # for atomic writes unlink $f or BAIL_OUT "unlink $!"; - $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei); - $lei->{dedupe}->prepare_dedupe; + $wcb = $wcb_get->($mbox, $f); $wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]); undef $wcb; is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock"); local $lei->{opt} = { augment => 1 }; - $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei); - $lei->{dedupe}->prepare_dedupe; + $wcb = $wcb_get->($mbox, $f); $wcb->(\($dup = $buf . "\nx\n"), 'deadbeef', [ qw(seen) ]); undef $wcb; # commit @@ -138,8 +154,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? like($raw[0], qr/\nblah\n\z/s, "original preserved $zsfx"); local $lei->{opt} = { augment => 1, jobs => 2 }; - $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei); - $lei->{dedupe}->prepare_dedupe; + $wcb = $wcb_get->($mbox, $f); $wcb->(\($dup = $buf . "\ny\n"), 'deadbeef', [ qw(seen) ]); undef $wcb; # commit @@ -155,7 +170,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? unlink $fn or BAIL_OUT $!; if ('default deduplication uses content_hash') { - my $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei); + my $wcb = $wcb_get->('mboxo', $fn); $wcb->(\(my $x = $buf), 'deadbeef', []) for (1..2); undef $wcb; # undef to commit changes my $cmp = ''; @@ -164,7 +179,7 @@ if ('default deduplication uses content_hash') { is($cmp, $buf, 'only one message written'); local $lei->{opt} = { augment => 1 }; - $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei); + $wcb = $wcb_get->('mboxo', $fn); $wcb->(\($x = $buf . "\nx\n"), 'deadbeef', []) for (1..2); undef $wcb; # undef to commit changes open $fh, '<', $fn or BAIL_OUT $!; @@ -178,7 +193,7 @@ if ('default deduplication uses content_hash') { { # stdout support open my $tmp, '+>', undef or BAIL_OUT $!; local $lei->{1} = $tmp; - my $wcb = PublicInbox::LeiToMail->write_cb("mboxrd:/dev/stdout", $lei); + my $wcb = $wcb_get->('mboxrd', '/dev/stdout'); $wcb->(\(my $x = $buf), 'deadbeef', []); undef $wcb; # commit seek($tmp, 0, SEEK_SET) or BAIL_OUT $!; @@ -192,7 +207,7 @@ SKIP: { # FIFO support my $fn = "$tmpdir/fifo"; mkfifo($fn, 0600) or skip("mkfifo not supported: $!", 1); my $cat = popen_rd([which('cat'), $fn]); - my $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei); + my $wcb = $wcb_get->('mboxo', $fn); $wcb->(\(my $x = $buf), 'deadbeef', []); undef $wcb; # commit my $cmp = ''; @@ -202,22 +217,17 @@ SKIP: { # FIFO support { # Maildir support my $md = "$tmpdir/maildir/"; - my $wcb = PublicInbox::LeiToMail->write_cb("Maildir:$md", $lei); - ok($wcb, 'got Maildir callback'); - $lei->{dedupe}->prepare_dedupe; + my $wcb = $wcb_get->('maildir', $md); + is(ref($wcb), 'CODE', 'got Maildir callback'); $wcb->(\(my $x = $buf), 'badc0ffee', []); - undef $wcb; # commit my @f; PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @f, shift }); - is(scalar(@f), 1, 'wrote one file'); open my $fh, $f[0] or BAIL_OUT $!; is(do { local $/; <$fh> }, $buf, 'wrote to Maildir'); - $wcb = PublicInbox::LeiToMail->write_cb("maildir:$md", $lei); - $lei->{dedupe}->prepare_dedupe; + $wcb = $wcb_get->('maildir', $md); $wcb->(\($x = $buf."\nx\n"), 'deadcafe', []); - undef $wcb; # commit my @x = (); PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @x, shift }); @@ -227,11 +237,9 @@ SKIP: { # FIFO support is(do { local $/; <$fh> }, $buf."\nx\n", 'wrote new file to Maildir'); local $lei->{opt}->{augment} = 1; - $wcb = PublicInbox::LeiToMail->write_cb("maildir:$md", $lei); - $lei->{dedupe}->prepare_dedupe; + $wcb = $wcb_get->('maildir', $md); $wcb->(\($x = $buf."\ny\n"), 'deadcafe', []); $wcb->(\($x = $buf."\ny\n"), 'b4dc0ffee', []); # skipped by dedupe - undef $wcb; # commit @f = (); PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @f, shift }); is(scalar grep(/\A\Q$x[0]\E\z/, @f), 1, 'old file still there');