From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 146D61FA12 for ; Thu, 14 Jan 2021 07:06:28 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 02/14] lei: test SIGPIPE, stop xsearch workers on client abort Date: Wed, 13 Jan 2021 19:06:15 -1200 Message-Id: <20210114070627.18195-3-e@80x24.org> In-Reply-To: <20210114070627.18195-1-e@80x24.org> References: <20210114070627.18195-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: The new test ensures consistency between oneshot and client/daemon users. Cancelling an in-progress result now also stops xsearch workers to avoid wasted CPU and I/O. Note the lei->atfork_child_wq usage changes, it is to workaround a bug in Perl 5: http://nntp.perl.org/group/perl.perl5.porters/258784 This switches the internal protocol to use SOCK_SEQPACKET AF_UNIX sockets to prevent merging messages from the daemon to client to run pager and kill/exit the client script. --- MANIFEST | 1 + lib/PublicInbox/IPC.pm | 45 ++++------ lib/PublicInbox/LEI.pm | 158 +++++++++++++++++---------------- lib/PublicInbox/LeiOverview.pm | 5 +- lib/PublicInbox/LeiQuery.pm | 22 ++--- lib/PublicInbox/LeiXSearch.pm | 34 +++++-- script/lei | 74 ++++++++++----- t/lei.t | 2 +- xt/lei-sigpipe.t | 32 +++++++ 9 files changed, 225 insertions(+), 148 deletions(-) create mode 100644 xt/lei-sigpipe.t diff --git a/MANIFEST b/MANIFEST index 810aec42..2ca240fc 100644 --- a/MANIFEST +++ b/MANIFEST @@ -429,6 +429,7 @@ xt/git_async_cmp.t xt/httpd-async-stream.t xt/imapd-mbsync-oimap.t xt/imapd-validate.t +xt/lei-sigpipe.t xt/mem-imapd-tls.t xt/mem-msgview.t xt/msgtime_cmp.t diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index c54fcc64..fbc91f6f 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -130,7 +130,8 @@ sub ipc_worker_spawn { sub ipc_worker_reap { # dwaitpid callback my ($self, $pid) = @_; - warn "PID:$pid died with \$?=$?\n" if $?; + # SIGTERM (15) is our default exit signal + warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15; } # for base class, override in sub classes @@ -236,50 +237,31 @@ sub ipc_sibling_atfork_child { $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; } -sub _close_recvd ($) { - my ($self) = @_; - my $x = $self->{-wq_recv_modes}; - my $end = $x ? $#$x : 2; - close($_) for (grep { defined } (delete @$self{0..$end})); -} - sub wq_worker_loop ($) { my ($self) = @_; - my $buf; my $len = $self->{wq_req_len} // (4096 * 33); - my ($sub, $args); my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2'; - local $SIG{PIPE} = sub { - my $cur_sub = $sub; - _close_recvd($self); - die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub; - }; while (1) { - my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF - my $i = 0; + my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]}; + my $nfd = 0; for my $fd (@fds) { my $mode = shift(@m); if (open(my $cmdfh, $mode, $fd)) { - $self->{$i++} = $cmdfh; + $self->{$nfd++} = $cmdfh; $cmdfh->autoflush(1); } else { - die "$$ open($mode$fd) (FD:$i): $!"; + die "$$ open($mode$fd) (FD:$nfd): $!"; } } # Sereal dies on truncated data, Storable returns undef - $args = thaw($buf) // + my $args = thaw($buf) // die "thaw error on buffer of size:".length($buf); - eval { - $sub = shift @$args; - eval { $self->$sub(@$args) }; - undef $sub; # quiet SIG{PIPE} handler - die $@ if $@; - }; + my $sub = shift @$args; + eval { $self->$sub(@$args) }; warn "$$ wq_worker: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE'; - # need to close explicitly to avoid warnings after SIGPIPE - _close_recvd($self); + delete @$self{0..($nfd-1)}; } } @@ -400,9 +382,16 @@ sub wq_close { } } +sub wq_kill { + my ($self, $sig) = @_; + my $workers = $self->{-wq_workers} or return; + kill($sig // 'TERM', keys %$workers); +} + sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS } sub DESTROY { + wq_kill($_[0]); wq_close($_[0]); ipc_worker_stop($_[0]); } diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 7313738e..2889fa76 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -11,13 +11,13 @@ use v5.10.1; use parent qw(PublicInbox::DS PublicInbox::LeiExternal PublicInbox::LeiQuery); use Getopt::Long (); -use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un); -use Errno qw(EAGAIN ECONNREFUSED ENOENT); +use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un); +use Errno qw(EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET); use POSIX (); use IO::Handle (); use Sys::Syslog qw(syslog openlog); use PublicInbox::Config; -use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLONESHOT); +use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET); use PublicInbox::Sigfd; use PublicInbox::DS qw(now dwaitpid); use PublicInbox::Spawn qw(spawn run_die popen_rd); @@ -238,16 +238,15 @@ my %CONFIG_KEYS = ( 'leistore.dir' => 'top-level storage location', ); -sub x_it ($$) { # pronounced "exit" +# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE +sub x_it ($$) { my ($self, $code) = @_; - $self->{1}->autoflush(1); # make sure client sees stdout before exit - my $sig = ($code & 127); - $code >>= 8 unless $sig; + # make sure client sees stdout before exit + $self->{1}->autoflush(1) if $self->{1}; if (my $sock = $self->{sock}) { - my $fds = [ map { fileno($_) } @$self{0..2} ]; - $send_cmd->($sock, $fds, "exit=$code\n", 0); - } else { # for oneshot - $quit->($code); + send($sock, "x_it $code", MSG_EOR); + } elsif (!($code & 127)) { # oneshot, ignore signals + $quit->($code >> 8); } } @@ -274,22 +273,20 @@ sub atfork_prepare_wq { grep { defined } @$self{qw(0 1 2 sock)} } -# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq)); +# usage: my %sig = $lei->atfork_child_wq($wq); +# local @SIG{keys %sig} = values %sig; sub atfork_child_wq { my ($self, $wq) = @_; - return () if $self->{0}; # did not fork - $self->{$_} = $wq->{$_} for (0..2); - $self->{sock} = $wq->{3} // die 'BUG: no {sock}'; # may be undef - my $oldpipe = $SIG{PIPE}; + @$self{qw(0 1 2 sock)} = delete(@$wq{0..3}); %PATH2CFG = (); @TO_CLOSE_ATFORK_CHILD = (); - ( - __WARN__ => sub { err($self, @_) }, - PIPE => sub { - $self->x_it(141); - $oldpipe->() if ref($oldpipe) eq 'CODE'; - } - ); + (__WARN__ => sub { err($self, @_) }, + PIPE => sub { + $self->x_it(13); # SIGPIPE = 13 + # we need to close explicitly to avoid Perl warning on SIGPIPE + close($_) for (delete @$self{1..2}); + die bless(\"$_[0]", 'PublicInbox::SIGPIPE'), + }); } # usage: ($lei, @io) = $lei->atfork_parent_wq($wq); @@ -300,9 +297,9 @@ sub atfork_parent_wq { my $ret = bless { %$self }, ref($self); $self->{env} = $env; delete @$ret{qw(-lei_store cfg pgr)}; - ($ret, delete @$ret{qw(0 1 2 sock)}); + ($ret, delete @$ret{0..2}, delete($ret->{sock}) // ()); } else { - ($self, @$self{qw(0 1 2 sock)}); + ($self, @$self{0..2}, $self->{sock} // ()); } } @@ -647,7 +644,7 @@ sub start_pager { my $buf = "exec 1\0".$pager; while (my ($k, $v) = each %new_env) { $buf .= "\0$k=$v" }; my $fds = [ map { fileno($_) } @$rdr{0..2} ]; - $send_cmd->($sock, $fds, $buf .= "\n", 0); + $send_cmd->($sock, $fds, $buf, MSG_EOR); } else { $pgr->[0] = spawn([$pager], $env, $rdr); } @@ -660,50 +657,39 @@ sub start_pager { sub stop_pager { my ($self) = @_; my $pgr = delete($self->{pgr}) or return; - my $pid = $pgr->[0]; - close $self->{1}; - # {2} may not be redirected - $self->{1} = $pgr->[1]; $self->{2} = $pgr->[2]; + # do not restore original stdout, just close it so we error out + close(delete($self->{1})) if $self->{1}; + my $pid = $pgr->[0]; dwaitpid($pid, undef, $self->{sock}) if $pid; } sub accept_dispatch { # Listener {post_accept} callback my ($sock) = @_; # ignore other - $sock->blocking(1); $sock->autoflush(1); my $self = bless { sock => $sock }, __PACKAGE__; - vec(my $rin = '', fileno($sock), 1) = 1; - # `say $sock' triggers "die" in lei(1) - my $buf; - if (select(my $rout = $rin, undef, undef, 1)) { - my @fds = $recv_cmd->($sock, $buf, 4096 * 33); # >MAX_ARG_STRLEN - if (scalar(@fds) == 3) { - my $i = 0; - for my $rdr (qw(<&= >&= >&=)) { - my $fd = shift(@fds); - if (open(my $fh, $rdr, $fd)) { - $self->{$i++} = $fh; - } else { - say $sock "open($rdr$fd) (FD=$i): $!"; - return; - } + vec(my $rvec, fileno($sock), 1) = 1; + select($rvec, undef, undef, 1) or + return send($sock, 'timed out waiting to recv FDs', MSG_EOR); + my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN + if (scalar(@fds) == 3) { + my $i = 0; + for my $rdr (qw(<&= >&= >&=)) { + my $fd = shift(@fds); + if (open(my $fh, $rdr, $fd)) { + $self->{$i++} = $fh; + next; } - } else { - say $sock "recv_cmd failed: $!"; - return; + return send($sock, "open($rdr$fd) (FD=$i): $!", MSG_EOR); } } else { - say $sock "timed out waiting to recv FDs"; - return; + return send($sock, "recv_cmd failed: $!", MSG_EOR); } $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV); # $buf = "$$\0$argc\0".join("\0", @ARGV).$ENV_STR."\0\0"; - if (substr($buf, -2, 2, '') ne "\0\0") { # s/\0\0\z// - say $sock "request command truncated"; - return; - } + substr($buf, -2, 2, '') eq "\0\0" or # s/\0\0\z// + return send($sock, 'request command truncated', MSG_EOR); my ($argc, @argv) = split(/\0/, $buf, -1); undef $buf; my %env = map { split(/=/, $_, 2) } splice(@argv, $argc); @@ -711,23 +697,50 @@ sub accept_dispatch { # Listener {post_accept} callback local %ENV = %env; $self->{env} = \%env; eval { dispatch($self, @argv) }; - say $sock $@ if $@; + send($sock, $@, MSG_EOR) if $@; } else { - say $sock "chdir($env{PWD}): $!"; # implicit close + send($sock, "chdir($env{PWD}): $!", MSG_EOR); # implicit close } } +sub dclose { + my ($self) = @_; + delete $self->{lxs}; # stops LeiXSearch queries + $self->close; # PublicInbox::DS::close +} + # for long-running results sub event_step { my ($self) = @_; local %ENV = %{$self->{env}}; - eval {}; # TODO - if ($@) { - say { $self->{sock} } $@; - $self->close; # PublicInbox::DS::close + my $sock = $self->{sock}; + eval { + while (my @fds = $recv_cmd->($sock, my $buf, 4096)) { + if (scalar(@fds) == 1 && !defined($fds[0])) { + return if $! == EAGAIN; + next if $! == EINTR; + last if $! == ECONNRESET; + die "recvmsg: $!"; + } + for my $fd (@fds) { + open my $rfh, '+<&=', $fd; + } + die "unrecognized client signal: $buf"; + } + dclose($self); + }; + if (my $err = $@) { + eval { $self->fail($err) }; + dclose($self); } } +sub event_step_init { + my ($self) = @_; + $self->{sock}->blocking(0); + $self->SUPER::new($self->{sock}, EPOLLIN|EPOLLET); +} + sub noop {} our $oldset; sub oldset { $oldset } @@ -742,7 +755,7 @@ sub lazy_start { die "connect($path): $!"; } umask(077) // die("umask(077): $!"); - socket(my $l, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!"; + socket(my $l, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!"; bind($l, pack_sockaddr_un($path)) or die "bind($path): $!"; listen($l, 1024) or die "listen: $!"; my @st = stat($path) or die "stat($path): $!"; @@ -793,7 +806,7 @@ sub lazy_start { USR2 => \&noop, }; my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK); - local %SIG = (%SIG, %$sig) if !$sigfd; + local @SIG{keys %$sig} = values(%$sig) unless $sigfd; local $SIG{PIPE} = 'IGNORE'; if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets push @TO_CLOSE_ATFORK_CHILD, $sigfd->{sock}; @@ -853,24 +866,19 @@ sub oneshot { local $quit = $exit if $exit; local %PATH2CFG; umask(077) // die("umask(077): $!"); - local $SIG{PIPE} = sub { die(bless(\"$_[0]", 'PublicInbox::SIGPIPE')) }; - eval { - my $self = bless { - 0 => *STDIN{GLOB}, - 1 => *STDOUT{GLOB}, - 2 => *STDERR{GLOB}, - env => \%ENV - }, __PACKAGE__; - dispatch($self, @ARGV); - }; - die $@ if $@ && ref($@) ne 'PublicInbox::SIGPIPE'; + dispatch((bless { + 0 => *STDIN{GLOB}, + 1 => *STDOUT{GLOB}, + 2 => *STDERR{GLOB}, + env => \%ENV + }, __PACKAGE__), @ARGV); } # ensures stdout hits the FS before sock disconnects so a client # can immediately reread it sub DESTROY { my ($self) = @_; - $self->{1}->autoflush(1); + $self->{1}->autoflush(1) if $self->{1}; stop_pager($self); } diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index 8a1f4f82..194c5e28 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -108,8 +108,9 @@ sub _unbless_smsg { sub ovv_atexit_child { my ($self, $lei) = @_; - my $bref = delete $lei->{ovv_buf} or return; - print { $lei->{1} } $$bref; + if (my $bref = delete $lei->{ovv_buf}) { + print { $lei->{1} } $$bref; + } } # JSON module ->pretty output wastes too much vertical white space, diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 7ca01454..1a3e1193 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -40,14 +40,13 @@ sub lei_q { if ($opt->{external} // 1) { $self->_externals_each(\&_vivify_external, \@srcs); } - my $j = $opt->{jobs} // scalar(@srcs) > 3 ? 3 : scalar(@srcs); + my $j = $opt->{jobs} // (scalar(@srcs) > 3 ? 3 : scalar(@srcs)); $j = 1 if !$opt->{thread}; $j++ if $opt->{'local'}; # for sto->search below - if ($self->{sock}) { - $self->atfork_prepare_wq($lxs); - $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset) - // $lxs->wq_workers($j); - } + $self->atfork_prepare_wq($lxs); + $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset) + // $lxs->wq_workers($j); + unshift(@srcs, $sto->search) if $opt->{'local'}; # no forking workers after this require PublicInbox::LeiOverview; @@ -77,16 +76,7 @@ sub lei_q { # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self); $self->{mset_opt} = \%mset_opt; $self->{ovv}->ovv_begin($self); - pipe(my ($eof_wait, $qry_done)) or die "pipe $!"; - require PublicInbox::EOFpipe; - my $eof = PublicInbox::EOFpipe->new($eof_wait, \&query_done, $self); - $lxs->do_query($self, $qry_done, \@srcs); - $eof->event_step unless $self->{sock}; -} - -sub query_done { # PublicInbox::EOFpipe callback - my ($self) = @_; - $self->{ovv}->ovv_end($self); + $lxs->do_query($self, \@srcs); } 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index c030b2b2..d06b6f1d 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -92,7 +92,9 @@ sub _mset_more ($$) { sub query_thread_mset { # for --thread my ($self, $lei, $ibxish) = @_; - local %SIG = (%SIG, $lei->atfork_child_wq($self)); + my %sig = $lei->atfork_child_wq($self); + local @SIG{keys %sig} = values %sig; + my ($srch, $over) = ($ibxish->search, $ibxish->over); unless ($srch && $over) { my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; @@ -125,9 +127,10 @@ sub query_thread_mset { # for --thread sub query_mset { # non-parallel for non-"--thread" users my ($self, $lei, $srcs) = @_; + my %sig = $lei->atfork_child_wq($self); + local @SIG{keys %sig} = values %sig; my $mo = { %{$lei->{mset_opt}} }; my $mset; - local %SIG = (%SIG, $lei->atfork_child_wq($self)); $self->attach_external($_) for @$srcs; my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); do { @@ -143,9 +146,17 @@ sub query_mset { # non-parallel for non-"--thread" users $lei->{ovv}->ovv_atexit_child($lei); } +sub query_done { # PublicInbox::EOFpipe callback + my ($lei) = @_; + $lei->{ovv}->ovv_end($lei); + $lei->dclose; +} + sub do_query { - my ($self, $lei_orig, $qry_done, $srcs) = @_; + my ($self, $lei_orig, $srcs) = @_; my ($lei, @io) = $lei_orig->atfork_parent_wq($self); + + pipe(my ($eof_wait, $qry_done)) or die "pipe $!"; $io[0] = $qry_done; # don't need stdin $io[1]->autoflush(1); $io[2]->autoflush(1); @@ -160,9 +171,20 @@ sub do_query { for my $rmt (@{$self->{remotes} // []}) { $self->wq_do('query_thread_mbox', \@io, $lei, $rmt); } - - # sent off to children, they will drop remaining references to it - close $qry_done; + @io = (); + close $qry_done; # fully closed when children are done + + # query_done will run when query_*mset close $qry_done + if ($lei_orig->{sock}) { # watch for client premature exit + require PublicInbox::EOFpipe; + PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig); + $lei_orig->{lxs} = $self; + $lei_orig->event_step_init; + } else { + $self->wq_close; + read($eof_wait, my $buf, 1); # wait for close($lei->{0}) + query_done($lei_orig); # may SIGPIPE + } } sub ipc_atfork_child { diff --git a/script/lei b/script/lei index 5c32ab88..9610a876 100755 --- a/script/lei +++ b/script/lei @@ -3,32 +3,47 @@ # License: AGPL-3.0+ use strict; use v5.10.1; -use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un); +use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un); +use Errno qw(EINTR ECONNRESET); use PublicInbox::CmdIPC4; my $narg = 4; +my ($sock, $pwd); my $recv_cmd = PublicInbox::CmdIPC4->can('recv_cmd4'); my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do { require PublicInbox::Spawn; # takes ~50ms even if built *sigh* - $narg = 4; $recv_cmd = PublicInbox::Spawn->can('recv_cmd4'); PublicInbox::Spawn->can('send_cmd4'); }; +sub sigchld { + my ($sig) = @_; + my $flags = $sig ? POSIX::WNOHANG() : 0; + while (waitpid(-1, $flags) > 0) {} +} + sub exec_cmd { my ($fds, $argc, @argv) = @_; - my %env = map { split(/=/, $_, 2) } splice(@argv, $argc); - my @m = (*STDIN{IO}, '<&=', *STDOUT{IO}, '>&=', - *STDERR{IO}, '>&='); + my @m = (*STDIN{IO}, '<&=', *STDOUT{IO}, '>&=', *STDERR{IO}, '>&='); + my @rdr; for my $fd (@$fds) { my ($old_io, $mode) = splice(@m, 0, 2); - open($old_io, $mode, $fd) or die "open $mode$fd: $!"; + open(my $tmpfh, $mode, $fd) or die "open $mode$fd: $!"; + push @rdr, $old_io, $mode, $tmpfh; + } + require POSIX; # WNOHANG + $SIG{CHLD} = \&sigchld; + my $pid = fork // die "fork: $!"; + if ($pid == 0) { + my %env = map { split(/=/, $_, 2) } splice(@argv, $argc); + while (my ($old_io, $mode, $tmpfh) = splice(@rdr, 0, 3)) { + open $old_io, $mode, $tmpfh or die "open $mode: $!"; + } + %ENV = (%ENV, %env); + exec(@argv); + die "exec: @argv: $!"; } - %ENV = (%ENV, %env); - exec(@argv); - die "exec: @argv: $!"; } -my ($sock, $pwd); if ($send_cmd && eval { my $path = do { my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei'; @@ -40,10 +55,10 @@ if ($send_cmd && eval { require File::Path; File::Path::mkpath($runtime_dir, 0, 0700); } - "$runtime_dir/$narg.sock"; + "$runtime_dir/$narg.seq.sock"; }; my $addr = pack_sockaddr_un($path); - socket($sock, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!"; + socket($sock, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!"; unless (connect($sock, $addr)) { # start the daemon if not started local $ENV{PERL5LIB} = join(':', @INC); open(my $daemon, '-|', $^X, qw[-MPublicInbox::LEI @@ -73,22 +88,41 @@ Falling back to (slow) one-shot mode } 1; }) { # (Socket::MsgHdr|Inline::C), $sock, $pwd are all available: - local $ENV{PWD} = $pwd; + $ENV{PWD} = $pwd; my $buf = join("\0", scalar(@ARGV), @ARGV); while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" } $buf .= "\0\0"; - select $sock; - $| = 1; # unbuffer selected $sock - $send_cmd->($sock, [ 0, 1, 2 ], $buf, 0); - while (my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33)) { - if ($buf =~ /\Aexit=([0-9]+)\n\z/) { - exit($1); - } elsif ($buf =~ /\Aexec (.+)\n\z/) { + $send_cmd->($sock, [ 0, 1, 2 ], $buf, MSG_EOR); + $SIG{TERM} = $SIG{INT} = $SIG{QUIT} = sub { + my ($sig) = @_; # 'TERM', not an integer :< + $SIG{$sig} = 'DEFAULT'; + kill($sig, $$); # exit($signo + 128) + }; + my $x_it_code = 0; + while (1) { + my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33); + if (scalar(@fds) == 1 && !defined($fds[0])) { + last if $! == ECONNRESET; + next if $! == EINTR; + die "recvmsg: $!"; + } + last if $buf eq ''; + if ($buf =~ /\Ax_it ([0-9]+)\z/) { + $x_it_code = $1 + 0; + last; + } elsif ($buf =~ /\Aexec (.+)\z/) { exec_cmd(\@fds, split(/\0/, $1)); } else { + sigchld(); die $buf; } } + sigchld(); + if (my $sig = ($x_it_code & 127)) { + kill $sig, $$; + sleep; + } + exit($x_it_code >> 8); } else { # for systems lacking Socket::MsgHdr or Inline::C warn $@ if $@; require PublicInbox::LEI; diff --git a/t/lei.t b/t/lei.t index 6819f182..3ebaade6 100644 --- a/t/lei.t +++ b/t/lei.t @@ -215,7 +215,7 @@ SKIP: { # real socket skip 'Socket::MsgHdr or Inline::C missing or unconfigured', $nr; local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run"; - my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.sock"; + my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.seq.sock"; ok($lei->('daemon-pid'), 'daemon-pid'); is($err, '', 'no error from daemon-pid'); diff --git a/xt/lei-sigpipe.t b/xt/lei-sigpipe.t new file mode 100644 index 00000000..4d35bbb3 --- /dev/null +++ b/xt/lei-sigpipe.t @@ -0,0 +1,32 @@ +#!perl -w +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ +use strict; +use v5.10.1; +use Test::More; +use PublicInbox::TestCommon; +use POSIX qw(WTERMSIG WIFSIGNALED SIGPIPE); +require_mods(qw(json DBD::SQLite Search::Xapian)); +# XXX this needs an already configured lei instance with many messages + +my $do_test = sub { + my $env = shift // {}; + pipe(my ($r, $w)) or BAIL_OUT $!; + open my $err, '+>', undef or BAIL_OUT $!; + my $opt = { run_mode => 0, 1 => $w, 2 => $err }; + my $tp = start_script([qw(lei q -t), 'bytes:1..'], $env, $opt); + close $w; + sysread($r, my $buf, 1); + close $r; # trigger SIGPIPE + $tp->join; + ok(WIFSIGNALED($?), 'signaled'); + is(WTERMSIG($?), SIGPIPE, 'got SIGPIPE'); + seek($err, 0, 0); + my @err = grep(!m{mkdir /dev/null\b}, <$err>); + is_deeply(\@err, [], 'no errors'); +}; + +$do_test->(); +$do_test->({XDG_RUNTIME_DIR => '/dev/null'}); + +done_testing;