user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 4/7] lei: less error-prone FD mapping
Date: Fri, 29 Jan 2021 12:42:57 +0500	[thread overview]
Message-ID: <20210129074300.14475-5-e@80x24.org> (raw)
In-Reply-To: <20210129074300.14475-1-e@80x24.org>

Keeping track of non-standard FDs gets tricky, so make it easier
by relying on st_dev/st_ino mapping in the transmitted objects.

We'll keep using numbers for the standard FDs since we need to
be able to easily redirect them in the producer (main daemon)
process for (gzip|bzip2|xz) if writing to a compressed mbox.
---
 lib/PublicInbox/LEI.pm         | 56 +++++++++++++++++++++-------
 lib/PublicInbox/LeiOverview.pm |  9 ++---
 lib/PublicInbox/LeiToMail.pm   |  8 +---
 lib/PublicInbox/LeiXSearch.pm  | 68 +++++++++++++++-------------------
 lib/PublicInbox/Spawn.pm       |  2 +-
 5 files changed, 77 insertions(+), 66 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index f5413aab..3ed330f9 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -335,14 +335,27 @@ sub atfork_prepare_wq {
 	}
 }
 
+sub io_restore ($$) {
+	my ($dst, $src) = @_;
+	for my $i (0..2) { # standard FDs
+		my $io = delete $src->{$i} or next;
+		$dst->{$i} = $io;
+	}
+	for my $i (3..9) { # named (non-standard) FDs
+		my $io = $src->{$i} or next;
+		my @st = stat($io) or die "stat $src.$i ($io): $!";
+		my $f = delete $dst->{"dev=$st[0],ino=$st[1]"} // next;
+		$dst->{$f} = $io;
+		delete $src->{$i};
+	}
+}
+
 # usage: my %sig = $lei->atfork_child_wq($wq);
 #	 local @SIG{keys %sig} = values %sig;
 sub atfork_child_wq {
 	my ($self, $wq) = @_;
-	my ($sock, $l2m_wq_s1);
-	(@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4});
-	$self->{sock} = $sock if -S $sock;
-	$self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1 && -S $l2m_wq_s1;
+	io_restore($self, $wq);
+	io_restore($self->{l2m}, $wq);
 	%PATH2CFG = ();
 	undef $errors_log;
 	$quit = \&CORE::exit;
@@ -355,30 +368,45 @@ sub atfork_child_wq {
 			close(delete $self->{$i});
 		}
 		# trigger the LeiXSearch $done OpPipe:
-		syswrite($self->{0}, '!') if $self->{0} && -p $self->{0};
+		syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
 		$SIG{PIPE} = 'DEFAULT';
 		die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
 	});
 }
 
+sub io_extract ($;@) {
+	my ($obj, @fields) = @_;
+	my @io;
+	for my $f (@fields) {
+		my $io = delete $obj->{$f} or next;
+		my @st = stat($io) or die "W: stat $obj.$f ($io): $!";
+		$obj->{"dev=$st[0],ino=$st[1]"} = $f;
+		push @io, $io;
+	}
+	@io
+}
+
 # usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
 sub atfork_parent_wq {
 	my ($self, $wq) = @_;
 	my $env = delete $self->{env}; # env is inherited at fork
-	my $ret = bless { %$self }, ref($self);
-	if (my $dedupe = delete $ret->{dedupe}) {
-		$ret->{dedupe} = $wq->deep_clone($dedupe);
+	my $lei = bless { %$self }, ref($self);
+	if (my $dedupe = delete $lei->{dedupe}) {
+		$lei->{dedupe} = $wq->deep_clone($dedupe);
 	}
 	$self->{env} = $env;
-	delete @$ret{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m
-	my @io = delete @$ret{0..2};
-	$io[3] = delete($ret->{sock}) // $io[2];
-	my $l2m = $ret->{l2m};
+	delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m
+	my @io = (delete(@$lei{qw(0 1 2)}),
+			io_extract($lei, qw(sock op_pipe startq)));
+	my $l2m = $lei->{l2m};
 	if ($l2m && $l2m != $wq) { # $wq == lxs
-		$io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1};
+		if (my $wq_s1 = $l2m->{-wq_s1}) {
+			push @io, io_extract($l2m, '-wq_s1');
+			$l2m->{-wq_s1} = $wq_s1;
+		}
 		$l2m->wq_close(1);
 	}
-	($ret, @io);
+	($lei, @io);
 }
 
 sub _help ($;$) {
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index f9a28138..c67e2747 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -220,14 +220,13 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
 		};
 	} elsif ($l2m && $l2m->{-wq_s1}) {
 		my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m);
-		# n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout,
-		# $io[4] becomes a notification pipe that triggers EOF
+		# $io[-1] becomes a notification pipe that triggers EOF
 		# in this wq worker when all outstanding ->write_mail
 		# calls are complete
