user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 07/22] ipc: work queue support via SOCK_SEQPACKET
  2021-01-10 12:14  7% [PATCH 00/22] lei query overview views Eric Wong
@ 2021-01-10 12:15  5% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

This will allow any number of younger sibling processes to
communicate with older siblings directly without relying on a
mediator process.  This is intended to be useful for
distributing search work across multiple workers without caring
which worker hits it (we only care about shard members).

And any request sent with this will be able to hit any worker
without locking on our part.

Unix stream sockets with a listener were also considered;
binding to a file on the FS may confuse users given there's
already a socket path for lei(1).  Linux-only Abstract or
autobind sockets are rejected due to lack of portability.

SOCK_SEQPACKET via socketpair(2) was chosen since it's POSIX
2008 and available on FreeBSD 9+ in addition to Linux, and
doesn't require filesystem access.
---
 lib/PublicInbox/IPC.pm | 106 +++++++++++++++++++++++++++++++++++++++--
 t/ipc.t                |  66 +++++++++++++++++++++++++
 2 files changed, 168 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 5082f110..27ea90de 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -1,14 +1,16 @@
 # Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# base class for remote IPC calls, requires Storable
-# TODO: this ought to be usable in SearchIdxShard
+# base class for remote IPC calls and workqueues, requires Storable or Sereal
 package PublicInbox::IPC;
 use strict;
 use v5.10.1;
 use Carp qw(confess croak);
-use PublicInbox::Sigfd;
+use PublicInbox::DS qw(dwaitpid);
+use PublicInbox::Spawn;
 use POSIX ();
+use Socket qw(AF_UNIX MSG_EOR);
+my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
 my ($enc, $dec);
 # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+
@@ -34,6 +36,17 @@ if ($enc && $dec) { # should be custom ops
 	} // warn("Storable (part of Perl) missing: $@\n");
 }
 
