user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 4/9] ipc: require fork+SOCK_SEQPACKET for wq_* functions
  2023-10-07 21:24  7% [PATCH 0/9] more process-related cleanups Eric Wong
@ 2023-10-07 21:24  6% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
  To: meta

None of the lei internals works properly without forking and
sockets.  The fallback code increases the potential to accidentally
call subs in the wrong process during the teardown phase.

We'll still support ipc_do w/o forking for now since it
forking doesn't benefit small indexing runs from -mda and
such.
---
 lib/PublicInbox/IPC.pm | 43 ++++++++++++++++--------------------------
 t/ipc.t                | 19 ++++++++-----------
 2 files changed, 24 insertions(+), 38 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 068c5623..ba8b5739 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -256,16 +256,12 @@ sub do_sock_stream { # via wq_io_do, for big requests
 
 sub wq_broadcast {
 	my ($self, $sub, @args) = @_;
-	if (my $wkr = $self->{-wq_workers}) {
-		my $buf = ipc_freeze([$sub, @args]);
-		for my $bcast1 (values %$wkr) {
-			my $sock = $bcast1 // $self->{-wq_s1} // next;
-			send($sock, $buf, 0) // croak "send: $!";
-			# XXX shouldn't have to deal with EMSGSIZE here...
-		}
-	} else {
-		eval { $self->$sub(@args) };
-		warn "wq_broadcast: $@" if $@;
+	my $wkr = $self->{-wq_workers} or Carp::confess('no -wq_workers');
+	my $buf = ipc_freeze([$sub, @args]);
+	for my $bcast1 (values %$wkr) {
+		my $sock = $bcast1 // $self->{-wq_s1} // next;
+		send($sock, $buf, 0) // croak "send: $!";
+		# XXX shouldn't have to deal with EMSGSIZE here...
 	}
 }
 
@@ -291,24 +287,17 @@ sub stream_in_full ($$$) {
 
 sub wq_io_do { # always async
 	my ($self, $sub, $ios, @args) = @_;
-	if (my $s1 = $self->{-wq_s1}) { # run in worker
-		my $fds = [ map { fileno($_) } @$ios ];
-		my $buf = ipc_freeze([$sub, @args]);
-		if (length($buf) > $MY_MAX_ARG_STRLEN) {
-			stream_in_full($s1, $fds, $buf);
-		} else {
-			my $n = $send_cmd->($s1, $fds, $buf, 0);
-			return if defined($n); # likely
-			$!{ETOOMANYREFS} and
-				croak "sendmsg: $! (check RLIMIT_NOFILE)";
-			$!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
-				croak("sendmsg: $!");
-		}
+	my $s1 = $self->{-wq_s1} or Carp::confess('no -wq_s1');
+	my $fds = [ map { fileno($_) } @$ios ];
+	my $buf = ipc_freeze([$sub, @args]);
+	if (length($buf) > $MY_MAX_ARG_STRLEN) {
+		stream_in_full($s1, $fds, $buf);
 	} else {
-		@$self{0..$#$ios} = @$ios;
-		eval { $self->$sub(@args) };
-		warn "wq_io_do: $@" if $@;
-		delete @$self{0..$#$ios}; # don't close
+		my $n = $send_cmd->($s1, $fds, $buf, 0);
+		return if defined($n); # likely
+		$!{ETOOMANYREFS} and croak "sendmsg: $! (check RLIMIT_NOFILE)";
+		$!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+			croak("sendmsg: $!");
 	}
 }
 
diff --git a/t/ipc.t b/t/ipc.t
index 7bdf2218..519ef089 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -1,9 +1,7 @@
 #!perl -w
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict;
-use v5.10.1;
-use Test::More;
+use v5.12;
 use PublicInbox::TestCommon;
 use Fcntl qw(SEEK_SET);
 use PublicInbox::SHA qw(sha1_hex);
@@ -108,7 +106,9 @@ open my $agpl, '<', 'COPYING' or BAIL_OUT "AGPL-3 missing: $!";
 my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!";
 close $agpl or BAIL_OUT "close: $!";
 
-for my $t ('local', 'worker', 'worker again') {
+for my $t ('worker', 'worker again') {
+	my $ppid = $ipc->wq_workers_start('wq', 1);
+	push(@ppids, $ppid);
 	$ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
 	my $i = 0;
 	for my $fh ($ra, $rb, $rc) {
@@ -132,14 +132,12 @@ for my $t ('local', 'worker', 'worker again') {
 		$exp = sha1_hex($bigger)."\n";
 		is(readline($rb), $exp, "SHA WQWorker limit ($t)");
 	}
-	my $ppid = $ipc->wq_workers_start('wq', 1);
-	push(@ppids, $ppid);
 }
 
 # wq_io_do works across fork (siblings can feed)
 SKIP: {
 	skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0];
-	is_deeply(\@ppids, [$$, undef, undef],
+	is_xdeeply(\@ppids, [$$, undef],
 		'parent pid returned in wq_workers_start');
 	my $pid = fork // BAIL_OUT $!;
 	if ($pid == 0) {
@@ -173,10 +171,9 @@ SKIP: {
 	skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0];
 	seek($warn, 0, SEEK_SET) or BAIL_OUT;
 	my @warn = <$warn>;
-	is(scalar(@warn), 3, 'warned 3 times');
-	like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do');
-	like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
-	is($warn[2], $warn[1], 'worker did not die');
+	is(scalar(@warn), 2, 'warned 3 times');
+	like($warn[0], qr/ wq_worker: /, '2nd warned from wq_worker');
+	is($warn[0], $warn[1], 'worker did not die');
 
 	$SIG{__WARN__} = 'DEFAULT';
 	is($ipc->wq_workers_start('wq', 2), $$, 'workers started again');

^ permalink raw reply related	[relevance 6%]

* [PATCH 0/9] more process-related cleanups
@ 2023-10-07 21:24  7% Eric Wong
  2023-10-07 21:24  6% ` [PATCH 4/9] ipc: require fork+SOCK_SEQPACKET for wq_* functions Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
  To: meta

2/9 fixes an annoying syslog error I spotted running tests;
3/9 is long overdue, and there's a few more overdue things
coming up...

Eric Wong (9):
  xt/httpd-async-stream: avoid waitpid call
  lei: do not issue sto->done if socket is inactive
  lei: always use async `done' requests to store
  ipc: require fork+SOCK_SEQPACKET for wq_* functions
  ipc: use autodie for most syscalls
  import: use autodie, rely on PerlIO for retries
  rename ProcessPipe to ProcessIO
  process_io: pass args to awaitpid as list
  cindex: start using autodie

 MANIFEST                                      |  3 +-
 lib/PublicInbox/CodeSearchIdx.pm              | 70 ++++++++--------
 lib/PublicInbox/Gcf2Client.pm                 |  4 +-
 lib/PublicInbox/Git.pm                        |  4 +-
 lib/PublicInbox/HTTPD/Async.pm                |  2 +-
 lib/PublicInbox/IPC.pm                        | 82 ++++++++-----------
 lib/PublicInbox/Import.pm                     | 45 ++++------
 lib/PublicInbox/LEI.pm                        | 11 ++-
 lib/PublicInbox/LeiInput.pm                   |  2 +-
 lib/PublicInbox/LeiRediff.pm                  |  2 +-
 lib/PublicInbox/LeiRemote.pm                  |  2 +-
 lib/PublicInbox/LeiStore.pm                   | 17 ++--
 lib/PublicInbox/LeiToMail.pm                  |  6 +-
 lib/PublicInbox/LeiXSearch.pm                 |  6 +-
 .../{ProcessPipe.pm => ProcessIO.pm}          | 12 ++-
 lib/PublicInbox/Qspawn.pm                     |  8 +-
 lib/PublicInbox/Spamcheck/Spamc.pm            |  2 +-
 lib/PublicInbox/Spawn.pm                      | 12 +--
 t/ipc.t                                       | 19 ++---
 t/lei-store-fail.t                            | 51 ++++++++++++
 t/spawn.t                                     | 12 +--
 xt/httpd-async-stream.t                       |  6 +-
 22 files changed, 196 insertions(+), 182 deletions(-)
 rename lib/PublicInbox/{ProcessPipe.pm => ProcessIO.pm} (83%)
 create mode 100644 t/lei-store-fail.t

^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2023-10-07 21:24  7% [PATCH 0/9] more process-related cleanups Eric Wong
2023-10-07 21:24  6% ` [PATCH 4/9] ipc: require fork+SOCK_SEQPACKET for wq_* functions Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).