-		die "BUG: \$io[4] $io[4] unexpected" if $io[4];
-		pipe($l2m->{each_smsg_done}, $io[4]) or die "pipe: $!";
-		fcntl($io[4], 1031, 4096) if $^O eq 'linux';
+		pipe($l2m->{each_smsg_done}, $io[$#io + 1]) or die "pipe: $!";
+		fcntl($io[-1], 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
 		delete @$lei_ipc{qw(l2m opt mset_opt cmd)};
+		$lei_ipc->{each_smsg_not_done} = $#io;
 		my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
 		$self->{git} = $git;
 		my $git_dir = $git->{git_dir};
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 08a1570d..61b546b5 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -460,7 +460,7 @@ sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
 
 sub write_mail { # via ->wq_do
 	my ($self, $git_dir, $smsg, $lei) = @_;
-	my $not_done = delete $self->{4}; # write end of {each_smsg_done}
+	my $not_done = delete $self->{$lei->{each_smsg_not_done}};
 	my $wcb = $self->{wcb} //= do { # first message
 		my %sig = $lei->atfork_child_wq($self);
 		@SIG{keys %sig} = values %sig; # not local
@@ -471,12 +471,6 @@ sub write_mail { # via ->wq_do
 	$git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]);
 }
 
-sub ipc_atfork_prepare {
-	my ($self) = @_;
-	# FDs: (done_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr)
-	$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
-}
-
 # We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY
 # ordering is unstable at worker exit and may cause segfaults
 sub reap_gits {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 9ea2b5f3..e69b637c 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -109,9 +109,9 @@ sub wait_startq ($) {
 sub query_thread_mset { # for --thread
 	my ($self, $lei, $ibxish) = @_;
 	local $0 = "$0 query_thread_mset";
-	my $startq = delete $self->{5};
 	my %sig = $lei->atfork_child_wq($self);
 	local @SIG{keys %sig} = values %sig;
+	my $startq = delete $lei->{startq};
 
 	my ($srch, $over) = ($ibxish->search, $ibxish->over);
 	unless ($srch && $over) {
@@ -145,9 +145,9 @@ sub query_thread_mset { # for --thread
 sub query_mset { # non-parallel for non-"--thread" users
 	my ($self, $lei) = @_;
 	local $0 = "$0 query_mset";
-	my $startq = delete $self->{5};
 	my %sig = $lei->atfork_child_wq($self);
 	local @SIG{keys %sig} = values %sig;
+	my $startq = delete $lei->{startq};
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
 	for my $loc (locals($self)) {
@@ -173,7 +173,7 @@ sub each_eml { # callback for MboxReader->mboxrd
 	$smsg->parse_references($eml, mids($eml));
 	$smsg->{$_} //= '' for qw(from to cc ds subject references mid);
 	delete @$smsg{qw(From Subject -ds -ts)};
-	if (my $startq = delete($self->{5})) { wait_startq($startq) }
+	if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
 	$each_smsg->($smsg, undef, $eml);
 }
 
@@ -352,11 +352,12 @@ sub query_prepare { # called by wq_do
 	my ($self, $lei) = @_;
 	local $0 = "$0 query_prepare";
 	my %sig = $lei->atfork_child_wq($self);
-	-p $lei->{0} or die "BUG: \$done pipe expected";
+	-p $lei->{op_pipe} or die "BUG: \$done pipe expected";
 	local @SIG{keys %sig} = values %sig;
+	delete $lei->{l2m}->{-wq_s1};
 	eval { $lei->{l2m}->do_augment($lei) };
 	$lei->fail($@) if $@;
-	syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!";
+	syswrite($lei->{op_pipe}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
 sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
@@ -370,56 +371,45 @@ sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
 }
 
 sub do_query {
-	my ($self, $lei_orig) = @_;
-	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
-	$io[0] = undef;
-	pipe(my $done, $io[0]) or die "pipe $!";
-	$lei_orig->{1}->autoflush(1);
+	my ($self, $lei) = @_;
+	$lei->{1}->autoflush(1);
+	my ($au_done, $zpipe);
+	my $l2m = $lei->{l2m};
+	if ($l2m) {
+		pipe($lei->{startq}, $au_done) or die "pipe: $!";
+		# 1031: F_SETPIPE_SZ
+		fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
+		$zpipe = $l2m->pre_augment($lei);
+	}
+	pipe(my $done, $lei->{op_pipe}) or die "pipe $!";
+	my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
+	delete($lei->{op_pipe});
 
-	$lei_orig->event_step_init; # wait for shutdowns
+	$lei->event_step_init; # wait for shutdowns
 	my $done_op = {
-		'' => [ \&query_done, $lei_orig ],
-		'!' => [ \&sigpipe_handler, $lei_orig ]
+		'' => [ \&query_done, $lei ],
+		'!' => [ \&sigpipe_handler, $lei ]
 	};
-	my $in_loop = exists $lei_orig->{sock};
+	my $in_loop = exists $lei->{sock};
 	$done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
-	my $l2m = $lei->{l2m};
 	if ($l2m) {
-		# may redirect $lei->{1} for mbox
-		my $zpipe = $l2m->pre_augment($lei_orig);
-		$io[1] = $lei_orig->{1};
-		pipe(my ($startq, $au_done)) or die "pipe: $!";
-		$done_op->{'.'} = [ \&do_post_augment, $lei_orig,
-					$zpipe, $au_done ];
-		local $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1}
-		die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
-		$self->wq_do('query_prepare', \@io, $lei);
-		fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
-		$io[5] = $startq;
+		$done_op->{'.'} = [ \&do_post_augment, $lei, $zpipe, $au_done ];
+		$self->wq_do('query_prepare', \@io, $lei_ipc);
 		$io[1] = $zpipe->[1] if $zpipe;
 	}
-	start_query($self, \@io, $lei);
+	start_query($self, \@io, $lei_ipc);
 	$self->wq_close(1);
 	unless ($in_loop) {
-		# for the $lei->atfork_child_wq PIPE handler:
+		# for the $lei_ipc->atfork_child_wq PIPE handler:
 		while ($done->{sock}) { $done->event_step }
 	}
 }
 
-sub ipc_atfork_prepare {
-	my ($self) = @_;
-	if (exists $self->{remotes}) {
-		require PublicInbox::MboxReader;
-		require IO::Uncompress::Gunzip;
-	}
-	# FDS: (0: done_wr, 1: stdout|mbox, 2: stderr,
-	#       3: sock, 4: $l2m->{-wq_s1}, 5: $startq)
-	$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
-}
-
 sub add_uri {
 	my ($self, $uri) = @_;
 	if (my $curl = $self->{curl} //= which('curl') // 0) {
+		require PublicInbox::MboxReader;
+		require IO::Uncompress::Gunzip;
 		push @{$self->{remotes}}, $uri;
 	} else {
 		warn "curl missing, ignoring $uri\n";
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index ef4885c1..1842899c 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-#define SEND_FD_CAPA 6
+#define SEND_FD_CAPA 10
 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
 	struct cmsghdr hdr;

  parent reply	other threads:[~2021-01-29  7:43 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-29  7:42 [PATCH 0/7] lei: more half-baked updates Eric Wong
2021-01-29  7:42 ` [PATCH 1/7] ipc: wq: support passing fields to workers Eric Wong
2021-01-29  7:42 ` [PATCH 2/7] lei_xsearch: drop repeated "Xapian" in error message Eric Wong
2021-01-29  7:42 ` [PATCH 3/7] ipc: more consistent behavior between worker types Eric Wong
2021-01-29  7:42 ` Eric Wong [this message]
2021-01-29  7:42 ` [PATCH 5/7] git: synchronous cat_file may return type and OID Eric Wong
2021-01-29  7:42 ` [PATCH 6/7] ipc: move on_destroy scope to inside the eval Eric Wong
2021-01-29  7:43 ` [PATCH 7/7] shared_kv: simplify PID+object guard for cleanup Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210129074300.14475-5-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    --subject='Re: [PATCH 4/7] lei: less error-prone FD mapping' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

user/dev discussion of public-inbox itself

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://public-inbox.org/meta
	git clone --mirror http://czquwvybam4bgbro.onion/meta
	git clone --mirror http://hjrcffqmbrq6wope.onion/meta
	git clone --mirror http://ou63pmih66umazou.onion/meta

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V1 meta meta/ https://public-inbox.org/meta \
		meta@public-inbox.org
	public-inbox-index meta

Example config snippet for mirrors.
Newsgroups are available over NNTP:
	nntp://news.public-inbox.org/inbox.comp.mail.public-inbox.meta
	nntp://7fh6tueqddpjyxjmgtdiueylzoqt6pt7hec3pukyptlmohoowvhde4yd.onion/inbox.comp.mail.public-inbox.meta
	nntp://ie5yzdi7fg72h7s4sdcztq5evakq23rdt33mfyfcddc5u3ndnw24ogqd.onion/inbox.comp.mail.public-inbox.meta
	nntp://4uok3hntl7oi7b4uf4rtfwefqeexfzil2w6kgk2jn5z2f764irre7byd.onion/inbox.comp.mail.public-inbox.meta
	nntp://news.gmane.io/gmane.mail.public-inbox.general
 note: .onion URLs require Tor: https://www.torproject.org/

code repositories for project(s) associated with this inbox:

	https://80x24.org/public-inbox.git

AGPL code for this site: git clone https://public-inbox.org/public-inbox.git