about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiToMail.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LeiToMail.pm')
-rw-r--r--lib/PublicInbox/LeiToMail.pm29
1 files changed, 22 insertions, 7 deletions
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 593547f6..5481b5e4 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -14,7 +14,7 @@ use PublicInbox::Import;
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use PublicInbox::Syscall qw(rename_noreplace);
-use autodie qw(open seek close);
+use autodie qw(pipe open seek close);
 use Carp qw(croak);
 
 my %kw2char = ( # Maildir characters
@@ -605,7 +605,7 @@ sub _pre_augment_mbox {
                         $lei->{dedupe} && $lei->{dedupe}->can('reset_dedupe');
         }
         if ($self->{zsfx} = PublicInbox::MboxReader::zsfx($dst)) {
-                pipe(my ($r, $w)) or die "pipe: $!";
+                pipe(my $r, my $w);
                 $lei->{zpipe} = [ $r, $w ];
                 $lei->{ovv}->{lock_path} and
                         die 'BUG: unexpected {ovv}->{lock_path}';
@@ -719,17 +719,32 @@ sub do_augment { # slow, runs in wq worker
         $m->($self, $lei);
 }
 
+sub post_augment_call ($$$$) {
+        my ($self, $lei, $m, $post_augment_done) = @_;
+        eval { $m->($self, $lei) };
+        $lei->{post_augment_err} = $@ if $@; # for post_augment_done
+}
+
 # fast (spawn compressor or mkdir), runs in same process as pre_augment
 sub post_augment {
-        my ($self, $lei, @args) = @_;
+        my ($self, $lei, $post_augment_done) = @_;
         $self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ...");
 
-        # FIXME: this synchronous wait can be slow w/ parallel callers
-        my $wait = $lei->{opt}->{'import-before'} ?
-                        $lei->{sto}->wq_do('barrier') : 0;
         # _post_augment_mbox
         my $m = $self->can("_post_augment_$self->{base_type}") or return;
-        $m->($self, $lei, @args);
+
+        # --import-before is only for lei-(q|lcat), not lei-convert
+        $lei->{opt}->{'import-before'} or
+                return post_augment_call $self, $lei, $m, $post_augment_done;
+
+        # we can't deal with post_augment until import-before commits:
+        require PublicInbox::EOFpipe;
+        my @io = @$lei{qw(2 sock)};
+        pipe(my $r, $io[2]);
+        PublicInbox::EOFpipe->new($r, \&post_augment_call,
+                                $self, $lei, $m, $post_augment_done);
+        $lei->{sto}->wq_io_do('barrier', \@io);
+        # _post_augment_* && post_augment_done run when barrier is complete
 }
 
 # called by every single l2m worker process