user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 02/14] lei: test SIGPIPE, stop xsearch workers on client abort
Date: Wed, 13 Jan 2021 19:06:15 -1200	[thread overview]
Message-ID: <20210114070627.18195-3-e@80x24.org> (raw)
In-Reply-To: <20210114070627.18195-1-e@80x24.org>

The new test ensures consistency between oneshot and
client/daemon users.  Cancelling an in-progress result now also
stops xsearch workers to avoid wasted CPU and I/O.

Note the lei->atfork_child_wq usage changes, it is to workaround
a bug in Perl 5: http://nntp.perl.org/group/perl.perl5.porters/258784
<CAHhgV8hPbcmkzWizp6Vijw921M5BOXixj4+zTh3nRS9vRBYk8w@mail.gmail.com>

This switches the internal protocol to use SOCK_SEQPACKET
AF_UNIX sockets to prevent merging messages from the daemon to
client to run pager and kill/exit the client script.
---
 MANIFEST                       |   1 +
 lib/PublicInbox/IPC.pm         |  45 ++++------
 lib/PublicInbox/LEI.pm         | 158 +++++++++++++++++----------------
 lib/PublicInbox/LeiOverview.pm |   5 +-
 lib/PublicInbox/LeiQuery.pm    |  22 ++---
 lib/PublicInbox/LeiXSearch.pm  |  34 +++++--
 script/lei                     |  74 ++++++++++-----
 t/lei.t                        |   2 +-
 xt/lei-sigpipe.t               |  32 +++++++
 9 files changed, 225 insertions(+), 148 deletions(-)
 create mode 100644 xt/lei-sigpipe.t

diff --git a/MANIFEST b/MANIFEST
index 810aec42..2ca240fc 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -429,6 +429,7 @@ xt/git_async_cmp.t
 xt/httpd-async-stream.t
 xt/imapd-mbsync-oimap.t
 xt/imapd-validate.t
