about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/IPC.pm45
-rw-r--r--lib/PublicInbox/WQWorker.pm2
-rw-r--r--t/ipc.t11
3 files changed, 37 insertions, 21 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 24237773..497a6035 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -17,6 +17,7 @@ use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
+my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 our @EXPORT_OK = qw(ipc_freeze ipc_thaw);
 my $WQ_MAX_WORKERS = 4096;
@@ -213,7 +214,7 @@ sub ipc_sibling_atfork_child {
 
 sub recv_and_run {
         my ($self, $s2, $len, $full_stream) = @_;
-        my @fds = $recv_cmd->($s2, my $buf, $len);
+        my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
         return if scalar(@fds) && !defined($fds[0]);
         my $n = length($buf) or return 0;
         my $nfd = 0;
@@ -268,27 +269,37 @@ sub wq_broadcast {
         }
 }
 
+sub stream_in_full ($$$) {
+        my ($s1, $fds, $buf) = @_;
+        socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
+                croak "socketpair: $!";
+        my $n = $send_cmd->($s1, [ fileno($r) ],
+                        ipc_freeze(['do_sock_stream', length($buf)]),
+                        MSG_EOR) // croak "sendmsg: $!";
+        undef $r;
+        $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
+        while ($n < length($buf)) {
+                my $x = syswrite($w, $buf, length($buf) - $n, $n) //
+                                croak "syswrite: $!";
+                croak "syswrite wrote 0 bytes" if $x == 0;
+                $n += $x;
+        }
+}
+
 sub wq_io_do { # always async
         my ($self, $sub, $ios, @args) = @_;
         if (my $s1 = $self->{-wq_s1}) { # run in worker
                 my $fds = [ map { fileno($_) } @$ios ];
                 my $buf = ipc_freeze([$sub, @args]);
-                my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
-                return if defined($n); # likely
-                croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS};
-                croak "sendmsg: $!" if !$!{EMSGSIZE};
-                socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
-                        croak "socketpair: $!";
-                $n = $send_cmd->($s1, [ fileno($r) ],
-                                ipc_freeze(['do_sock_stream', length($buf)]),
-                                MSG_EOR) // croak "sendmsg: $!";
-                undef $r;
-                $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
-                while ($n < length($buf)) {
-                        my $x = syswrite($w, $buf, length($buf) - $n, $n) //
-                                        croak "syswrite: $!";
-                        croak "syswrite wrote 0 bytes" if $x == 0;
-                        $n += $x;
+                if (length($buf) > $MY_MAX_ARG_STRLEN) {
+                        stream_in_full($s1, $fds, $buf);
+                } else {
+                        my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
+                        return if defined($n); # likely
+                        $!{ETOOMANYREFS} and
+                                croak "sendmsg: $! (check RLIMIT_NOFILE)";
+                        $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+                        croak("sendmsg: $!");
                 }
         } else {
                 @$self{0..$#$ios} = @$ios;
diff --git a/lib/PublicInbox/WQWorker.pm b/lib/PublicInbox/WQWorker.pm
index 3636321e..f7aa61c5 100644
--- a/lib/PublicInbox/WQWorker.pm
+++ b/lib/PublicInbox/WQWorker.pm
@@ -23,7 +23,7 @@ sub event_step {
         my ($self) = @_;
         my $n;
         do {
-                $n = $self->{wq}->recv_and_run($self->{sock}, 4096 * 33);
+                $n = $self->{wq}->recv_and_run($self->{sock});
         } while ($n);
         return if !defined($n) && $! == EAGAIN; # likely
         warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET;
diff --git a/t/ipc.t b/t/ipc.t
index ca88eb59..7983fdc0 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -122,11 +122,16 @@ for my $t ('local', 'worker', 'worker again') {
         $ipc->wq_io_do('test_sha', [ $wa, $wb ], 'hello world');
         is(readline($rb), sha1_hex('hello world')."\n", "SHA small ($t)");
         {
-                my $bigger = $big x 10;
+                my $bigger = $big x 10; # to hit EMSGSIZE
                 $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger);
                 my $exp = sha1_hex($bigger)."\n";
-                undef $bigger;
-                is(readline($rb), $exp, "SHA big ($t)");
+                is(readline($rb), $exp, "SHA big for EMSGSIZE ($t)");
+
+                # to hit the WQWorker recv_and_run length
+                substr($bigger, my $MY_MAX_ARG_STRLEN = 4096 * 33, -1) = '';
+                $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger);
+                $exp = sha1_hex($bigger)."\n";
+                is(readline($rb), $exp, "SHA WQWorker limit ($t)");
         }
         my $ppid = $ipc->wq_workers_start('wq', 1);
         push(@ppids, $ppid);