user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 15/22] ipc: start supporting sending/receiving more than 3 FDs
  2021-01-10 12:14  7% [PATCH 00/22] lei query overview views Eric Wong
@ 2021-01-10 12:15  6% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

Actually, sending 4 FDs will be useful for lei internal xsearch
work once we start accepting input from stdin.  It won't be used
with the lightweight lei(1) client, however.

For WWW (eventually), a single FD may be enough.
---
 lib/PublicInbox/CmdIPC1.pm    | 16 +++++++-----
 lib/PublicInbox/CmdIPC4.pm    | 12 +++++----
 lib/PublicInbox/IPC.pm        | 13 +++++-----
 lib/PublicInbox/LeiXSearch.pm |  6 ++---
 lib/PublicInbox/Spawn.pm      | 48 ++++++++++++++++++++---------------
 script/lei                    |  2 +-
 t/cmd_ipc.t                   |  5 ++--
 t/ipc.t                       |  6 ++---
 8 files changed, 60 insertions(+), 48 deletions(-)

diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm
index 0eed8bed..de6e54ef 100644
--- a/lib/PublicInbox/CmdIPC1.pm
+++ b/lib/PublicInbox/CmdIPC1.pm
@@ -10,17 +10,19 @@ BEGIN { eval {
 require IO::FDPass; # XS, available in all major distros
 no warnings 'once';
 
-*send_cmd1 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
-	for (1..3) {
-		IO::FDPass::send(fileno($_[0]), $_[$_]) or
+*send_cmd1 = sub ($$$$) { # (sock, fds, buf, flags) = @_;
+	my ($sock, $fds, undef, $flags) = @_;
+	for my $fd (@$fds) {
+		IO::FDPass::send(fileno($sock), $fd) or
 					die "IO::FDPass::send: $!";
 	}
-	send($_[0], $_[4], $_[5]) or die "send $!";
+	send($sock, $_[2], $flags) or die "send $!";
 };
 
-*recv_cmd1 = sub ($$$) {
-	my ($s, undef, $len) = @_;
-	my @fds = map { IO::FDPass::recv(fileno($s)) } (0..2);
+*recv_cmd1 = sub ($$$;$) {
+	my ($s, undef, $len, $nfds) = @_;
+	$nfds //= 3;
+	my @fds = map { IO::FDPass::recv(fileno($s)) } (1..$nfds);
 	recv($s, $_[1], $len, 0) // die "recv: $!";
 	length($_[1]) == 0 ? () : @fds;
 };
diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm
index 90fca62d..c4fcb0d6 100644
--- a/lib/PublicInbox/CmdIPC4.pm
+++ b/lib/PublicInbox/CmdIPC4.pm
@@ -13,10 +13,12 @@ require Socket::MsgHdr; # XS
 no warnings 'once';
 
 # 3 FDs per-sendmsg(2) + buffer
-*send_cmd4 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
-	my $mh = Socket::MsgHdr->new(buf => $_[4]);
-	$mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('iii', @_[1,2,3]));
-	Socket::MsgHdr::sendmsg($_[0], $mh, $_[5]) or die "sendmsg: $!";
+*send_cmd4 = sub ($$$$) { # (sock, fds, buf, flags) = @_;
+	my ($sock, $fds, undef, $flags) = @_;
+	my $mh = Socket::MsgHdr->new(buf => $_[2]);
+	$mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS,
+			pack('i' x scalar(@$fds), @$fds));
+	Socket::MsgHdr::sendmsg($sock, $mh, $flags) or die "sendmsg: $!";
 };
 
 *recv_cmd4 = sub ($$$) {
@@ -26,7 +28,7 @@ no warnings 'once';
 	$_[1] = $mh->buf;
 	return () if $r == 0;
 	my (undef, undef, $data) = $mh->cmsghdr;
-	unpack('iii', $data);
+	unpack('i' x (length($data) / 4), $data);
 };
 
 } } # /eval /BEGIN
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index be5b2f45..b0a0bfb5 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -263,16 +263,15 @@ sub wq_worker_loop ($) {
 }
 
 sub wq_do { # always async
-	my ($self, $sub, $in, $out, $err, @args) = @_;
+	my ($self, $sub, $ios, @args) = @_;
 	if (my $s1 = $self->{-wq_s1}) { # run in worker
-		$_ = fileno($_) for ($in, $out, $err);
-		$send_cmd->($s1, $in, $out, $err,
-				freeze([$sub, @args]), MSG_EOR);
+		my $fds = [ map { fileno($_) } @$ios ];
+		$send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR);
 	} else {
-		@$self{0, 1, 2} = ($in, $out, $err);
+		@$self{0..$#$ios} = @$ios;
 		eval { $self->$sub(@args) };
 		warn "wq_do: $@" if $@;
-		delete @$self{0, 1, 2};
+		delete @$self{0..$#$ios};
 	}
 }
 
@@ -334,7 +333,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
 	my ($self) = @_;
 	my $workers = $self->{-wq_workers} or return;
 	my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
-	$self->wq_do('wq_exit', $s2, $s2, $s2);
+	$self->wq_do('wq_exit', [ $s2, $s2, $s2 ]);
 	$self->{-wq_exit_pending}++;
 	# caller must call wq_worker_decr_wait in main loop
 }
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index a3010efe..c0df21a8 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -150,14 +150,14 @@ sub do_query {
 	$io[2]->autoflush(1);
 	if ($lei->{opt}->{thread}) {
 		for my $ibxish (@$srcs) {
-			$self->wq_do('query_thread_mset', @io, $lei, $ibxish);
+			$self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
 		}
 	} else {
-		$self->wq_do('query_mset', @io, $lei, $srcs);
+		$self->wq_do('query_mset', \@io, $lei, $srcs);
 	}
 	# TODO
 	for my $rmt (@{$self->{remotes} // []}) {
-		$self->wq_do('query_thread_mbox', @io, $lei, $rmt);
+		$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
 	}
 }
 
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 7d0d9597..b35bf54c 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -209,20 +209,22 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-struct my_3fds { int fds[3]; };
+#define SEND_FD_CAPA 3
+#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
 	struct cmsghdr hdr;
-	char pad[sizeof(struct cmsghdr)+ 8 + sizeof(struct my_3fds) + 8];
+	char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE];
 };
 
