about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-10-06 01:02:58 +0000
committerEric Wong <e@80x24.org>2023-10-06 09:38:08 +0000
commit00fe4ec336dcd8fcf3c45498d7f1ae5c228c6b92 (patch)
tree0a7d2cfe0ccd7cfd1cbc8c15017261e97ae0cb2e
parent14dd9df0f718f8d0815851efe52f3633ec6137b8 (diff)
downloadpublic-inbox-00fe4ec336dcd8fcf3c45498d7f1ae5c228c6b92.tar.gz
This ensures script/lei $send_cmd usage is EINTR-safe (since
I prefer to avoid loading PublicInbox::IPC for startup time).
Overall, it saves us some code, too.
-rw-r--r--lib/PublicInbox/CmdIPC4.pm24
-rw-r--r--lib/PublicInbox/IPC.pm26
-rw-r--r--lib/PublicInbox/LEI.pm6
-rw-r--r--lib/PublicInbox/LeiSelfSocket.pm3
-rw-r--r--lib/PublicInbox/Spawn.pm41
-rw-r--r--lib/PublicInbox/Syscall.pm21
-rw-r--r--lib/PublicInbox/XapClient.pm2
-rw-r--r--lib/PublicInbox/XapHelper.pm2
-rwxr-xr-xscript/lei5
-rw-r--r--t/cmd_ipc.t12
-rw-r--r--t/xap_helper.t6
11 files changed, 73 insertions, 75 deletions
diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm
index 4bc4c729..2f102ec6 100644
--- a/lib/PublicInbox/CmdIPC4.pm
+++ b/lib/PublicInbox/CmdIPC4.pm
@@ -7,6 +7,16 @@
 package PublicInbox::CmdIPC4;
 use v5.12;
 use Socket qw(SOL_SOCKET SCM_RIGHTS);
