about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-05-25 22:20:01 +0000
committerEric Wong <e@80x24.org>2021-05-25 23:05:02 +0000
commit708b182a57373172f5523f3dc297659d58e03b58 (patch)
tree90d2a254577ce297f33f7988ccc6eceed252435c /lib
parent3060d78b4183f3e985fb7ff8864949de990f2610 (diff)
downloadpublic-inbox-708b182a57373172f5523f3dc297659d58e03b58.tar.gz
ipc: wq: handle >MAX_ARG_STRLEN && <EMSGSIZE case
WQWorkers are limited roughly to MAX_ARG_STRLEN (the kernel
limit of argv + environ) to avoid excessive memory growth.
Occasionally, we need to send larger messages via workqueues
that are too small to hit EMSGSIZE on the sender.

This fixes "lei q" when using HTTP(S) externals, since that
code path sends large Eml objects from lei_xsearch workers
directly to lei2mail WQ workers.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/IPC.pm45
-rw-r--r--lib/PublicInbox/WQWorker.pm2
2 files changed, 29 insertions, 18 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 24237773..497a6035 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -17,6 +17,7 @@ use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
+my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 our @EXPORT_OK = qw(ipc_freeze ipc_thaw);
 my $WQ_MAX_WORKERS = 4096;
@@ -213,7 +214,7 @@ sub ipc_sibling_atfork_child {
 
 sub recv_and_run {
         my ($self, $s2, $len, $full_stream) = @_;
-        my @fds = $recv_cmd->($s2, my $buf, $len);
+        my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
         return if scalar(@fds) && !defined($fds[0]);
         my $n = length($buf) or return 0;
         my $nfd = 0;
@@ -268,27 +269,37 @@ sub wq_broadcast {
         }
 }
 
+sub stream_in_full ($$$) {
+        my ($s1, $fds, $buf) = @_;
+        socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
+                croak "socketpair: $!";
+        my $n = $send_cmd->($s1, [ fileno($r) ],
+                        ipc_freeze(['do_sock_stream', length($buf)]),
+                        MSG_EOR) // croak "sendmsg: $!";
+        undef $r;
+        $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
+        while ($n < length($buf)) {
+                my $x = syswrite($w, $buf, length($buf) - $n, $n) //
+                                croak "syswrite: $!";
+                croak "syswrite wrote 0 bytes" if $x == 0;
+                $n += $x;
+        }
+}
+
 sub wq_io_do { # always async
         my ($self, $sub, $ios, @args) = @_;
         if (my $s1 = $self->{-wq_s1}) { # run in worker
                 my $fds = [ map { fileno($_) } @$ios ];
                 my $buf = ipc_freeze([$sub, @args]);
-                my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
-                return if defined($n); # likely
-                croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS};
-                croak "sendmsg: $!" if !$!{EMSGSIZE};
-                socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
-                        croak "socketpair: $!";
-                $n = $send_cmd->($s1, [ fileno($r) ],
-                                ipc_freeze(['do_sock_stream', length($buf)]),
-                                MSG_EOR) // croak "sendmsg: $!";
-                undef $r;
-                $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
-                while ($n < length($buf)) {
-                        my $x = syswrite($w, $buf, length($buf) - $n, $n) //
-                                        croak "syswrite: $!";
-                        croak "syswrite wrote 0 bytes" if $x == 0;
-                        $n += $x;
+                if (length($buf) > $MY_MAX_ARG_STRLEN) {
+                        stream_in_full($s1, $fds, $buf);
+                } else {
+                        my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
+                        return if defined($n); # likely
+                        $!{ETOOMANYREFS} and
+                                croak "sendmsg: $! (check RLIMIT_NOFILE)";
+                        $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+                        croak("sendmsg: $!");
                 }
         } else {
                 @$self{0..$#$ios} = @$ios;
diff --git a/lib/PublicInbox/WQWorker.pm b/lib/PublicInbox/WQWorker.pm
index 3636321e..f7aa61c5 100644
--- a/lib/PublicInbox/WQWorker.pm
+++ b/lib/PublicInbox/WQWorker.pm
@@ -23,7 +23,7 @@ sub event_step {
         my ($self) = @_;
         my $n;
         do {
-                $n = $self->{wq}->recv_and_run($self->{sock}, 4096 * 33);
+                $n = $self->{wq}->recv_and_run($self->{sock});
         } while ($n);
         return if !defined($n) && $! == EAGAIN; # likely
         warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET;