From 708b182a57373172f5523f3dc297659d58e03b58 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 25 May 2021 22:20:01 +0000 Subject: ipc: wq: handle >MAX_ARG_STRLEN && ($s2, my $buf, $len); + my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN); return if scalar(@fds) && !defined($fds[0]); my $n = length($buf) or return 0; my $nfd = 0; @@ -268,27 +269,37 @@ sub wq_broadcast { } } +sub stream_in_full ($$$) { + my ($s1, $fds, $buf) = @_; + socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or + croak "socketpair: $!"; + my $n = $send_cmd->($s1, [ fileno($r) ], + ipc_freeze(['do_sock_stream', length($buf)]), + MSG_EOR) // croak "sendmsg: $!"; + undef $r; + $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!"; + while ($n < length($buf)) { + my $x = syswrite($w, $buf, length($buf) - $n, $n) // + croak "syswrite: $!"; + croak "syswrite wrote 0 bytes" if $x == 0; + $n += $x; + } +} + sub wq_io_do { # always async my ($self, $sub, $ios, @args) = @_; if (my $s1 = $self->{-wq_s1}) { # run in worker my $fds = [ map { fileno($_) } @$ios ]; my $buf = ipc_freeze([$sub, @args]); - my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR); - return if defined($n); # likely - croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS}; - croak "sendmsg: $!" if !$!{EMSGSIZE}; - socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or - croak "socketpair: $!"; - $n = $send_cmd->($s1, [ fileno($r) ], - ipc_freeze(['do_sock_stream', length($buf)]), - MSG_EOR) // croak "sendmsg: $!"; - undef $r; - $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!"; - while ($n < length($buf)) { - my $x = syswrite($w, $buf, length($buf) - $n, $n) // - croak "syswrite: $!"; - croak "syswrite wrote 0 bytes" if $x == 0; - $n += $x; + if (length($buf) > $MY_MAX_ARG_STRLEN) { + stream_in_full($s1, $fds, $buf); + } else { + my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR); + return if defined($n); # likely + $!{ETOOMANYREFS} and + croak "sendmsg: $! (check RLIMIT_NOFILE)"; + $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) : + croak("sendmsg: $!"); } } else { @$self{0..$#$ios} = @$ios; diff --git a/lib/PublicInbox/WQWorker.pm b/lib/PublicInbox/WQWorker.pm index 3636321e..f7aa61c5 100644 --- a/lib/PublicInbox/WQWorker.pm +++ b/lib/PublicInbox/WQWorker.pm @@ -23,7 +23,7 @@ sub event_step { my ($self) = @_; my $n; do { - $n = $self->{wq}->recv_and_run($self->{sock}, 4096 * 33); + $n = $self->{wq}->recv_and_run($self->{sock}); } while ($n); return if !defined($n) && $! == EAGAIN; # likely warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET; -- cgit v1.2.3-24-ge0c7