about summary refs log tree commit homepage
path: root/lib/PublicInbox/IPC.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/IPC.pm')
-rw-r--r--lib/PublicInbox/IPC.pm45
1 files changed, 28 insertions, 17 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 24237773..497a6035 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -17,6 +17,7 @@ use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
+my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 our @EXPORT_OK = qw(ipc_freeze ipc_thaw);
 my $WQ_MAX_WORKERS = 4096;
@@ -213,7 +214,7 @@ sub ipc_sibling_atfork_child {
 
 sub recv_and_run {
         my ($self, $s2, $len, $full_stream) = @_;
-        my @fds = $recv_cmd->($s2, my $buf, $len);
+        my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
         return if scalar(@fds) && !defined($fds[0]);
         my $n = length($buf) or return 0;
         my $nfd = 0;
@@ -268,27 +269,37 @@ sub wq_broadcast {
         }
 }
 
+sub stream_in_full ($$$) {
+        my ($s1, $fds, $buf) = @_;
+        socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
+                croak "socketpair: $!";
+        my $n = $send_cmd->($s1, [ fileno($r) ],
+                        ipc_freeze(['do_sock_stream', length($buf)]),
+                        MSG_EOR) // croak "sendmsg: $!";
+        undef $r;
+        $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
+        while ($n < length($buf)) {
+                my $x = syswrite($w, $buf, length($buf) - $n, $n) //
+                                croak "syswrite: $!";
+                croak "syswrite wrote 0 bytes" if $x == 0;
+                $n += $x;
+        }
+}
+
 sub wq_io_do { # always async
         my ($self, $sub, $ios, @args) = @_;
         if (my $s1 = $self->{-wq_s1}) { # run in worker
                 my $fds = [ map { fileno($_) } @$ios ];
                 my $buf = ipc_freeze([$sub, @args]);
-                my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
-                return if defined($n); # likely
-                croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS};
-                croak "sendmsg: $!" if !$!{EMSGSIZE};
-                socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
-                        croak "socketpair: $!";
-                $n = $send_cmd->($s1, [ fileno($r) ],
-                                ipc_freeze(['do_sock_stream', length($buf)]),
-                                MSG_EOR) // croak "sendmsg: $!";
-                undef $r;
-                $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
-                while ($n < length($buf)) {
-                        my $x = syswrite($w, $buf, length($buf) - $n, $n) //
-                                        croak "syswrite: $!";
-                        croak "syswrite wrote 0 bytes" if $x == 0;
-                        $n += $x;
+                if (length($buf) > $MY_MAX_ARG_STRLEN) {
+                        stream_in_full($s1, $fds, $buf);
+                } else {
+                        my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
+                        return if defined($n); # likely
+                        $!{ETOOMANYREFS} and
+                                croak "sendmsg: $! (check RLIMIT_NOFILE)";
+                        $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+                        croak("sendmsg: $!");
                 }
         } else {
                 @$self{0..$#$ios} = @$ios;