From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 920061F9E5 for ; Tue, 25 May 2021 22:20:01 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 2/2] ipc: wq: handle >MAX_ARG_STRLEN && In-Reply-To: <20210525222001.27517-1-e@80x24.org> References: <20210525222001.27517-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: WQWorkers are limited roughly to MAX_ARG_STRLEN (the kernel limit of argv + environ) to avoid excessive memory growth. Occasionally, we need to send larger messages via workqueues that are too small to hit EMSGSIZE on the sender. This fixes "lei q" when using HTTP(S) externals, since that code path sends large Eml objects from lei_xsearch workers directly to lei2mail WQ workers. --- lib/PublicInbox/IPC.pm | 45 +++++++++++++++++++++++-------------- lib/PublicInbox/WQWorker.pm | 2 +- t/ipc.t | 11 ++++++--- 3 files changed, 37 insertions(+), 21 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 24237773..497a6035 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -17,6 +17,7 @@ use PublicInbox::Spawn; use PublicInbox::OnDestroy; use PublicInbox::WQWorker; use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); +my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? our @EXPORT_OK = qw(ipc_freeze ipc_thaw); my $WQ_MAX_WORKERS = 4096; @@ -213,7 +214,7 @@ sub ipc_sibling_atfork_child { sub recv_and_run { my ($self, $s2, $len, $full_stream) = @_; - my @fds = $recv_cmd->($s2, my $buf, $len); + my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN); return if scalar(@fds) && !defined($fds[0]); my $n = length($buf) or return 0; my $nfd = 0; @@ -268,27 +269,37 @@ sub wq_broadcast { } } +sub stream_in_full ($$$) { + my ($s1, $fds, $buf) = @_; + socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or + croak "socketpair: $!"; + my $n = $send_cmd->($s1, [ fileno($r) ], + ipc_freeze(['do_sock_stream', length($buf)]), + MSG_EOR) // croak "sendmsg: $!"; + undef $r; + $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!"; + while ($n < length($buf)) { + my $x = syswrite($w, $buf, length($buf) - $n, $n) // + croak "syswrite: $!"; + croak "syswrite wrote 0 bytes" if $x == 0; + $n += $x; + } +} + sub wq_io_do { # always async my ($self, $sub, $ios, @args) = @_; if (my $s1 = $self->{-wq_s1}) { # run in worker my $fds = [ map { fileno($_) } @$ios ]; my $buf = ipc_freeze([$sub, @args]); - my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR); - return if defined($n); # likely - croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS}; - croak "sendmsg: $!" if !$!{EMSGSIZE}; - socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or - croak "socketpair: $!"; - $n = $send_cmd->($s1, [ fileno($r) ], - ipc_freeze(['do_sock_stream', length($buf)]), - MSG_EOR) // croak "sendmsg: $!"; - undef $r; - $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!"; - while ($n < length($buf)) { - my $x = syswrite($w, $buf, length($buf) - $n, $n) // - croak "syswrite: $!"; - croak "syswrite wrote 0 bytes" if $x == 0; - $n += $x; + if (length($buf) > $MY_MAX_ARG_STRLEN) { + stream_in_full($s1, $fds, $buf); + } else { + my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR); + return if defined($n); # likely + $!{ETOOMANYREFS} and + croak "sendmsg: $! (check RLIMIT_NOFILE)"; + $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) : + croak("sendmsg: $!"); } } else { @$self{0..$#$ios} = @$ios; diff --git a/lib/PublicInbox/WQWorker.pm b/lib/PublicInbox/WQWorker.pm index 3636321e..f7aa61c5 100644 --- a/lib/PublicInbox/WQWorker.pm +++ b/lib/PublicInbox/WQWorker.pm @@ -23,7 +23,7 @@ sub event_step { my ($self) = @_; my $n; do { - $n = $self->{wq}->recv_and_run($self->{sock}, 4096 * 33); + $n = $self->{wq}->recv_and_run($self->{sock}); } while ($n); return if !defined($n) && $! == EAGAIN; # likely warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET; diff --git a/t/ipc.t b/t/ipc.t index ca88eb59..7983fdc0 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -122,11 +122,16 @@ for my $t ('local', 'worker', 'worker again') { $ipc->wq_io_do('test_sha', [ $wa, $wb ], 'hello world'); is(readline($rb), sha1_hex('hello world')."\n", "SHA small ($t)"); { - my $bigger = $big x 10; + my $bigger = $big x 10; # to hit EMSGSIZE $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger); my $exp = sha1_hex($bigger)."\n"; - undef $bigger; - is(readline($rb), $exp, "SHA big ($t)"); + is(readline($rb), $exp, "SHA big for EMSGSIZE ($t)"); + + # to hit the WQWorker recv_and_run length + substr($bigger, my $MY_MAX_ARG_STRLEN = 4096 * 33, -1) = ''; + $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger); + $exp = sha1_hex($bigger)."\n"; + is(readline($rb), $exp, "SHA WQWorker limit ($t)"); } my $ppid = $ipc->wq_workers_start('wq', 1); push(@ppids, $ppid);