* [PATCH 02/14] lei: test SIGPIPE, stop xsearch workers on client abort
2021-01-14 7:06 7% [PATCH 00/14] lei: another pile of changes Eric Wong
@ 2021-01-14 7:06 3% ` Eric Wong
0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
The new test ensures consistency between oneshot and
client/daemon users. Cancelling an in-progress result now also
stops xsearch workers to avoid wasted CPU and I/O.
Note the lei->atfork_child_wq usage changes, it is to workaround
a bug in Perl 5: http://nntp.perl.org/group/perl.perl5.porters/258784
<CAHhgV8hPbcmkzWizp6Vijw921M5BOXixj4+zTh3nRS9vRBYk8w@mail.gmail.com>
This switches the internal protocol to use SOCK_SEQPACKET
AF_UNIX sockets to prevent merging messages from the daemon to
client to run pager and kill/exit the client script.
---
MANIFEST | 1 +
lib/PublicInbox/IPC.pm | 45 ++++------
lib/PublicInbox/LEI.pm | 158 +++++++++++++++++----------------
lib/PublicInbox/LeiOverview.pm | 5 +-
lib/PublicInbox/LeiQuery.pm | 22 ++---
lib/PublicInbox/LeiXSearch.pm | 34 +++++--
script/lei | 74 ++++++++++-----
t/lei.t | 2 +-
xt/lei-sigpipe.t | 32 +++++++
9 files changed, 225 insertions(+), 148 deletions(-)
create mode 100644 xt/lei-sigpipe.t
diff --git a/MANIFEST b/MANIFEST
index 810aec42..2ca240fc 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -429,6 +429,7 @@ xt/git_async_cmp.t
xt/httpd-async-stream.t
xt/imapd-mbsync-oimap.t
xt/imapd-validate.t
+xt/lei-sigpipe.t
xt/mem-imapd-tls.t
xt/mem-msgview.t
xt/msgtime_cmp.t
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index c54fcc64..fbc91f6f 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -130,7 +130,8 @@ sub ipc_worker_spawn {
sub ipc_worker_reap { # dwaitpid callback
my ($self, $pid) = @_;
- warn "PID:$pid died with \$?=$?\n" if $?;
+ # SIGTERM (15) is our default exit signal
+ warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15;
}
# for base class, override in sub classes
@@ -236,50 +237,31 @@ sub ipc_sibling_atfork_child {
$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
}
-sub _close_recvd ($) {
- my ($self) = @_;
- my $x = $self->{-wq_recv_modes};
- my $end = $x ? $#$x : 2;
- close($_) for (grep { defined } (delete @$self{0..$end}));
-}
-
sub wq_worker_loop ($) {
my ($self) = @_;
- my $buf;
my $len = $self->{wq_req_len} // (4096 * 33);
- my ($sub, $args);
my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
- local $SIG{PIPE} = sub {
- my $cur_sub = $sub;
- _close_recvd($self);
- die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub;
- };
while (1) {
- my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
- my $i = 0;
+ my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF
my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
+ my $nfd = 0;
for my $fd (@fds) {
my $mode = shift(@m);
if (open(my $cmdfh, $mode, $fd)) {
- $self->{$i++} = $cmdfh;
+ $self->{$nfd++} = $cmdfh;
$cmdfh->autoflush(1);
} else {
- die "$$ open($mode$fd) (FD:$i): $!";
+ die "$$ open($mode$fd) (FD:$nfd): $!";
}
}
# Sereal dies on truncated data, Storable returns undef
- $args = thaw($buf) //
+ my $args = thaw($buf) //
die "thaw error on buffer of size:".length($buf);
- eval {
- $sub = shift @$args;
- eval { $self->$sub(@$args) };
- undef $sub; # quiet SIG{PIPE} handler
- die $@ if $@;
- };
+ my $sub = shift @$args;
+ eval { $self->$sub(@$args) };
warn "$$ wq_worker: $@" if $@ &&
ref($@) ne 'PublicInbox::SIGPIPE';
- # need to close explicitly to avoid warnings after SIGPIPE
- _close_recvd($self);
+ delete @$self{0..($nfd-1)};
}
}
@@ -400,9 +382,16 @@ sub wq_close {
}
}
+sub wq_kill {
+ my ($self, $sig) = @_;
+ my $workers = $self->{-wq_workers} or return;
+ kill($sig // 'TERM', keys %$workers);
+}
+
sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
sub DESTROY {
+ wq_kill($_[0]);
wq_close($_[0]);
ipc_worker_stop($_[0]);
}
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 7313738e..2889fa76 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -11,13 +11,13 @@ use v5.10.1;
use parent qw(PublicInbox::DS PublicInbox::LeiExternal
PublicInbox::LeiQuery);
use Getopt::Long ();
-use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
-use Errno qw(EAGAIN ECONNREFUSED ENOENT);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Errno qw(EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
use POSIX ();
use IO::Handle ();
use Sys::Syslog qw(syslog openlog);
use PublicInbox::Config;
-use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
+use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET);
use PublicInbox::Sigfd;
use PublicInbox::DS qw(now dwaitpid);
use PublicInbox::Spawn qw(spawn run_die popen_rd);
@@ -238,16 +238,15 @@ my %CONFIG_KEYS = (
'leistore.dir' => 'top-level storage location',
);
-sub x_it ($$) { # pronounced "exit"
+# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
+sub x_it ($$) {
my ($self, $code) = @_;
- $self->{1}->autoflush(1); # make sure client sees stdout before exit
- my $sig = ($code & 127);
- $code >>= 8 unless $sig;
+ # make sure client sees stdout before exit
+ $self->{1}->autoflush(1) if $self->{1};
if (my $sock = $self->{sock}) {
- my $fds = [ map { fileno($_) } @$self{0..2} ];
- $send_cmd->($sock, $fds, "exit=$code\n", 0);
- } else { # for oneshot
- $quit->($code);
+ send($sock, "x_it $code", MSG_EOR);
+ } elsif (!($code & 127)) { # oneshot, ignore signals
+ $quit->($code >> 8);
}
}
@@ -274,22 +273,20 @@ sub atfork_prepare_wq {
grep { defined } @$self{qw(0 1 2 sock)}
}
-# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
+# usage: my %sig = $lei->atfork_child_wq($wq);
+# local @SIG{keys %sig} = values %sig;
sub atfork_child_wq {
my ($self, $wq) = @_;
- return () if $self->{0}; # did not fork
- $self->{$_} = $wq->{$_} for (0..2);
- $self->{sock} = $wq->{3} // die 'BUG: no {sock}'; # may be undef
- my $oldpipe = $SIG{PIPE};
+ @$self{qw(0 1 2 sock)} = delete(@$wq{0..3});
%PATH2CFG = ();
@TO_CLOSE_ATFORK_CHILD = ();
- (
- __WARN__ => sub { err($self, @_) },
- PIPE => sub {
- $self->x_it(141);
- $oldpipe->() if ref($oldpipe) eq 'CODE';
- }
- );
+ (__WARN__ => sub { err($self, @_) },
+ PIPE => sub {
+ $self->x_it(13); # SIGPIPE = 13
+ # we need to close explicitly to avoid Perl warning on SIGPIPE
+ close($_) for (delete @$self{1..2});
+ die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
+ });
}
# usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
@@ -300,9 +297,9 @@ sub atfork_parent_wq {
my $ret = bless { %$self }, ref($self);
$self->{env} = $env;
delete @$ret{qw(-lei_store cfg pgr)};
- ($ret, delete @$ret{qw(0 1 2 sock)});
+ ($ret, delete @$ret{0..2}, delete($ret->{sock}) // ());
} else {
- ($self, @$self{qw(0 1 2 sock)});
+ ($self, @$self{0..2}, $self->{sock} // ());
}
}
@@ -647,7 +644,7 @@ sub start_pager {
my $buf = "exec 1\0".$pager;
while (my ($k, $v) = each %new_env) { $buf .= "\0$k=$v" };
my $fds = [ map { fileno($_) } @$rdr{0..2} ];
- $send_cmd->($sock, $fds, $buf .= "\n", 0);
+ $send_cmd->($sock, $fds, $buf, MSG_EOR);
} else {
$pgr->[0] = spawn([$pager], $env, $rdr);
}
@@ -660,50 +657,39 @@ sub start_pager {
sub stop_pager {
my ($self) = @_;
my $pgr = delete($self->{pgr}) or return;
- my $pid = $pgr->[0];
- close $self->{1};
- # {2} may not be redirected
- $self->{1} = $pgr->[1];
$self->{2} = $pgr->[2];
+ # do not restore original stdout, just close it so we error out
+ close(delete($self->{1})) if $self->{1};
+ my $pid = $pgr->[0];
dwaitpid($pid, undef, $self->{sock}) if $pid;
}
sub accept_dispatch { # Listener {post_accept} callback
my ($sock) = @_; # ignore other
- $sock->blocking(1);
$sock->autoflush(1);
my $self = bless { sock => $sock }, __PACKAGE__;
- vec(my $rin = '', fileno($sock), 1) = 1;
- # `say $sock' triggers "die" in lei(1)
- my $buf;
- if (select(my $rout = $rin, undef, undef, 1)) {
- my @fds = $recv_cmd->($sock, $buf, 4096 * 33); # >MAX_ARG_STRLEN
- if (scalar(@fds) == 3) {
- my $i = 0;
- for my $rdr (qw(<&= >&= >&=)) {
- my $fd = shift(@fds);
- if (open(my $fh, $rdr, $fd)) {
- $self->{$i++} = $fh;
- } else {
- say $sock "open($rdr$fd) (FD=$i): $!";
- return;
- }
+ vec(my $rvec, fileno($sock), 1) = 1;
+ select($rvec, undef, undef, 1) or
+ return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
+ my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN
+ if (scalar(@fds) == 3) {
+ my $i = 0;
+ for my $rdr (qw(<&= >&= >&=)) {
+ my $fd = shift(@fds);
+ if (open(my $fh, $rdr, $fd)) {
+ $self->{$i++} = $fh;
+ next;
}
- } else {
- say $sock "recv_cmd failed: $!";
- return;
+ return send($sock, "open($rdr$fd) (FD=$i): $!", MSG_EOR);
}
} else {
- say $sock "timed out waiting to recv FDs";
- return;
+ return send($sock, "recv_cmd failed: $!", MSG_EOR);
}
$self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
# $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
# $buf = "$$\0$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
- if (substr($buf, -2, 2, '') ne "\0\0") { # s/\0\0\z//
- say $sock "request command truncated";
- return;
- }
+ substr($buf, -2, 2, '') eq "\0\0" or # s/\0\0\z//
+ return send($sock, 'request command truncated', MSG_EOR);
my ($argc, @argv) = split(/\0/, $buf, -1);
undef $buf;
my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
@@ -711,23 +697,50 @@ sub accept_dispatch { # Listener {post_accept} callback
local %ENV = %env;
$self->{env} = \%env;
eval { dispatch($self, @argv) };
- say $sock $@ if $@;
+ send($sock, $@, MSG_EOR) if $@;
} else {
- say $sock "chdir($env{PWD}): $!"; # implicit close
+ send($sock, "chdir($env{PWD}): $!", MSG_EOR); # implicit close
}
}
+sub dclose {
+ my ($self) = @_;
+ delete $self->{lxs}; # stops LeiXSearch queries
+ $self->close; # PublicInbox::DS::close
+}
+
# for long-running results
sub event_step {
my ($self) = @_;
local %ENV = %{$self->{env}};
- eval {}; # TODO
- if ($@) {
- say { $self->{sock} } $@;
- $self->close; # PublicInbox::DS::close
+ my $sock = $self->{sock};
+ eval {
+ while (my @fds = $recv_cmd->($sock, my $buf, 4096)) {
+ if (scalar(@fds) == 1 && !defined($fds[0])) {
+ return if $! == EAGAIN;
+ next if $! == EINTR;
+ last if $! == ECONNRESET;
+ die "recvmsg: $!";
+ }
+ for my $fd (@fds) {
+ open my $rfh, '+<&=', $fd;
+ }
+ die "unrecognized client signal: $buf";
+ }
+ dclose($self);
+ };
+ if (my $err = $@) {
+ eval { $self->fail($err) };
+ dclose($self);
}
}
+sub event_step_init {
+ my ($self) = @_;
+ $self->{sock}->blocking(0);
+ $self->SUPER::new($self->{sock}, EPOLLIN|EPOLLET);
+}
+
sub noop {}
our $oldset; sub oldset { $oldset }
@@ -742,7 +755,7 @@ sub lazy_start {
die "connect($path): $!";
}
umask(077) // die("umask(077): $!");
- socket(my $l, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
+ socket(my $l, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
bind($l, pack_sockaddr_un($path)) or die "bind($path): $!";
listen($l, 1024) or die "listen: $!";
my @st = stat($path) or die "stat($path): $!";
@@ -793,7 +806,7 @@ sub lazy_start {
USR2 => \&noop,
};
my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
- local %SIG = (%SIG, %$sig) if !$sigfd;
+ local @SIG{keys %$sig} = values(%$sig) unless $sigfd;
local $SIG{PIPE} = 'IGNORE';
if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
push @TO_CLOSE_ATFORK_CHILD, $sigfd->{sock};
@@ -853,24 +866,19 @@ sub oneshot {
local $quit = $exit if $exit;
local %PATH2CFG;
umask(077) // die("umask(077): $!");
- local $SIG{PIPE} = sub { die(bless(\"$_[0]", 'PublicInbox::SIGPIPE')) };
- eval {
- my $self = bless {
- 0 => *STDIN{GLOB},
- 1 => *STDOUT{GLOB},
- 2 => *STDERR{GLOB},
- env => \%ENV
- }, __PACKAGE__;
- dispatch($self, @ARGV);
- };
- die $@ if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+ dispatch((bless {
+ 0 => *STDIN{GLOB},
+ 1 => *STDOUT{GLOB},
+ 2 => *STDERR{GLOB},
+ env => \%ENV
+ }, __PACKAGE__), @ARGV);
}
# ensures stdout hits the FS before sock disconnects so a client
# can immediately reread it
sub DESTROY {
my ($self) = @_;
- $self->{1}->autoflush(1);
+ $self->{1}->autoflush(1) if $self->{1};
stop_pager($self);
}
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 8a1f4f82..194c5e28 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -108,8 +108,9 @@ sub _unbless_smsg {
sub ovv_atexit_child {
my ($self, $lei) = @_;
- my $bref = delete $lei->{ovv_buf} or return;
- print { $lei->{1} } $$bref;
+ if (my $bref = delete $lei->{ovv_buf}) {
+ print { $lei->{1} } $$bref;
+ }
}
# JSON module ->pretty output wastes too much vertical white space,
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 7ca01454..1a3e1193 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -40,14 +40,13 @@ sub lei_q {
if ($opt->{external} // 1) {
$self->_externals_each(\&_vivify_external, \@srcs);
}
- my $j = $opt->{jobs} // scalar(@srcs) > 3 ? 3 : scalar(@srcs);
+ my $j = $opt->{jobs} // (scalar(@srcs) > 3 ? 3 : scalar(@srcs));
$j = 1 if !$opt->{thread};
$j++ if $opt->{'local'}; # for sto->search below
- if ($self->{sock}) {
- $self->atfork_prepare_wq($lxs);
- $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
- // $lxs->wq_workers($j);
- }
+ $self->atfork_prepare_wq($lxs);
+ $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
+ // $lxs->wq_workers($j);
+
unshift(@srcs, $sto->search) if $opt->{'local'};
# no forking workers after this
require PublicInbox::LeiOverview;
@@ -77,16 +76,7 @@ sub lei_q {
# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
$self->{mset_opt} = \%mset_opt;
$self->{ovv}->ovv_begin($self);
- pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
- require PublicInbox::EOFpipe;
- my $eof = PublicInbox::EOFpipe->new($eof_wait, \&query_done, $self);
- $lxs->do_query($self, $qry_done, \@srcs);
- $eof->event_step unless $self->{sock};
-}
-
-sub query_done { # PublicInbox::EOFpipe callback
- my ($self) = @_;
- $self->{ovv}->ovv_end($self);
+ $lxs->do_query($self, \@srcs);
}
1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index c030b2b2..d06b6f1d 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -92,7 +92,9 @@ sub _mset_more ($$) {
sub query_thread_mset { # for --thread
my ($self, $lei, $ibxish) = @_;
- local %SIG = (%SIG, $lei->atfork_child_wq($self));
+ my %sig = $lei->atfork_child_wq($self);
+ local @SIG{keys %sig} = values %sig;
+
my ($srch, $over) = ($ibxish->search, $ibxish->over);
unless ($srch && $over) {
my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
@@ -125,9 +127,10 @@ sub query_thread_mset { # for --thread
sub query_mset { # non-parallel for non-"--thread" users
my ($self, $lei, $srcs) = @_;
+ my %sig = $lei->atfork_child_wq($self);
+ local @SIG{keys %sig} = values %sig;
my $mo = { %{$lei->{mset_opt}} };
my $mset;
- local %SIG = (%SIG, $lei->atfork_child_wq($self));
$self->attach_external($_) for @$srcs;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
do {
@@ -143,9 +146,17 @@ sub query_mset { # non-parallel for non-"--thread" users
$lei->{ovv}->ovv_atexit_child($lei);
}
+sub query_done { # PublicInbox::EOFpipe callback
+ my ($lei) = @_;
+ $lei->{ovv}->ovv_end($lei);
+ $lei->dclose;
+}
+
sub do_query {
- my ($self, $lei_orig, $qry_done, $srcs) = @_;
+ my ($self, $lei_orig, $srcs) = @_;
my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+
+ pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
$io[0] = $qry_done; # don't need stdin
$io[1]->autoflush(1);
$io[2]->autoflush(1);
@@ -160,9 +171,20 @@ sub do_query {
for my $rmt (@{$self->{remotes} // []}) {
$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
}
-
- # sent off to children, they will drop remaining references to it
- close $qry_done;
+ @io = ();
+ close $qry_done; # fully closed when children are done
+
+ # query_done will run when query_*mset close $qry_done
+ if ($lei_orig->{sock}) { # watch for client premature exit
+ require PublicInbox::EOFpipe;
+ PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
+ $lei_orig->{lxs} = $self;
+ $lei_orig->event_step_init;
+ } else {
+ $self->wq_close;
+ read($eof_wait, my $buf, 1); # wait for close($lei->{0})
+ query_done($lei_orig); # may SIGPIPE
+ }
}
sub ipc_atfork_child {
diff --git a/script/lei b/script/lei
index 5c32ab88..9610a876 100755
--- a/script/lei
+++ b/script/lei
@@ -3,32 +3,47 @@
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
use strict;
use v5.10.1;
-use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Errno qw(EINTR ECONNRESET);
use PublicInbox::CmdIPC4;
my $narg = 4;
+my ($sock, $pwd);
my $recv_cmd = PublicInbox::CmdIPC4->can('recv_cmd4');
my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do {
require PublicInbox::Spawn; # takes ~50ms even if built *sigh*
- $narg = 4;
$recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
PublicInbox::Spawn->can('send_cmd4');
};
+sub sigchld {
+ my ($sig) = @_;
+ my $flags = $sig ? POSIX::WNOHANG() : 0;
+ while (waitpid(-1, $flags) > 0) {}
+}
+
sub exec_cmd {
my ($fds, $argc, @argv) = @_;
- my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
- my @m = (*STDIN{IO}, '<&=', *STDOUT{IO}, '>&=',
- *STDERR{IO}, '>&=');
+ my @m = (*STDIN{IO}, '<&=', *STDOUT{IO}, '>&=', *STDERR{IO}, '>&=');
+ my @rdr;
for my $fd (@$fds) {
my ($old_io, $mode) = splice(@m, 0, 2);
- open($old_io, $mode, $fd) or die "open $mode$fd: $!";
+ open(my $tmpfh, $mode, $fd) or die "open $mode$fd: $!";
+ push @rdr, $old_io, $mode, $tmpfh;
+ }
+ require POSIX; # WNOHANG
+ $SIG{CHLD} = \&sigchld;
+ my $pid = fork // die "fork: $!";
+ if ($pid == 0) {
+ my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
+ while (my ($old_io, $mode, $tmpfh) = splice(@rdr, 0, 3)) {
+ open $old_io, $mode, $tmpfh or die "open $mode: $!";
+ }
+ %ENV = (%ENV, %env);
+ exec(@argv);
+ die "exec: @argv: $!";
}
- %ENV = (%ENV, %env);
- exec(@argv);
- die "exec: @argv: $!";
}
-my ($sock, $pwd);
if ($send_cmd && eval {
my $path = do {
my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei';
@@ -40,10 +55,10 @@ if ($send_cmd && eval {
require File::Path;
File::Path::mkpath($runtime_dir, 0, 0700);
}
- "$runtime_dir/$narg.sock";
+ "$runtime_dir/$narg.seq.sock";
};
my $addr = pack_sockaddr_un($path);
- socket($sock, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
+ socket($sock, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
unless (connect($sock, $addr)) { # start the daemon if not started
local $ENV{PERL5LIB} = join(':', @INC);
open(my $daemon, '-|', $^X, qw[-MPublicInbox::LEI
@@ -73,22 +88,41 @@ Falling back to (slow) one-shot mode
}
1;
}) { # (Socket::MsgHdr|Inline::C), $sock, $pwd are all available:
- local $ENV{PWD} = $pwd;
+ $ENV{PWD} = $pwd;
my $buf = join("\0", scalar(@ARGV), @ARGV);
while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
$buf .= "\0\0";
- select $sock;
- $| = 1; # unbuffer selected $sock
- $send_cmd->($sock, [ 0, 1, 2 ], $buf, 0);
- while (my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33)) {
- if ($buf =~ /\Aexit=([0-9]+)\n\z/) {
- exit($1);
- } elsif ($buf =~ /\Aexec (.+)\n\z/) {
+ $send_cmd->($sock, [ 0, 1, 2 ], $buf, MSG_EOR);
+ $SIG{TERM} = $SIG{INT} = $SIG{QUIT} = sub {
+ my ($sig) = @_; # 'TERM', not an integer :<
+ $SIG{$sig} = 'DEFAULT';
+ kill($sig, $$); # exit($signo + 128)
+ };
+ my $x_it_code = 0;
+ while (1) {
+ my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33);
+ if (scalar(@fds) == 1 && !defined($fds[0])) {
+ last if $! == ECONNRESET;
+ next if $! == EINTR;
+ die "recvmsg: $!";
+ }
+ last if $buf eq '';
+ if ($buf =~ /\Ax_it ([0-9]+)\z/) {
+ $x_it_code = $1 + 0;
+ last;
+ } elsif ($buf =~ /\Aexec (.+)\z/) {
exec_cmd(\@fds, split(/\0/, $1));
} else {
+ sigchld();
die $buf;
}
}
+ sigchld();
+ if (my $sig = ($x_it_code & 127)) {
+ kill $sig, $$;
+ sleep;
+ }
+ exit($x_it_code >> 8);
} else { # for systems lacking Socket::MsgHdr or Inline::C
warn $@ if $@;
require PublicInbox::LEI;
diff --git a/t/lei.t b/t/lei.t
index 6819f182..3ebaade6 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -215,7 +215,7 @@ SKIP: { # real socket
skip 'Socket::MsgHdr or Inline::C missing or unconfigured', $nr;
local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run";
- my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.sock";
+ my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.seq.sock";
ok($lei->('daemon-pid'), 'daemon-pid');
is($err, '', 'no error from daemon-pid');
diff --git a/xt/lei-sigpipe.t b/xt/lei-sigpipe.t
new file mode 100644
index 00000000..4d35bbb3
--- /dev/null
+++ b/xt/lei-sigpipe.t
@@ -0,0 +1,32 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use POSIX qw(WTERMSIG WIFSIGNALED SIGPIPE);
+require_mods(qw(json DBD::SQLite Search::Xapian));
+# XXX this needs an already configured lei instance with many messages
+
+my $do_test = sub {
+ my $env = shift // {};
+ pipe(my ($r, $w)) or BAIL_OUT $!;
+ open my $err, '+>', undef or BAIL_OUT $!;
+ my $opt = { run_mode => 0, 1 => $w, 2 => $err };
+ my $tp = start_script([qw(lei q -t), 'bytes:1..'], $env, $opt);
+ close $w;
+ sysread($r, my $buf, 1);
+ close $r; # trigger SIGPIPE
+ $tp->join;
+ ok(WIFSIGNALED($?), 'signaled');
+ is(WTERMSIG($?), SIGPIPE, 'got SIGPIPE');
+ seek($err, 0, 0);
+ my @err = grep(!m{mkdir /dev/null\b}, <$err>);
+ is_deeply(\@err, [], 'no errors');
+};
+
+$do_test->();
+$do_test->({XDG_RUNTIME_DIR => '/dev/null'});
+
+done_testing;
^ permalink raw reply related [relevance 3%]
* [PATCH 00/14] lei: another pile of changes
@ 2021-01-14 7:06 7% Eric Wong
2021-01-14 7:06 3% ` [PATCH 02/14] lei: test SIGPIPE, stop xsearch workers on client abort Eric Wong
0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
PATCH 2/14 took forever to figure out; turns out I was hunting
an old bug in Perl :x (and led to PATCH 3/14, too)
We could probably go farther on 5/14 and eliminate the
need for @TO_CLOSE_ATFORK_CHILD completely, but my brain
was fried from 2/14 :x.
The "ts:" => "rt:" change is technically user-visible,
but "ts:" was never publicly documented so I doubt it
affects anybody. "rt:" (received time) may be documented
in the future.
Eric Wong (14):
cmd_ipc: support + test EINTR + EAGAIN, no FDs
lei: test SIGPIPE, stop xsearch workers on client abort
daemon+watch: fix localization of %SIG for non-signalfd users
lei: do not unlink socket path at exit
lei: reduce live FD references in wq child
lei: rely on localized $current_lei for warnings
lei_dedupe+shared_kv: ensure round-tripping serialization
lei q: reinstate smsg dedupe
search: rename "ts:" prefix to "rt:"
lei_overview: rename "references" to "refs"
lei: q: lock stdout on overview output
leixsearch: remove some commented out code
lei: remove temporary var on open
lei: pass FD to CWD via cmsg, use fchdir on server
MANIFEST | 2 +
lib/PublicInbox/CmdIPC4.pm | 6 +-
lib/PublicInbox/Daemon.pm | 4 +-
lib/PublicInbox/IMAPsearchqp.pm | 6 +-
lib/PublicInbox/IPC.pm | 45 +++-----
lib/PublicInbox/LEI.pm | 182 +++++++++++++++++---------------
lib/PublicInbox/LeiDedupe.pm | 29 ++---
lib/PublicInbox/LeiOverview.pm | 43 +++++++-
lib/PublicInbox/LeiQuery.pm | 27 ++---
lib/PublicInbox/LeiXSearch.pm | 60 +++++++----
lib/PublicInbox/Lock.pm | 2 +-
lib/PublicInbox/Search.pm | 2 +-
lib/PublicInbox/SharedKV.pm | 12 ++-
lib/PublicInbox/Spawn.pm | 13 ++-
script/lei | 88 +++++++++------
script/public-inbox-watch | 2 +-
t/cmd_ipc.t | 32 ++++++
t/imap_searchqp.t | 6 +-
t/lei.t | 33 +-----
t/lei_dedupe.t | 13 +++
t/lei_overview.t | 33 ++++++
xt/lei-sigpipe.t | 32 ++++++
22 files changed, 417 insertions(+), 255 deletions(-)
create mode 100644 t/lei_overview.t
create mode 100644 xt/lei-sigpipe.t
^ permalink raw reply [relevance 7%]
Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-01-14 7:06 7% [PATCH 00/14] lei: another pile of changes Eric Wong
2021-01-14 7:06 3% ` [PATCH 02/14] lei: test SIGPIPE, stop xsearch workers on client abort Eric Wong
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).