-int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags)
+int send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
 {
 	struct msghdr msg = { 0 };
-	struct iovec iov;
 	union my_cmsg cmsg = { 0 };
-	int *fdp;
-	size_t i;
 	STRLEN dlen = 0;
+	struct iovec iov;
+	AV *fds = (AV *)SvRV(svfds);
+	I32 i, nfds = av_len(fds) + 1;
+	int *fdp;
 
 	if (SvOK(data)) {
 		iov.iov_base = SvPV(data, dlen);
@@ -234,16 +236,22 @@ int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags)
 	}
 	msg.msg_iov = &iov;
 	msg.msg_iovlen = 1;
-	msg.msg_control = &cmsg.hdr;
-	msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds));
-
-	cmsg.hdr.cmsg_level = SOL_SOCKET;
-	cmsg.hdr.cmsg_type = SCM_RIGHTS;
-	cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(struct my_3fds));
-	fdp = (int *)CMSG_DATA(&cmsg.hdr);
-	*fdp++ = in;
-	*fdp++ = out;
-	*fdp++ = err;
+	if (nfds) {
+		if (nfds > SEND_FD_CAPA) {
+			fprintf(stderr, "FIXME: bump SEND_FD_CAPA=%d\n", nfds);
+			nfds = SEND_FD_CAPA;
+		}
+		msg.msg_control = &cmsg.hdr;
+		msg.msg_controllen = CMSG_SPACE(nfds * sizeof(int));
+		cmsg.hdr.cmsg_level = SOL_SOCKET;
+		cmsg.hdr.cmsg_type = SCM_RIGHTS;
+		cmsg.hdr.cmsg_len = CMSG_LEN(nfds * sizeof(int));
+		fdp = (int *)CMSG_DATA(&cmsg.hdr);
+		for (i = 0; i < nfds; i++) {
+			SV **fd = av_fetch(fds, i, 0);
+			*fdp++ = SvIV(*fd);
+		}
+	}
 	return sendmsg(PerlIO_fileno(s), &msg, flags) >= 0;
 }
 
