From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id DEF781F454 for ; Fri, 6 Oct 2023 01:37:13 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1696556234; bh=0TngfKfTnTwiYFiahQdLleKs/5a6ieHTj+tNy10I4W0=; h=From:To:Subject:Date:From; b=PbILKuSAjWwbyuLQ3peurVMEK5O2aJLGrpIJ6FQ5EBkLjjVcfnWIGd1OpmLr1NgTF 9diw0lzJ+lDd/8kas7Q0hXr61o9Y/aJarEpGZTi8yCVqcN4PGwYesUyzeI2yfWmIE7 3lNUKgukJjCi3TVAA2sBYoDFEpNl93RMEnRoGlOI= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH] ipc: lower-level send_cmd/recv_cmd handle EINTR directly Date: Fri, 6 Oct 2023 01:37:13 +0000 Message-Id: <20231006013713.3762219-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This ensures script/lei $send_cmd usage is EINTR-safe (since I prefer to avoid loading PublicInbox::IPC for startup time). Overall, it saves us some code, too. --- lib/PublicInbox/CmdIPC4.pm | 24 +++++++++++++------ lib/PublicInbox/IPC.pm | 26 ++++---------------- lib/PublicInbox/LEI.pm | 6 ++--- lib/PublicInbox/LeiSelfSocket.pm | 3 ++- lib/PublicInbox/Spawn.pm | 41 ++++++++++++++++++-------------- lib/PublicInbox/Syscall.pm | 21 ++++++++-------- lib/PublicInbox/XapClient.pm | 2 +- lib/PublicInbox/XapHelper.pm | 2 +- script/lei | 5 +--- t/cmd_ipc.t | 12 ++++++---- t/xap_helper.t | 4 ++-- 11 files changed, 72 insertions(+), 74 deletions(-) diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm index 4bc4c729..2f102ec6 100644 --- a/lib/PublicInbox/CmdIPC4.pm +++ b/lib/PublicInbox/CmdIPC4.pm @@ -7,6 +7,16 @@ package PublicInbox::CmdIPC4; use v5.12; use Socket qw(SOL_SOCKET SCM_RIGHTS); + +sub sendmsg_retry ($) { + return 1 if $!{EINTR}; + return unless ($!{ENOMEM} || $!{ENOBUFS} || $!{ETOOMANYREFS}); + return if ++$_[0] >= 50; + warn "# sleeping on sendmsg: $! (#$_[0])\n"; + select(undef, undef, undef, 0.1); + 1; +} + BEGIN { eval { require Socket::MsgHdr; # XS no warnings 'once'; @@ -20,21 +30,21 @@ no warnings 'once'; my $try = 0; do { $s = Socket::MsgHdr::sendmsg($sock, $mh, $flags); - } while (!defined($s) && - ($!{ENOBUFS} || $!{ENOMEM} || $!{ETOOMANYREFS}) && - (++$try < 50) && - warn "# sleeping on sendmsg: $! (#$try)\n" && - select(undef, undef, undef, 0.1) == 0); + } while (!defined($s) && sendmsg_retry($try)); $s; }; *recv_cmd4 = sub ($$$) { my ($s, undef, $len) = @_; # $_[1] = destination buffer my $mh = Socket::MsgHdr->new(buflen => $len, controllen => 256); - my $r = Socket::MsgHdr::recvmsg($s, $mh, 0) // do { + my $r; + do { + $r = Socket::MsgHdr::recvmsg($s, $mh, 0); + } while (!defined($r) && $!{EINTR}); + if (!defined($r)) { $_[1] = ''; return (undef); - }; + } $_[1] = $mh->buf; return () if $r == 0; my (undef, undef, $data) = $mh->cmsghdr; diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 9b4b1508..839281b2 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -204,27 +204,9 @@ sub ipc_sibling_atfork_child { $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; } -sub send_cmd ($$$$) { - my ($s, $fds, $buf, $fl) = @_; - while (1) { - my $n = $send_cmd->($s, $fds, $buf, $fl); - next if !defined($n) && $!{EINTR}; - return $n; - } -} - -sub recv_cmd ($$$) { - my ($s, undef, $len) = @_; # $_[1] is $buf - while (1) { - my @fds = $recv_cmd->($s, $_[1], $len); - next if scalar(@fds) == 1 && !defined($fds[0]) && $!{EINTR}; - return @fds; - } -} - sub recv_and_run { my ($self, $s2, $len, $full_stream) = @_; - my @fds = recv_cmd($s2, my $buf, $len // $MY_MAX_ARG_STRLEN); + my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN); return if scalar(@fds) && !defined($fds[0]); my $n = length($buf) or return 0; my $nfd = 0; @@ -291,11 +273,11 @@ sub stream_in_full ($$$) { my ($s1, $fds, $buf) = @_; socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or croak "socketpair: $!"; - my $n = send_cmd($s1, [ fileno($r) ], + my $n = $send_cmd->($s1, [ fileno($r) ], ipc_freeze(['do_sock_stream', length($buf)]), 0) // croak "sendmsg: $!"; undef $r; - $n = send_cmd($w, $fds, $buf, 0) // croak "sendmsg: $!"; + $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!"; while ($n < length($buf)) { my $x = syswrite($w, $buf, length($buf) - $n, $n); if (!defined($n)) { @@ -315,7 +297,7 @@ sub wq_io_do { # always async if (length($buf) > $MY_MAX_ARG_STRLEN) { stream_in_full($s1, $fds, $buf); } else { - my $n = send_cmd $s1, $fds, $buf, 0; + my $n = $send_cmd->($s1, $fds, $buf, 0); return if defined($n); # likely $!{ETOOMANYREFS} and croak "sendmsg: $! (check RLIMIT_NOFILE)"; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index e300f0a4..f8bcd43d 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1041,7 +1041,7 @@ sub start_mua { sub send_exec_cmd { # tell script/lei to execute a command my ($self, $io, $cmd, $env) = @_; - PublicInbox::IPC::send_cmd( + $PublicInbox::IPC::send_cmd->( $self->{sock} // die('lei client gone'), [ map { fileno($_) } @$io ], exec_buf($cmd, $env), 0) // @@ -1139,7 +1139,7 @@ sub accept_dispatch { # Listener {post_accept} callback select($rvec, undef, undef, 60) or return send($sock, 'timed out waiting to recv FDs', 0); # (4096 * 33) >MAX_ARG_STRLEN - my @fds = PublicInbox::IPC::recv_cmd($sock, my $buf, 4096 * 33) or + my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or return; # EOF if (!defined($fds[0])) { warn(my $msg = "recv_cmd failed: $!"); @@ -1178,7 +1178,7 @@ sub event_step { local %ENV = %{$self->{env}}; local $current_lei = $self; eval { - my @fds = PublicInbox::IPC::recv_cmd( + my @fds = $PublicInbox::IPC::recv_cmd->( $self->{sock} // return, my $buf, 4096); if (scalar(@fds) == 1 && !defined($fds[0])) { return if $! == EAGAIN; diff --git a/lib/PublicInbox/LeiSelfSocket.pm b/lib/PublicInbox/LeiSelfSocket.pm index b8745252..0e15bc7c 100644 --- a/lib/PublicInbox/LeiSelfSocket.pm +++ b/lib/PublicInbox/LeiSelfSocket.pm @@ -21,7 +21,8 @@ sub new { sub event_step { my ($self) = @_; - my @fds = PublicInbox::IPC::recv_cmd($self->{sock}, my $buf, 4096 * 33); + my ($buf, @fds); + @fds = $PublicInbox::IPC::recv_cmd->($self->{sock}, $buf, 4096 * 33); if (scalar(@fds) == 1 && !defined($fds[0])) { return if $!{EAGAIN}; die "recvmsg: $!" unless $!{ECONNRESET}; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index bb2abe28..4c7e0f80 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -173,19 +173,20 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref, return (int)pid; } -static int sleep_wait(unsigned *tries, int err) +static int sendmsg_retry(unsigned *tries) { const struct timespec req = { 0, 100000000 }; /* 100ms */ + int err = errno; switch (err) { + case EINTR: PERL_ASYNC_CHECK(); return 1; case ENOBUFS: case ENOMEM: case ETOOMANYREFS: - if (++*tries < 50) { - fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n", - strerror(err), *tries); - nanosleep(&req, NULL); - return 1; - } - default: - return 0; + if (++*tries >= 50) return 0; + fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n", + strerror(err), *tries); + nanosleep(&req, NULL); + PERL_ASYNC_CHECK(); + return 1; + default: return 0; } } @@ -237,7 +238,7 @@ SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags) } do { sent = sendmsg(PerlIO_fileno(s), &msg, flags); - } while (sent < 0 && sleep_wait(&tries, errno)); + } while (sent < 0 && sendmsg_retry(&tries)); return sent >= 0 ? newSViv(sent) : &PL_sv_undef; } @@ -259,20 +260,24 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n) msg.msg_control = &cmsg.hdr; msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE); - i = recvmsg(PerlIO_fileno(s), &msg, 0); + for (;;) { + i = recvmsg(PerlIO_fileno(s), &msg, 0); + if (i >= 0 || errno != EINTR) break; + PERL_ASYNC_CHECK(); + } if (i >= 0) { SvCUR_set(buf, i); + if (cmsg.hdr.cmsg_level == SOL_SOCKET && + cmsg.hdr.cmsg_type == SCM_RIGHTS) { + size_t len = cmsg.hdr.cmsg_len; + int *fdp = (int *)CMSG_DATA(&cmsg.hdr); + for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) + Inline_Stack_Push(sv_2mortal(newSViv(*fdp++))); + } } else { Inline_Stack_Push(&PL_sv_undef); SvCUR_set(buf, 0); } - if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET && - cmsg.hdr.cmsg_type == SCM_RIGHTS) { - size_t len = cmsg.hdr.cmsg_len; - int *fdp = (int *)CMSG_DATA(&cmsg.hdr); - for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) - Inline_Stack_Push(sv_2mortal(newSViv(*fdp++))); - } Inline_Stack_Done; } #endif /* defined(CMSG_SPACE) && defined(CMSG_LEN) */ diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm index 4cf45d0f..e83beb6a 100644 --- a/lib/PublicInbox/Syscall.pm +++ b/lib/PublicInbox/Syscall.pm @@ -394,6 +394,8 @@ use constant msg_controllen => CMSG_SPACE(10 * SIZEOF_int) + 16; # 10 FDs if (defined($SYS_sendmsg) && defined($SYS_recvmsg)) { no warnings 'once'; +require PublicInbox::CmdIPC4; + *send_cmd4 = sub ($$$$) { my ($sock, $fds, undef, $flags) = @_; my $iov = pack('P'.TMPL_size_t, @@ -418,16 +420,12 @@ no warnings 'once'; $cmsghdr, # msg_control $msg_controllen, 0); # msg_flags - my $sent; + my $s; my $try = 0; do { - $sent = syscall($SYS_sendmsg, fileno($sock), $mh, $flags); - } while ($sent < 0 && - ($!{ENOBUFS} || $!{ENOMEM} || $!{ETOOMANYREFS}) && - (++$try < 50) && - warn "# sleeping on sendmsg: $! (#$try)\n" && - select(undef, undef, undef, 0.1) == 0); - $sent >= 0 ? $sent : undef; + $s = syscall($SYS_sendmsg, fileno($sock), $mh, $flags); + } while ($s < 0 && PublicInbox::CmdIPC4::sendmsg_retry($try)); + $s >= 0 ? $s : undef; }; *recv_cmd4 = sub ($$$) { @@ -446,8 +444,11 @@ no warnings 'once'; $cmsghdr, # msg_control msg_controllen, 0); # msg_flags - my $r = syscall($SYS_recvmsg, fileno($sock), $mh, 0); - if ($r < 0) { # $! is set + my $r; + do { + $r = syscall($SYS_recvmsg, fileno($sock), $mh, 0); + } while ($r < 0 && $!{EINTR}); + if ($r < 0) { $_[1] = ''; return (undef); } diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm index f6c09c3b..9e2d71a0 100644 --- a/lib/PublicInbox/XapClient.pm +++ b/lib/PublicInbox/XapClient.pm @@ -21,7 +21,7 @@ sub mkreq { } my @fds = map fileno($_), @$ios; my $buf = join("\0", @arg, ''); - $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, 0) // + $n = $PublicInbox::IPC::send_cmd->($self->{io}, \@fds, $buf, 0) // die "send_cmd: $!"; $n == length($buf) or die "send_cmd: $n != ".length($buf); $r; diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm index c98708e3..ae907766 100644 --- a/lib/PublicInbox/XapHelper.pm +++ b/lib/PublicInbox/XapHelper.pm @@ -177,7 +177,7 @@ sub recv_loop { my $in = \*STDIN; while (!defined($parent_pid) || getppid == $parent_pid) { PublicInbox::DS::sig_setmask($workerset); - my @fds = PublicInbox::IPC::recv_cmd($in, $rbuf, 4096*33); + my @fds = $PublicInbox::IPC::recv_cmd->($in, $rbuf, 4096*33); scalar(@fds) or exit(66); # EX_NOINPUT die "recvmsg: $!" if !defined($fds[0]); PublicInbox::DS::block_signals(); diff --git a/script/lei b/script/lei index 1d90be0a..087afc33 100755 --- a/script/lei +++ b/script/lei @@ -116,10 +116,7 @@ $SIG{CONT} = sub { send($sock, 'CONT', 0) }; my $x_it_code = 0; while (1) { my (@fds) = $recv_cmd->($sock, my $buf, 4096 * 33); - if (scalar(@fds) == 1 && !defined($fds[0])) { - next if $!{EINTR}; - die "recvmsg: $!"; - } + die "recvmsg: $!" if scalar(@fds) == 1 && !defined($fds[0]); last if $buf eq ''; if ($buf =~ /\Aexec (.+)\z/) { $exec_cmd->(\@fds, split(/\0/, $1)); diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t index e5d22aab..ccf4ca31 100644 --- a/t/cmd_ipc.t +++ b/t/cmd_ipc.t @@ -59,18 +59,20 @@ my $do_test = sub { SKIP: { if ($pid == 0) { # need to loop since Perl signals are racy # (the interpreter doesn't self-pipe) - CORE::kill('ALRM', $tgt) while (tick(0.05)); + my $n = 3; + while (tick(0.01 * $n) && --$n) { + kill('ALRM', $tgt) + } + close $s1; POSIX::_exit(1); } + close $s1; @fds = $recv->($s2, $buf, length($src) + 1); - ok($!{EINTR}, "EINTR set by ($desc)"); - kill('KILL', $pid); waitpid($pid, 0); - is_deeply(\@fds, [ undef ], "EINTR $desc"); + is_deeply(\@fds, [], "EINTR->EOF $desc"); ok($alrm, 'SIGALRM hit'); } - close $s1; @fds = $recv->($s2, $buf, length($src) + 1); is_deeply(\@fds, [], "no FDs on EOF $desc"); is($buf, '', "buffer cleared on EOF ($desc)"); diff --git a/t/xap_helper.t b/t/xap_helper.t index 2303301d..27742cad 100644 --- a/t/xap_helper.t +++ b/t/xap_helper.t @@ -52,8 +52,8 @@ my $doreq = sub { my $buf = join("\0", @arg, ''); my @fds = fileno($y); push @fds, fileno($err) if $err; - my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, 0); - $n // xbail "send: $!"; + my $n = $PublicInbox::IPC::send_cmd->($s, \@fds, $buf, 0) // + xbail "send: $!"; my $exp = length($buf); $exp == $n or xbail "req @arg sent short ($n != $exp)"; $x;