diff options
Diffstat (limited to 'lib/PublicInbox/LeiToMail.pm')
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index dfae29e9..5481b5e4 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -14,7 +14,7 @@ use PublicInbox::Import; use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); use PublicInbox::Syscall qw(rename_noreplace); -use autodie qw(open seek close); +use autodie qw(pipe open seek close); use Carp qw(croak); my %kw2char = ( # Maildir characters @@ -605,7 +605,7 @@ sub _pre_augment_mbox { $lei->{dedupe} && $lei->{dedupe}->can('reset_dedupe'); } if ($self->{zsfx} = PublicInbox::MboxReader::zsfx($dst)) { - pipe(my ($r, $w)) or die "pipe: $!"; + pipe(my $r, my $w); $lei->{zpipe} = [ $r, $w ]; $lei->{ovv}->{lock_path} and die 'BUG: unexpected {ovv}->{lock_path}'; @@ -719,16 +719,32 @@ sub do_augment { # slow, runs in wq worker $m->($self, $lei); } +sub post_augment_call ($$$$) { + my ($self, $lei, $m, $post_augment_done) = @_; + eval { $m->($self, $lei) }; + $lei->{post_augment_err} = $@ if $@; # for post_augment_done +} + # fast (spawn compressor or mkdir), runs in same process as pre_augment sub post_augment { - my ($self, $lei, @args) = @_; + my ($self, $lei, $post_augment_done) = @_; $self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ..."); - my $wait = $lei->{opt}->{'import-before'} ? - $lei->{sto}->wq_do('checkpoint', 1) : 0; # _post_augment_mbox my $m = $self->can("_post_augment_$self->{base_type}") or return; - $m->($self, $lei, @args); + + # --import-before is only for lei-(q|lcat), not lei-convert + $lei->{opt}->{'import-before'} or + return post_augment_call $self, $lei, $m, $post_augment_done; + + # we can't deal with post_augment until import-before commits: + require PublicInbox::EOFpipe; + my @io = @$lei{qw(2 sock)}; + pipe(my $r, $io[2]); + PublicInbox::EOFpipe->new($r, \&post_augment_call, + $self, $lei, $m, $post_augment_done); + $lei->{sto}->wq_io_do('barrier', \@io); + # _post_augment_* && post_augment_done run when barrier is complete } # called by every single l2m worker process |