From dda8237aeb5722b3a48c31896d9b7398e50823f1 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 15 Jan 2021 23:36:21 -1200 Subject: lei_to_mail: prepare for worker offload We'll be doing most of the work in forked off worker processes, so ensure some of it is fork and serialization-friendly. --- t/lei_to_mail.t | 62 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 35 insertions(+), 27 deletions(-) (limited to 't') diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index cb30fed5..d5beb3d2 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -11,6 +11,7 @@ use PublicInbox::Spawn qw(popen_rd which); use List::Util qw(shuffle); require_mods(qw(DBD::SQLite)); require PublicInbox::MboxReader; +require PublicInbox::LeiOverview; use_ok 'PublicInbox::LeiToMail'; my $from = "Content-Length: 10\nSubject: x\n\nFrom hell\n"; my $noeol = "Subject: x\n\nFrom hell"; @@ -80,8 +81,27 @@ blah EOM my $fn = "$tmpdir/x.mbox"; my ($mbox) = shuffle(@MBOX); # pick one, shouldn't matter +my $wcb_get = sub { + my ($fmt, $dst) = @_; + delete $lei->{dedupe}; + $lei->{ovv} = bless { + fmt => $fmt, + dst => $dst + }, 'PublicInbox::LeiOverview'; + my $l2m = PublicInbox::LeiToMail->new($lei); + SKIP: { + require_mods('Storable', 1); + my $dup = Storable::thaw(Storable::freeze($l2m)); + is_deeply($dup, $l2m, "$fmt round-trips through storable"); + } + $l2m->do_prepare($lei); + my $cb = $l2m->write_cb($lei); + delete $lei->{1}; + $cb; +}; + my $orig = do { - my $wcb = PublicInbox::LeiToMail->write_cb("$mbox:$fn", $lei); + my $wcb = $wcb_get->($mbox, $fn); is(ref $wcb, 'CODE', 'write_cb returned callback'); ok(-f $fn && !-s _, 'empty file created'); $wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]); @@ -92,13 +112,12 @@ my $orig = do { unlink $fn or BAIL_OUT $!; local $lei->{opt} = { jobs => 2 }; - $wcb = PublicInbox::LeiToMail->write_cb("$mbox:$fn", $lei); + $wcb = $wcb_get->($mbox, $fn); ok(-f $fn && !-s _, 'truncated mbox destination'); - $lei->{dedupe}->prepare_dedupe; $wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]); undef $wcb; open $fh, '<', $fn or BAIL_OUT $!; - is($raw, do { local $/; <$fh> }, 'jobs > 1'); + is(do { local $/; <$fh> }, $raw, 'jobs > 1'); $raw; }; for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? @@ -109,8 +128,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? my $dc_cmd = eval { $zsfx2cmd->($zsfx, 1, $lei) }; ok($dc_cmd, "decompressor for .$zsfx"); my $f = "$fn.$zsfx"; - my $dst = "$mbox:$f"; - my $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei); + my $wcb = $wcb_get->($mbox, $f); $wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]); undef $wcb; my $uncompressed = xqx([@$dc_cmd, $f]); @@ -118,15 +136,13 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? local $lei->{opt} = { jobs => 2 }; # for atomic writes unlink $f or BAIL_OUT "unlink $!"; - $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei); - $lei->{dedupe}->prepare_dedupe; + $wcb = $wcb_get->($mbox, $f); $wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]); undef $wcb; is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock"); local $lei->{opt} = { augment => 1 }; - $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei); - $lei->{dedupe}->prepare_dedupe; + $wcb = $wcb_get->($mbox, $f); $wcb->(\($dup = $buf . "\nx\n"), 'deadbeef', [ qw(seen) ]); undef $wcb; # commit @@ -138,8 +154,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? like($raw[0], qr/\nblah\n\z/s, "original preserved $zsfx"); local $lei->{opt} = { augment => 1, jobs => 2 }; - $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei); - $lei->{dedupe}->prepare_dedupe; + $wcb = $wcb_get->($mbox, $f); $wcb->(\($dup = $buf . "\ny\n"), 'deadbeef', [ qw(seen) ]); undef $wcb; # commit @@ -155,7 +170,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? unlink $fn or BAIL_OUT $!; if ('default deduplication uses content_hash') { - my $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei); + my $wcb = $wcb_get->('mboxo', $fn); $wcb->(\(my $x = $buf), 'deadbeef', []) for (1..2); undef $wcb; # undef to commit changes my $cmp = ''; @@ -164,7 +179,7 @@ if ('default deduplication uses content_hash') { is($cmp, $buf, 'only one message written'); local $lei->{opt} = { augment => 1 }; - $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei); + $wcb = $wcb_get->('mboxo', $fn); $wcb->(\($x = $buf . "\nx\n"), 'deadbeef', []) for (1..2); undef $wcb; # undef to commit changes open $fh, '<', $fn or BAIL_OUT $!; @@ -178,7 +193,7 @@ if ('default deduplication uses content_hash') { { # stdout support open my $tmp, '+>', undef or BAIL_OUT $!; local $lei->{1} = $tmp; - my $wcb = PublicInbox::LeiToMail->write_cb("mboxrd:/dev/stdout", $lei); + my $wcb = $wcb_get->('mboxrd', '/dev/stdout'); $wcb->(\(my $x = $buf), 'deadbeef', []); undef $wcb; # commit seek($tmp, 0, SEEK_SET) or BAIL_OUT $!; @@ -192,7 +207,7 @@ SKIP: { # FIFO support my $fn = "$tmpdir/fifo"; mkfifo($fn, 0600) or skip("mkfifo not supported: $!", 1); my $cat = popen_rd([which('cat'), $fn]); - my $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei); + my $wcb = $wcb_get->('mboxo', $fn); $wcb->(\(my $x = $buf), 'deadbeef', []); undef $wcb; # commit my $cmp = ''; @@ -202,22 +217,17 @@ SKIP: { # FIFO support { # Maildir support my $md = "$tmpdir/maildir/"; - my $wcb = PublicInbox::LeiToMail->write_cb("Maildir:$md", $lei); - ok($wcb, 'got Maildir callback'); - $lei->{dedupe}->prepare_dedupe; + my $wcb = $wcb_get->('maildir', $md); + is(ref($wcb), 'CODE', 'got Maildir callback'); $wcb->(\(my $x = $buf), 'badc0ffee', []); - undef $wcb; # commit my @f; PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @f, shift }); - is(scalar(@f), 1, 'wrote one file'); open my $fh, $f[0] or BAIL_OUT $!; is(do { local $/; <$fh> }, $buf, 'wrote to Maildir'); - $wcb = PublicInbox::LeiToMail->write_cb("maildir:$md", $lei); - $lei->{dedupe}->prepare_dedupe; + $wcb = $wcb_get->('maildir', $md); $wcb->(\($x = $buf."\nx\n"), 'deadcafe', []); - undef $wcb; # commit my @x = (); PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @x, shift }); @@ -227,11 +237,9 @@ SKIP: { # FIFO support is(do { local $/; <$fh> }, $buf."\nx\n", 'wrote new file to Maildir'); local $lei->{opt}->{augment} = 1; - $wcb = PublicInbox::LeiToMail->write_cb("maildir:$md", $lei); - $lei->{dedupe}->prepare_dedupe; + $wcb = $wcb_get->('maildir', $md); $wcb->(\($x = $buf."\ny\n"), 'deadcafe', []); $wcb->(\($x = $buf."\ny\n"), 'b4dc0ffee', []); # skipped by dedupe - undef $wcb; # commit @f = (); PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @f, shift }); is(scalar grep(/\A\Q$x[0]\E\z/, @f), 1, 'old file still there'); -- cgit v1.2.3-24-ge0c7