user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 6/9] import: switch to Unix stream socket for fast-import
  2023-10-11  7:20  7% [PATCH 0/9] lei + import-related updates Eric Wong
@ 2023-10-11  7:20  4% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2023-10-11  7:20 UTC (permalink / raw)
  To: meta

We use fewer file descriptors and fewer lines of code this way.
I'm not aware of any place we rely on POSIX pipe semantics with
`git fast-import', and sockets have bigger buffers by default
in most cases (even if Linux allows larger pipe buffers).
---
 lib/PublicInbox/Import.pm   | 132 +++++++++++++++++-------------------
 script/public-inbox-convert |  18 ++---
 2 files changed, 71 insertions(+), 79 deletions(-)

diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index cd03da05..894ba818 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -8,7 +8,7 @@
 package PublicInbox::Import;
 use v5.12;
 use parent qw(PublicInbox::Lock);
-use PublicInbox::Spawn qw(run_die popen_rd);
+use PublicInbox::Spawn qw(run_die popen_rd spawn);
 use PublicInbox::MID qw(mids mid2path);
 use PublicInbox::Address;
 use PublicInbox::Smsg;
@@ -16,9 +16,11 @@ use PublicInbox::MsgTime qw(msg_datestamp);
 use PublicInbox::ContentHash qw(content_digest);
 use PublicInbox::MDA;
 use PublicInbox::Eml;
+use PublicInbox::ProcessIO;
 use POSIX qw(strftime);
-use autodie qw(read close);
+use autodie qw(read close socketpair);
 use Carp qw(croak);
+use Socket qw(AF_UNIX SOCK_STREAM);
 
 sub default_branch () {
 	state $default_branch = do {
@@ -56,11 +58,10 @@ sub new {
 # idempotent start function
 sub gfi_start {
 	my ($self) = @_;
-
-	return ($self->{in}, $self->{out}) if $self->{in};
-
-	my ($in_r, $out_r, $out_w);
-	pipe($out_r, $out_w) or die "pipe failed: $!";
+	my $io = $self->{io};
+	return $io if $io;
+	socketpair($io, my $s2, AF_UNIX, SOCK_STREAM, 0);
+	$io->autoflush(1);
 
 	$self->lock_acquire;
 	eval {
@@ -73,18 +74,17 @@ sub gfi_start {
 			die "fatal: ls-tree -r -z --name-only $ref: \$?=$?" if $?;
 			$self->{-tree} = { map { $_ => 1 } split(/\0/, $t) };
 		}
-		$in_r = $self->{in} = $git->popen(qw(fast-import
-					--quiet --done --date-format=raw),
-					undef, { 0 => $out_r });
-		$out_w->autoflush(1);
-		$self->{out} = $out_w;
+		my $gfi = [ 'git', "--git-dir=$git->{git_dir}", qw(fast-import
+				--quiet --done --date-format=raw) ];
+		my $pid = spawn($gfi, undef, { 0 => $s2, 1 => $s2 });
 		$self->{nchg} = 0;
+		$self->{io} = PublicInbox::ProcessIO->maybe_new($pid, $io);
 	};
 	if ($@) {
 		$self->lock_release;
 		die $@;
 	}
-	($in_r, $out_w);
+	$self->{io};
 }
 
 sub wfail () { die "write to fast-import failed: $!" }
@@ -99,22 +99,22 @@ sub norm_body ($) {
 }
 
 # only used for v1 (ssoma) inboxes
-sub _check_path ($$$$) {
-	my ($r, $w, $tip, $path) = @_;
+sub _check_path ($$$) {
+	my ($io, $tip, $path) = @_;
 	return if $tip eq '';
-	print $w "ls $tip $path\n" or wfail;
+	print $io "ls $tip $path\n" or wfail;
 	local $/ = "\n";
-	my $info = <$r> // die "EOF from fast-import: $!";
+	my $info = <$io> // die "EOF from fast-import: $!";
 	$info =~ /\Amissing / ? undef : $info;
 }
 
-sub _cat_blob ($$$) {
-	my ($r, $w, $oid) = @_;
-	print $w "cat-blob $oid\n" or wfail;
+sub _cat_blob ($$) {
+	my ($io, $oid) = @_;
+	print $io "cat-blob $oid\n" or wfail;
 	local $/ = "\n";
-	my $info = <$r> // die "EOF from fast-import / cat-blob: $!";
+	my $info = <$io> // die "EOF from fast-import / cat-blob: $!";
 	$info =~ /\A[a-f0-9]{40,} blob ([0-9]+)\n\z/ or return;
-	my $n = read($r, my $buf, my $len = $1 + 1);
+	my $n = read($io, my $buf, my $len = $1 + 1);
 	$n == $len or croak "cat-blob: short read: $n < $len";
 	my $lf = chop $buf;
 	croak "bad read on final byte: <$lf>" if $lf ne "\n";
@@ -123,17 +123,16 @@ sub _cat_blob ($$$) {
 
 sub cat_blob {
 	my ($self, $oid) = @_;
-	my ($r, $w) = $self->gfi_start;
-	_cat_blob($r, $w, $oid);
+	_cat_blob(gfi_start($self), $oid);
 }
 
 sub check_remove_v1 {
-	my ($r, $w, $tip, $path, $mime) = @_;
+	my ($io, $tip, $path, $mime) = @_;
 
-	my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef);
+	my $info = _check_path($io, $tip, $path) or return ('MISSING',undef);
 	$info =~ m!\A100644 blob ([a-f0-9]{40,})\t!s or die "not blob: $info";
 	my $oid = $1;
-	my $bref = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed";
+	my $bref = _cat_blob($io, $oid) or die "BUG: cat-blob $1 failed";
 	PublicInbox::Eml::strip_from($$bref);
 	my $cur = PublicInbox::Eml->new($bref);
 	my $cur_s = $cur->header('Subject') // '';
@@ -146,16 +145,15 @@ sub check_remove_v1 {
 
 sub checkpoint {
 	my ($self) = @_;
-	return unless $self->{in};
-	print { $self->{out} } "checkpoint\n" or wfail;
+	print { $self->{io} // return } "checkpoint\n" or wfail;
 	undef;
 }
 
 sub progress {
 	my ($self, $msg) = @_;
-	return unless $self->{in};
-	print { $self->{out} } "progress $msg\n" or wfail;
-	readline($self->{in}) eq "progress $msg\n" or die
+	my $io = $self->{io} or return;
+	print $io "progress $msg\n" or wfail;
+	readline($io) eq "progress $msg\n" or die
 		"progress $msg not received\n";
 	undef;
 }
@@ -205,10 +203,9 @@ sub barrier {
 # used for v2
 sub get_mark {
 	my ($self, $mark) = @_;
-	die "not active\n" unless $self->{in};
-	my ($r, $w) = $self->gfi_start;
-	print $w "get-mark $mark\n" or wfail;
-	my $oid = <$r> // die "get-mark failed, need git 2.6.0+\n";
+	my $io = $self->{io} or croak "not active\n";
+	print $io "get-mark $mark\n" or wfail;
+	my $oid = <$io> // die "get-mark failed, need git 2.6.0+\n";
 	chomp($oid);
 	$oid;
 }
@@ -225,11 +222,11 @@ sub remove {
 	my $path_type = $self->{path_type};
 	my ($path, $err, $cur, $blob);
 
-	my ($r, $w) = $self->gfi_start;
+	my $io = gfi_start($self);
 	my $tip = $self->{tip};
 	if ($path_type eq '2/38') {
 		$path = mid2path(v1_mid0($mime));
-		($err, $cur) = check_remove_v1($r, $w, $tip, $path, $mime);
+		($err, $cur) = check_remove_v1($io, $tip, $path, $mime);
 		return ($err, $cur) if $err;
 	} else {
 		my $sref;
@@ -241,7 +238,7 @@ sub remove {
 		}
 		my $len = length($$sref);
 		$blob = $self->{mark}++;
-		print $w "blob\nmark :$blob\ndata $len\n",
+		print $io "blob\nmark :$blob\ndata $len\n",
 			$$sref, "\n" or wfail;
 	}
 
@@ -249,22 +246,22 @@ sub remove {
 	my $commit = $self->{mark}++;
 	my $parent = $tip =~ /\A:/ ? $tip : undef;
 	unless ($parent) {
-		print $w "reset $ref\n" or wfail;
+		print $io "reset $ref\n" or wfail;
 	}
 	my $ident = $self->{ident};
 	my $now = now_raw();
 	$msg //= 'rm';
 	my $len = length($msg) + 1;
-	print $w "commit $ref\nmark :$commit\n",
+	print $io "commit $ref\nmark :$commit\n",
 		"author $ident $now\n",
 		"committer $ident $now\n",
 		"data $len\n$msg\n\n",
 		'from ', ($parent ? $parent : $tip), "\n" or wfail;
 	if (defined $path) {
-		print $w "D $path\n\n" or wfail;
+		print $io "D $path\n\n" or wfail;
 	} else {
-		clean_tree_v2($self, $w, 'd');
-		print $w "M 100644 :$blob d\n\n" or wfail;
+		clean_tree_v2($self, $io, 'd');
+		print $io "M 100644 :$blob d\n\n" or wfail;
 	}
 	$self->{nchg}++;
 	(($self->{tip} = ":$commit"), $cur);
@@ -354,11 +351,11 @@ sub v1_mid0 ($) {
 	$mids->[0];
 }
 sub clean_tree_v2 ($$$) {
-	my ($self, $w, $keep) = @_;
+	my ($self, $io, $keep) = @_;
 	my $tree = $self->{-tree} or return; #v2 only
 	delete $tree->{$keep};
 	foreach (keys %$tree) {
-		print $w "D $_\n" or wfail;
+		print $io "D $_\n" or wfail;
 	}
 	%$tree = ($keep => 1);
 }
@@ -377,10 +374,10 @@ sub add {
 		$path = 'm';
 	}
 
-	my ($r, $w) = $self->gfi_start;
+	my $io = gfi_start($self);
 	my $tip = $self->{tip};
 	if ($path_type eq '2/38') {
-		_check_path($r, $w, $tip, $path) and return;
+		_check_path($io, $tip, $path) and return;
 	}
 
 	drop_unwanted_headers($mime);
@@ -394,8 +391,7 @@ sub add {
 	my $raw_email = $mime->{-public_inbox_raw} // $mime->as_string;
 	my $n = length($raw_email);
 	$self->{bytes_added} += $n;
-	print $w "blob\nmark :$blob\ndata ", $n, "\n" or wfail;
-	print $w $raw_email, "\n" or wfail;
+	print $io "blob\nmark :$blob\ndata $n\n", $raw_email, "\n" or wfail;
 
 	# v2: we need this for Xapian
 	if ($smsg) {
@@ -422,19 +418,19 @@ sub add {
 	my $parent = $tip =~ /\A:/ ? $tip : undef;
 
 	unless ($parent) {
-		print $w "reset $ref\n" or wfail;
+		print $io "reset $ref\n" or wfail;
 	}
 
-	print $w "commit $ref\nmark :$commit\n",
+	print $io "commit $ref\nmark :$commit\n",
 		"author $author $at\n",
-		"committer $self->{ident} $ct\n" or wfail;
-	print $w "data ", (length($subject) + 1), "\n",
+		"committer $self->{ident} $ct\n",
+		"data ", (length($subject) + 1), "\n",
 		$subject, "\n\n" or wfail;
 	if ($tip ne '') {
-		print $w 'from ', ($parent ? $parent : $tip), "\n" or wfail;
+		print $io 'from ', ($parent ? $parent : $tip), "\n" or wfail;
 	}
-	clean_tree_v2($self, $w, $path);
-	print $w "M 100644 :$blob $path\n\n" or wfail;
+	clean_tree_v2($self, $io, $path);
+	print $io "M 100644 :$blob $path\n\n" or wfail;
 	$self->{nchg}++;
 	$self->{tip} = ":$commit";
 }
@@ -475,15 +471,14 @@ EOM
 }
 
 # true if locked and active
-sub active { !!$_[0]->{out} }
+sub active { !!$_[0]->{io} }
 
 sub done {
 	my ($self) = @_;
-	my $w = delete $self->{out} or return;
+	my $io = delete $self->{io} or return;
 	eval {
-		my $r = delete $self->{in} or die 'BUG: missing {in} when done';
-		print $w "done\n" or wfail;
-		close $r;
+		print $io "done\n" or wfail;
+		close $io; # reaps and dies on error
 	};
 	my $wait_err = $@;
 	my $nchg = delete $self->{nchg};
@@ -496,10 +491,7 @@ sub done {
 	die $wait_err if $wait_err;
 }
 
-sub atfork_child {
-	my ($self) = @_;
-	close($_) for (grep defined, delete(@$self{qw(in out)}));
-}
+sub atfork_child { close(delete($_[0]->{io}) // return) }
 
 sub digest2mid ($$;$) {
 	my ($dig, $hdr, $fallback_time) = @_;
@@ -552,7 +544,7 @@ sub replace_oids {
 	my $git = $self->{git};
 	my @export = (qw(fast-export --no-data --use-done-feature), $old);
 	my $rd = $git->popen(@export);
-	my ($r, $w) = $self->gfi_start;
+	my $io = gfi_start($self);
 	my @buf;
 	my $nreplace = 0;
 	my @oids;
@@ -563,7 +555,7 @@ sub replace_oids {
 			push @buf, "reset $tmp\n";
 		} elsif (/^commit (?:.+)/) {
 			if (@buf) {
-				print $w @buf or wfail;
+				print $io @buf or wfail;
 				@buf = ();
 			}
 			push @buf, "commit $tmp\n";
@@ -599,7 +591,7 @@ sub replace_oids {
 				rewrite_commit($self, \@oids, \@buf, $mime);
 				$nreplace++;
 			}
-			print $w @buf, "\n" or wfail;
+			print $io @buf, "\n" or wfail;
 			@buf = ();
 		} elsif ($_ eq "done\n") {
 			$done = 1;
@@ -612,7 +604,7 @@ sub replace_oids {
 	}
 	close $rd;
 	if (@buf) {
-		print $w @buf or wfail;
+		print $io @buf or wfail;
 	}
 	die 'done\n not seen from fast-export' unless $done;
 	chomp(my $cmt = $self->get_mark(":$mark")) if $nreplace;
diff --git a/script/public-inbox-convert b/script/public-inbox-convert
index 780f7194..0cc52777 100755
--- a/script/public-inbox-convert
+++ b/script/public-inbox-convert
@@ -120,7 +120,7 @@ my $head = $old->{ref_head} || 'HEAD';
 my $rd = $old->git->popen(qw(fast-export --use-done-feature), $head);
 $v2w->idx_init($opt);
 my $im = $v2w->importer;
-my ($r, $w) = $im->gfi_start;
+my $io = $im->gfi_start;
 my $h = '[0-9a-f]';
 my %D;
 my $last;
@@ -131,12 +131,12 @@ while (<$rd>) {
 		$state = 'commit';
 	} elsif (/^data ([0-9]+)/) {
 		my $len = $1;
-		print $w $_ or $im->wfail;
+		print $io $_ or $im->wfail;
 		while ($len) {
 			my $n = read($rd, my $tmp, $len) or die "read: $!";
 			warn "$n != $len\n" if $n != $len;
 			$len -= $n;
-			print $w $tmp or $im->wfail;
+			print $io $tmp or $im->wfail;
 		}
 		next;
 	} elsif ($state eq 'commit') {
@@ -144,9 +144,9 @@ while (<$rd>) {
 			my ($mark, $path) = ($1, $2);
 			$D{$path} = $mark;
 			if ($last && $last ne 'm') {
-				print $w "D $last\n" or $im->wfail;
+				print $io "D $last\n" or $im->wfail;
 			}
-			print $w "M 100644 :$mark m\n" or $im->wfail;
+			print $io "M 100644 :$mark m\n" or $im->wfail;
 			$last = 'm';
 			next;
 		}
@@ -154,18 +154,18 @@ while (<$rd>) {
 			my $mark = delete $D{$1};
 			defined $mark or die "undeleted path: $1\n";
 			if ($last && $last ne 'd') {
-				print $w "D $last\n" or $im->wfail;
+				print $io "D $last\n" or $im->wfail;
 			}
-			print $w "M 100644 :$mark d\n" or $im->wfail;
+			print $io "M 100644 :$mark d\n" or $im->wfail;
 			$last = 'd';
 			next;
 		}
 	}
 	last if $_ eq "done\n";
-	print $w $_ or $im->wfail;
+	print $io $_ or $im->wfail;
 }
 close $rd or die "fast-export: \$?=$? \$!=$!\n";
-$r = $w = undef; # v2w->done does the actual close and error checking
+$io = undef;
 $v2w->done;
 if (my $old_mm = $old->mm) {
 	$old->cleanup;

^ permalink raw reply related	[relevance 4%]

* [PATCH 0/9] lei + import-related updates
@ 2023-10-11  7:20  7% Eric Wong
  2023-10-11  7:20  4% ` [PATCH 6/9] import: switch to Unix stream socket for fast-import Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2023-10-11  7:20 UTC (permalink / raw)
  To: meta

