From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 6D0151FA12 for ; Tue, 19 Jan 2021 09:34:35 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 2/9] lei q: fix SIGPIPE handling from lei2mail workers Date: Tue, 19 Jan 2021 09:34:28 +0000 Message-Id: <20210119093435.17955-3-e@80x24.org> In-Reply-To: <20210119093435.17955-1-e@80x24.org> References: <20210119093435.17955-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We need to properly propagate SIGPIPE to the top-level lei-daemon process and avoid relying on auto-close, since auto-close triggers Perl warnings when explicit close() does not. --- lib/PublicInbox/LEI.pm | 15 +++++++++------ lib/PublicInbox/LeiToMail.pm | 7 ++++++- lib/PublicInbox/LeiXSearch.pm | 23 +++++++++++++++++++---- xt/lei-sigpipe.t | 29 ++++++++++++++++------------- 4 files changed, 50 insertions(+), 24 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 4b1dc673..802d2cd9 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -301,10 +301,13 @@ sub atfork_child_wq { PIPE => sub { $self->x_it(13); # SIGPIPE = 13 # we need to close explicitly to avoid Perl warning on SIGPIPE - close(delete $self->{1}); - # regular files and /dev/null (-c) won't trigger SIGPIPE - close(delete $self->{2}) unless (-f $self->{2} || -c _); - syswrite($self->{0}, '!') unless $self->{sock}; # for eof_wait + for my $i (1, 2) { + next unless $self->{$i} && (-p $self->{$i} || -S _); + close(delete $self->{$i}); + } + # trigger the LeiXSearch $done OpPipe: + syswrite($self->{0}, '!') if $self->{0} && -p $self->{0}; + $SIG{PIPE} = 'DEFAULT'; die bless(\"$_[0]", 'PublicInbox::SIGPIPE'), }); } @@ -322,7 +325,7 @@ sub atfork_parent_wq { my @io = delete @$ret{0..2}; $io[3] = delete($ret->{sock}) // *STDERR{GLOB}; my $l2m = $ret->{l2m}; - if ($l2m && $l2m != $wq) { + if ($l2m && $l2m != $wq) { # $wq == lxs $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1}; if (my @pids = $l2m->wq_close) { $wq->{l2m_pids} = \@pids; @@ -672,7 +675,7 @@ sub start_mua { @cmd = map { $_ eq '%f' ? ($replaced = $mfolder) : $_ } @cmd; push @cmd, $mfolder unless defined($replaced); $sock //= $self->{sock}; - if ($PublicInbox::DS::in_loop) { # lei(1) client process runs it + if ($sock) { # lei(1) client process runs it send($sock, exec_buf(\@cmd, {}), MSG_EOR); } else { # oneshot $self->{"mua.pid.$self.$$"} = spawn(\@cmd); diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index a1dce550..8e58ad11 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -247,11 +247,16 @@ sub _mbox_write_cb ($$) { $dedupe->prepare_dedupe; sub { # for git_to_mail my ($buf, $oid, $kw) = @_; + return unless $out; my $eml = PublicInbox::Eml->new($buf); if (!$dedupe->is_dup($eml, $oid)) { $buf = $eml2mbox->($eml, $kw); my $lk = $ovv->lock_for_scope; - $write->($out, $buf); + eval { $write->($out, $buf) }; + if ($@) { + die $@ if ref($@) ne 'PublicInbox::SIGPIPE'; + undef $out + } } } } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 73fd17f4..45a073a0 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -219,7 +219,7 @@ sub start_query { # always runs in main (lei-daemon) process @$io = (); } -sub query_prepare { # for wq_do, +sub query_prepare { # called by wq_do my ($self, $lei) = @_; my %sig = $lei->atfork_child_wq($self); local @SIG{keys %sig} = values %sig; @@ -227,6 +227,18 @@ sub query_prepare { # for wq_do, $lei->fail($@) if $@; } +sub sigpipe_handler { + my ($self, $lei_orig, $pids) = @_; + if ($pids) { # one-shot (no event loop) + kill 'TERM', @$pids; + kill 'PIPE', $$; + } else { + $self->wq_kill; + $self->wq_close; + } + close(delete $lei_orig->{1}) if $lei_orig->{1}; +} + sub do_query { my ($self, $lei_orig, $srcs) = @_; my ($lei, @io) = $lei_orig->atfork_parent_wq($self); @@ -234,7 +246,10 @@ sub do_query { pipe(my $done, $io[0]) or die "pipe $!"; $lei_orig->event_step_init; # wait for shutdowns - my $done_op = { '' => [ \&query_done, $self, $lei_orig ] }; + my $done_op = { + '' => [ \&query_done, $self, $lei_orig ], + '!' => [ \&sigpipe_handler, $self, $lei_orig ] + }; my $in_loop = exists $lei_orig->{sock}; $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop); my $l2m = $lei->{l2m}; @@ -244,7 +259,7 @@ sub do_query { my @l2m_io = (undef, @io[1..$#io]); pipe(my $startq, $l2m_io[0]) or die "pipe: $!"; $self->wq_do('query_prepare', \@l2m_io, $lei); - $io[4] //= *STDERR{GLOB}; + $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1} die "BUG: unexpected \$io[5]: $io[5]" if $io[5]; fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ $io[5] = $startq; @@ -253,7 +268,7 @@ sub do_query { unless ($in_loop) { my @pids = $self->wq_close; # for the $lei->atfork_child_wq PIPE handler: - $done_op->{'!'} = [ \&CORE::kill, 'TERM', @pids ]; + $done_op->{'!'}->[3] = \@pids; $done->event_step; my $ipc_worker_reap = $self->can('ipc_worker_reap'); if (my $l2m_pids = delete $self->{l2m_pids}) { diff --git a/xt/lei-sigpipe.t b/xt/lei-sigpipe.t index 4d35bbb3..448bd7db 100644 --- a/xt/lei-sigpipe.t +++ b/xt/lei-sigpipe.t @@ -11,19 +11,22 @@ require_mods(qw(json DBD::SQLite Search::Xapian)); my $do_test = sub { my $env = shift // {}; - pipe(my ($r, $w)) or BAIL_OUT $!; - open my $err, '+>', undef or BAIL_OUT $!; - my $opt = { run_mode => 0, 1 => $w, 2 => $err }; - my $tp = start_script([qw(lei q -t), 'bytes:1..'], $env, $opt); - close $w; - sysread($r, my $buf, 1); - close $r; # trigger SIGPIPE - $tp->join; - ok(WIFSIGNALED($?), 'signaled'); - is(WTERMSIG($?), SIGPIPE, 'got SIGPIPE'); - seek($err, 0, 0); - my @err = grep(!m{mkdir /dev/null\b}, <$err>); - is_deeply(\@err, [], 'no errors'); + for my $out ([], [qw(-f mboxcl2)]) { + pipe(my ($r, $w)) or BAIL_OUT $!; + open my $err, '+>', undef or BAIL_OUT $!; + my $opt = { run_mode => 0, 1 => $w, 2 => $err }; + my $cmd = [qw(lei q -t), @$out, 'bytes:1..']; + my $tp = start_script($cmd, $env, $opt); + close $w; + sysread($r, my $buf, 1); + close $r; # trigger SIGPIPE + $tp->join; + ok(WIFSIGNALED($?), "signaled @$out"); + is(WTERMSIG($?), SIGPIPE, "got SIGPIPE @$out"); + seek($err, 0, 0); + my @err = grep(!m{mkdir /dev/null\b}, <$err>); + is_deeply(\@err, [], "no errors @$out"); + } }; $do_test->();