about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-10 12:15:02 +0000
committerEric Wong <e@80x24.org>2021-01-12 03:51:42 +0000
commitc17c44d9e0ef28f0f0521656f335f836ad8b7754 (patch)
tree32ee54ba703a76adbdb91beeba761f18a3d0884b /lib
parenta7e6a8cd68fb6d700337d8dbc7ee2c65ff3d2fc1 (diff)
downloadpublic-inbox-c17c44d9e0ef28f0f0521656f335f836ad8b7754.tar.gz
For another step in in syscall reduction, we'll support
transferring 3 FDs and a buffer with a single sendmsg/recvmsg
syscall using Socket::MsgHdr if available.

Beyond script/lei itself, this will be used for internal IPC
between search backends (perhaps with SOCK_SEQPACKET).  There's
a chance this could make it to the public-facing daemons, too.

This adds an optional dependency on the Socket::MsgHdr package,
available as libsocket-msghdr-perl on Debian-based distros
(but not CentOS 7.x and FreeBSD 11.x, at least).

Our Inline::C version in PublicInbox::Spawn remains the last
choice for script/lei due to the high startup time, and
IO::FDPass remains supported for non-Debian distros.

Since the socket name prefix changes from 3 to 4, we'll also
take this opportunity to make the argv+env buffer transfer less
error-prone by relying on argc instead of designated delimiters.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/CmdIPC1.pm30
-rw-r--r--lib/PublicInbox/CmdIPC4.pm34
-rw-r--r--lib/PublicInbox/LEI.pm45
-rw-r--r--lib/PublicInbox/Spawn.pm45
4 files changed, 116 insertions, 38 deletions
diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm
new file mode 100644
index 00000000..0eed8bed
--- /dev/null
+++ b/lib/PublicInbox/CmdIPC1.pm
@@ -0,0 +1,30 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# callers should use PublicInbox::CmdIPC1->can('send_cmd1') (or recv_cmd1)
+# 2nd choice for lei(1) front-end and 3rd choice for lei internals
+package PublicInbox::CmdIPC1;
+use strict;
+use v5.10.1;
+BEGIN { eval {
+require IO::FDPass; # XS, available in all major distros
+no warnings 'once';
+
+*send_cmd1 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
+        for (1..3) {
+                IO::FDPass::send(fileno($_[0]), $_[$_]) or
+                                        die "IO::FDPass::send: $!";
+        }
+        send($_[0], $_[4], $_[5]) or die "send $!";
+};
+
+*recv_cmd1 = sub ($$$) {
+        my ($s, undef, $len) = @_;
+        my @fds = map { IO::FDPass::recv(fileno($s)) } (0..2);
+        recv($s, $_[1], $len, 0) // die "recv: $!";
+        length($_[1]) == 0 ? () : @fds;
+};
+
+} } # /eval /BEGIN
+
+1;
diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm
new file mode 100644
index 00000000..90fca62d
--- /dev/null
+++ b/lib/PublicInbox/CmdIPC4.pm
@@ -0,0 +1,34 @@
+# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# callers should use PublicInbox::CmdIPC4->can('send_cmd4') (or recv_cmd4)
+# first choice for script/lei front-end and 2nd choice for lei backend
+# libsocket-msghdr-perl is in Debian but many other distros as of 2021.
+package PublicInbox::CmdIPC4;
+use strict;
+use v5.10.1;
+use Socket qw(SOL_SOCKET SCM_RIGHTS);
+BEGIN { eval {
+require Socket::MsgHdr; # XS
+no warnings 'once';
+
+# 3 FDs per-sendmsg(2) + buffer
+*send_cmd4 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
+        my $mh = Socket::MsgHdr->new(buf => $_[4]);
+        $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('iii', @_[1,2,3]));
+        Socket::MsgHdr::sendmsg($_[0], $mh, $_[5]) or die "sendmsg: $!";
+};
+
+*recv_cmd4 = sub ($$$) {
+        my ($s, undef, $len) = @_; # $_[1] = destination buffer
+        my $mh = Socket::MsgHdr->new(buflen => $len, controllen => 256);
+        my $r = Socket::MsgHdr::recvmsg($s, $mh, 0) // die "recvmsg: $!";
+        $_[1] = $mh->buf;
+        return () if $r == 0;
+        my (undef, undef, $data) = $mh->cmsghdr;
+        unpack('iii', $data);
+};
+
+} } # /eval /BEGIN
+
+1;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 12e227d2..1f4ed0f6 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -26,7 +26,7 @@ use Text::Wrap qw(wrap);
 use File::Path qw(mkpath);
 use File::Spec;
 our $quit = \&CORE::exit;
