* [PATCH 4/9] ipc: require fork+SOCK_SEQPACKET for wq_* functions
2023-10-07 21:24 7% [PATCH 0/9] more process-related cleanups Eric Wong
@ 2023-10-07 21:24 6% ` Eric Wong
0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
To: meta
None of the lei internals works properly without forking and
sockets. The fallback code increases the potential to accidentally
call subs in the wrong process during the teardown phase.
We'll still support ipc_do w/o forking for now since it
forking doesn't benefit small indexing runs from -mda and
such.
---
lib/PublicInbox/IPC.pm | 43 ++++++++++++++++--------------------------
t/ipc.t | 19 ++++++++-----------
2 files changed, 24 insertions(+), 38 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 068c5623..ba8b5739 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -256,16 +256,12 @@ sub do_sock_stream { # via wq_io_do, for big requests
sub wq_broadcast {
my ($self, $sub, @args) = @_;
- if (my $wkr = $self->{-wq_workers}) {
- my $buf = ipc_freeze([$sub, @args]);
- for my $bcast1 (values %$wkr) {
- my $sock = $bcast1 // $self->{-wq_s1} // next;
- send($sock, $buf, 0) // croak "send: $!";
- # XXX shouldn't have to deal with EMSGSIZE here...
- }
- } else {
- eval { $self->$sub(@args) };
- warn "wq_broadcast: $@" if $@;
+ my $wkr = $self->{-wq_workers} or Carp::confess('no -wq_workers');
+ my $buf = ipc_freeze([$sub, @args]);
+ for my $bcast1 (values %$wkr) {
+ my $sock = $bcast1 // $self->{-wq_s1} // next;
+ send($sock, $buf, 0) // croak "send: $!";
+ # XXX shouldn't have to deal with EMSGSIZE here...
}
}
@@ -291,24 +287,17 @@ sub stream_in_full ($$$) {
sub wq_io_do { # always async
my ($self, $sub, $ios, @args) = @_;
- if (my $s1 = $self->{-wq_s1}) { # run in worker
- my $fds = [ map { fileno($_) } @$ios ];
- my $buf = ipc_freeze([$sub, @args]);
- if (length($buf) > $MY_MAX_ARG_STRLEN) {
- stream_in_full($s1, $fds, $buf);
- } else {
- my $n = $send_cmd->($s1, $fds, $buf, 0);
- return if defined($n); # likely
- $!{ETOOMANYREFS} and
- croak "sendmsg: $! (check RLIMIT_NOFILE)";
- $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
- croak("sendmsg: $!");
- }
+ my $s1 = $self->{-wq_s1} or Carp::confess('no -wq_s1');
+ my $fds = [ map { fileno($_) } @$ios ];
+ my $buf = ipc_freeze([$sub, @args]);
+ if (length($buf) > $MY_MAX_ARG_STRLEN) {
+ stream_in_full($s1, $fds, $buf);
} else {
- @$self{0..$#$ios} = @$ios;
- eval { $self->$sub(@args) };
- warn "wq_io_do: $@" if $@;
- delete @$self{0..$#$ios}; # don't close
+ my $n = $send_cmd->($s1, $fds, $buf, 0);
+ return if defined($n); # likely
+ $!{ETOOMANYREFS} and croak "sendmsg: $! (check RLIMIT_NOFILE)";
+ $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+ croak("sendmsg: $!");
}
}
diff --git a/t/ipc.t b/t/ipc.t
index 7bdf2218..519ef089 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -1,9 +1,7 @@
#!perl -w
# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict;
-use v5.10.1;
-use Test::More;
+use v5.12;
use PublicInbox::TestCommon;
use Fcntl qw(SEEK_SET);
use PublicInbox::SHA qw(sha1_hex);
@@ -108,7 +106,9 @@ open my $agpl, '<', 'COPYING' or BAIL_OUT "AGPL-3 missing: $!";
my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!";
close $agpl or BAIL_OUT "close: $!";
-for my $t ('local', 'worker', 'worker again') {
+for my $t ('worker', 'worker again') {
+ my $ppid = $ipc->wq_workers_start('wq', 1);
+ push(@ppids, $ppid);
$ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
my $i = 0;
for my $fh ($ra, $rb, $rc) {
@@ -132,14 +132,12 @@ for my $t ('local', 'worker', 'worker again') {
$exp = sha1_hex($bigger)."\n";
is(readline($rb), $exp, "SHA WQWorker limit ($t)");
}
- my $ppid = $ipc->wq_workers_start('wq', 1);
- push(@ppids, $ppid);
}
# wq_io_do works across fork (siblings can feed)
SKIP: {
skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0];
- is_deeply(\@ppids, [$$, undef, undef],
+ is_xdeeply(\@ppids, [$$, undef],
'parent pid returned in wq_workers_start');
my $pid = fork // BAIL_OUT $!;
if ($pid == 0) {
@@ -173,10 +171,9 @@ SKIP: {
skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0];
seek($warn, 0, SEEK_SET) or BAIL_OUT;
my @warn = <$warn>;
- is(scalar(@warn), 3, 'warned 3 times');
- like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do');
- like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
- is($warn[2], $warn[1], 'worker did not die');
+ is(scalar(@warn), 2, 'warned 3 times');
+ like($warn[0], qr/ wq_worker: /, '2nd warned from wq_worker');
+ is($warn[0], $warn[1], 'worker did not die');
$SIG{__WARN__} = 'DEFAULT';
is($ipc->wq_workers_start('wq', 2), $$, 'workers started again');
^ permalink raw reply related [relevance 6%]
* [PATCH 0/9] more process-related cleanups
@ 2023-10-07 21:24 7% Eric Wong
2023-10-07 21:24 6% ` [PATCH 4/9] ipc: require fork+SOCK_SEQPACKET for wq_* functions Eric Wong
0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
To: meta
2/9 fixes an annoying syslog error I spotted running tests;
3/9 is long overdue, and there's a few more overdue things
coming up...
Eric Wong (9):
xt/httpd-async-stream: avoid waitpid call
lei: do not issue sto->done if socket is inactive
lei: always use async `done' requests to store
ipc: require fork+SOCK_SEQPACKET for wq_* functions
ipc: use autodie for most syscalls
import: use autodie, rely on PerlIO for retries
rename ProcessPipe to ProcessIO
process_io: pass args to awaitpid as list
cindex: start using autodie
MANIFEST | 3 +-
lib/PublicInbox/CodeSearchIdx.pm | 70 ++++++++--------
lib/PublicInbox/Gcf2Client.pm | 4 +-
lib/PublicInbox/Git.pm | 4 +-
lib/PublicInbox/HTTPD/Async.pm | 2 +-
lib/PublicInbox/IPC.pm | 82 ++++++++-----------
lib/PublicInbox/Import.pm | 45 ++++------
lib/PublicInbox/LEI.pm | 11 ++-
lib/PublicInbox/LeiInput.pm | 2 +-
lib/PublicInbox/LeiRediff.pm | 2 +-
lib/PublicInbox/LeiRemote.pm | 2 +-
lib/PublicInbox/LeiStore.pm | 17 ++--
lib/PublicInbox/LeiToMail.pm | 6 +-
lib/PublicInbox/LeiXSearch.pm | 6 +-
.../{ProcessPipe.pm => ProcessIO.pm} | 12 ++-
lib/PublicInbox/Qspawn.pm | 8 +-
lib/PublicInbox/Spamcheck/Spamc.pm | 2 +-
lib/PublicInbox/Spawn.pm | 12 +--
t/ipc.t | 19 ++---
t/lei-store-fail.t | 51 ++++++++++++
t/spawn.t | 12 +--
xt/httpd-async-stream.t | 6 +-
22 files changed, 196 insertions(+), 182 deletions(-)
rename lib/PublicInbox/{ProcessPipe.pm => ProcessIO.pm} (83%)
create mode 100644 t/lei-store-fail.t
^ permalink raw reply [relevance 7%]
Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2023-10-07 21:24 7% [PATCH 0/9] more process-related cleanups Eric Wong
2023-10-07 21:24 6% ` [PATCH 4/9] ipc: require fork+SOCK_SEQPACKET for wq_* functions Eric Wong
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).