* [PATCH 05/22] cmd_ipc: send FDs with buffer payload
2021-01-10 12:14 7% [PATCH 00/22] lei query overview views Eric Wong
@ 2021-01-10 12:15 3% ` Eric Wong
0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
To: meta
For another step in in syscall reduction, we'll support
transferring 3 FDs and a buffer with a single sendmsg/recvmsg
syscall using Socket::MsgHdr if available.
Beyond script/lei itself, this will be used for internal IPC
between search backends (perhaps with SOCK_SEQPACKET). There's
a chance this could make it to the public-facing daemons, too.
This adds an optional dependency on the Socket::MsgHdr package,
available as libsocket-msghdr-perl on Debian-based distros
(but not CentOS 7.x and FreeBSD 11.x, at least).
Our Inline::C version in PublicInbox::Spawn remains the last
choice for script/lei due to the high startup time, and
IO::FDPass remains supported for non-Debian distros.
Since the socket name prefix changes from 3 to 4, we'll also
take this opportunity to make the argv+env buffer transfer less
error-prone by relying on argc instead of designated delimiters.
---
MANIFEST | 3 ++
lib/PublicInbox/CmdIPC1.pm | 30 +++++++++++++
lib/PublicInbox/CmdIPC4.pm | 34 ++++++++++++++
lib/PublicInbox/LEI.pm | 45 ++++++++++---------
lib/PublicInbox/Spawn.pm | 45 +++++++++++--------
script/lei | 53 +++++++++++-----------
t/cmd_ipc.t | 90 ++++++++++++++++++++++++++++++++++++++
t/lei.t | 22 +++++++---
t/spawn.t | 29 ------------
9 files changed, 250 insertions(+), 101 deletions(-)
create mode 100644 lib/PublicInbox/CmdIPC1.pm
create mode 100644 lib/PublicInbox/CmdIPC4.pm
create mode 100644 t/cmd_ipc.t
diff --git a/MANIFEST b/MANIFEST
index 609160dd..62c14cd2 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -109,6 +109,8 @@ lib/PublicInbox/Admin.pm
lib/PublicInbox/AdminEdit.pm
lib/PublicInbox/AltId.pm
lib/PublicInbox/Cgit.pm
+lib/PublicInbox/CmdIPC1.pm
+lib/PublicInbox/CmdIPC4.pm
lib/PublicInbox/CompressNoop.pm
lib/PublicInbox/Config.pm
lib/PublicInbox/ConfigIter.pm
@@ -275,6 +277,7 @@ t/altid.t
t/altid_v2.t
t/cgi.t
t/check-www-inbox.perl
+t/cmd_ipc.t
t/config.t
t/config_limiter.t
t/content_hash.t
diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm
new file mode 100644
index 00000000..0eed8bed
--- /dev/null
+++ b/lib/PublicInbox/CmdIPC1.pm
@@ -0,0 +1,30 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# callers should use PublicInbox::CmdIPC1->can('send_cmd1') (or recv_cmd1)
+# 2nd choice for lei(1) front-end and 3rd choice for lei internals
+package PublicInbox::CmdIPC1;
+use strict;
+use v5.10.1;
+BEGIN { eval {
+require IO::FDPass; # XS, available in all major distros
+no warnings 'once';
+
+*send_cmd1 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
+ for (1..3) {
+ IO::FDPass::send(fileno($_[0]), $_[$_]) or
+ die "IO::FDPass::send: $!";
+ }
+ send($_[0], $_[4], $_[5]) or die "send $!";
+};
+
+*recv_cmd1 = sub ($$$) {
+ my ($s, undef, $len) = @_;
+ my @fds = map { IO::FDPass::recv(fileno($s)) } (0..2);
+ recv($s, $_[1], $len, 0) // die "recv: $!";
+ length($_[1]) == 0 ? () : @fds;
+};
+
+} } # /eval /BEGIN
+
+1;
diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm
new file mode 100644
index 00000000..90fca62d
--- /dev/null
+++ b/lib/PublicInbox/CmdIPC4.pm
@@ -0,0 +1,34 @@
+# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# callers should use PublicInbox::CmdIPC4->can('send_cmd4') (or recv_cmd4)
+# first choice for script/lei front-end and 2nd choice for lei backend
+# libsocket-msghdr-perl is in Debian but many other distros as of 2021.
+package PublicInbox::CmdIPC4;
+use strict;
+use v5.10.1;
+use Socket qw(SOL_SOCKET SCM_RIGHTS);
+BEGIN { eval {
+require Socket::MsgHdr; # XS
+no warnings 'once';
+
+# 3 FDs per-sendmsg(2) + buffer
+*send_cmd4 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
+ my $mh = Socket::MsgHdr->new(buf => $_[4]);
+ $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('iii', @_[1,2,3]));
+ Socket::MsgHdr::sendmsg($_[0], $mh, $_[5]) or die "sendmsg: $!";
+};
+
+*recv_cmd4 = sub ($$$) {
+ my ($s, undef, $len) = @_; # $_[1] = destination buffer
+ my $mh = Socket::MsgHdr->new(buflen => $len, controllen => 256);
+ my $r = Socket::MsgHdr::recvmsg($s, $mh, 0) // die "recvmsg: $!";
+ $_[1] = $mh->buf;
+ return () if $r == 0;
+ my (undef, undef, $data) = $mh->cmsghdr;
+ unpack('iii', $data);
+};
+
+} } # /eval /BEGIN
+
+1;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 12e227d2..1f4ed0f6 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -26,7 +26,7 @@ use Text::Wrap qw(wrap);
use File::Path qw(mkpath);
use File::Spec;
our $quit = \&CORE::exit;
-my $recv_3fds;
+my $recv_cmd;
my $GLP = Getopt::Long::Parser->new;
$GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
my $GLP_PASS = Getopt::Long::Parser->new;
@@ -619,8 +619,9 @@ sub accept_dispatch { # Listener {post_accept} callback
my $self = bless { sock => $sock }, __PACKAGE__;
vec(my $rin = '', fileno($sock), 1) = 1;
# `say $sock' triggers "die" in lei(1)
+ my $buf;
if (select(my $rout = $rin, undef, undef, 1)) {
- my @fds = $recv_3fds->(fileno($sock));
+ my @fds = $recv_cmd->($sock, $buf, 4096 * 33); # >MAX_ARG_STRLEN
if (scalar(@fds) == 3) {
my $i = 0;
for my $rdr (qw(<&= >&= >&=)) {
@@ -633,7 +634,7 @@ sub accept_dispatch { # Listener {post_accept} callback
}
}
} else {
- say $sock "recv_3fds failed: $!";
+ say $sock "recv_cmd failed: $!";
return;
}
} else {
@@ -641,20 +642,20 @@ sub accept_dispatch { # Listener {post_accept} callback
return;
}
$self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
- # $ARGV_STR = join("]\0[", @ARGV);
- # $ENV_STR = join('', map { "$_=$ENV{$_}\0" } keys %ENV);
- # $line = "$$\0\0>$ARGV_STR\0\0>$ENV_STR\0\0";
- my ($client_pid, $argv, $env) = do {
- local $/ = "\0\0\0"; # yes, 3 NULs at EOL, not 2
- chomp(my $line = <$sock>);
- split(/\0\0>/, $line, 3);
- };
- my %env = map { split(/=/, $_, 2) } split(/\0/, $env);
+ # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
+ # $buf = "$$\0$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
+ if (substr($buf, -2, 2, '') ne "\0\0") { # s/\0\0\z//
+ say $sock "request command truncated";
+ return;
+ }
+ my ($client_pid, $argc, @argv) = split(/\0/, $buf, -1);
+ undef $buf;
+ my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
if (chdir($env{PWD})) {
local %ENV = %env;
$self->{env} = \%env;
- $self->{pid} = $client_pid;
- eval { dispatch($self, split(/\]\0\[/, $argv)) };
+ $self->{pid} = $client_pid + 0;
+ eval { dispatch($self, @argv) };
say $sock $@ if $@;
} else {
say $sock "chdir($env{PWD}): $!"; # implicit close
@@ -692,13 +693,17 @@ sub lazy_start {
pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
my $oldset = PublicInbox::DS::block_signals();
if ($nfd == 1) {
- require IO::FDPass;
- $recv_3fds = sub { map { IO::FDPass::recv($_[0]) } (0..2) };
- } elsif ($nfd == 3) {
- $recv_3fds = PublicInbox::Spawn->can('recv_3fds');
+ require PublicInbox::CmdIPC1;
+ $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
+ } elsif ($nfd == 4) {
+ $recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do {
+ require PublicInbox::CmdIPC4;
+ PublicInbox::CmdIPC4->can('recv_cmd4');
+ };
}
- $recv_3fds or die
- "IO::FDPass missing or Inline::C not installed/configured\n";
+ $recv_cmd or die <<"";
+(Socket::MsgHdr || IO::FDPass || Inline::C) missing/unconfigured (nfd=$nfd);
+
require PublicInbox::Listener;
require PublicInbox::EOFpipe;
(-p STDOUT) or die "E: stdout must be a pipe\n";
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index cd94ba96..7d0d9597 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -201,6 +201,8 @@ void nodatacow_dir(const char *dir)
}
SET_NODATACOW
+# last choice for script/lei, 1st choice for lei internals
+# compatible with PublicInbox::CmdIPC4
my $fdpass = <<'FDPASS';
#include <sys/types.h>
#include <sys/uio.h>
@@ -213,16 +215,23 @@ union my_cmsg {
char pad[sizeof(struct cmsghdr)+ 8 + sizeof(struct my_3fds) + 8];
};
-int send_3fds(int sockfd, int infd, int outfd, int errfd)
+int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags)
{
struct msghdr msg = { 0 };
struct iovec iov;
union my_cmsg cmsg = { 0 };
int *fdp;
size_t i;
+ STRLEN dlen = 0;
- iov.iov_base = &msg.msg_namelen; /* whatever */
- iov.iov_len = 1;
+ if (SvOK(data)) {
+ iov.iov_base = SvPV(data, dlen);
+ iov.iov_len = dlen;
+ }
+ if (!dlen) { /* must be non-zero */
+ iov.iov_base = &msg.msg_namelen; /* whatever */
+ iov.iov_len = 1;
+ }
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = &cmsg.hdr;
@@ -232,38 +241,38 @@ int send_3fds(int sockfd, int infd, int outfd, int errfd)
cmsg.hdr.cmsg_type = SCM_RIGHTS;
cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(struct my_3fds));
fdp = (int *)CMSG_DATA(&cmsg.hdr);
- *fdp++ = infd;
- *fdp++ = outfd;
- *fdp++ = errfd;
- return sendmsg(sockfd, &msg, 0) >= 0;
+ *fdp++ = in;
+ *fdp++ = out;
+ *fdp++ = err;
+ return sendmsg(PerlIO_fileno(s), &msg, flags) >= 0;
}
-void recv_3fds(int sockfd)
+void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
{
union my_cmsg cmsg = { 0 };
struct msghdr msg = { 0 };
struct iovec iov;
size_t i;
Inline_Stack_Vars;
+ Inline_Stack_Reset;
- iov.iov_base = &msg.msg_namelen; /* whatever */
- iov.iov_len = 1;
+ if (!SvOK(buf))
+ sv_setpvn(buf, "", 0);
+ iov.iov_base = SvGROW(buf, n + 1);
+ iov.iov_len = n;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = &cmsg.hdr;
msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds));
- if (recvmsg(sockfd, &msg, 0) <= 0)
- return;
-
- errno = EDOM;
- Inline_Stack_Reset;
- if (cmsg.hdr.cmsg_level == SOL_SOCKET &&
+ i = recvmsg(PerlIO_fileno(s), &msg, 0);
+ if (i < 0)
+ croak("recvmsg: %s", strerror(errno));
+ SvCUR_set(buf, i);
+ if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
cmsg.hdr.cmsg_type == SCM_RIGHTS &&
cmsg.hdr.cmsg_len == CMSG_LEN(sizeof(struct my_3fds))) {
int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
- size_t i;
-
for (i = 0; i < 3; i++)
Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
}
diff --git a/script/lei b/script/lei
index 2ea98da4..d954b9eb 100755
--- a/script/lei
+++ b/script/lei
@@ -4,17 +4,20 @@
use strict;
use v5.10.1;
use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
-my ($send_3fds, $nfd);
-if (my ($sock, $pwd) = eval {
- $send_3fds = eval {
- require IO::FDPass;
- $nfd = 1; # 1 FD per-sendmsg
- sub { IO::FDPass::send($_[0], $_[$_]) for (1..3) }
- } // do {
- require PublicInbox::Spawn; # takes ~50ms even if built *sigh*
- $nfd = 3; # 3 FDs per-sendmsg(2)
- PublicInbox::Spawn->can('send_3fds');
- } // die "IO::FDPass missing or Inline::C not installed/configured\n";
+use PublicInbox::CmdIPC4;
+my $narg = 4;
+my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do {
+ require PublicInbox::CmdIPC1; # 2nd choice
+ $narg = 1;
+ PublicInbox::CmdIPC1->can('send_cmd1');
+} // do {
+ require PublicInbox::Spawn; # takes ~50ms even if built *sigh*
+ $narg = 4;
+ PublicInbox::Spawn->can('send_cmd4');
+};
+
+my ($sock, $pwd);
+if ($send_cmd && eval {
my $path = do {
my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei';
if ($runtime_dir eq '/lei') {
@@ -25,29 +28,27 @@ if (my ($sock, $pwd) = eval {
require File::Path;
File::Path::mkpath($runtime_dir, 0, 0700);
}
- "$runtime_dir/$nfd.sock";
+ "$runtime_dir/$narg.sock";
};
my $addr = pack_sockaddr_un($path);
- socket(my $sock, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
+ socket($sock, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
unless (connect($sock, $addr)) { # start the daemon if not started
local $ENV{PERL5LIB} = join(':', @INC);
open(my $daemon, '-|', $^X, qw[-MPublicInbox::LEI
-E PublicInbox::LEI::lazy_start(@ARGV)],
- $path, $! + 0, $nfd) or die "popen: $!";
+ $path, $! + 0, $narg) or die "popen: $!";
while (<$daemon>) { warn $_ } # EOF when STDERR is redirected
close($daemon) or warn <<"";
lei-daemon could not start, exited with \$?=$?
# try connecting again anyways, unlink+bind may be racy
- unless (connect($sock, $addr)) {
- die <<"";
+ connect($sock, $addr) or die <<"";
connect($path): $! (after attempted daemon start)
Falling back to (slow) one-shot mode
- }
}
require Cwd;
- my $pwd = $ENV{PWD} // '';
+ $pwd = $ENV{PWD} // '';
my $cwd = Cwd::fastcwd() // die "fastcwd(PWD=$pwd): $!";
if ($pwd ne $cwd) { # prefer ENV{PWD} if it's a symlink to real cwd
my @st_cwd = stat($cwd) or die "stat(cwd=$cwd): $!";
@@ -58,23 +59,21 @@ Falling back to (slow) one-shot mode
} else {
$pwd = $cwd;
}
- ($sock, $pwd);
-}) { # IO::FDPass, $sock, $pwd are all available:
+ 1;
+}) { # (Socket::MsgHdr|IO::FDPass|Inline::C), $sock, $pwd are all available:
local $ENV{PWD} = $pwd;
- my $buf = "$$\0\0>" . join("]\0[", @ARGV) . "\0\0>";
- while (my ($k, $v) = each %ENV) { $buf .= "$k=$v\0" }
+ my $buf = join("\0", $$, scalar(@ARGV), @ARGV);
+ while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
$buf .= "\0\0";
select $sock;
$| = 1; # unbuffer selected $sock
- $send_3fds->(fileno($sock), 0, 1, 2);
- print $sock $buf or die "print(sock, buf): $!";
+ $send_cmd->($sock, 0, 1, 2, $buf, 0);
while ($buf = <$sock>) {
$buf =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0);
die $buf;
}
-} else { # for systems lacking IO::FDPass
- # don't warn about IO::FDPass since it's not commonly installed
- warn $@ if $@ && index($@, 'IO::FDPass') < 0;
+} else { # for systems lacking Socket::MsgHdr, IO::FDPass or Inline::C
+ warn $@ if $@;
require PublicInbox::LEI;
PublicInbox::LEI::oneshot(__PACKAGE__);
}
diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t
new file mode 100644
index 00000000..b9f4d128
--- /dev/null
+++ b/t/cmd_ipc.t
@@ -0,0 +1,90 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use Socket qw(AF_UNIX SOCK_STREAM MSG_EOR);
+pipe(my ($r, $w)) or BAIL_OUT;
+my ($send, $recv);
+require_ok 'PublicInbox::Spawn';
+my $SOCK_SEQPACKET = eval { Socket::SOCK_SEQPACKET() } // undef;
+
+my $do_test = sub { SKIP: {
+ my ($type, $flag, $desc) = @_;
+ defined $type or skip 'SOCK_SEQPACKET missing', 7;
+ my ($s1, $s2);
+ my $src = 'some payload' x 40;
+ socketpair($s1, $s2, AF_UNIX, $type, 0) or BAIL_OUT $!;
+ $send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag);
+ my (@fds) = $recv->($s2, my $buf, length($src) + 1);
+ is($buf, $src, 'got buffer payload '.$desc);
+ my ($r1, $w1, $s1a);
+ my $opens = sub {
+ ok(open($r1, '<&=', $fds[0]), 'opened received $r');
+ ok(open($w1, '>&=', $fds[1]), 'opened received $w');
+ ok(open($s1a, '+>&=', $fds[2]), 'opened received $s1');
+ };
+ $opens->();
+ my @exp = stat $r;
+ my @cur = stat $r1;
+ is("$exp[0]\0$exp[1]", "$cur[0]\0$cur[1]", '$r dev/ino matches');
+ @exp = stat $w;
+ @cur = stat $w1;
+ is("$exp[0]\0$exp[1]", "$cur[0]\0$cur[1]", '$w dev/ino matches');
+ @exp = stat $s1;
+ @cur = stat $s1a;
+ is("$exp[0]\0$exp[1]", "$cur[0]\0$cur[1]", '$s1 dev/ino matches');
+ if (defined($SOCK_SEQPACKET) && $type == $SOCK_SEQPACKET) {
+ $r1 = $w1 = $s1a = undef;
+ $src = (',' x 1023) . '-' .('.' x 1024);
+ $send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag);
+ (@fds) = $recv->($s2, $buf, 1024);
+ is($buf, (',' x 1023) . '-', 'silently truncated buf');
+ $opens->();
+ $r1 = $w1 = $s1a = undef;
+ close $s1;
+ @fds = $recv->($s2, $buf, length($src) + 1);
+ is_deeply(\@fds, [], "no FDs on EOF $desc");
+ is($buf, '', "buffer cleared on EOF ($desc)");
+
+ }
+} };
+
+my $send_ic = PublicInbox::Spawn->can('send_cmd4');
+my $recv_ic = PublicInbox::Spawn->can('recv_cmd4');
+SKIP: {
+ ($send_ic && $recv_ic) or skip 'Inline::C not installed/enabled', 12;
+ $send = $send_ic;
+ $recv = $recv_ic;
+ $do_test->(SOCK_STREAM, 0, 'Inline::C stream');
+ $do_test->($SOCK_SEQPACKET, MSG_EOR, 'Inline::C seqpacket');
+}
+
+SKIP: {
+ require_mods('Socket::MsgHdr', 13);
+ require_ok 'PublicInbox::CmdIPC4';
+ $send = PublicInbox::CmdIPC4->can('send_cmd4');
+ $recv = PublicInbox::CmdIPC4->can('recv_cmd4');
+ $do_test->(SOCK_STREAM, 0, 'MsgHdr stream');
+ $do_test->($SOCK_SEQPACKET, MSG_EOR, 'MsgHdr seqpacket');
+ SKIP: {
+ ($send_ic && $recv_ic) or
+ skip 'Inline::C not installed/enabled', 12;
+ $recv = $recv_ic;
+ $do_test->(SOCK_STREAM, 0, 'Inline::C -> MsgHdr stream');
+ $do_test->($SOCK_SEQPACKET, 0, 'Inline::C -> MsgHdr seqpacket');
+ }
+}
+
+SKIP: {
+ require_mods('IO::FDPass', 13);
+ require_ok 'PublicInbox::CmdIPC1';
+ $send = PublicInbox::CmdIPC1->can('send_cmd1');
+ $recv = PublicInbox::CmdIPC1->can('recv_cmd1');
+ $do_test->(SOCK_STREAM, 0, 'IO::FDPass stream');
+ $do_test->($SOCK_SEQPACKET, MSG_EOR, 'IO::FDPass seqpacket');
+}
+
+done_testing;
diff --git a/t/lei.t b/t/lei.t
index 72c50308..992800a5 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -10,6 +10,8 @@ use File::Path qw(rmtree);
require_git 2.6;
require_mods(qw(json DBD::SQLite Search::Xapian));
my $opt = { 1 => \(my $out = ''), 2 => \(my $err = '') };
+my ($home, $for_destroy) = tmpdir();
+my $err_filter;
my $lei = sub {
my ($cmd, $env, $xopt) = @_;
$out = $err = '';
@@ -17,10 +19,12 @@ my $lei = sub {
($env, $xopt) = grep { (!defined) || ref } @_;
$cmd = [ grep { defined && !ref } @_ ];
}
- run_script(['lei', @$cmd], $env, $xopt // $opt);
+ my $res = run_script(['lei', @$cmd], $env, $xopt // $opt);
+ $err_filter and
+ $err = join('', grep(!/$err_filter/, split(/^/m, $err)));
+ $res;
};
-my ($home, $for_destroy) = tmpdir();
delete local $ENV{XDG_DATA_HOME};
delete local $ENV{XDG_CONFIG_HOME};
local $ENV{GIT_COMMITTER_EMAIL} = 'lei@example.com';
@@ -195,18 +199,22 @@ my $test_lei_common = sub {
if ($ENV{TEST_LEI_ONESHOT}) {
require_ok 'PublicInbox::LEI';
- # force sun_path[108] overflow, "IO::FDPass" avoids warning
- local $ENV{XDG_RUNTIME_DIR} = "$home/IO::FDPass".('.sun_path' x 108);
+ # force sun_path[108] overflow, ($lei->() filters out this path)
+ my $xrd = "$home/1shot-test".('.sun_path' x 108);
+ local $ENV{XDG_RUNTIME_DIR} = $xrd;
+ $err_filter = qr!\Q$xrd!;
$test_lei_common->();
}
SKIP: { # real socket
require_mods(qw(Cwd), my $nr = 105);
- my $nfd = eval { require IO::FDPass; 1 } // do {
+ my $nfd = eval { require Socket::MsgHdr; 4 } //
+ eval { require IO::FDPass; 1 } // do {
require PublicInbox::Spawn;
- PublicInbox::Spawn->can('send_3fds') ? 3 : undef;
+ PublicInbox::Spawn->can('send_cmd4') ? 4 : undef;
} //
- skip 'IO::FDPass missing or Inline::C not installed/configured', $nr;
+ skip 'Socket::MsgHdr, IO::FDPass or Inline::C missing or unconfigured',
+ $nr;
local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run";
my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.sock";
diff --git a/t/spawn.t b/t/spawn.t
index 558afc28..0eed79bb 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -5,35 +5,6 @@ use warnings;
use Test::More;
use PublicInbox::Spawn qw(which spawn popen_rd);
use PublicInbox::Sigfd;
-use Socket qw(AF_UNIX SOCK_STREAM);
-
-SKIP: {
- my $recv_3fds = PublicInbox::Spawn->can('recv_3fds');
- my $send_3fds = PublicInbox::Spawn->can('send_3fds');
- skip 'Inline::C not enabled', 3 unless $send_3fds && $recv_3fds;
- my ($s1, $s2);
- socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or BAIL_OUT $!;
- pipe(my ($r, $w)) or BAIL_OUT $!;
- my @orig = ($r, $w, $s2);
- my @fd = map { fileno($_) } @orig;
- ok($send_3fds->(fileno($s1), $fd[0], $fd[1], $fd[2]),
- 'FDs sent');
- my (@fds) = $recv_3fds->(fileno($s2));
- is(scalar(@fds), 3, 'got 3 fds');
- use Data::Dumper; diag Dumper(\@fds);
- is(scalar(grep(/\A\d+\z/, @fds)), 3, 'all valid FDs');
- my $i = 0;
- my @cmp = map {
- open my $new, $_, shift(@fds) or BAIL_OUT "open $! $i => $_";
- ($new, shift(@orig), $i++);
- } (qw(<&= >&= +<&=));
- while (my ($new, $old, $fd) = splice(@cmp, 0, 3)) {
- my @new = stat($new);
- my @old = stat($old);
- is("$old[0]\0$old[1]", "$new[0]\0$new[1]",
- "device/inode matches on received FD:$fd");
- }
-}
{
my $true = which('true');
^ permalink raw reply related [relevance 3%]
* [PATCH 00/22] lei query overview views
@ 2021-01-10 12:14 7% Eric Wong
2021-01-10 12:15 3% ` [PATCH 05/22] cmd_ipc: send FDs with buffer payload Eric Wong
0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2021-01-10 12:14 UTC (permalink / raw)
To: meta
Usage summary:
lei add-external /path/to/v1-or-v2-inbox
lei add-external /path/to/another-inbox-or-ext-index
# URLs aren't supported, yet :<
lei q SEARCH TERMS GO HERE... # pager should open with JSON output
For faster startup time than what Inline::C can give:
apt-get install libsocket-msghdr-perl # Socket::Msghdr
Having neither Inline::C nor Socket::Msghdr means parallel
queries won't work.
I went back-and-forth on a bunch of things but ultimately gave
up trying to support IO::FDPass since it got too fragile and
difficult to test with the work-queue distribution.
The pager runs from the client process (if using Socket::MsgHdr
or Inline::C), now. It took at fair amount of work from my slow
brain to get pager shutdown to be instantaneous, though queries
which haven't output anything aren't easily interruptible...
The wq_* IPC stuff will be reused in the normal read-only
WWW/IMAP search at some point, too.
Eric Wong (22):
lei query + pagination sorta working
lei q: deduplicate smsg
ds: block signals when reaping
ipc: add support for asynchronous callbacks
cmd_ipc: send FDs with buffer payload
ipc: avoid excessive evals
ipc: work queue support via SOCK_SEQPACKET
ipc: eliminate ipc_worker_stop method
ipc: wq: support dynamic worker count change
ipc: drop -ipc_parent_pid field
ipc: DESTROY and wq_workers methods
lei: rename $w to $wpager for warning message
lei: fix oneshot TTY detection by passing STD*{GLOB}
lei: query: ensure pager exit is instantaneous
ipc: start supporting sending/receiving more than 3 FDs
ipc: fix IO::FDPass use with a worker limit of 1
ipc: drop unused fields, default sighandlers for wq
lei: get rid of client {pid} field
lei: fork + FD cleanup
lei: run pager in client script
lei_xsearch: transfer 4 FDs internally, drop IO::FDPass
lei: query: restore JSON output overview
MANIFEST | 4 +
lib/PublicInbox/CmdIPC4.pm | 36 ++++
lib/PublicInbox/DS.pm | 16 +-
lib/PublicInbox/Daemon.pm | 10 +-
lib/PublicInbox/ExtSearchIdx.pm | 4 +-
lib/PublicInbox/IPC.pm | 280 ++++++++++++++++++++++++++++----
lib/PublicInbox/LEI.pm | 180 +++++++++++++-------
lib/PublicInbox/LeiDedupe.pm | 29 +++-
lib/PublicInbox/LeiExternal.pm | 33 ++--
lib/PublicInbox/LeiOverview.pm | 188 +++++++++++++++++++++
lib/PublicInbox/LeiQuery.pm | 92 +++++++++++
lib/PublicInbox/LeiStore.pm | 2 +-
lib/PublicInbox/LeiToMail.pm | 2 +
lib/PublicInbox/LeiXSearch.pm | 118 +++++++++++++-
lib/PublicInbox/Search.pm | 10 +-
lib/PublicInbox/SearchView.pm | 10 +-
lib/PublicInbox/Sigfd.pm | 12 +-
lib/PublicInbox/Spawn.pm | 85 ++++++----
lib/PublicInbox/Watch.pm | 8 +-
script/lei | 76 +++++----
script/public-inbox-watch | 4 +-
t/cmd_ipc.t | 82 ++++++++++
t/ipc.t | 115 ++++++++++++-
t/lei.t | 31 +++-
t/lei_dedupe.t | 14 ++
t/lei_xsearch.t | 5 +
t/spawn.t | 33 +---
27 files changed, 1233 insertions(+), 246 deletions(-)
create mode 100644 lib/PublicInbox/CmdIPC4.pm
create mode 100644 lib/PublicInbox/LeiOverview.pm
create mode 100644 lib/PublicInbox/LeiQuery.pm
create mode 100644 t/cmd_ipc.t
^ permalink raw reply [relevance 7%]
Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-01-10 12:14 7% [PATCH 00/22] lei query overview views Eric Wong
2021-01-10 12:15 3% ` [PATCH 05/22] cmd_ipc: send FDs with buffer payload Eric Wong
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).