-my $recv_3fds;
+my $recv_cmd;
 my $GLP = Getopt::Long::Parser->new;
 $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
 my $GLP_PASS = Getopt::Long::Parser->new;
@@ -619,8 +619,9 @@ sub accept_dispatch { # Listener {post_accept} callback
         my $self = bless { sock => $sock }, __PACKAGE__;
         vec(my $rin = '', fileno($sock), 1) = 1;
         # `say $sock' triggers "die" in lei(1)
+        my $buf;
         if (select(my $rout = $rin, undef, undef, 1)) {
-                my @fds = $recv_3fds->(fileno($sock));
+                my @fds = $recv_cmd->($sock, $buf, 4096 * 33); # >MAX_ARG_STRLEN
                 if (scalar(@fds) == 3) {
                         my $i = 0;
                         for my $rdr (qw(<&= >&= >&=)) {
@@ -633,7 +634,7 @@ sub accept_dispatch { # Listener {post_accept} callback
                                 }
                         }
                 } else {
-                        say $sock "recv_3fds failed: $!";
+                        say $sock "recv_cmd failed: $!";
                         return;
                 }
         } else {
@@ -641,20 +642,20 @@ sub accept_dispatch { # Listener {post_accept} callback
                 return;
         }
         $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
-        # $ARGV_STR = join("]\0[", @ARGV);
-        # $ENV_STR = join('', map { "$_=$ENV{$_}\0" } keys %ENV);
-        # $line = "$$\0\0>$ARGV_STR\0\0>$ENV_STR\0\0";
-        my ($client_pid, $argv, $env) = do {
-                local $/ = "\0\0\0"; # yes, 3 NULs at EOL, not 2
-                chomp(my $line = <$sock>);
-                split(/\0\0>/, $line, 3);
-        };
-        my %env = map { split(/=/, $_, 2) } split(/\0/, $env);
+        # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
+        # $buf = "$$\0$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
+        if (substr($buf, -2, 2, '') ne "\0\0") { # s/\0\0\z//
+                say $sock "request command truncated";
+                return;
+        }
+        my ($client_pid, $argc, @argv) = split(/\0/, $buf, -1);
+        undef $buf;
+        my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
         if (chdir($env{PWD})) {
                 local %ENV = %env;
                 $self->{env} = \%env;
-                $self->{pid} = $client_pid;
-                eval { dispatch($self, split(/\]\0\[/, $argv)) };
+                $self->{pid} = $client_pid + 0;
+                eval { dispatch($self, @argv) };
                 say $sock $@ if $@;
         } else {
                 say $sock "chdir($env{PWD}): $!"; # implicit close
@@ -692,13 +693,17 @@ sub lazy_start {
         pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
         my $oldset = PublicInbox::DS::block_signals();
         if ($nfd == 1) {
-                require IO::FDPass;
-                $recv_3fds = sub { map { IO::FDPass::recv($_[0]) } (0..2) };
-        } elsif ($nfd == 3) {
-                $recv_3fds = PublicInbox::Spawn->can('recv_3fds');
+                require PublicInbox::CmdIPC1;
+                $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
+        } elsif ($nfd == 4) {
+                $recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do {
+                        require PublicInbox::CmdIPC4;
+                        PublicInbox::CmdIPC4->can('recv_cmd4');
+                };
         }
-        $recv_3fds or die
-                "IO::FDPass missing or Inline::C not installed/configured\n";
+        $recv_cmd or die <<"";
+(Socket::MsgHdr || IO::FDPass || Inline::C) missing/unconfigured (nfd=$nfd);
+
         require PublicInbox::Listener;
         require PublicInbox::EOFpipe;
         (-p STDOUT) or die "E: stdout must be a pipe\n";
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index cd94ba96..7d0d9597 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -201,6 +201,8 @@ void nodatacow_dir(const char *dir)
 }
 SET_NODATACOW
 
+# last choice for script/lei, 1st choice for lei internals
+# compatible with PublicInbox::CmdIPC4
 my $fdpass = <<'FDPASS';
 #include <sys/types.h>
 #include <sys/uio.h>
@@ -213,16 +215,23 @@ union my_cmsg {
         char pad[sizeof(struct cmsghdr)+ 8 + sizeof(struct my_3fds) + 8];
 };
 
-int send_3fds(int sockfd, int infd, int outfd, int errfd)
+int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags)
 {
         struct msghdr msg = { 0 };
         struct iovec iov;
         union my_cmsg cmsg = { 0 };
         int *fdp;
         size_t i;
+        STRLEN dlen = 0;
 
-        iov.iov_base = &msg.msg_namelen; /* whatever */
-        iov.iov_len = 1;
+        if (SvOK(data)) {
+                iov.iov_base = SvPV(data, dlen);
+                iov.iov_len = dlen;
+        }
+        if (!dlen) { /* must be non-zero */
+                iov.iov_base = &msg.msg_namelen; /* whatever */
+                iov.iov_len = 1;
+        }
         msg.msg_iov = &iov;
         msg.msg_iovlen = 1;
         msg.msg_control = &cmsg.hdr;
@@ -232,38 +241,38 @@ int send_3fds(int sockfd, int infd, int outfd, int errfd)
         cmsg.hdr.cmsg_type = SCM_RIGHTS;
         cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(struct my_3fds));
         fdp = (int *)CMSG_DATA(&cmsg.hdr);
-        *fdp++ = infd;
-        *fdp++ = outfd;
-        *fdp++ = errfd;
-        return sendmsg(sockfd, &msg, 0) >= 0;
+        *fdp++ = in;
+        *fdp++ = out;
+        *fdp++ = err;
+        return sendmsg(PerlIO_fileno(s), &msg, flags) >= 0;
 }
 
-void recv_3fds(int sockfd)
+void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
 {
         union my_cmsg cmsg = { 0 };
         struct msghdr msg = { 0 };
         struct iovec iov;
         size_t i;
         Inline_Stack_Vars;
+        Inline_Stack_Reset;
 
-        iov.iov_base = &msg.msg_namelen; /* whatever */
-        iov.iov_len = 1;
+        if (!SvOK(buf))
+                sv_setpvn(buf, "", 0);
+        iov.iov_base = SvGROW(buf, n + 1);
+        iov.iov_len = n;
         msg.msg_iov = &iov;
         msg.msg_iovlen = 1;
         msg.msg_control = &cmsg.hdr;
         msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds));
 
-        if (recvmsg(sockfd, &msg, 0) <= 0)
-                return;
-
-        errno = EDOM;
-        Inline_Stack_Reset;
-        if (cmsg.hdr.cmsg_level == SOL_SOCKET &&
+        i = recvmsg(PerlIO_fileno(s), &msg, 0);
+        if (i < 0)
+                croak("recvmsg: %s", strerror(errno));
+        SvCUR_set(buf, i);
+        if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
                         cmsg.hdr.cmsg_type == SCM_RIGHTS &&
                         cmsg.hdr.cmsg_len == CMSG_LEN(sizeof(struct my_3fds))) {
                 int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
-                size_t i;
-
                 for (i = 0; i < 3; i++)
                         Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
         }