* [PATCH 6/9] import: switch to Unix stream socket for fast-import
2023-10-11 7:20 7% [PATCH 0/9] lei + import-related updates Eric Wong
@ 2023-10-11 7:20 4% ` Eric Wong
0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2023-10-11 7:20 UTC (permalink / raw)
To: meta
We use fewer file descriptors and fewer lines of code this way.
I'm not aware of any place we rely on POSIX pipe semantics with
`git fast-import', and sockets have bigger buffers by default
in most cases (even if Linux allows larger pipe buffers).
---
lib/PublicInbox/Import.pm | 132 +++++++++++++++++-------------------
script/public-inbox-convert | 18 ++---
2 files changed, 71 insertions(+), 79 deletions(-)
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index cd03da05..894ba818 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -8,7 +8,7 @@
package PublicInbox::Import;
use v5.12;
use parent qw(PublicInbox::Lock);
-use PublicInbox::Spawn qw(run_die popen_rd);
+use PublicInbox::Spawn qw(run_die popen_rd spawn);
use PublicInbox::MID qw(mids mid2path);
use PublicInbox::Address;
use PublicInbox::Smsg;
@@ -16,9 +16,11 @@ use PublicInbox::MsgTime qw(msg_datestamp);
use PublicInbox::ContentHash qw(content_digest);
use PublicInbox::MDA;
use PublicInbox::Eml;
+use PublicInbox::ProcessIO;
use POSIX qw(strftime);
-use autodie qw(read close);
+use autodie qw(read close socketpair);
use Carp qw(croak);
+use Socket qw(AF_UNIX SOCK_STREAM);
sub default_branch () {
state $default_branch = do {
@@ -56,11 +58,10 @@ sub new {
# idempotent start function
sub gfi_start {
my ($self) = @_;
-
- return ($self->{in}, $self->{out}) if $self->{in};
-
- my ($in_r, $out_r, $out_w);
- pipe($out_r, $out_w) or die "pipe failed: $!";
+ my $io = $self->{io};
+ return $io if $io;
+ socketpair($io, my $s2, AF_UNIX, SOCK_STREAM, 0);
+ $io->autoflush(1);
$self->lock_acquire;
eval {
@@ -73,18 +74,17 @@ sub gfi_start {
die "fatal: ls-tree -r -z --name-only $ref: \$?=$?" if $?;
$self->{-tree} = { map { $_ => 1 } split(/\0/, $t) };
}
- $in_r = $self->{in} = $git->popen(qw(fast-import
- --quiet --done --date-format=raw),
- undef, { 0 => $out_r });
- $out_w->autoflush(1);
- $self->{out} = $out_w;
+ my $gfi = [ 'git', "--git-dir=$git->{git_dir}", qw(fast-import
+ --quiet --done --date-format=raw) ];
+ my $pid = spawn($gfi, undef, { 0 => $s2, 1 => $s2 });
$self->{nchg} = 0;
+ $self->{io} = PublicInbox::ProcessIO->maybe_new($pid, $io);
};
if ($@) {
$self->lock_release;
die $@;
}
- ($in_r, $out_w);
+ $self->{io};
}
sub wfail () { die "write to fast-import failed: $!" }
@@ -99,22 +99,22 @@ sub norm_body ($) {
}
# only used for v1 (ssoma) inboxes
-sub _check_path ($$$$) {
- my ($r, $w, $tip, $path) = @_;
+sub _check_path ($$$) {
+ my ($io, $tip, $path) = @_;
return if $tip eq '';
- print $w "ls $tip $path\n" or wfail;
+ print $io "ls $tip $path\n" or wfail;
local $/ = "\n";
- my $info = <$r> // die "EOF from fast-import: $!";
+ my $info = <$io> // die "EOF from fast-import: $!";
$info =~ /\Amissing / ? undef : $info;
}
-sub _cat_blob ($$$) {
- my ($r, $w, $oid) = @_;
- print $w "cat-blob $oid\n" or wfail;
+sub _cat_blob ($$) {
+ my ($io, $oid) = @_;
+ print $io "cat-blob $oid\n" or wfail;
local $/ = "\n";
- my $info = <$r> // die "EOF from fast-import / cat-blob: $!";
+ my $info = <$io> // die "EOF from fast-import / cat-blob: $!";
$info =~ /\A[a-f0-9]{40,} blob ([0-9]+)\n\z/ or return;
- my $n = read($r, my $buf, my $len = $1 + 1);
+ my $n = read($io, my $buf, my $len = $1 + 1);
$n == $len or croak "cat-blob: short read: $n < $len";
my $lf = chop $buf;
croak "bad read on final byte: <$lf>" if $lf ne "\n";
@@ -123,17 +123,16 @@ sub _cat_blob ($$$) {
sub cat_blob {
my ($self, $oid) = @_;
- my ($r, $w) = $self->gfi_start;
- _cat_blob($r, $w, $oid);
+ _cat_blob(gfi_start($self), $oid);
}
sub check_remove_v1 {
- my ($r, $w, $tip, $path, $mime) = @_;
+ my ($io, $tip, $path, $mime) = @_;
- my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef);
+ my $info = _check_path($io, $tip, $path) or return ('MISSING',undef);
$info =~ m!\A100644 blob ([a-f0-9]{40,})\t!s or die "not blob: $info";
my $oid = $1;
- my $bref = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed";
+ my $bref = _cat_blob($io, $oid) or die "BUG: cat-blob $1 failed";
PublicInbox::Eml::strip_from($$bref);
my $cur = PublicInbox::Eml->new($bref);
my $cur_s = $cur->header('Subject') // '';
@@ -146,16 +145,15 @@ sub check_remove_v1 {
sub checkpoint {
my ($self) = @_;
- return unless $self->{in};
- print { $self->{out} } "checkpoint\n" or wfail;
+ print { $self->{io} // return } "checkpoint\n" or wfail;
undef;
}
sub progress {
my ($self, $msg) = @_;
- return unless $self->{in};
- print { $self->{out} } "progress $msg\n" or wfail;
- readline($self->{in}) eq "progress $msg\n" or die
+ my $io = $self->{io} or return;
+ print $io "progress $msg\n" or wfail;
+ readline($io) eq "progress $msg\n" or die
"progress $msg not received\n";
undef;
}
@@ -205,10 +203,9 @@ sub barrier {
# used for v2
sub get_mark {
my ($self, $mark) = @_;
- die "not active\n" unless $self->{in};
- my ($r, $w) = $self->gfi_start;
- print $w "get-mark $mark\n" or wfail;
- my $oid = <$r> // die "get-mark failed, need git 2.6.0+\n";
+ my $io = $self->{io} or croak "not active\n";
+ print $io "get-mark $mark\n" or wfail;
+ my $oid = <$io> // die "get-mark failed, need git 2.6.0+\n";
chomp($oid);
$oid;
}
@@ -225,11 +222,11 @@ sub remove {
my $path_type = $self->{path_type};
my ($path, $err, $cur, $blob);
- my ($r, $w) = $self->gfi_start;
+ my $io = gfi_start($self);
my $tip = $self->{tip};
if ($path_type eq '2/38') {
$path = mid2path(v1_mid0($mime));
- ($err, $cur) = check_remove_v1($r, $w, $tip, $path, $mime);
+ ($err, $cur) = check_remove_v1($io, $tip, $path, $mime);
return ($err, $cur) if $err;
} else {
my $sref;
@@ -241,7 +238,7 @@ sub remove {
}
my $len = length($$sref);
$blob = $self->{mark}++;
- print $w "blob\nmark :$blob\ndata $len\n",
+ print $io "blob\nmark :$blob\ndata $len\n",
$$sref, "\n" or wfail;
}
@@ -249,22 +246,22 @@ sub remove {
my $commit = $self->{mark}++;
my $parent = $tip =~ /\A:/ ? $tip : undef;
unless ($parent) {
- print $w "reset $ref\n" or wfail;
+ print $io "reset $ref\n" or wfail;
}
my $ident = $self->{ident};
my $now = now_raw();
$msg //= 'rm';
my $len = length($msg) + 1;
- print $w "commit $ref\nmark :$commit\n",
+ print $io "commit $ref\nmark :$commit\n",
"author $ident $now\n",
"committer $ident $now\n",
"data $len\n$msg\n\n",
'from ', ($parent ? $parent : $tip), "\n" or wfail;
if (defined $path) {
- print $w "D $path\n\n" or wfail;
+ print $io "D $path\n\n" or wfail;
} else {
- clean_tree_v2($self, $w, 'd');
- print $w "M 100644 :$blob d\n\n" or wfail;
+ clean_tree_v2($self, $io, 'd');
+ print $io "M 100644 :$blob d\n\n" or wfail;
}
$self->{nchg}++;
(($self->{tip} = ":$commit"), $cur);
@@ -354,11 +351,11 @@ sub v1_mid0 ($) {
$mids->[0];
}
sub clean_tree_v2 ($$$) {
- my ($self, $w, $keep) = @_;
+ my ($self, $io, $keep) = @_;
my $tree = $self->{-tree} or return; #v2 only
delete $tree->{$keep};
foreach (keys %$tree) {
- print $w "D $_\n" or wfail;
+ print $io "D $_\n" or wfail;
}
%$tree = ($keep => 1);
}
@@ -377,10 +374,10 @@ sub add {
$path = 'm';
}
- my ($r, $w) = $self->gfi_start;
+ my $io = gfi_start($self);
my $tip = $self->{tip};
if ($path_type eq '2/38') {
- _check_path($r, $w, $tip, $path) and return;
+ _check_path($io, $tip, $path) and return;
}
drop_unwanted_headers($mime);
@@ -394,8 +391,7 @@ sub add {
my $raw_email = $mime->{-public_inbox_raw} // $mime->as_string;
my $n = length($raw_email);
$self->{bytes_added} += $n;
- print $w "blob\nmark :$blob\ndata ", $n, "\n" or wfail;
- print $w $raw_email, "\n" or wfail;
+ print $io "blob\nmark :$blob\ndata $n\n", $raw_email, "\n" or wfail;
# v2: we need this for Xapian
if ($smsg) {
@@ -422,19 +418,19 @@ sub add {
my $parent = $tip =~ /\A:/ ? $tip : undef;
unless ($parent) {
- print $w "reset $ref\n" or wfail;
+ print $io "reset $ref\n" or wfail;
}
- print $w "commit $ref\nmark :$commit\n",
+ print $io "commit $ref\nmark :$commit\n",
"author $author $at\n",
- "committer $self->{ident} $ct\n" or wfail;
- print $w "data ", (length($subject) + 1), "\n",
+ "committer $self->{ident} $ct\n",
+ "data ", (length($subject) + 1), "\n",
$subject, "\n\n" or wfail;
if ($tip ne '') {
- print $w 'from ', ($parent ? $parent : $tip), "\n" or wfail;
+ print $io 'from ', ($parent ? $parent : $tip), "\n" or wfail;
}
- clean_tree_v2($self, $w, $path);
- print $w "M 100644 :$blob $path\n\n" or wfail;
+ clean_tree_v2($self, $io, $path);
+ print $io "M 100644 :$blob $path\n\n" or wfail;
$self->{nchg}++;
$self->{tip} = ":$commit";
}
@@ -475,15 +471,14 @@ EOM
}
# true if locked and active
-sub active { !!$_[0]->{out} }
+sub active { !!$_[0]->{io} }
sub done {
my ($self) = @_;
- my $w = delete $self->{out} or return;
+ my $io = delete $self->{io} or return;
eval {
- my $r = delete $self->{in} or die 'BUG: missing {in} when done';
- print $w "done\n" or wfail;
- close $r;
+ print $io "done\n" or wfail;
+ close $io; # reaps and dies on error
};
my $wait_err = $@;
my $nchg = delete $self->{nchg};
@@ -496,10 +491,7 @@ sub done {
die $wait_err if $wait_err;
}
-sub atfork_child {
- my ($self) = @_;
- close($_) for (grep defined, delete(@$self{qw(in out)}));
-}
+sub atfork_child { close(delete($_[0]->{io}) // return) }
sub digest2mid ($$;$) {
my ($dig, $hdr, $fallback_time) = @_;
@@ -552,7 +544,7 @@ sub replace_oids {
my $git = $self->{git};
my @export = (qw(fast-export --no-data --use-done-feature), $old);
my $rd = $git->popen(@export);
- my ($r, $w) = $self->gfi_start;
+ my $io = gfi_start($self);
my @buf;
my $nreplace = 0;
my @oids;
@@ -563,7 +555,7 @@ sub replace_oids {
push @buf, "reset $tmp\n";
} elsif (/^commit (?:.+)/) {
if (@buf) {
- print $w @buf or wfail;
+ print $io @buf or wfail;
@buf = ();
}
push @buf, "commit $tmp\n";
@@ -599,7 +591,7 @@ sub replace_oids {
rewrite_commit($self, \@oids, \@buf, $mime);
$nreplace++;
}
- print $w @buf, "\n" or wfail;
+ print $io @buf, "\n" or wfail;
@buf = ();
} elsif ($_ eq "done\n") {
$done = 1;
@@ -612,7 +604,7 @@ sub replace_oids {
}
close $rd;
if (@buf) {
- print $w @buf or wfail;
+ print $io @buf or wfail;
}
die 'done\n not seen from fast-export' unless $done;
chomp(my $cmt = $self->get_mark(":$mark")) if $nreplace;
diff --git a/script/public-inbox-convert b/script/public-inbox-convert
index 780f7194..0cc52777 100755
--- a/script/public-inbox-convert
+++ b/script/public-inbox-convert
@@ -120,7 +120,7 @@ my $head = $old->{ref_head} || 'HEAD';
my $rd = $old->git->popen(qw(fast-export --use-done-feature), $head);
$v2w->idx_init($opt);
my $im = $v2w->importer;
-my ($r, $w) = $im->gfi_start;
+my $io = $im->gfi_start;
my $h = '[0-9a-f]';
my %D;
my $last;
@@ -131,12 +131,12 @@ while (<$rd>) {
$state = 'commit';
} elsif (/^data ([0-9]+)/) {
my $len = $1;
- print $w $_ or $im->wfail;
+ print $io $_ or $im->wfail;
while ($len) {
my $n = read($rd, my $tmp, $len) or die "read: $!";
warn "$n != $len\n" if $n != $len;
$len -= $n;
- print $w $tmp or $im->wfail;
+ print $io $tmp or $im->wfail;
}
next;
} elsif ($state eq 'commit') {
@@ -144,9 +144,9 @@ while (<$rd>) {
my ($mark, $path) = ($1, $2);
$D{$path} = $mark;
if ($last && $last ne 'm') {
- print $w "D $last\n" or $im->wfail;
+ print $io "D $last\n" or $im->wfail;
}
- print $w "M 100644 :$mark m\n" or $im->wfail;
+ print $io "M 100644 :$mark m\n" or $im->wfail;
$last = 'm';
next;
}
@@ -154,18 +154,18 @@ while (<$rd>) {
my $mark = delete $D{$1};
defined $mark or die "undeleted path: $1\n";
if ($last && $last ne 'd') {
- print $w "D $last\n" or $im->wfail;
+ print $io "D $last\n" or $im->wfail;
}
- print $w "M 100644 :$mark d\n" or $im->wfail;
+ print $io "M 100644 :$mark d\n" or $im->wfail;
$last = 'd';
next;
}
}
last if $_ eq "done\n";
- print $w $_ or $im->wfail;
+ print $io $_ or $im->wfail;
}
close $rd or die "fast-export: \$?=$? \$!=$!\n";
-$r = $w = undef; # v2w->done does the actual close and error checking
+$io = undef;
$v2w->done;
if (my $old_mm = $old->mm) {
$old->cleanup;
^ permalink raw reply related [relevance 4%]
* [PATCH 0/9] lei + import-related updates
@ 2023-10-11 7:20 7% Eric Wong
2023-10-11 7:20 4% ` [PATCH 6/9] import: switch to Unix stream socket for fast-import Eric Wong
0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2023-10-11 7:20 UTC (permalink / raw)
To: meta
A few more ProcessIO conversions to start with, and then
cleanups while I started working on import-related stuff.
Some of this will tie in nicely for FUSE, too...
I've realized msgtime messages were pointless anyways since
there's nothing anybody can really do about bad messages that
get through various upstream spam filters.
5/9 is a long-overdue cleanup I noticed while going
over Import.pm
9/9 ought to fix the fragile t/lei-store-fail.t test
by using new features.
Eric Wong (9):
lei rediff: use ProcessIO for --drq support
lei_xsearch: improve curl progress reporting
msgtime: quiet warnings we can do nothing about
msgtime: simplify msg_timestamp and msg_datestamp
treewide: consolidate "From " line removal
import: switch to Unix stream socket for fast-import
import: cat_blob is a no-op w/o live fast-import
lei blob: run cat_blob on lei/store for pending blobs
lei import|tag|rm: support --commit-delay=SECONDS
lib/PublicInbox/Eml.pm | 6 ++
lib/PublicInbox/IMAP.pm | 2 +-
lib/PublicInbox/Import.pm | 138 ++++++++++++++++------------------
lib/PublicInbox/LEI.pm | 23 +++---
lib/PublicInbox/LeiBlob.pm | 16 ++--
lib/PublicInbox/LeiInput.pm | 5 +-
lib/PublicInbox/LeiInspect.pm | 2 +-
lib/PublicInbox/LeiRediff.pm | 33 ++++----
lib/PublicInbox/LeiStore.pm | 11 +++
lib/PublicInbox/LeiToMail.pm | 3 +-
lib/PublicInbox/LeiXSearch.pm | 34 +++++----
lib/PublicInbox/Mbox.pm | 16 ++--
lib/PublicInbox/MboxReader.pm | 2 +-
lib/PublicInbox/MsgTime.pm | 49 +++++-------
lib/PublicInbox/NNTP.pm | 3 +-
lib/PublicInbox/ProcessIO.pm | 18 ++---
lib/PublicInbox/Spawn.pm | 1 +
script/public-inbox-convert | 18 ++---
script/public-inbox-edit | 5 +-
script/public-inbox-learn | 2 +-
script/public-inbox-mda | 4 +-
script/public-inbox-purge | 4 +-
t/lei-import.t | 13 ++++
t/lei-store-fail.t | 20 +++--
t/lei-tag.t | 15 +++-
25 files changed, 230 insertions(+), 213 deletions(-)
^ permalink raw reply [relevance 7%]
Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2023-10-11 7:20 7% [PATCH 0/9] lei + import-related updates Eric Wong
2023-10-11 7:20 4% ` [PATCH 6/9] import: switch to Unix stream socket for fast-import Eric Wong
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).