+my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
+my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
+	require PublicInbox::CmdIPC4;
+	$recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4');
+	PublicInbox::CmdIPC4->can('send_cmd4');
+} // do {
+	require PublicInbox::CmdIPC1;
+	$recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
+	PublicInbox::CmdIPC1->can('send_cmd1');
+};
+
 sub _get_rec ($) {
 	my ($r) = @_;
 	defined(my $len = <$r>) or return;
@@ -144,7 +157,7 @@ sub ipc_worker_stop {
 
 	# allow any sibling to send ipc_worker_exit, but siblings can't wait
 	return if $$ != $ppid;
-	PublicInbox::DS::dwaitpid($pid, \&ipc_worker_reap, $self);
+	dwaitpid($pid, \&ipc_worker_reap, $self);
 }
 
 # use this if we have multiple readers reading curl or "pigz -dc"
@@ -224,4 +237,89 @@ sub ipc_sibling_atfork_child {
 	$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
+sub wq_worker_loop ($$) {
+	my ($self, $s2) = @_;
+	my $buf;
+	my $len = $self->{wq_req_len} // (4096 * 33);
+	my ($rec, $sub, @args);
+	while (1) {
+		my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
+		my $i = 0;
+		my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]};
+		for my $fd (@fds) {
+			my $mode = shift(@m);
+			if (open(my $fh, $mode, $fd)) {
+				$self->{$i++} = $fh;
+			} else {
+				die "$$ open($mode$fd) (FD:$i): $!";
+			}
+		}
+		# Sereal dies, Storable returns undef
+		$rec = thaw($buf) //
+			die "thaw error on buffer of size:".length($buf);
+		($sub, @args) = @$rec;
+		eval { $self->$sub(@args) };
+		warn "$$ wq_worker: $@" if $@;
+		delete @$self{0, 1, 2};
+	}
+}
+
+sub wq_do { # always async
+	my ($self, $sub, $in, $out, $err, @args) = @_;
+	if (my $s1 = $self->{-wq_seq}) { # run in worker
+		$_ = fileno($_) for ($in, $out, $err);
+		$send_cmd->($s1, $in, $out, $err,
+				freeze([$sub, @args]), MSG_EOR);
+	} else {
+		@$self{0, 1, 2} = ($in, $out, $err);
+		eval { $self->$sub(@args) };
+		warn "wq_do: $@" if $@;
+		delete @$self{0, 1, 2};
+	}
+}
+
+# starts workqueue workers if Sereal or Storable is installed
+sub wq_workers_start {
+	my ($self, $ident, $nr_workers, $oldset) = @_;
+	($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
+	return if $self->{-wq_seq}; # idempotent
+	my ($s1, $s2);
+	socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!";
+	my $sigset = $oldset // PublicInbox::DS::block_signals();
+	$self->ipc_atfork_parent;
+	$nr_workers //= 4;
+	$self->{-wq_workers} = {};
+	for my $i (0..($nr_workers - 1)) {
+		defined(my $pid = fork) or die "fork: $!";
+		if ($pid == 0) {
+			eval { PublicInbox::DS->Reset };
+			$s1 = undef;
+			$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+			local $0 = $ident."[$i]";
+			PublicInbox::DS::sig_setmask($oldset);
+			my $on_destroy = $self->ipc_atfork_child;
+			eval { wq_worker_loop($self, $s2) };
+			die "worker $ident PID:$$ died: $@\n" if $@;
+			exit;
+		} else {
+			$self->{-wq_workers}->{$pid} = $i;
+		}
+	}
+	PublicInbox::DS::sig_setmask($sigset) unless $oldset;
+	$s2 = undef;
+	$self->{-wq_seq} = $s1;
+	$self->{-wq_ppid} = $$;
+}
+
+sub wq_close {
+	my ($self) = @_;
+	delete $self->{-wq_seq} or return;
+	my $ppid = delete $self->{-wq_ppid} // die 'BUG: no wq_ppid';
+	my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
+	return if $ppid != $$; # can't reap siblings or parents
+	for my $pid (keys %$workers) {
+		dwaitpid($pid, \&ipc_worker_reap, $self);
+	}
+}
+
 1;
diff --git a/t/ipc.t b/t/ipc.t
index 400fb768..f09f76ef 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -5,6 +5,7 @@ use strict;
 use v5.10.1;
 use Test::More;
 use PublicInbox::TestCommon;
+use Fcntl qw(SEEK_SET);
 require_ok 'PublicInbox::IPC';
 state $once = eval <<'';
 package PublicInbox::IPC;
@@ -15,6 +16,13 @@ sub test_scalarref { \'scalarref' }
 sub test_undef { undef }
 sub test_die { shift; die @_; 'unreachable' }
 sub test_pid { $$ }
+sub test_write_each_fd {
+	my ($self, @args) = @_;
+	for my $fd (0..2) {
+		print { $self->{$fd} } "i=$fd $$ ", @args, "\n";
+		$self->{$fd}->flush;
+	}
+}
 1;
 
 my $ipc = bless {}, 'PublicInbox::IPC';
@@ -102,4 +110,62 @@ SKIP: {
 	ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped');
 }
 $ipc->ipc_worker_stop; # idempotent
+
+# work queues
+$ipc->{wq_open_modes} = [qw( >&= >&= >&= )];
+pipe(my ($ra, $wa)) or BAIL_OUT $!;
+pipe(my ($rb, $wb)) or BAIL_OUT $!;
+pipe(my ($rc, $wc)) or BAIL_OUT $!;
+open my $warn, '+>', undef or BAIL_OUT;
+$warn->autoflush(0);
+local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ };
+my @ppids;
+for my $t ('local', 'worker', 'worker again') {
+	$ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, 'hello world');
+	my $i = 0;
+	for my $fh ($ra, $rb, $rc) {
+		my $buf = readline($fh);
+		is(chop($buf), "\n", "trailing CR ($t)");
+		like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)");
+		$i++;
+	}
+	$ipc->wq_do('test_die', $wa, $wb, $wc);
+	my $ppid = $ipc->wq_workers_start('wq', 1);
+	push(@ppids, $ppid);
+}
+
+# wq_do works across fork (siblings can feed)
+SKIP: {
+	skip 'Socket::MsgHdr, IO::FDPass, Inline::C missing', 7 if !$ppids[0];
+	is_deeply(\@ppids, [$$, undef, undef],
+		'parent pid returned in wq_workers_start');
+	my $pid = fork // BAIL_OUT $!;
+	if ($pid == 0) {
+		use POSIX qw(_exit);
+		$ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, $$);
+		_exit(0);
+	} else {
+		my $i = 0;
+		my ($wpid, @rest) = keys %{$ipc->{-wq_workers}};
+		is(scalar(@rest), 0, 'only one worker');
+		for my $fh ($ra, $rb, $rc) {
+			my $buf = readline($fh);
+			is(chop($buf), "\n", "trailing CR #$i");
+			like($buf, qr/^i=$i $wpid $pid\z/,
+				'got expected from sibling');
+			$i++;
+		}
+		is(waitpid($pid, 0), $pid, 'waitpid complete');
+		is($?, 0, 'child wq producer exited');
+	}
+}
+
+$ipc->wq_close;
+seek($warn, 0, SEEK_SET) or BAIL_OUT;
+my @warn = <$warn>;
+is(scalar(@warn), 3, 'warned 3 times');
+like($warn[0], qr/ wq_do: /, '1st warned from wq_do');
+like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
+is($warn[2], $warn[1], 'worker did not die');
+
 done_testing;

