about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiToMail.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LeiToMail.pm')
-rw-r--r--lib/PublicInbox/LeiToMail.pm210
1 files changed, 126 insertions, 84 deletions
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 2aa3977e..5481b5e4 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -7,12 +7,15 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
-use PublicInbox::ProcessPipe;
+use PublicInbox::IO;
+use PublicInbox::Git;
 use PublicInbox::Spawn qw(spawn);
-use Symbol qw(gensym);
+use PublicInbox::Import;
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use PublicInbox::Syscall qw(rename_noreplace);
+use autodie qw(pipe open seek close);
+use Carp qw(croak);
 
 my %kw2char = ( # Maildir characters
         draft => 'D',
@@ -54,8 +57,7 @@ sub _mbox_hdr_buf ($$$) {
         }
         my $buf = delete $eml->{hdr};
 
-        # fixup old bug from import (pre-a0c07cba0e5d8b6a)
-        $$buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+        PublicInbox::Eml::strip_from($$buf);
         my $ident = $smsg->{blob} // 'lei';
         if (defined(my $pct = $smsg->{pct})) { $ident .= "=$pct" }
 
@@ -132,40 +134,41 @@ sub eml2mboxcl2 {
 }
 
 sub git_to_mail { # git->cat_async callback
-        my ($bref, $oid, $type, $size, $arg) = @_;
-        $type // return; # called by git->async_abort
-        my ($write_cb, $smsg) = @$arg;
-        if ($type eq 'missing' && $smsg->{-lms_rw}) {
-                if ($bref = $smsg->{-lms_rw}->local_blob($oid, 1)) {
+        my ($bref, $oid, $type, $size, $smsg) = @_;
+        $type // return; # called by PublicInbox::Git::close
+        return if $PublicInbox::Git::in_cleanup;
+        my $self = delete $smsg->{l2m} // croak "BUG: no l2m (type=$type)";
+        $self->{lei} // croak "BUG: no {lei} (type=$type)";
+        eval {
+                if ($type eq 'missing' &&
+                          ($bref = $self->{-lms_rw}->local_blob($oid, 1))) {
                         $type = 'blob';
                         $size = length($$bref);
                 }
-        }
-        return warn("W: $oid is $type (!= blob)\n") if $type ne 'blob';
-        return warn("E: $oid is empty\n") unless $size;
-        die "BUG: expected=$smsg->{blob} got=$oid" if $smsg->{blob} ne $oid;
-        $write_cb->($bref, $smsg);
+                $type eq 'blob' or return $self->{lei}->child_error(0,
+                                                "W: $oid is $type (!= blob)");
+                $size or return $self->{lei}->child_error(0,"E: $oid is empty");
+                $smsg->{blob} eq $oid or die "BUG: expected=$smsg->{blob}";
+                $smsg->{bytes} ||= $size;
+                $self->{wcb}->($bref, $smsg);
+        };
+        $self->{lei}->fail("$@ (oid=$oid)") if $@;
 }
 
-sub reap_compress { # dwaitpid callback
-        my ($lei, $pid) = @_;
-        my $cmd = delete $lei->{"pid.$pid"};
-        return if $? == 0;
-        $lei->fail("@$cmd failed", $? >> 8);
+sub reap_compress { # awaitpid callback
+        my ($pid, $lei, $cmd, $old_out) = @_;
+        $lei->{1} = $old_out;
+        $lei->fail($?, "@$cmd failed") if $?;
 }
 
-sub _post_augment_mbox { # open a compressor process from top-level process
+sub _post_augment_mbox { # open a compressor process from top-level lei-daemon
         my ($self, $lei) = @_;
         my $zsfx = $self->{zsfx} or return;
         my $cmd = PublicInbox::MboxReader::zsfx2cmd($zsfx, undef, $lei);
         my ($r, $w) = @{delete $lei->{zpipe}};
         my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 };
-        my $pid = spawn($cmd, undef, $rdr);
-        my $pp = gensym;
-        my $dup = bless { "pid.$pid" => $cmd }, ref($lei);
-        $dup->{$_} = $lei->{$_} for qw(2 sock);
-        tie *$pp, 'PublicInbox::ProcessPipe', $pid, $w, \&reap_compress, $dup;
-        $lei->{1} = $pp;
+        $lei->{1} = PublicInbox::IO::attach_pid($w, spawn($cmd, undef, $rdr),
+                                \&reap_compress, $lei, $cmd, $lei->{1});
 }
 
 # --augment existing output destination, with deduplication
@@ -197,7 +200,7 @@ sub _mbox_write_cb ($$) {
         sub { # for git_to_mail
                 my ($buf, $smsg, $eml) = @_;
                 $eml //= PublicInbox::Eml->new($buf);
-                ++$lei->{-nr_seen};
+                ++$self->{-nr_seen};
                 return if $dedupe->is_dup($eml, $smsg);
                 $lse->xsmsg_vmd($smsg) if $lse;
                 $smsg->{-recent} = 1 if $set_recent;
@@ -208,7 +211,7 @@ sub _mbox_write_cb ($$) {
                         my $lk = $ovv->lock_for_scope;
                         $lei->out($$buf);
                 }
-                ++$lei->{-nr_write};
+                ++$self->{-nr_write};
         }
 }
 
@@ -259,7 +262,7 @@ sub _buf2maildir ($$$$) {
                 $tmp = $dst.'tmp/'.$rand.$common;
         } while (!($ok = sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY)) &&
                 $!{EEXIST} && ($rand = _rand.','));
-        if ($ok && print $fh $$buf and close($fh)) {
+        if ($ok && print $fh $$buf and $fh->close) {
                 $dst .= $dir; # 'new/' or 'cur/'
                 $rand = '';
                 do {
@@ -293,7 +296,7 @@ sub _maildir_write_cb ($$) {
                 my ($bref, $smsg, $eml) = @_;
                 $dst // return $lei->fail; # dst may be undef-ed in last run
 
-                ++$lei->{-nr_seen};
+                ++$self->{-nr_seen};
                 return if $dedupe && $dedupe->is_dup($eml //
                                                 PublicInbox::Eml->new($$bref),
                                                 $smsg);
@@ -301,7 +304,7 @@ sub _maildir_write_cb ($$) {
                 my $n = _buf2maildir($dst, $bref // \($eml->as_string),
                                         $smsg, $dir);
                 $lms->set_src($smsg->oidbin, $out, $n) if $lms;
-                ++$lei->{-nr_write};
+                ++$self->{-nr_write};
         }
 }
 
@@ -310,8 +313,11 @@ sub _imap_write_cb ($$) {
         my $dedupe = $lei->{dedupe};
         $dedupe->prepare_dedupe if $dedupe;
         my $append = $lei->{net}->can('imap_append');
-        my $uri = $self->{uri};
-        my $mic = $lei->{net}->mic_get($uri);
+        my $uri = $self->{uri} // die 'BUG: no {uri}';
+        my $mic = $lei->{net}->mic_get($uri) // die <<EOM;
+E: $uri connection failed.
+E: Consider using `--jobs ,1' to limit IMAP connections
+EOM
         my $folder = $uri->mailbox;
         $uri->uidvalidity($mic->uidvalidity($folder));
         my $lse = $lei->{lse}; # may be undef
@@ -321,7 +327,7 @@ sub _imap_write_cb ($$) {
                 my ($bref, $smsg, $eml) = @_;
                 $mic // return $lei->fail; # mic may be undef-ed in last run
 
-                ++$lei->{-nr_seen};
+                ++$self->{-nr_seen};
                 return if $dedupe && $dedupe->is_dup($eml //
                                                 PublicInbox::Eml->new($$bref),
                                                 $smsg);
@@ -334,7 +340,7 @@ sub _imap_write_cb ($$) {
                 # imap_append returns UID if IMAP server has UIDPLUS extension
                 ($lms && $uid =~ /\A[0-9]+\z/) and
                         $lms->set_src($smsg->oidbin, $$uri, $uid + 0);
-                ++$lei->{-nr_write};
+                ++$self->{-nr_write};
         }
 }
 
@@ -362,13 +368,14 @@ sub _v2_write_cb ($$) {
         my ($self, $lei) = @_;
         my $dedupe = $lei->{dedupe};
         $dedupe->prepare_dedupe if $dedupe;
+        # only call in worker
+        $PublicInbox::Import::DROP_UNIQUE_UNSUB = $lei->{-drop_unique_unsub};
         sub { # for git_to_mail
                 my ($bref, $smsg, $eml) = @_;
                 $eml //= PublicInbox::Eml->new($bref);
-                ++$lei->{-nr_seen};
+                ++$self->{-nr_seen};
                 return if $dedupe && $dedupe->is_dup($eml, $smsg);
-                $lei->{v2w}->wq_do('add', $eml); # V2Writable->add
-                ++$lei->{-nr_write};
+                $lei->{v2w}->add($eml) and ++$self->{-nr_write};
         }
 }
 
@@ -392,9 +399,16 @@ sub new {
                                 "$dst exists and is not a directory\n";
                 $lei->{ovv}->{dst} = $dst .= '/' if substr($dst, -1) ne '/';
                 $lei->{opt}->{save} //= \1 if $lei->{cmd} eq 'q';
+        } elsif ($fmt eq 'mh') {
+                -e $dst && !-d _ and die
+                                "$dst exists and is not a directory\n";
+                $lei->{ovv}->{dst} = $dst .= '/' if substr($dst, -1) ne '/';
+                $lei->{opt}->{save} //= \1 if $lei->{cmd} eq 'q';
         } elsif (substr($fmt, 0, 4) eq 'mbox') {
                 require PublicInbox::MboxReader;
-                $self->can("eml2$fmt") or die "bad mbox format: $fmt\n";
+                $self->can("eml2$fmt") or die <<EOM;
+E: bad mbox format: $fmt (did you mean: mboxrd, mboxo, mboxcl, or mboxcl2?)
+EOM
                 $self->{base_type} = 'mbox';
                 if ($lei->{cmd} eq 'q' &&
                                 (($lei->path_to_fd($dst) // -1) < 0) &&
@@ -428,7 +442,7 @@ sub new {
                         ($lei->{opt}->{dedupe}//'') eq 'oid';
                 $self->{base_type} = 'v2';
                 $self->{-wq_nr_workers} = 1; # v2 has shards
-                $lei->{opt}->{save} = \1;
+                $lei->{opt}->{save} //= \1 if $lei->{cmd} eq 'q';
                 $dst = $lei->{ovv}->{dst} = $lei->abs_path($dst);
                 @conflict = qw(mua sort);
         } else {
@@ -438,6 +452,8 @@ sub new {
                 (-d $dst || (-e _ && !-w _)) and die
                         "$dst exists and is not a writable file\n";
         }
+        $lei->{input_opt} and # lei_convert sets this
+                @conflict = grep { !$lei->{input_opt}->{$_} } @conflict;
         my @err = map { defined($lei->{opt}->{$_}) ? "--$_" : () } @conflict;
         die "@err incompatible with $fmt\n" if @err;
         $self->{dst} = $dst;
@@ -456,15 +472,10 @@ sub new {
 sub _pre_augment_maildir {
         my ($self, $lei) = @_;
         my $dst = $lei->{ovv}->{dst};
-        for my $x (qw(tmp new cur)) {
-                my $d = $dst.$x;
-                next if -d $d;
-                require File::Path;
-                File::Path::mkpath($d);
-                -d $d or die "$d is not a directory";
-        }
+        require File::Path;
+        File::Path::make_path(map { $dst.$_ } qw(tmp new cur));
         # for utime, so no opendir
-        open $self->{poke_dh}, '<', "${dst}cur" or die "open ${dst}cur: $!";
+        open $self->{poke_dh}, '<', "${dst}cur";
 }
 
 sub clobber_dst_prepare ($;$) {
@@ -544,11 +555,11 @@ sub _pre_augment_text {
                 $out = $lei->{$devfd};
         } else { # normal-looking path
                 if (-p $dst) {
-                        open $out, '>', $dst or die "open($dst): $!";
+                        open $out, '>', $dst;
                 } elsif (-f _ || !-e _) {
                         # text allows augment, HTML/Atom won't
                         my $mode = $lei->{opt}->{augment} ? '>>' : '>';
-                        open $out, $mode, $dst or die "open($mode, $dst): $!";
+                        open $out, $mode, $dst;
                 } else {
                         die "$dst is not a file or FIFO\n";
                 }
@@ -567,7 +578,7 @@ sub _pre_augment_mbox {
                 $out = $lei->{$devfd};
         } else { # normal-looking path
                 if (-p $dst) {
-                        open $out, '>', $dst or die "open($dst): $!";
+                        open $out, '>', $dst;
                 } elsif (-f _ || !-e _) {
                         require PublicInbox::MboxLock;
                         my $m = $lei->{opt}->{'lock'} //
@@ -580,7 +591,7 @@ sub _pre_augment_mbox {
                 $lei->{old_1} = $lei->{1}; # keep for spawning MUA
         }
         # Perl does SEEK_END even with O_APPEND :<
-        $self->{seekable} = seek($out, 0, SEEK_SET);
+        $self->{seekable} = $out->seek(0, SEEK_SET);
         if (!$self->{seekable} && !$!{ESPIPE} && !defined($devfd)) {
                 die "seek($dst): $!\n";
         }
@@ -594,7 +605,7 @@ sub _pre_augment_mbox {
                         $lei->{dedupe} && $lei->{dedupe}->can('reset_dedupe');
         }
         if ($self->{zsfx} = PublicInbox::MboxReader::zsfx($dst)) {
-                pipe(my ($r, $w)) or die "pipe: $!";
+                pipe(my $r, my $w);
                 $lei->{zpipe} = [ $r, $w ];
                 $lei->{ovv}->{lock_path} and
                         die 'BUG: unexpected {ovv}->{lock_path}';
@@ -606,6 +617,17 @@ sub _pre_augment_mbox {
         undef;
 }
 
+sub finish_output {
+        my ($self, $lei) = @_;
+        my $out = delete $lei->{1} // die 'BUG: no lei->{1}';
+        my $old = delete $lei->{old_1} or return; # path only
+        $lei->{1} = $old;
+        return if $out->close; # reaps gzip|pigz|xz|bzip2
+        my $msg = "E: Error closing $lei->{ovv}->{dst}";
+        $? ? $lei->child_error($?) : ($msg .= " ($!)");
+        die $msg;
+}
+
 sub _do_augment_mbox {
         my ($self, $lei) = @_;
         return unless $self->{seekable};
@@ -622,7 +644,7 @@ sub _do_augment_mbox {
         if (my $zsfx = $self->{zsfx}) {
                 $rd = PublicInbox::MboxReader::zsfxcat($out, $zsfx, $lei);
         } else {
-                open($rd, '+>>&', $out) or die "dup: $!";
+                open($rd, '+>>&', $out);
         }
         my $dedupe;
         if ($opt->{augment}) {
@@ -642,16 +664,10 @@ sub _do_augment_mbox {
                 PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei);
         }
         # maybe some systems don't honor O_APPEND, Perl does this:
-        seek($out, 0, SEEK_END) or die "seek $dst: $!";
+        seek($out, 0, SEEK_END);
         $dedupe->pause_dedupe if $dedupe;
 }
 
-sub v2w_done_wait { # dwaitpid callback
-        my ($arg, $pid) = @_;
-        my ($v2w, $lei) = @$arg;
-        $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
-}
-
 sub _pre_augment_v2 {
         my ($self, $lei) = @_;
         my $dir = $self->{dst};
@@ -672,19 +688,21 @@ sub _pre_augment_v2 {
                 });
         }
         PublicInbox::InboxWritable->new($ibx, @creat);
+        local $PublicInbox::Import::DROP_UNIQUE_UNSUB; # only for workers
+        PublicInbox::Import::load_config(PublicInbox::Config->new, sub {
+                $lei->x_it(shift);
+                die "E: can't write v2 inbox with broken config\n";
+        });
+        $lei->{-drop_unique_unsub} = $PublicInbox::Import::DROP_UNIQUE_UNSUB;
         $ibx->init_inbox if @creat;
-        my $v2w = $ibx->importer;
-        $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
-        $v2w->wq_wait_async(\&v2w_done_wait, $lei);
-        $lei->{v2w} = $v2w;
+        $lei->{v2w} = $ibx->importer;
         return if !$lei->{opt}->{shared};
         my $d = "$lei->{ale}->{git}->{git_dir}/objects";
-        my $al = "$dir/git/0.git/objects/info/alternates";
-        open my $fh, '+>>', $al or die "open($al): $!";
-        seek($fh, 0, SEEK_SET) or die "seek($al): $!";
-        grep(/\A\Q$d\E\n/, <$fh>) and return;
-        print $fh "$d\n" or die "print($al): $!";
-        close $fh or die "close($al): $!";
+        open my $fh, '+>>', my $f = "$dir/git/0.git/objects/info/alternates";
+        seek($fh, 0, SEEK_SET); # Perl did SEEK_END when it saw '>>'
+        my $seen = grep /\A\Q$d\E\n/, PublicInbox::IO::read_all $fh;
+        print $fh "$d\n" if !$seen;
+        close $fh;
 }
 
 sub pre_augment { # fast (1 disk seek), runs in same process as post_augment
@@ -701,16 +719,32 @@ sub do_augment { # slow, runs in wq worker
         $m->($self, $lei);
 }
 
+sub post_augment_call ($$$$) {
+        my ($self, $lei, $m, $post_augment_done) = @_;
+        eval { $m->($self, $lei) };
+        $lei->{post_augment_err} = $@ if $@; # for post_augment_done
+}
+
 # fast (spawn compressor or mkdir), runs in same process as pre_augment
 sub post_augment {
-        my ($self, $lei, @args) = @_;
+        my ($self, $lei, $post_augment_done) = @_;
         $self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ...");
 
-        my $wait = $lei->{opt}->{'import-before'} ?
-                        $lei->{sto}->wq_do('checkpoint', 1) : 0;
         # _post_augment_mbox
         my $m = $self->can("_post_augment_$self->{base_type}") or return;
-        $m->($self, $lei, @args);
+
+        # --import-before is only for lei-(q|lcat), not lei-convert
+        $lei->{opt}->{'import-before'} or
+                return post_augment_call $self, $lei, $m, $post_augment_done;
+
+        # we can't deal with post_augment until import-before commits:
+        require PublicInbox::EOFpipe;
+        my @io = @$lei{qw(2 sock)};
+        pipe(my $r, $io[2]);
+        PublicInbox::EOFpipe->new($r, \&post_augment_call,
+                                $self, $lei, $m, $post_augment_done);
+        $lei->{sto}->wq_io_do('barrier', \@io);
+        # _post_augment_* && post_augment_done run when barrier is complete
 }
 
 # called by every single l2m worker process
@@ -749,7 +783,8 @@ sub do_post_auth {
                 $au_peers->[1] = undef;
                 sysread($au_peers->[0], my $barrier1, 1);
         }
-        $self->{wcb} = $self->write_cb($lei);
+        eval { $self->{wcb} = $self->write_cb($lei) };
+        $lei->fail($@) if $@;
         if ($au_peers) { # wait for peer l2m to set write_cb
                 $au_peers->[3] = undef;
                 sysread($au_peers->[2], my $barrier2, 1);
@@ -786,22 +821,29 @@ sub poke_dst {
 
 sub write_mail { # via ->wq_io_do
         my ($self, $smsg, $eml) = @_;
-        return $self->{wcb}->(undef, $smsg, $eml) if $eml;
-        $smsg->{-lms_rw} = $self->{-lms_rw};
-        $self->{git}->cat_async($smsg->{blob}, \&git_to_mail,
-                                [$self->{wcb}, $smsg]);
+        if ($eml) {
+                eval { $self->{wcb}->(undef, $smsg, $eml) };
+                $self->{lei}->fail("blob=$smsg->{blob} $@") if $@;
+        } else {
+                $smsg->{l2m} = $self;
+                $self->{git}->cat_async($smsg->{blob}, \&git_to_mail, $smsg);
+        }
 }
 
 sub wq_atexit_child {
         my ($self) = @_;
         local $PublicInbox::DS::in_loop = 0; # waitpid synchronously
         my $lei = $self->{lei};
-        delete $self->{wcb};
         $lei->{ale}->git->async_wait_all;
-        my ($nr_w, $nr_s) = delete(@$lei{qw(-nr_write -nr_seen)});
-        $nr_s or return;
+        my ($nr_w, $nr_s) = delete(@$self{qw(-nr_write -nr_seen)});
+        if (my $v2w = delete $lei->{v2w}) {
+                eval { $v2w->done };
+                $lei->child_error($?, "E: $@ ($v2w->{ibx}->{inboxdir})") if $@;
+        }
+        delete $self->{wcb};
+        (($nr_w //= 0) + ($nr_s //= 0)) or return;
         return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p};
-        $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr_w, $nr_s);
+        $lei->{pkt_op_p}->pkt_do('incr', -nr_write => $nr_w, -nr_seen => $nr_s)
 }
 
 # runs on a 1s timer in lei-daemon