about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/Import.pm132
-rwxr-xr-xscript/public-inbox-convert18
2 files changed, 71 insertions, 79 deletions
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index cd03da05..894ba818 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -8,7 +8,7 @@
 package PublicInbox::Import;
 use v5.12;
 use parent qw(PublicInbox::Lock);
-use PublicInbox::Spawn qw(run_die popen_rd);
+use PublicInbox::Spawn qw(run_die popen_rd spawn);
 use PublicInbox::MID qw(mids mid2path);
 use PublicInbox::Address;
 use PublicInbox::Smsg;
@@ -16,9 +16,11 @@ use PublicInbox::MsgTime qw(msg_datestamp);
 use PublicInbox::ContentHash qw(content_digest);
 use PublicInbox::MDA;
 use PublicInbox::Eml;
+use PublicInbox::ProcessIO;
 use POSIX qw(strftime);
-use autodie qw(read close);
+use autodie qw(read close socketpair);
 use Carp qw(croak);
+use Socket qw(AF_UNIX SOCK_STREAM);
 
 sub default_branch () {
         state $default_branch = do {
@@ -56,11 +58,10 @@ sub new {
 # idempotent start function
 sub gfi_start {
         my ($self) = @_;
-
-        return ($self->{in}, $self->{out}) if $self->{in};
-
-        my ($in_r, $out_r, $out_w);
-        pipe($out_r, $out_w) or die "pipe failed: $!";
+        my $io = $self->{io};
+        return $io if $io;
+        socketpair($io, my $s2, AF_UNIX, SOCK_STREAM, 0);
+        $io->autoflush(1);
 
         $self->lock_acquire;
         eval {
@@ -73,18 +74,17 @@ sub gfi_start {
                         die "fatal: ls-tree -r -z --name-only $ref: \$?=$?" if $?;
                         $self->{-tree} = { map { $_ => 1 } split(/\0/, $t) };
                 }
-                $in_r = $self->{in} = $git->popen(qw(fast-import
-                                        --quiet --done --date-format=raw),
-                                        undef, { 0 => $out_r });
-                $out_w->autoflush(1);
-                $self->{out} = $out_w;
+                my $gfi = [ 'git', "--git-dir=$git->{git_dir}", qw(fast-import
+                                --quiet --done --date-format=raw) ];
+                my $pid = spawn($gfi, undef, { 0 => $s2, 1 => $s2 });
                 $self->{nchg} = 0;
+                $self->{io} = PublicInbox::ProcessIO->maybe_new($pid, $io);
         };
         if ($@) {
                 $self->lock_release;
                 die $@;
         }
-        ($in_r, $out_w);
+        $self->{io};
 }
 
 sub wfail () { die "write to fast-import failed: $!" }
@@ -99,22 +99,22 @@ sub norm_body ($) {
 }
 
 # only used for v1 (ssoma) inboxes
-sub _check_path ($$$$) {
-        my ($r, $w, $tip, $path) = @_;
+sub _check_path ($$$) {
+        my ($io, $tip, $path) = @_;
         return if $tip eq '';
-        print $w "ls $tip $path\n" or wfail;
+        print $io "ls $tip $path\n" or wfail;
         local $/ = "\n";
-        my $info = <$r> // die "EOF from fast-import: $!";
+        my $info = <$io> // die "EOF from fast-import: $!";
         $info =~ /\Amissing / ? undef : $info;
 }
 
-sub _cat_blob ($$$) {
-        my ($r, $w, $oid) = @_;
-        print $w "cat-blob $oid\n" or wfail;
+sub _cat_blob ($$) {
+        my ($io, $oid) = @_;
+        print $io "cat-blob $oid\n" or wfail;
         local $/ = "\n";
-        my $info = <$r> // die "EOF from fast-import / cat-blob: $!";
+        my $info = <$io> // die "EOF from fast-import / cat-blob: $!";
         $info =~ /\A[a-f0-9]{40,} blob ([0-9]+)\n\z/ or return;
-        my $n = read($r, my $buf, my $len = $1 + 1);
+        my $n = read($io, my $buf, my $len = $1 + 1);
         $n == $len or croak "cat-blob: short read: $n < $len";
         my $lf = chop $buf;
         croak "bad read on final byte: <$lf>" if $lf ne "\n";
@@ -123,17 +123,16 @@ sub _cat_blob ($$$) {
 
 sub cat_blob {
         my ($self, $oid) = @_;
-        my ($r, $w) = $self->gfi_start;
-        _cat_blob($r, $w, $oid);
+        _cat_blob(gfi_start($self), $oid);
 }
 
 sub check_remove_v1 {
-        my ($r, $w, $tip, $path, $mime) = @_;
+        my ($io, $tip, $path, $mime) = @_;
 
-        my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef);
+        my $info = _check_path($io, $tip, $path) or return ('MISSING',undef);
         $info =~ m!\A100644 blob ([a-f0-9]{40,})\t!s or die "not blob: $info";
         my $oid = $1;
-        my $bref = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed";
+        my $bref = _cat_blob($io, $oid) or die "BUG: cat-blob $1 failed";
         PublicInbox::Eml::strip_from($$bref);
         my $cur = PublicInbox::Eml->new($bref);
         my $cur_s = $cur->header('Subject') // '';
@@ -146,16 +145,15 @@ sub check_remove_v1 {
 
 sub checkpoint {
         my ($self) = @_;
-        return unless $self->{in};
-        print { $self->{out} } "checkpoint\n" or wfail;
+        print { $self->{io} // return } "checkpoint\n" or wfail;
         undef;
 }
 
 sub progress {
         my ($self, $msg) = @_;
-        return unless $self->{in};
-        print { $self->{out} } "progress $msg\n" or wfail;
-        readline($self->{in}) eq "progress $msg\n" or die
+        my $io = $self->{io} or return;
+        print $io "progress $msg\n" or wfail;
+        readline($io) eq "progress $msg\n" or die
                 "progress $msg not received\n";
         undef;
 }
@@ -205,10 +203,9 @@ sub barrier {
 # used for v2
 sub get_mark {
         my ($self, $mark) = @_;
-        die "not active\n" unless $self->{in};
-        my ($r, $w) = $self->gfi_start;
-        print $w "get-mark $mark\n" or wfail;
-        my $oid = <$r> // die "get-mark failed, need git 2.6.0+\n";
+        my $io = $self->{io} or croak "not active\n";
+        print $io "get-mark $mark\n" or wfail;
+        my $oid = <$io> // die "get-mark failed, need git 2.6.0+\n";
         chomp($oid);
         $oid;
 }
@@ -225,11 +222,11 @@ sub remove {
         my $path_type = $self->{path_type};
         my ($path, $err, $cur, $blob);
 
-        my ($r, $w) = $self->gfi_start;
+        my $io = gfi_start($self);
         my $tip = $self->{tip};
         if ($path_type eq '2/38') {
                 $path = mid2path(v1_mid0($mime));
-                ($err, $cur) = check_remove_v1($r, $w, $tip, $path, $mime);
+                ($err, $cur) = check_remove_v1($io, $tip, $path, $mime);
                 return ($err, $cur) if $err;
         } else {
                 my $sref;
@@ -241,7 +238,7 @@ sub remove {
                 }
                 my $len = length($$sref);
                 $blob = $self->{mark}++;
-                print $w "blob\nmark :$blob\ndata $len\n",
+                print $io "blob\nmark :$blob\ndata $len\n",
                         $$sref, "\n" or wfail;
         }
 
@@ -249,22 +246,22 @@ sub remove {
         my $commit = $self->{mark}++;
         my $parent = $tip =~ /\A:/ ? $tip : undef;
         unless ($parent) {
-                print $w "reset $ref\n" or wfail;
+                print $io "reset $ref\n" or wfail;
         }
         my $ident = $self->{ident};
         my $now = now_raw();
         $msg //= 'rm';
         my $len = length($msg) + 1;
-        print $w "commit $ref\nmark :$commit\n",
+        print $io "commit $ref\nmark :$commit\n",
                 "author $ident $now\n",
                 "committer $ident $now\n",
                 "data $len\n$msg\n\n",
                 'from ', ($parent ? $parent : $tip), "\n" or wfail;
         if (defined $path) {
-                print $w "D $path\n\n" or wfail;
+                print $io "D $path\n\n" or wfail;
         } else {
-                clean_tree_v2($self, $w, 'd');
-                print $w "M 100644 :$blob d\n\n" or wfail;
+                clean_tree_v2($self, $io, 'd');
+                print $io "M 100644 :$blob d\n\n" or wfail;
         }
         $self->{nchg}++;
         (($self->{tip} = ":$commit"), $cur);
@@ -354,11 +351,11 @@ sub v1_mid0 ($) {
         $mids->[0];
 }
 sub clean_tree_v2 ($$$) {
-        my ($self, $w, $keep) = @_;
+        my ($self, $io, $keep) = @_;
         my $tree = $self->{-tree} or return; #v2 only
         delete $tree->{$keep};
         foreach (keys %$tree) {
-                print $w "D $_\n" or wfail;
+                print $io "D $_\n" or wfail;
         }
         %$tree = ($keep => 1);
 }
@@ -377,10 +374,10 @@ sub add {
                 $path = 'm';
         }
 
-        my ($r, $w) = $self->gfi_start;
+        my $io = gfi_start($self);
         my $tip = $self->{tip};
         if ($path_type eq '2/38') {
-                _check_path($r, $w, $tip, $path) and return;
+                _check_path($io, $tip, $path) and return;
         }
 
         drop_unwanted_headers($mime);
@@ -394,8 +391,7 @@ sub add {
         my $raw_email = $mime->{-public_inbox_raw} // $mime->as_string;
         my $n = length($raw_email);
         $self->{bytes_added} += $n;
-        print $w "blob\nmark :$blob\ndata ", $n, "\n" or wfail;
-        print $w $raw_email, "\n" or wfail;
+        print $io "blob\nmark :$blob\ndata $n\n", $raw_email, "\n" or wfail;
 
         # v2: we need this for Xapian
         if ($smsg) {
@@ -422,19 +418,19 @@ sub add {
         my $parent = $tip =~ /\A:/ ? $tip : undef;
 
         unless ($parent) {
-                print $w "reset $ref\n" or wfail;
+                print $io "reset $ref\n" or wfail;
         }
 
-        print $w "commit $ref\nmark :$commit\n",
+        print $io "commit $ref\nmark :$commit\n",
                 "author $author $at\n",
-                "committer $self->{ident} $ct\n" or wfail;
-        print $w "data ", (length($subject) + 1), "\n",
+                "committer $self->{ident} $ct\n",
+                "data ", (length($subject) + 1), "\n",
                 $subject, "\n\n" or wfail;
         if ($tip ne '') {
-                print $w 'from ', ($parent ? $parent : $tip), "\n" or wfail;
+                print $io 'from ', ($parent ? $parent : $tip), "\n" or wfail;
         }
-        clean_tree_v2($self, $w, $path);
-        print $w "M 100644 :$blob $path\n\n" or wfail;
+        clean_tree_v2($self, $io, $path);
+        print $io "M 100644 :$blob $path\n\n" or wfail;
         $self->{nchg}++;
         $self->{tip} = ":$commit";
 }
@@ -475,15 +471,14 @@ EOM
 }
 
 # true if locked and active
-sub active { !!$_[0]->{out} }
+sub active { !!$_[0]->{io} }
 
 sub done {
         my ($self) = @_;
-        my $w = delete $self->{out} or return;
+        my $io = delete $self->{io} or return;
         eval {
-                my $r = delete $self->{in} or die 'BUG: missing {in} when done';
-                print $w "done\n" or wfail;
-                close $r;
+                print $io "done\n" or wfail;
+                close $io; # reaps and dies on error
         };
         my $wait_err = $@;
         my $nchg = delete $self->{nchg};
@@ -496,10 +491,7 @@ sub done {
         die $wait_err if $wait_err;
 }
 
-sub atfork_child {
-        my ($self) = @_;
-        close($_) for (grep defined, delete(@$self{qw(in out)}));
-}
+sub atfork_child { close(delete($_[0]->{io}) // return) }
 
 sub digest2mid ($$;$) {
         my ($dig, $hdr, $fallback_time) = @_;
@@ -552,7 +544,7 @@ sub replace_oids {
         my $git = $self->{git};
         my @export = (qw(fast-export --no-data --use-done-feature), $old);
         my $rd = $git->popen(@export);
-        my ($r, $w) = $self->gfi_start;
+        my $io = gfi_start($self);
         my @buf;
         my $nreplace = 0;
         my @oids;
@@ -563,7 +555,7 @@ sub replace_oids {
                         push @buf, "reset $tmp\n";
                 } elsif (/^commit (?:.+)/) {
                         if (@buf) {
-                                print $w @buf or wfail;
+                                print $io @buf or wfail;
                                 @buf = ();
                         }
                         push @buf, "commit $tmp\n";
@@ -599,7 +591,7 @@ sub replace_oids {
                                 rewrite_commit($self, \@oids, \@buf, $mime);
                                 $nreplace++;
                         }
-                        print $w @buf, "\n" or wfail;
+                        print $io @buf, "\n" or wfail;
                         @buf = ();
                 } elsif ($_ eq "done\n") {
                         $done = 1;
@@ -612,7 +604,7 @@ sub replace_oids {
         }
         close $rd;
         if (@buf) {
-                print $w @buf or wfail;
+                print $io @buf or wfail;
         }
         die 'done\n not seen from fast-export' unless $done;
         chomp(my $cmt = $self->get_mark(":$mark")) if $nreplace;
diff --git a/script/public-inbox-convert b/script/public-inbox-convert
index 780f7194..0cc52777 100755
--- a/script/public-inbox-convert
+++ b/script/public-inbox-convert
@@ -120,7 +120,7 @@ my $head = $old->{ref_head} || 'HEAD';
 my $rd = $old->git->popen(qw(fast-export --use-done-feature), $head);
 $v2w->idx_init($opt);
 my $im = $v2w->importer;
-my ($r, $w) = $im->gfi_start;
+my $io = $im->gfi_start;
 my $h = '[0-9a-f]';
 my %D;
 my $last;
@@ -131,12 +131,12 @@ while (<$rd>) {
                 $state = 'commit';
         } elsif (/^data ([0-9]+)/) {
                 my $len = $1;
-                print $w $_ or $im->wfail;
+                print $io $_ or $im->wfail;
                 while ($len) {
                         my $n = read($rd, my $tmp, $len) or die "read: $!";
                         warn "$n != $len\n" if $n != $len;
                         $len -= $n;
-                        print $w $tmp or $im->wfail;
+                        print $io $tmp or $im->wfail;
                 }
                 next;
         } elsif ($state eq 'commit') {
@@ -144,9 +144,9 @@ while (<$rd>) {
                         my ($mark, $path) = ($1, $2);
                         $D{$path} = $mark;
                         if ($last && $last ne 'm') {
-                                print $w "D $last\n" or $im->wfail;
+                                print $io "D $last\n" or $im->wfail;
                         }
-                        print $w "M 100644 :$mark m\n" or $im->wfail;
+                        print $io "M 100644 :$mark m\n" or $im->wfail;
                         $last = 'm';
                         next;
                 }
@@ -154,18 +154,18 @@ while (<$rd>) {
                         my $mark = delete $D{$1};
                         defined $mark or die "undeleted path: $1\n";
                         if ($last && $last ne 'd') {
-                                print $w "D $last\n" or $im->wfail;
+                                print $io "D $last\n" or $im->wfail;
                         }
-                        print $w "M 100644 :$mark d\n" or $im->wfail;
+                        print $io "M 100644 :$mark d\n" or $im->wfail;
                         $last = 'd';
                         next;
                 }
         }
         last if $_ eq "done\n";
-        print $w $_ or $im->wfail;
+        print $io $_ or $im->wfail;
 }
 close $rd or die "fast-export: \$?=$? \$!=$!\n";
-$r = $w = undef; # v2w->done does the actual close and error checking
+$io = undef;
 $v2w->done;
 if (my $old_mm = $old->mm) {
         $old->cleanup;