+
+sub sendmsg_retry ($) {
+        return 1 if $!{EINTR};
+        return unless ($!{ENOMEM} || $!{ENOBUFS} || $!{ETOOMANYREFS});
+        return if ++$_[0] >= 50;
+        warn "# sleeping on sendmsg: $! (#$_[0])\n";
+        select(undef, undef, undef, 0.1);
+        1;
+}
+
 BEGIN { eval {
 require Socket::MsgHdr; # XS
 no warnings 'once';
@@ -20,21 +30,21 @@ no warnings 'once';
         my $try = 0;
         do {
                 $s = Socket::MsgHdr::sendmsg($sock, $mh, $flags);
-        } while (!defined($s) &&
-                        ($!{ENOBUFS} || $!{ENOMEM} || $!{ETOOMANYREFS}) &&
-                        (++$try < 50) &&
-                        warn "# sleeping on sendmsg: $! (#$try)\n" &&
-                        select(undef, undef, undef, 0.1) == 0);
+        } while (!defined($s) && sendmsg_retry($try));
         $s;
 };
 
 *recv_cmd4 = sub ($$$) {
         my ($s, undef, $len) = @_; # $_[1] = destination buffer
         my $mh = Socket::MsgHdr->new(buflen => $len, controllen => 256);
-        my $r = Socket::MsgHdr::recvmsg($s, $mh, 0) // do {
+        my $r;
+        do {
+                $r = Socket::MsgHdr::recvmsg($s, $mh, 0);
+        } while (!defined($r) && $!{EINTR});
+        if (!defined($r)) {
                 $_[1] = '';
                 return (undef);
-        };
+        }
         $_[1] = $mh->buf;
         return () if $r == 0;
         my (undef, undef, $data) = $mh->cmsghdr;
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 9b4b1508..839281b2 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -204,27 +204,9 @@ sub ipc_sibling_atfork_child {
         $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub send_cmd ($$$$) {
-        my ($s, $fds, $buf, $fl) = @_;
-        while (1) {
-                my $n = $send_cmd->($s, $fds, $buf, $fl);
-                next if !defined($n) && $!{EINTR};
-                return $n;
-        }
-}
-
-sub recv_cmd ($$$) {
-        my ($s, undef, $len) = @_; # $_[1] is $buf
-        while (1) {
-                my @fds = $recv_cmd->($s, $_[1], $len);
-                next if scalar(@fds) == 1 && !defined($fds[0]) && $!{EINTR};
-                return @fds;
-        }
-}
-
 sub recv_and_run {
         my ($self, $s2, $len, $full_stream) = @_;
-        my @fds = recv_cmd($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
+        my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
         return if scalar(@fds) && !defined($fds[0]);
         my $n = length($buf) or return 0;
         my $nfd = 0;
@@ -291,11 +273,11 @@ sub stream_in_full ($$$) {
         my ($s1, $fds, $buf) = @_;
         socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
                 croak "socketpair: $!";
-        my $n = send_cmd($s1, [ fileno($r) ],
+        my $n = $send_cmd->($s1, [ fileno($r) ],
                         ipc_freeze(['do_sock_stream', length($buf)]),
                         0) // croak "sendmsg: $!";
         undef $r;
-        $n = send_cmd($w, $fds, $buf, 0) // croak "sendmsg: $!";
+        $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
         while ($n < length($buf)) {
                 my $x = syswrite($w, $buf, length($buf) - $n, $n);
                 if (!defined($n)) {
@@ -315,7 +297,7 @@ sub wq_io_do { # always async
                 if (length($buf) > $MY_MAX_ARG_STRLEN) {
                         stream_in_full($s1, $fds, $buf);
                 } else {
-                        my $n = send_cmd $s1, $fds, $buf, 0;
+                        my $n = $send_cmd->($s1, $fds, $buf, 0);
                         return if defined($n); # likely
                         $!{ETOOMANYREFS} and
                                 croak "sendmsg: $! (check RLIMIT_NOFILE)";
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index e300f0a4..f8bcd43d 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1041,7 +1041,7 @@ sub start_mua {
 
 sub send_exec_cmd { # tell script/lei to execute a command
         my ($self, $io, $cmd, $env) = @_;
-        PublicInbox::IPC::send_cmd(
+        $PublicInbox::IPC::send_cmd->(
                         $self->{sock} // die('lei client gone'),
                         [ map { fileno($_) } @$io ],
                         exec_buf($cmd, $env), 0) //
@@ -1139,7 +1139,7 @@ sub accept_dispatch { # Listener {post_accept} callback
         select($rvec, undef, undef, 60) or
                 return send($sock, 'timed out waiting to recv FDs', 0);
         # (4096 * 33) >MAX_ARG_STRLEN
-        my @fds = PublicInbox::IPC::recv_cmd($sock, my $buf, 4096 * 33) or
+        my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or
                 return; # EOF
         if (!defined($fds[0])) {
                 warn(my $msg = "recv_cmd failed: $!");
@@ -1178,7 +1178,7 @@ sub event_step {
         local %ENV = %{$self->{env}};
         local $current_lei = $self;
         eval {
-                my @fds = PublicInbox::IPC::recv_cmd(
+                my @fds = $PublicInbox::IPC::recv_cmd->(
                         $self->{sock} // return, my $buf, 4096);
                 if (scalar(@fds) == 1 && !defined($fds[0])) {
                         return if $! == EAGAIN;
diff --git a/lib/PublicInbox/LeiSelfSocket.pm b/lib/PublicInbox/LeiSelfSocket.pm
index b8745252..0e15bc7c 100644
--- a/lib/PublicInbox/LeiSelfSocket.pm
+++ b/lib/PublicInbox/LeiSelfSocket.pm
@@ -21,7 +21,8 @@ sub new {
 
 sub event_step {
         my ($self) = @_;
-        my @fds = PublicInbox::IPC::recv_cmd($self->{sock}, my $buf, 4096 * 33);
+        my ($buf, @fds);
+        @fds = $PublicInbox::IPC::recv_cmd->($self->{sock}, $buf, 4096 * 33);
         if (scalar(@fds) == 1 && !defined($fds[0])) {
                 return if $!{EAGAIN};
                 die "recvmsg: $!" unless $!{ECONNRESET};
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index bb2abe28..4c7e0f80 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -173,19 +173,20 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref,
         return (int)pid;
 }
 
-static int sleep_wait(unsigned *tries, int err)
+static int sendmsg_retry(unsigned *tries)
 {
         const struct timespec req = { 0, 100000000 }; /* 100ms */
+        int err = errno;
         switch (err) {
+        case EINTR: PERL_ASYNC_CHECK(); return 1;
         case ENOBUFS: case ENOMEM: case ETOOMANYREFS:
-                if (++*tries < 50) {
-                        fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n",
-                                strerror(err), *tries);
-                        nanosleep(&req, NULL);
-                        return 1;
-                }
-        default:
-                return 0;
+                if (++*tries >= 50) return 0;
+                fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n",
+                        strerror(err), *tries);
+                nanosleep(&req, NULL);
+                PERL_ASYNC_CHECK();
+                return 1;
+        default: return 0;
         }
 }
 
@@ -237,7 +238,7 @@ SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
         }
         do {
                 sent = sendmsg(PerlIO_fileno(s), &msg, flags);
-        } while (sent < 0 && sleep_wait(&tries, errno));
+        } while (sent < 0 && sendmsg_retry(&tries));
         return sent >= 0 ? newSViv(sent) : &PL_sv_undef;
 }
 
@@ -259,20 +260,24 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
         msg.msg_control = &cmsg.hdr;
         msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE);
 
-        i = recvmsg(PerlIO_fileno(s), &msg, 0);
+        for (;;) {
+                i = recvmsg(PerlIO_fileno(s), &msg, 0);
+                if (i >= 0 || errno != EINTR) break;
+                PERL_ASYNC_CHECK();
+        }
         if (i >= 0) {
                 SvCUR_set(buf, i);
+                if (cmsg.hdr.cmsg_level == SOL_SOCKET &&
+                                cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+                        size_t len = cmsg.hdr.cmsg_len;
+                        int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
+                        for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
+                                Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
+                }
         } else {
                 Inline_Stack_Push(&PL_sv_undef);
                 SvCUR_set(buf, 0);
         }
-        if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
-                        cmsg.hdr.cmsg_type == SCM_RIGHTS) {
-                size_t len = cmsg.hdr.cmsg_len;
-                int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
-                for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
-                        Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
-        }
         Inline_Stack_Done;
 }
 #endif /* defined(CMSG_SPACE) && defined(CMSG_LEN) */
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index 4cf45d0f..e83beb6a 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -394,6 +394,8 @@ use constant msg_controllen => CMSG_SPACE(10 * SIZEOF_int) + 16; # 10 FDs
 
 if (defined($SYS_sendmsg) && defined($SYS_recvmsg)) {
 no warnings 'once';
+require PublicInbox::CmdIPC4;
+
 *send_cmd4 = sub ($$$$) {
         my ($sock, $fds, undef, $flags) = @_;
         my $iov = pack('P'.TMPL_size_t,
@@ -418,16 +420,12 @@ no warnings 'once';
                         $cmsghdr, # msg_control
                         $msg_controllen,
                         0); # msg_flags
-        my $sent;
+        my $s;
         my $try = 0;
         do {
-                $sent = syscall($SYS_sendmsg, fileno($sock), $mh, $flags);
-        } while ($sent < 0 &&
-                        ($!{ENOBUFS} || $!{ENOMEM} || $!{ETOOMANYREFS}) &&
-                        (++$try < 50) &&
-                        warn "# sleeping on sendmsg: $! (#$try)\n" &&
-                        select(undef, undef, undef, 0.1) == 0);
-        $sent >= 0 ? $sent : undef;
+                $s = syscall($SYS_sendmsg, fileno($sock), $mh, $flags);
+        } while ($s < 0 && PublicInbox::CmdIPC4::sendmsg_retry($try));
+        $s >= 0 ? $s : undef;
 };
 
 *recv_cmd4 = sub ($$$) {
@@ -446,8 +444,11 @@ no warnings 'once';
                         $cmsghdr, # msg_control
                         msg_controllen,
                         0); # msg_flags
-        my $r = syscall($SYS_recvmsg, fileno($sock), $mh, 0);
-        if ($r < 0) { # $! is set
+        my $r;
+        do {
+                $r = syscall($SYS_recvmsg, fileno($sock), $mh, 0);
+        } while ($r < 0 && $!{EINTR});
+        if ($r < 0) {
                 $_[1] = '';
                 return (undef);
         }
diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm
index f6c09c3b..9e2d71a0 100644
--- a/lib/PublicInbox/XapClient.pm
+++ b/lib/PublicInbox/XapClient.pm
@@ -21,7 +21,7 @@ sub mkreq {
         }
         my @fds = map fileno($_), @$ios;
         my $buf = join("\0", @arg, '');
-        $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, 0) //
+        $n = $PublicInbox::IPC::send_cmd->($self->{io}, \@fds, $buf, 0) //
                 die "send_cmd: $!";
         $n == length($buf) or die "send_cmd: $n != ".length($buf);
         $r;
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index c98708e3..ae907766 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -177,7 +177,7 @@ sub recv_loop {
         my $in = \*STDIN;
         while (!defined($parent_pid) || getppid == $parent_pid) {
                 PublicInbox::DS::sig_setmask($workerset);
-                my @fds = PublicInbox::IPC::recv_cmd($in, $rbuf, 4096*33);
+                my @fds = $PublicInbox::IPC::recv_cmd->($in, $rbuf, 4096*33);
                 scalar(@fds) or exit(66); # EX_NOINPUT
                 die "recvmsg: $!" if !defined($fds[0]);
                 PublicInbox::DS::block_signals();
diff --git a/script/lei b/script/lei
index 1d90be0a..087afc33 100755
--- a/script/lei
+++ b/script/lei
@@ -116,10 +116,7 @@ $SIG{CONT} = sub { send($sock, 'CONT', 0) };
 my $x_it_code = 0;
 while (1) {
         my (@fds) = $recv_cmd->($sock, my $buf, 4096 * 33);
-        if (scalar(@fds) == 1 && !defined($fds[0])) {
-                next if $!{EINTR};
-                die "recvmsg: $!";
-        }
+        die "recvmsg: $!" if scalar(@fds) == 1 && !defined($fds[0]);
         last if $buf eq '';
         if ($buf =~ /\Aexec (.+)\z/) {
                 $exec_cmd->(\@fds, split(/\0/, $1));
diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t
index e5d22aab..ccf4ca31 100644
--- a/t/cmd_ipc.t
+++ b/t/cmd_ipc.t
@@ -59,18 +59,20 @@ my $do_test = sub { SKIP: {
                         if ($pid == 0) {
                                 # need to loop since Perl signals are racy
                                 # (the interpreter doesn't self-pipe)
-                                CORE::kill('ALRM', $tgt) while (tick(0.05));
+                                my $n = 3;
+                                while (tick(0.01 * $n) && --$n) {
+                                        kill('ALRM', $tgt)
+                                }
+                                close $s1;
                                 POSIX::_exit(1);
                         }
+                        close $s1;
                         @fds = $recv->($s2, $buf, length($src) + 1);
-                        ok($!{EINTR}, "EINTR set by ($desc)");
-                        kill('KILL', $pid);
                         waitpid($pid, 0);
-                        is_deeply(\@fds, [ undef ], "EINTR $desc");
+                        is_deeply(\@fds, [], "EINTR->EOF $desc");
                         ok($alrm, 'SIGALRM hit');
                 }
 
-                close $s1;
                 @fds = $recv->($s2, $buf, length($src) + 1);
                 is_deeply(\@fds, [], "no FDs on EOF $desc");
                 is($buf, '', "buffer cleared on EOF ($desc)");
diff --git a/t/xap_helper.t b/t/xap_helper.t
index 2303301d..7890392d 100644
--- a/t/xap_helper.t
+++ b/t/xap_helper.t
@@ -7,7 +7,7 @@ require_mods(qw(DBD::SQLite Xapian +SCM_RIGHTS)); # TODO: FIFO support?
 use PublicInbox::Spawn qw(spawn);
 use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM);
 require PublicInbox::AutoReap;
-require PublicInbox::IPC;
+use PublicInbox::IPC;
 require PublicInbox::XapClient;
 use autodie;
 my ($tmp, $for_destroy) = tmpdir();
@@ -52,8 +52,8 @@ my $doreq = sub {
         my $buf = join("\0", @arg, '');
         my @fds = fileno($y);
         push @fds, fileno($err) if $err;
-        my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, 0);
-        $n // xbail "send: $!";
+        my $n = $PublicInbox::IPC::send_cmd->($s, \@fds, $buf, 0) //
+                xbail "send: $!";
         my $exp = length($buf);
         $exp == $n or xbail "req @arg sent short ($n != $exp)";
         $x;