+xt/lei-sigpipe.t
 xt/mem-imapd-tls.t
 xt/mem-msgview.t
 xt/msgtime_cmp.t
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index c54fcc64..fbc91f6f 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -130,7 +130,8 @@ sub ipc_worker_spawn {
 
 sub ipc_worker_reap { # dwaitpid callback
 	my ($self, $pid) = @_;
-	warn "PID:$pid died with \$?=$?\n" if $?;
+	# SIGTERM (15) is our default exit signal
+	warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15;
 }
 
 # for base class, override in sub classes
@@ -236,50 +237,31 @@ sub ipc_sibling_atfork_child {
 	$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub _close_recvd ($) {
-	my ($self) = @_;
-	my $x = $self->{-wq_recv_modes};
-	my $end = $x ? $#$x : 2;
-	close($_) for (grep { defined } (delete @$self{0..$end}));
-}
-
 sub wq_worker_loop ($) {
 	my ($self) = @_;
-	my $buf;
 	my $len = $self->{wq_req_len} // (4096 * 33);
-	my ($sub, $args);
 	my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
-	local $SIG{PIPE} = sub {
-		my $cur_sub = $sub;
-		_close_recvd($self);
-		die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub;
-	};
 	while (1) {
-		my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
-		my $i = 0;
+		my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF
 		my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
+		my $nfd = 0;
 		for my $fd (@fds) {
 			my $mode = shift(@m);
 			if (open(my $cmdfh, $mode, $fd)) {
-				$self->{$i++} = $cmdfh;
+				$self->{$nfd++} = $cmdfh;
 				$cmdfh->autoflush(1);
 			} else {
-				die "$$ open($mode$fd) (FD:$i): $!";
+				die "$$ open($mode$fd) (FD:$nfd): $!";
 			}
 		}
 		# Sereal dies on truncated data, Storable returns undef
-		$args = thaw($buf) //
+		my $args = thaw($buf) //
 			die "thaw error on buffer of size:".length($buf);
-		eval {
-			$sub = shift @$args;
-			eval { $self->$sub(@$args) };
-			undef $sub; # quiet SIG{PIPE} handler
-			die $@ if $@;
-		};
+		my $sub = shift @$args;
+		eval { $self->$sub(@$args) };
 		warn "$$ wq_worker: $@" if $@ &&
 					ref($@) ne 'PublicInbox::SIGPIPE';
-		# need to close explicitly to avoid warnings after SIGPIPE
-		_close_recvd($self);
+		delete @$self{0..($nfd-1)};
 	}
 }
 
@@ -400,9 +382,16 @@ sub wq_close {
 	}
 }
 
+sub wq_kill {
+	my ($self, $sig) = @_;
+	my $workers = $self->{-wq_workers} or return;
+	kill($sig // 'TERM', keys %$workers);
+}
+
 sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
 
 sub DESTROY {
+	wq_kill($_[0]);
 	wq_close($_[0]);
 	ipc_worker_stop($_[0]);
 }
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 7313738e..2889fa76 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -11,13 +11,13 @@ use v5.10.1;
 use parent qw(PublicInbox::DS PublicInbox::LeiExternal
 	PublicInbox::LeiQuery);
 use Getopt::Long ();
-use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
-use Errno qw(EAGAIN ECONNREFUSED ENOENT);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Errno qw(EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
 use POSIX ();
 use IO::Handle ();
 use Sys::Syslog qw(syslog openlog);
 use PublicInbox::Config;
-use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
+use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET);
 use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now dwaitpid);
 use PublicInbox::Spawn qw(spawn run_die popen_rd);
@@ -238,16 +238,15 @@ my %CONFIG_KEYS = (
 	'leistore.dir' => 'top-level storage location',
 );
 
-sub x_it ($$) { # pronounced "exit"
+# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
+sub x_it ($$) {
 	my ($self, $code) = @_;
-	$self->{1}->autoflush(1); # make sure client sees stdout before exit
-	my $sig = ($code & 127);
-	$code >>= 8 unless $sig;
+	# make sure client sees stdout before exit
+	$self->{1}->autoflush(1) if $self->{1};
 	if (my $sock = $self->{sock}) {
-		my $fds = [ map { fileno($_) } @$self{0..2} ];
-		$send_cmd->($sock, $fds, "exit=$code\n", 0);
-	} else { # for oneshot
-		$quit->($code);
+		send($sock, "x_it $code", MSG_EOR);
+	} elsif (!($code & 127)) { # oneshot, ignore signals
+		$quit->($code >> 8);
 	}
 }
 
@@ -274,22 +273,20 @@ sub atfork_prepare_wq {
 				grep { defined } @$self{qw(0 1 2 sock)}
 }
 
-# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
+# usage: my %sig = $lei->atfork_child_wq($wq);
+#	 local @SIG{keys %sig} = values %sig;
 sub atfork_child_wq {
 	my ($self, $wq) = @_;
-	return () if $self->{0}; # did not fork
-	$self->{$_} = $wq->{$_} for (0..2);
-	$self->{sock} = $wq->{3} // die 'BUG: no {sock}'; # may be undef
-	my $oldpipe = $SIG{PIPE};
+	@$self{qw(0 1 2 sock)} = delete(@$wq{0..3});
 	%PATH2CFG = ();
 	@TO_CLOSE_ATFORK_CHILD = ();
-	(
-		__WARN__ => sub { err($self, @_) },
-		PIPE => sub {
-			$self->x_it(141);
-			$oldpipe->() if ref($oldpipe) eq 'CODE';
-		}
-	);
+	(__WARN__ => sub { err($self, @_) },
+	PIPE => sub {
+		$self->x_it(13); # SIGPIPE = 13
+		# we need to close explicitly to avoid Perl warning on SIGPIPE
+		close($_) for (delete @$self{1..2});
+		die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
+	});
 }
 
 # usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
@@ -300,9 +297,9 @@ sub atfork_parent_wq {
 		my $ret = bless { %$self }, ref($self);
 		$self->{env} = $env;
 		delete @$ret{qw(-lei_store cfg pgr)};
-		($ret, delete @$ret{qw(0 1 2 sock)});
+		($ret, delete @$ret{0..2}, delete($ret->{sock}) // ());
 	} else {
-		($self, @$self{qw(0 1 2 sock)});
+		($self, @$self{0..2}, $self->{sock} // ());
 	}
 }
 
@@ -647,7 +644,7 @@ sub start_pager {
 		my $buf = "exec 1\0".$pager;
 		while (my ($k, $v) = each %new_env) { $buf .= "\0$k=$v" };
 		my $fds = [ map { fileno($_) } @$rdr{0..2} ];
-		$send_cmd->($sock, $fds, $buf .= "\n", 0);
+		$send_cmd->($sock, $fds, $buf, MSG_EOR);
 	} else {
 		$pgr->[0] = spawn([$pager], $env, $rdr);
 	}
@@ -660,50 +657,39 @@ sub start_pager {
 sub stop_pager {
 	my ($self) = @_;
 	my $pgr = delete($self->{pgr}) or return;
-	my $pid = $pgr->[0];
-	close $self->{1};
-	# {2} may not be redirected
-	$self->{1} = $pgr->[1];
 	$self->{2} = $pgr->[2];
+	# do not restore original stdout, just close it so we error out
+	close(delete($self->{1})) if $self->{1};
+	my $pid = $pgr->[0];
 	dwaitpid($pid, undef, $self->{sock}) if $pid;
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
 	my ($sock) = @_; # ignore other
-	$sock->blocking(1);
 	$sock->autoflush(1);
 	my $self = bless { sock => $sock }, __PACKAGE__;
-	vec(my $rin = '', fileno($sock), 1) = 1;
-	# `say $sock' triggers "die" in lei(1)
-	my $buf;
-	if (select(my $rout = $rin, undef, undef, 1)) {
-		my @fds = $recv_cmd->($sock, $buf, 4096 * 33); # >MAX_ARG_STRLEN
-		if (scalar(@fds) == 3) {
-			my $i = 0;
-			for my $rdr (qw(<&= >&= >&=)) {
-				my $fd = shift(@fds);
-				if (open(my $fh, $rdr, $fd)) {
-					$self->{$i++} = $fh;
-				}  else {
-					say $sock "open($rdr$fd) (FD=$i): $!";
-					return;
-				}
+	vec(my $rvec, fileno($sock), 1) = 1;
+	select($rvec, undef, undef, 1) or
+		return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
+	my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN
+	if (scalar(@fds) == 3) {
+		my $i = 0;
+		for my $rdr (qw(<&= >&= >&=)) {
+			my $fd = shift(@fds);
+			if (open(my $fh, $rdr, $fd)) {
+				$self->{$i++} = $fh;
+				next;
 			}
-		} else {
-			say $sock "recv_cmd failed: $!";
-			return;
+			return send($sock, "open($rdr$fd) (FD=$i): $!", MSG_EOR);
 		}
 	} else {
-		say $sock "timed out waiting to recv FDs";
-		return;
+		return send($sock, "recv_cmd failed: $!", MSG_EOR);
 	}
 	$self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
 	# $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
 	# $buf = "$$\0$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
-	if (substr($buf, -2, 2, '') ne "\0\0") { # s/\0\0\z//
-		say $sock "request command truncated";
-		return;
-	}
+	substr($buf, -2, 2, '') eq "\0\0" or  # s/\0\0\z//
+		return send($sock, 'request command truncated', MSG_EOR);
 	my ($argc, @argv) = split(/\0/, $buf, -1);
 	undef $buf;
 	my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
@@ -711,23 +697,50 @@ sub accept_dispatch { # Listener {post_accept} callback
 		local %ENV = %env;
 		$self->{env} = \%env;
 		eval { dispatch($self, @argv) };
-		say $sock $@ if $@;
+		send($sock, $@, MSG_EOR) if $@;
 	} else {
-		say $sock "chdir($env{PWD}): $!"; # implicit close
+		send($sock, "chdir($env{PWD}): $!", MSG_EOR); # implicit close
 	}
 }
 
+sub dclose {
+	my ($self) = @_;
+	delete $self->{lxs}; # stops LeiXSearch queries
+	$self->close; # PublicInbox::DS::close
+}
+
 # for long-running results
 sub event_step {
 	my ($self) = @_;
 	local %ENV = %{$self->{env}};
-	eval {}; # TODO
-	if ($@) {
-		say { $self->{sock} } $@;
-		$self->close; # PublicInbox::DS::close
+	my $sock = $self->{sock};
+	eval {
+		while (my @fds = $recv_cmd->($sock, my $buf, 4096)) {
+			if (scalar(@fds) == 1 && !defined($fds[0])) {
+				return if $! == EAGAIN;
+				next if $! == EINTR;
+				last if $! == ECONNRESET;
+				die "recvmsg: $!";
+			}
+			for my $fd (@fds) {
+				open my $rfh, '+<&=', $fd;
+			}
+			die "unrecognized client signal: $buf";
+		}
+		dclose($self);
+	};
+	if (my $err = $@) {
+		eval { $self->fail($err) };
+		dclose($self);
 	}
 }
 
+sub event_step_init {
+	my ($self) = @_;
+	$self->{sock}->blocking(0);
+	$self->SUPER::new($self->{sock}, EPOLLIN|EPOLLET);
+}
+
 sub noop {}
 
 our $oldset; sub oldset { $oldset }
@@ -742,7 +755,7 @@ sub lazy_start {
 		die "connect($path): $!";
 	}
 	umask(077) // die("umask(077): $!");
-	socket(my $l, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
+	socket(my $l, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
 	bind($l, pack_sockaddr_un($path)) or die "bind($path): $!";
 	listen($l, 1024) or die "listen: $!";
 	my @st = stat($path) or die "stat($path): $!";
@@ -793,7 +806,7 @@ sub lazy_start {
 		USR2 => \&noop,
 	};
 	my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
-	local %SIG = (%SIG, %$sig) if !$sigfd;
+	local @SIG{keys %$sig} = values(%$sig) unless $sigfd;
 	local $SIG{PIPE} = 'IGNORE';
 	if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
 		push @TO_CLOSE_ATFORK_CHILD, $sigfd->{sock};
@@ -853,24 +866,19 @@ sub oneshot {
 	local $quit = $exit if $exit;
 	local %PATH2CFG;
 	umask(077) // die("umask(077): $!");
-	local $SIG{PIPE} = sub { die(bless(\"$_[0]", 'PublicInbox::SIGPIPE')) };
-	eval {
-		my $self = bless {
-			0 => *STDIN{GLOB},
-			1 => *STDOUT{GLOB},
-			2 => *STDERR{GLOB},
-			env => \%ENV
-		}, __PACKAGE__;
-		dispatch($self, @ARGV);
-	};
-	die $@ if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+	dispatch((bless {
+		0 => *STDIN{GLOB},
+		1 => *STDOUT{GLOB},
+		2 => *STDERR{GLOB},
+		env => \%ENV
+	}, __PACKAGE__), @ARGV);
 }
 
 # ensures stdout hits the FS before sock disconnects so a client
 # can immediately reread it
 sub DESTROY {
 	my ($self) = @_;
-	$self->{1}->autoflush(1);
+	$self->{1}->autoflush(1) if $self->{1};
 	stop_pager($self);
 }
 
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 8a1f4f82..194c5e28 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -108,8 +108,9 @@ sub _unbless_smsg {
 
 sub ovv_atexit_child {
 	my ($self, $lei) = @_;
-	my $bref = delete $lei->{ovv_buf} or return;
-	print { $lei->{1} } $$bref;
+	if (my $bref = delete $lei->{ovv_buf}) {
+		print { $lei->{1} } $$bref;
+	}
 }
 
 # JSON module ->pretty output wastes too much vertical white space,
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 7ca01454..1a3e1193 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -40,14 +40,13 @@ sub lei_q {
 	if ($opt->{external} // 1) {
 		$self->_externals_each(\&_vivify_external, \@srcs);
 	}
-	my $j = $opt->{jobs} // scalar(@srcs) > 3 ? 3 : scalar(@srcs);
+	my $j = $opt->{jobs} // (scalar(@srcs) > 3 ? 3 : scalar(@srcs));
 	$j = 1 if !$opt->{thread};
 	$j++ if $opt->{'local'}; # for sto->search below
-	if ($self->{sock}) {
-		$self->atfork_prepare_wq($lxs);
-		$lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
-			// $lxs->wq_workers($j);
-	}
+	$self->atfork_prepare_wq($lxs);
+	$lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
+		// $lxs->wq_workers($j);
+
 	unshift(@srcs, $sto->search) if $opt->{'local'};
 	# no forking workers after this
 	require PublicInbox::LeiOverview;
@@ -77,16 +76,7 @@ sub lei_q {
 	# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
 	$self->{mset_opt} = \%mset_opt;
 	$self->{ovv}->ovv_begin($self);
-	pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
-	require PublicInbox::EOFpipe;
-	my $eof = PublicInbox::EOFpipe->new($eof_wait, \&query_done, $self);
-	$lxs->do_query($self, $qry_done, \@srcs);
-	$eof->event_step unless $self->{sock};
-}
-
-sub query_done { # PublicInbox::EOFpipe callback
-	my ($self) = @_;
-	$self->{ovv}->ovv_end($self);
+	$lxs->do_query($self, \@srcs);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index c030b2b2..d06b6f1d 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -92,7 +92,9 @@ sub _mset_more ($$) {
 
 sub query_thread_mset { # for --thread
 	my ($self, $lei, $ibxish) = @_;
-	local %SIG = (%SIG, $lei->atfork_child_wq($self));
+	my %sig = $lei->atfork_child_wq($self);
+	local @SIG{keys %sig} = values %sig;
+
 	my ($srch, $over) = ($ibxish->search, $ibxish->over);
 	unless ($srch && $over) {
 		my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
@@ -125,9 +127,10 @@ sub query_thread_mset { # for --thread
 
 sub query_mset { # non-parallel for non-"--thread" users
 	my ($self, $lei, $srcs) = @_;
+	my %sig = $lei->atfork_child_wq($self);
+	local @SIG{keys %sig} = values %sig;
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
-	local %SIG = (%SIG, $lei->atfork_child_wq($self));
 	$self->attach_external($_) for @$srcs;
 	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
 	do {
@@ -143,9 +146,17 @@ sub query_mset { # non-parallel for non-"--thread" users
 	$lei->{ovv}->ovv_atexit_child($lei);
 }
 
+sub query_done { # PublicInbox::EOFpipe callback
+	my ($lei) = @_;
+	$lei->{ovv}->ovv_end($lei);
+	$lei->dclose;
+}
+
 sub do_query {
-	my ($self, $lei_orig, $qry_done, $srcs) = @_;
+	my ($self, $lei_orig, $srcs) = @_;
 	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+
+	pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
 	$io[0] = $qry_done; # don't need stdin
 	$io[1]->autoflush(1);
 	$io[2]->autoflush(1);
@@ -160,9 +171,20 @@ sub do_query {
 	for my $rmt (@{$self->{remotes} // []}) {
 		$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
 	}
-
-	# sent off to children, they will drop remaining references to it
-	close $qry_done;
+	@io = ();
+	close $qry_done; # fully closed when children are done
+
+	# query_done will run when query_*mset close $qry_done
+	if ($lei_orig->{sock}) { # watch for client premature exit
+		require PublicInbox::EOFpipe;
+		PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
+		$lei_orig->{lxs} = $self;
+		$lei_orig->event_step_init;
+	} else {
+		$self->wq_close;
+		read($eof_wait, my $buf, 1); # wait for close($lei->{0})
+		query_done($lei_orig); # may SIGPIPE
+	}
 }
 
 sub ipc_atfork_child {
diff --git a/script/lei b/script/lei
index 5c32ab88..9610a876 100755
--- a/script/lei
+++ b/script/lei
@@ -3,32 +3,47 @@
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 use strict;
 use v5.10.1;
-use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Errno qw(EINTR ECONNRESET);
 use PublicInbox::CmdIPC4;
 my $narg = 4;
+my ($sock, $pwd);
 my $recv_cmd = PublicInbox::CmdIPC4->can('recv_cmd4');
 my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do {
 	require PublicInbox::Spawn; # takes ~50ms even if built *sigh*
-	$narg = 4;
 	$recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
 	PublicInbox::Spawn->can('send_cmd4');
 };
 
+sub sigchld {
+	my ($sig) = @_;
+	my $flags = $sig ? POSIX::WNOHANG() : 0;
+	while (waitpid(-1, $flags) > 0) {}
+}
+
 sub exec_cmd {
 	my ($fds, $argc, @argv) = @_;
-	my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
-	my @m = (*STDIN{IO}, '<&=',  *STDOUT{IO}, '>&=',
-		*STDERR{IO}, '>&=');
+	my @m = (*STDIN{IO}, '<&=',  *STDOUT{IO}, '>&=', *STDERR{IO}, '>&=');
+	my @rdr;
 	for my $fd (@$fds) {
 		my ($old_io, $mode) = splice(@m, 0, 2);
-		open($old_io, $mode, $fd) or die "open $mode$fd: $!";
+		open(my $tmpfh, $mode, $fd) or die "open $mode$fd: $!";
+		push @rdr, $old_io, $mode, $tmpfh;
+	}
+	require POSIX; # WNOHANG
+	$SIG{CHLD} = \&sigchld;
+	my $pid = fork // die "fork: $!";
+	if ($pid == 0) {
+		my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
+		while (my ($old_io, $mode, $tmpfh) = splice(@rdr, 0, 3)) {
+			open $old_io, $mode, $tmpfh or die "open $mode: $!";
+		}
+		%ENV = (%ENV, %env);
+		exec(@argv);
+		die "exec: @argv: $!";
 	}
-	%ENV = (%ENV, %env);
-	exec(@argv);
-	die "exec: @argv: $!";
 }
 
-my ($sock, $pwd);
 if ($send_cmd && eval {
 	my $path = do {
 		my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei';
@@ -40,10 +55,10 @@ if ($send_cmd && eval {
 			require File::Path;
 			File::Path::mkpath($runtime_dir, 0, 0700);
 		}
-		"$runtime_dir/$narg.sock";
+		"$runtime_dir/$narg.seq.sock";
 	};
 	my $addr = pack_sockaddr_un($path);
-	socket($sock, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
+	socket($sock, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
 	unless (connect($sock, $addr)) { # start the daemon if not started
 		local $ENV{PERL5LIB} = join(':', @INC);
 		open(my $daemon, '-|', $^X, qw[-MPublicInbox::LEI
@@ -73,22 +88,41 @@ Falling back to (slow) one-shot mode
 	}
 	1;
 }) { # (Socket::MsgHdr|Inline::C), $sock, $pwd are all available:
-	local $ENV{PWD} = $pwd;
+	$ENV{PWD} = $pwd;
 	my $buf = join("\0", scalar(@ARGV), @ARGV);
 	while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
 	$buf .= "\0\0";
-	select $sock;
-	$| = 1; # unbuffer selected $sock
-	$send_cmd->($sock, [ 0, 1, 2 ], $buf, 0);
-	while (my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33)) {
-		if ($buf =~ /\Aexit=([0-9]+)\n\z/) {
-			exit($1);
-		} elsif ($buf =~ /\Aexec (.+)\n\z/) {
+	$send_cmd->($sock, [ 0, 1, 2 ], $buf, MSG_EOR);
+	$SIG{TERM} = $SIG{INT} = $SIG{QUIT} = sub {
+		my ($sig) = @_; # 'TERM', not an integer :<
+		$SIG{$sig} = 'DEFAULT';
+		kill($sig, $$); # exit($signo + 128)
+	};
+	my $x_it_code = 0;
+	while (1) {
+		my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33);
+		if (scalar(@fds) == 1 && !defined($fds[0])) {
+			last if $! == ECONNRESET;
+			next if $! == EINTR;
+			die "recvmsg: $!";
+		}
+		last if $buf eq '';
+		if ($buf =~ /\Ax_it ([0-9]+)\z/) {
+			$x_it_code = $1 + 0;
+			last;
+		} elsif ($buf =~ /\Aexec (.+)\z/) {
 			exec_cmd(\@fds, split(/\0/, $1));
 		} else {
+			sigchld();
 			die $buf;
 		}
 	}
+	sigchld();
+	if (my $sig = ($x_it_code & 127)) {
+		kill $sig, $$;
+		sleep;
+	}
+	exit($x_it_code >> 8);
 } else { # for systems lacking Socket::MsgHdr or Inline::C
 	warn $@ if $@;
 	require PublicInbox::LEI;
diff --git a/t/lei.t b/t/lei.t
index 6819f182..3ebaade6 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -215,7 +215,7 @@ SKIP: { # real socket
 	skip 'Socket::MsgHdr or Inline::C missing or unconfigured', $nr;
 
 	local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run";
-	my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.sock";
+	my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.seq.sock";
 
 	ok($lei->('daemon-pid'), 'daemon-pid');
 	is($err, '', 'no error from daemon-pid');
diff --git a/xt/lei-sigpipe.t b/xt/lei-sigpipe.t
new file mode 100644
index 00000000..4d35bbb3
--- /dev/null
+++ b/xt/lei-sigpipe.t
@@ -0,0 +1,32 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use POSIX qw(WTERMSIG WIFSIGNALED SIGPIPE);
+require_mods(qw(json DBD::SQLite Search::Xapian));
+# XXX this needs an already configured lei instance with many messages
+
+my $do_test = sub {
+	my $env = shift // {};
+	pipe(my ($r, $w)) or BAIL_OUT $!;
+	open my $err, '+>', undef or BAIL_OUT $!;
+	my $opt = { run_mode => 0, 1 => $w, 2 => $err };
+	my $tp = start_script([qw(lei q -t), 'bytes:1..'], $env, $opt);
+	close $w;
+	sysread($r, my $buf, 1);
+	close $r; # trigger SIGPIPE
+	$tp->join;
+	ok(WIFSIGNALED($?), 'signaled');
+	is(WTERMSIG($?), SIGPIPE, 'got SIGPIPE');
+	seek($err, 0, 0);
+	my @err = grep(!m{mkdir /dev/null\b}, <$err>);
+	is_deeply(\@err, [], 'no errors');
+};
+
+$do_test->();
+$do_test->({XDG_RUNTIME_DIR => '/dev/null'});
+
+done_testing;

  parent reply	other threads:[~2021-01-14  7:06 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-14  7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
2021-01-14  7:06 ` [PATCH 01/14] cmd_ipc: support + test EINTR + EAGAIN, no FDs Eric Wong
2021-01-14  7:06 ` Eric Wong [this message]
2021-01-14  7:06 ` [PATCH 03/14] daemon+watch: fix localization of %SIG for non-signalfd users Eric Wong
2021-01-14  7:06 ` [PATCH 04/14] lei: do not unlink socket path at exit Eric Wong
2021-01-14  7:06 ` [PATCH 05/14] lei: reduce live FD references in wq child Eric Wong
2021-01-14  7:06 ` [PATCH 06/14] lei: rely on localized $current_lei for warnings Eric Wong
2021-01-14  7:06 ` [PATCH 07/14] lei_dedupe+shared_kv: ensure round-tripping serialization Eric Wong
2021-01-14  7:06 ` [PATCH 08/14] lei q: reinstate smsg dedupe Eric Wong
2021-01-14  7:06 ` [PATCH 09/14] search: rename "ts:" prefix to "rt:" Eric Wong
2021-01-14  7:06 ` [PATCH 10/14] lei_overview: rename "references" to "refs" Eric Wong
2021-01-14  7:06 ` [PATCH 11/14] lei: q: lock stdout on overview output Eric Wong
2021-01-15  0:18   ` Eric Wong
2021-01-14  7:06 ` [PATCH 12/14] leixsearch: remove some commented out code Eric Wong
2021-01-14  7:06 ` [PATCH 13/14] lei: remove temporary var on open Eric Wong
2021-01-14  7:06 ` [PATCH 14/14] lei: pass FD to CWD via cmsg, use fchdir on server Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: http://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210114070627.18195-3-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).