^ permalink raw reply related	[relevance 5%]

* [PATCH 00/22] lei query overview views
@ 2021-01-10 12:14  7% Eric Wong
  2021-01-10 12:15  5% ` [PATCH 07/22] ipc: work queue support via SOCK_SEQPACKET Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2021-01-10 12:14 UTC (permalink / raw)
  To: meta

Usage summary:

	lei add-external /path/to/v1-or-v2-inbox
	lei add-external /path/to/another-inbox-or-ext-index
			# URLs aren't supported, yet :<

	lei q SEARCH TERMS GO HERE... # pager should open with JSON output

For faster startup time than what Inline::C can give:

	apt-get install libsocket-msghdr-perl # Socket::Msghdr

Having neither Inline::C nor Socket::Msghdr means parallel
queries won't work.

I went back-and-forth on a bunch of things but ultimately gave
up trying to support IO::FDPass since it got too fragile and
difficult to test with the work-queue distribution.

The pager runs from the client process (if using Socket::MsgHdr
or Inline::C), now.  It took at fair amount of work from my slow
brain to get pager shutdown to be instantaneous, though queries
which haven't output anything aren't easily interruptible...

The wq_* IPC stuff will be reused in the normal read-only
WWW/IMAP search at some point, too.

Eric Wong (22):
  lei query + pagination sorta working
  lei q: deduplicate smsg
  ds: block signals when reaping
  ipc: add support for asynchronous callbacks
  cmd_ipc: send FDs with buffer payload
  ipc: avoid excessive evals
  ipc: work queue support via SOCK_SEQPACKET
  ipc: eliminate ipc_worker_stop method
  ipc: wq: support dynamic worker count change
  ipc: drop -ipc_parent_pid field
  ipc: DESTROY and wq_workers methods
  lei: rename $w to $wpager for warning message
  lei: fix oneshot TTY detection by passing STD*{GLOB}
  lei: query: ensure pager exit is instantaneous
  ipc: start supporting sending/receiving more than 3 FDs
  ipc: fix IO::FDPass use with a worker limit of 1
  ipc: drop unused fields, default sighandlers for wq
  lei: get rid of client {pid} field
  lei: fork + FD cleanup
  lei: run pager in client script
  lei_xsearch: transfer 4 FDs internally, drop IO::FDPass
  lei: query: restore JSON output overview

 MANIFEST                        |   4 +
 lib/PublicInbox/CmdIPC4.pm      |  36 ++++
 lib/PublicInbox/DS.pm           |  16 +-
 lib/PublicInbox/Daemon.pm       |  10 +-
 lib/PublicInbox/ExtSearchIdx.pm |   4 +-
 lib/PublicInbox/IPC.pm          | 280 ++++++++++++++++++++++++++++----
 lib/PublicInbox/LEI.pm          | 180 +++++++++++++-------
 lib/PublicInbox/LeiDedupe.pm    |  29 +++-
 lib/PublicInbox/LeiExternal.pm  |  33 ++--
 lib/PublicInbox/LeiOverview.pm  | 188 +++++++++++++++++++++
 lib/PublicInbox/LeiQuery.pm     |  92 +++++++++++
 lib/PublicInbox/LeiStore.pm     |   2 +-
 lib/PublicInbox/LeiToMail.pm    |   2 +
 lib/PublicInbox/LeiXSearch.pm   | 118 +++++++++++++-
 lib/PublicInbox/Search.pm       |  10 +-
 lib/PublicInbox/SearchView.pm   |  10 +-
 lib/PublicInbox/Sigfd.pm        |  12 +-
 lib/PublicInbox/Spawn.pm        |  85 ++++++----
 lib/PublicInbox/Watch.pm        |   8 +-
 script/lei                      |  76 +++++----
 script/public-inbox-watch       |   4 +-
 t/cmd_ipc.t                     |  82 ++++++++++
 t/ipc.t                         | 115 ++++++++++++-
 t/lei.t                         |  31 +++-
 t/lei_dedupe.t                  |  14 ++
 t/lei_xsearch.t                 |   5 +
 t/spawn.t                       |  33 +---
 27 files changed, 1233 insertions(+), 246 deletions(-)
 create mode 100644 lib/PublicInbox/CmdIPC4.pm
 create mode 100644 lib/PublicInbox/LeiOverview.pm
 create mode 100644 lib/PublicInbox/LeiQuery.pm
 create mode 100644 t/cmd_ipc.t

^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-01-10 12:14  7% [PATCH 00/22] lei query overview views Eric Wong
2021-01-10 12:15  5% ` [PATCH 07/22] ipc: work queue support via SOCK_SEQPACKET Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).