A few more ProcessIO conversions to start with, and then
cleanups while I started working on import-related stuff.
Some of this will tie in nicely for FUSE, too...

I've realized msgtime messages were pointless anyways since
there's nothing anybody can really do about bad messages that
get through various upstream spam filters.

5/9 is a long-overdue cleanup I noticed while going
over Import.pm

9/9 ought to fix the fragile t/lei-store-fail.t test
by using new features.

Eric Wong (9):
  lei rediff: use ProcessIO for --drq support
  lei_xsearch: improve curl progress reporting
  msgtime: quiet warnings we can do nothing about
  msgtime: simplify msg_timestamp and msg_datestamp
  treewide: consolidate "From " line removal
  import: switch to Unix stream socket for fast-import
  import: cat_blob is a no-op w/o live fast-import
  lei blob: run cat_blob on lei/store for pending blobs
  lei import|tag|rm: support --commit-delay=SECONDS

 lib/PublicInbox/Eml.pm        |   6 ++
 lib/PublicInbox/IMAP.pm       |   2 +-
 lib/PublicInbox/Import.pm     | 138 ++++++++++++++++------------------
 lib/PublicInbox/LEI.pm        |  23 +++---
 lib/PublicInbox/LeiBlob.pm    |  16 ++--
 lib/PublicInbox/LeiInput.pm   |   5 +-
 lib/PublicInbox/LeiInspect.pm |   2 +-
 lib/PublicInbox/LeiRediff.pm  |  33 ++++----
 lib/PublicInbox/LeiStore.pm   |  11 +++
 lib/PublicInbox/LeiToMail.pm  |   3 +-
 lib/PublicInbox/LeiXSearch.pm |  34 +++++----
 lib/PublicInbox/Mbox.pm       |  16 ++--
 lib/PublicInbox/MboxReader.pm |   2 +-
 lib/PublicInbox/MsgTime.pm    |  49 +++++-------
 lib/PublicInbox/NNTP.pm       |   3 +-
 lib/PublicInbox/ProcessIO.pm  |  18 ++---
 lib/PublicInbox/Spawn.pm      |   1 +
 script/public-inbox-convert   |  18 ++---
 script/public-inbox-edit      |   5 +-
 script/public-inbox-learn     |   2 +-
 script/public-inbox-mda       |   4 +-
 script/public-inbox-purge     |   4 +-
 t/lei-import.t                |  13 ++++
 t/lei-store-fail.t            |  20 +++--
 t/lei-tag.t                   |  15 +++-
 25 files changed, 230 insertions(+), 213 deletions(-)


^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2023-10-11  7:20  7% [PATCH 0/9] lei + import-related updates Eric Wong
2023-10-11  7:20  4% ` [PATCH 6/9] import: switch to Unix stream socket for fast-import Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).