@@ -263,17 +271,17 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
 	msg.msg_iov = &iov;
 	msg.msg_iovlen = 1;
 	msg.msg_control = &cmsg.hdr;
-	msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds));
+	msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE);
 
 	i = recvmsg(PerlIO_fileno(s), &msg, 0);
 	if (i < 0)
 		croak("recvmsg: %s", strerror(errno));
 	SvCUR_set(buf, i);
 	if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
-			cmsg.hdr.cmsg_type == SCM_RIGHTS &&
-			cmsg.hdr.cmsg_len == CMSG_LEN(sizeof(struct my_3fds))) {
+			cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+		size_t len = cmsg.hdr.cmsg_len;
 		int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
-		for (i = 0; i < 3; i++)
+		for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
 			Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
 	}
 	Inline_Stack_Done;
diff --git a/script/lei b/script/lei
index d954b9eb..5e30f4d7 100755
--- a/script/lei
+++ b/script/lei
@@ -67,7 +67,7 @@ Falling back to (slow) one-shot mode
 	$buf .= "\0\0";
 	select $sock;
 	$| = 1; # unbuffer selected $sock
-	$send_cmd->($sock, 0, 1, 2, $buf, 0);
+	$send_cmd->($sock, [ 0, 1, 2 ], $buf, 0);
 	while ($buf = <$sock>) {
 		$buf =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0);
 		die $buf;
diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t
index b9f4d128..22f73c19 100644
--- a/t/cmd_ipc.t
+++ b/t/cmd_ipc.t
@@ -17,7 +17,8 @@ my $do_test = sub { SKIP: {
 	my ($s1, $s2);
 	my $src = 'some payload' x 40;
 	socketpair($s1, $s2, AF_UNIX, $type, 0) or BAIL_OUT $!;
-	$send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag);
+	my $sfds = [ fileno($r), fileno($w), fileno($s1) ];
+	$send->($s1, $sfds, $src, $flag);
 	my (@fds) = $recv->($s2, my $buf, length($src) + 1);
 	is($buf, $src, 'got buffer payload '.$desc);
 	my ($r1, $w1, $s1a);
@@ -39,7 +40,7 @@ my $do_test = sub { SKIP: {
 	if (defined($SOCK_SEQPACKET) && $type == $SOCK_SEQPACKET) {
 		$r1 = $w1 = $s1a = undef;
 		$src = (',' x 1023) . '-' .('.' x 1024);
-		$send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag);
+		$send->($s1, $sfds, $src, $flag);
 		(@fds) = $recv->($s2, $buf, 1024);
 		is($buf, (',' x 1023) . '-', 'silently truncated buf');
 		$opens->();
diff --git a/t/ipc.t b/t/ipc.t
index 903294c5..d2b6ad4f 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -121,7 +121,7 @@ $warn->autoflush(0);
 local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ };
 my @ppids;
 for my $t ('local', 'worker', 'worker again') {
-	$ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, 'hello world');
+	$ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
 	my $i = 0;
 	for my $fh ($ra, $rb, $rc) {
 		my $buf = readline($fh);
@@ -129,7 +129,7 @@ for my $t ('local', 'worker', 'worker again') {
 		like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)");
 		$i++;
 	}
-	$ipc->wq_do('test_die', $wa, $wb, $wc);
+	$ipc->wq_do('test_die', [ $wa, $wb, $wc ]);
 	my $ppid = $ipc->wq_workers_start('wq', 1);
 	push(@ppids, $ppid);
 }
@@ -142,7 +142,7 @@ SKIP: {
 	my $pid = fork // BAIL_OUT $!;
 	if ($pid == 0) {
 		use POSIX qw(_exit);
-		$ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, $$);
+		$ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], $$);
 		_exit(0);
 	} else {
 		my $i = 0;

^ permalink raw reply related	[relevance 6%]

* [PATCH 00/22] lei query overview views
@ 2021-01-10 12:14  7% Eric Wong
  2021-01-10 12:15  6% ` [PATCH 15/22] ipc: start supporting sending/receiving more than 3 FDs Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2021-01-10 12:14 UTC (permalink / raw)
  To: meta

Usage summary:

	lei add-external /path/to/v1-or-v2-inbox
	lei add-external /path/to/another-inbox-or-ext-index
			# URLs aren't supported, yet :<

	lei q SEARCH TERMS GO HERE... # pager should open with JSON output

For faster startup time than what Inline::C can give:

	apt-get install libsocket-msghdr-perl # Socket::Msghdr

Having neither Inline::C nor Socket::Msghdr means parallel
queries won't work.

I went back-and-forth on a bunch of things but ultimately gave
up trying to support IO::FDPass since it got too fragile and
difficult to test with the work-queue distribution.

The pager runs from the client process (if using Socket::MsgHdr
or Inline::C), now.  It took at fair amount of work from my slow
brain to get pager shutdown to be instantaneous, though queries
which haven't output anything aren't easily interruptible...

The wq_* IPC stuff will be reused in the normal read-only
WWW/IMAP search at some point, too.

Eric Wong (22):
  lei query + pagination sorta working
  lei q: deduplicate smsg
  ds: block signals when reaping
  ipc: add support for asynchronous callbacks
  cmd_ipc: send FDs with buffer payload
  ipc: avoid excessive evals
  ipc: work queue support via SOCK_SEQPACKET
  ipc: eliminate ipc_worker_stop method
  ipc: wq: support dynamic worker count change
  ipc: drop -ipc_parent_pid field
  ipc: DESTROY and wq_workers methods
  lei: rename $w to $wpager for warning message
  lei: fix oneshot TTY detection by passing STD*{GLOB}
  lei: query: ensure pager exit is instantaneous
  ipc: start supporting sending/receiving more than 3 FDs
  ipc: fix IO::FDPass use with a worker limit of 1
  ipc: drop unused fields, default sighandlers for wq
  lei: get rid of client {pid} field
  lei: fork + FD cleanup
  lei: run pager in client script
  lei_xsearch: transfer 4 FDs internally, drop IO::FDPass
  lei: query: restore JSON output overview

 MANIFEST                        |   4 +
 lib/PublicInbox/CmdIPC4.pm      |  36 ++++
 lib/PublicInbox/DS.pm           |  16 +-
 lib/PublicInbox/Daemon.pm       |  10 +-
 lib/PublicInbox/ExtSearchIdx.pm |   4 +-
 lib/PublicInbox/IPC.pm          | 280 ++++++++++++++++++++++++++++----
 lib/PublicInbox/LEI.pm          | 180 +++++++++++++-------
 lib/PublicInbox/LeiDedupe.pm    |  29 +++-
 lib/PublicInbox/LeiExternal.pm  |  33 ++--
 lib/PublicInbox/LeiOverview.pm  | 188 +++++++++++++++++++++
 lib/PublicInbox/LeiQuery.pm     |  92 +++++++++++
 lib/PublicInbox/LeiStore.pm     |   2 +-
 lib/PublicInbox/LeiToMail.pm    |   2 +
 lib/PublicInbox/LeiXSearch.pm   | 118 +++++++++++++-
 lib/PublicInbox/Search.pm       |  10 +-
 lib/PublicInbox/SearchView.pm   |  10 +-
 lib/PublicInbox/Sigfd.pm        |  12 +-
 lib/PublicInbox/Spawn.pm        |  85 ++++++----
 lib/PublicInbox/Watch.pm        |   8 +-
 script/lei                      |  76 +++++----
 script/public-inbox-watch       |   4 +-
 t/cmd_ipc.t                     |  82 ++++++++++
 t/ipc.t                         | 115 ++++++++++++-
 t/lei.t                         |  31 +++-
 t/lei_dedupe.t                  |  14 ++
 t/lei_xsearch.t                 |   5 +
 t/spawn.t                       |  33 +---
 27 files changed, 1233 insertions(+), 246 deletions(-)
 create mode 100644 lib/PublicInbox/CmdIPC4.pm
 create mode 100644 lib/PublicInbox/LeiOverview.pm
 create mode 100644 lib/PublicInbox/LeiQuery.pm
 create mode 100644 t/cmd_ipc.t

^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-01-10 12:14  7% [PATCH 00/22] lei query overview views Eric Wong
2021-01-10 12:15  6% ` [PATCH 15/22] ipc: start supporting sending/receiving more than 3 FDs Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).