about summary refs log tree commit homepage
path: root/lib/PublicInbox/Spawn.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/Spawn.pm')
-rw-r--r--lib/PublicInbox/Spawn.pm346
1 files changed, 182 insertions, 164 deletions
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index e940d3c9..e36659ce 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # This allows vfork to be used for spawning subprocesses if
@@ -6,25 +6,26 @@
 # is explicitly defined in the environment (and writable).
 # Under Linux, vfork can make a big difference in spawning performance
 # as process size increases (fork still needs to mark pages for CoW use).
-# Currently, we only use this for code intended for long running
-# daemons (inside the PSGI code (-httpd) and -nntpd).  The short-lived
-# scripts (-mda, -index, -learn, -init) either use IPC::run or standard
-# Perl routines.
+# None of this is intended to be thread-safe since Perl5 maintainers
+# officially discourage the use of threads.
 #
 # There'll probably be more OS-level C stuff here, down the line.
 # We don't want too many DSOs: https://udrepper.livejournal.com/8790.html
 
 package PublicInbox::Spawn;
-use strict;
+use v5.12;
 use parent qw(Exporter);
-use Symbol qw(gensym);
-use Fcntl qw(LOCK_EX SEEK_SET);
+use PublicInbox::Lock;
+use Fcntl qw(SEEK_SET);
 use IO::Handle ();
-use PublicInbox::ProcessPipe;
-our @EXPORT_OK = qw(which spawn popen_rd run_die nodatacow_dir);
-our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA);
+use Carp qw(croak);
+use PublicInbox::IO;
+our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait run_qx);
+our (@RLIMITS, %RLIMITS);
+use autodie qw(close open pipe seek sysseek truncate);
 
 BEGIN {
+        @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA);
         my $all_libc = <<'ALL_LIBC'; # all *nix systems we support
 #include <sys/resource.h>
 #include <sys/socket.h>
@@ -34,12 +35,9 @@ BEGIN {
 #include <unistd.h>
 #include <stdlib.h>
 #include <errno.h>
-
-/* some platforms need alloca.h, but some don't */
-#if defined(__GNUC__) && !defined(alloca)
-#  define alloca(sz) __builtin_alloca(sz)
-#endif
-
+#include <time.h>
+#include <stdio.h>
+#include <string.h>
 #include <signal.h>
 #include <assert.h>
 
@@ -51,11 +49,17 @@ BEGIN {
  *   This is unlike "sv_len", which returns what you would expect.
  */
 #define AV2C_COPY(dst, src) do { \
+        static size_t dst##__capa; \
         I32 i; \
         I32 top_index = av_len(src); \
         I32 real_len = top_index + 1; \
         I32 capa = real_len + 1; \
-        dst = alloca(capa * sizeof(char *)); \
+        if (capa > dst##__capa) { \
+                dst##__capa = 0; /* in case Newx croaks */ \
+                Safefree(dst); \
+                Newx(dst, capa, char *); \
+                dst##__capa = capa; \
+        } \
         for (i = 0; i < real_len; i++) { \
                 SV **sv = av_fetch(src, i, 0); \
                 dst[i] = SvPV_nolen(*sv); \
@@ -84,23 +88,28 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref,
         AV *env = (AV *)SvRV(envref);
         AV *rlim = (AV *)SvRV(rlimref);
         const char *filename = SvPV_nolen(file);
-        pid_t pid;
-        char **argv, **envp;
+        pid_t pid = -1;
+        static char **argv, **envp;
         sigset_t set, old;
         int ret, perrnum;
         volatile int cerrnum = 0; /* shared due to vfork */
-        int chld_is_member;
+        int chld_is_member; /* needed due to shared memory w/ vfork */
         I32 max_fd = av_len(redir);
 
         AV2C_COPY(argv, cmd);
         AV2C_COPY(envp, env);
 
-        if (sigfillset(&set)) return -1;
-        if (sigprocmask(SIG_SETMASK, &set, &old)) return -1;
+        if (sigfillset(&set)) goto out;
+        if (sigdelset(&set, SIGABRT)) goto out;
+        if (sigdelset(&set, SIGBUS)) goto out;
+        if (sigdelset(&set, SIGFPE)) goto out;
+        if (sigdelset(&set, SIGILL)) goto out;
+        if (sigdelset(&set, SIGSEGV)) goto out;
+        /* no XCPU/XFSZ here */
+        if (sigprocmask(SIG_SETMASK, &set, &old)) goto out;
         chld_is_member = sigismember(&old, SIGCHLD);
-        if (chld_is_member < 0) return -1;
-        if (chld_is_member > 0)
-                sigdelset(&old, SIGCHLD);
+        if (chld_is_member < 0) goto out;
+        if (chld_is_member > 0 && sigdelset(&old, SIGCHLD)) goto out;
 
         pid = vfork();
         if (pid == 0) {
@@ -119,8 +128,10 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref,
                         exit_err("setpgid", &cerrnum);
                 for (sig = 1; sig < NSIG; sig++)
                         signal(sig, SIG_DFL); /* ignore errors on signals */
-                if (*cd && chdir(cd) < 0)
-                        exit_err("chdir", &cerrnum);
+                if (*cd && chdir(cd) < 0) {
+                        write(2, "cd ", 3);
+                        exit_err(cd, &cerrnum);
+                }
 
                 max_rlim = av_len(rlim);
                 for (i = 0; i < max_rlim; i += 3) {
@@ -159,9 +170,29 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref,
         } else if (perrnum) {
                 errno = perrnum;
         }
+out:
+        if (pid < 0)
+                croak("E: fork_exec %s: %s\n", filename, strerror(errno));
         return (int)pid;
 }
 
+static int sendmsg_retry(unsigned *tries)
+{
+        const struct timespec req = { 0, 100000000 }; /* 100ms */
+        int err = errno;
+        switch (err) {
+        case EINTR: PERL_ASYNC_CHECK(); return 1;
+        case ENOBUFS: case ENOMEM: case ETOOMANYREFS:
+                if (++*tries >= 50) return 0;
+                fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n",
+                        strerror(err), *tries);
+                nanosleep(&req, NULL);
+                PERL_ASYNC_CHECK();
+                return 1;
+        default: return 0;
+        }
+}
+
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
 #define SEND_FD_CAPA 10
 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
@@ -180,6 +211,7 @@ SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
         AV *fds = (AV *)SvRV(svfds);
         I32 i, nfds = av_len(fds) + 1;
         int *fdp;
+        unsigned tries = 0;
 
         if (SvOK(data)) {
                 iov.iov_base = SvPV(data, dlen);
@@ -207,7 +239,9 @@ SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
                         *fdp++ = SvIV(*fd);
                 }
         }
-        sent = sendmsg(PerlIO_fileno(s), &msg, flags);
+        do {
+                sent = sendmsg(PerlIO_fileno(s), &msg, flags);
+        } while (sent < 0 && sendmsg_retry(&tries));
         return sent >= 0 ? newSViv(sent) : &PL_sv_undef;
 }
 
@@ -229,130 +263,79 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
         msg.msg_control = &cmsg.hdr;
         msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE);
 
-        i = recvmsg(PerlIO_fileno(s), &msg, 0);
-        if (i < 0)
-                Inline_Stack_Push(&PL_sv_undef);
-        else
+        for (;;) {
+                i = recvmsg(PerlIO_fileno(s), &msg, 0);
+                if (i >= 0 || errno != EINTR) break;
+                PERL_ASYNC_CHECK();
+        }
+        if (i >= 0) {
                 SvCUR_set(buf, i);
-        if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
-                        cmsg.hdr.cmsg_type == SCM_RIGHTS) {
-                size_t len = cmsg.hdr.cmsg_len;
-                int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
-                for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
-                        Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
+                if (cmsg.hdr.cmsg_level == SOL_SOCKET &&
+                                cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+                        size_t len = cmsg.hdr.cmsg_len;
+                        int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
+                        for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
+                                Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
+                }
+        } else {
+                Inline_Stack_Push(&PL_sv_undef);
+                SvCUR_set(buf, 0);
         }
         Inline_Stack_Done;
 }
 #endif /* defined(CMSG_SPACE) && defined(CMSG_LEN) */
-ALL_LIBC
-
-# btrfs on Linux is copy-on-write (COW) by default.  As of Linux 5.7,
-# this still leads to fragmentation for SQLite and Xapian files where
-# random I/O happens, so we disable COW just for SQLite files and Xapian
-# directories.  Disabling COW disables checksumming, so we only do this
-# for regeneratable files, and not canonical git storage (git doesn't
-# checksum refs, only data under $GIT_DIR/objects).
-        my $set_nodatacow = $^O eq 'linux' ? <<'SET_NODATACOW' : '';
-#include <sys/ioctl.h>
-#include <sys/vfs.h>
-#include <linux/magic.h>
-#include <linux/fs.h>
-#include <dirent.h>
-#include <errno.h>
-#include <stdio.h>
-#include <string.h>
-
-void nodatacow_fd(int fd)
-{
-        struct statfs buf;
-        int val = 0;
 
-        if (fstatfs(fd, &buf) < 0) {
-                fprintf(stderr, "fstatfs: %s\\n", strerror(errno));
-                return;
-        }
-
-        /* only btrfs is known to have this problem, so skip for non-btrfs */
-        if (buf.f_type != BTRFS_SUPER_MAGIC)
-                return;
-
-        if (ioctl(fd, FS_IOC_GETFLAGS, &val) < 0) {
-                fprintf(stderr, "FS_IOC_GET_FLAGS: %s\\n", strerror(errno));
-                return;
-        }
-        val |= FS_NOCOW_FL;
-        if (ioctl(fd, FS_IOC_SETFLAGS, &val) < 0)
-                fprintf(stderr, "FS_IOC_SET_FLAGS: %s\\n", strerror(errno));
-}
-
-void nodatacow_dir(const char *dir)
+void rlimit_map()
 {
-        DIR *dh = opendir(dir);
-        int fd;
-
-        if (!dh) croak("opendir(%s): %s", dir, strerror(errno));
-        fd = dirfd(dh);
-        if (fd >= 0)
-                nodatacow_fd(fd);
-        /* ENOTSUP probably won't happen under Linux... */
-        closedir(dh);
-}
-SET_NODATACOW
-
-        my $inline_dir = $ENV{PERL_INLINE_DIRECTORY} //= (
+        Inline_Stack_Vars;
+        Inline_Stack_Reset;
+ALL_LIBC
+        my $inline_dir = $ENV{PERL_INLINE_DIRECTORY} // (
                         $ENV{XDG_CACHE_HOME} //
                         ( ($ENV{HOME} // '/nonexistent').'/.cache' )
                 ).'/public-inbox/inline-c';
-        warn "$inline_dir exists, not writable\n" if -e $inline_dir && !-w _;
-        $set_nodatacow = $all_libc = undef unless -d _ && -w _;
+        undef $all_libc unless -d $inline_dir;
         if (defined $all_libc) {
-                my $f = "$inline_dir/.public-inbox.lock";
-                open my $oldout, '>&', \*STDOUT or die "dup(1): $!";
-                open my $olderr, '>&', \*STDERR or die "dup(2): $!";
-                open my $fh, '+>', $f or die "open($f): $!";
-                open STDOUT, '>&', $fh or die "1>$f: $!";
-                open STDERR, '>&', $fh or die "2>$f: $!";
+                for (@RLIMITS, 'RLIM_INFINITY') {
+                        $all_libc .= <<EOM;
+        Inline_Stack_Push(sv_2mortal(newSVpvs("$_")));
+        Inline_Stack_Push(sv_2mortal(newSViv($_)));
+EOM
+                }
+                $all_libc .= <<EOM;
+        Inline_Stack_Done;
+} // rlimit_map
+EOM
+                local $ENV{PERL_INLINE_DIRECTORY} = $inline_dir;
+                # CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking
+                my $lk = PublicInbox::Lock->new($inline_dir.
+                                                '/.public-inbox.lock');
+                my $fh = $lk->lock_acquire;
+                open my $oldout, '>&', \*STDOUT;
+                open my $olderr, '>&', \*STDERR;
+                open STDOUT, '>&', $fh;
+                open STDERR, '>&', $fh;
                 STDERR->autoflush(1);
                 STDOUT->autoflush(1);
-
-                # CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking
-                flock($fh, LOCK_EX) or die "LOCK_EX($f): $!";
-                eval <<'EOM';
-use Inline C => $all_libc.$set_nodatacow, BUILD_NOISY => 1;
-EOM
+                eval 'use Inline C => $all_libc, BUILD_NOISY => 1';
                 my $err = $@;
-                my $ndc_err = '';
-                if ($err && $set_nodatacow) { # missing Linux kernel headers
-                        $ndc_err = "with set_nodatacow: <\n$err\n>\n";
-                        undef $set_nodatacow;
-                        eval <<'EOM';
-use Inline C => $all_libc, BUILD_NOISY => 1;
-EOM
-                };
-                $err = $@;
-                open(STDERR, '>&', $olderr) or warn "restore stderr: $!";
-                open(STDOUT, '>&', $oldout) or warn "restore stdout: $!";
+                open(STDERR, '>&', $olderr);
+                open(STDOUT, '>&', $oldout);
                 if ($err) {
                         seek($fh, 0, SEEK_SET);
                         my @msg = <$fh>;
-                        warn "Inline::C build failed:\n",
-                                $ndc_err, $err, "\n", @msg;
-                        $set_nodatacow = $all_libc = undef;
-                } elsif ($ndc_err) {
-                        warn "Inline::C build succeeded w/o set_nodatacow\n",
-                                "error $ndc_err";
+                        truncate($fh, 0);
+                        warn "Inline::C build failed:\n", $err, "\n", @msg;
+                        $all_libc = undef;
                 }
         }
-        unless ($all_libc) {
+        if (defined $all_libc) { # set for Gcf2
+                $ENV{PERL_INLINE_DIRECTORY} = $inline_dir;
+                %RLIMITS = rlimit_map();
+        } else {
                 require PublicInbox::SpawnPP;
                 *pi_fork_exec = \&PublicInbox::SpawnPP::pi_fork_exec
         }
-        unless ($set_nodatacow) {
-                require PublicInbox::NDC_PP;
-                no warnings 'once';
-                *nodatacow_fd = \&PublicInbox::NDC_PP::nodatacow_fd;
-                *nodatacow_dir = \&PublicInbox::NDC_PP::nodatacow_dir;
-        }
 } # /BEGIN
 
 sub which ($) {
@@ -366,59 +349,94 @@ sub which ($) {
 }
 
 sub spawn ($;$$) {
-        my ($cmd, $env, $opts) = @_;
+        my ($cmd, $env, $opt) = @_;
         my $f = which($cmd->[0]) // die "$cmd->[0]: command not found\n";
-        my @env;
-        $opts ||= {};
+        my (@env, @rdr);
         my %env = (%ENV, $env ? %$env : ());
         while (my ($k, $v) = each %env) {
                 push @env, "$k=$v" if defined($v);
         }
-        my $redir = [];
         for my $child_fd (0..2) {
-                my $parent_fd = $opts->{$child_fd};
-                if (defined($parent_fd) && $parent_fd !~ /\A[0-9]+\z/) {
-                        my $fd = fileno($parent_fd) //
-                                        die "$parent_fd not an IO GLOB? $!";
-                        $parent_fd = $fd;
+                my $pfd = $opt->{$child_fd};
+                if ('SCALAR' eq ref($pfd)) {
+                        open my $fh, '+>', undef;
+                        $opt->{"fh.$child_fd"} = $fh; # for read_out_err
+                        if ($child_fd == 0) {
+                                print $fh $$pfd;
+                                $fh->flush or die "flush: $!";
+                                sysseek($fh, 0, SEEK_SET);
+                        }
+                        $pfd = fileno($fh);
+                } elsif (defined($pfd) && $pfd !~ /\A[0-9]+\z/) {
+                        my $fd = fileno($pfd) //
+                                        croak "BUG: $pfd not an IO GLOB? $!";
+                        $pfd = $fd;
                 }
-                $redir->[$child_fd] = $parent_fd // $child_fd;
+                $rdr[$child_fd] = $pfd // $child_fd;
         }
         my $rlim = [];
-
         foreach my $l (@RLIMITS) {
-                my $v = $opts->{$l} // next;
-                my $r = eval "require BSD::Resource; BSD::Resource::$l();";
-                unless (defined $r) {
-                        warn "$l undefined by BSD::Resource: $@\n";
-                        next;
-                }
+                my $v = $opt->{$l} // next;
+                my $r = $RLIMITS{$l} //
+                        eval "require BSD::Resource; BSD::Resource::$l();" //
+                        do {
+                                warn "$l undefined by BSD::Resource: $@\n";
+                                next;
+                        };
                 push @$rlim, $r, @$v;
         }
-        my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work?
-        my $pgid = $opts->{pgid} // -1;
-        my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd, $pgid);
-        die "fork_exec @$cmd failed: $!\n" unless $pid > 0;
-        $pid;
+        my $cd = $opt->{'-C'} // ''; # undef => NULL mapping doesn't work?
+        my $pgid = $opt->{pgid} // -1;
+        pi_fork_exec(\@rdr, $f, $cmd, \@env, $rlim, $cd, $pgid);
 }
 
 sub popen_rd {
+        my ($cmd, $env, $opt, @cb_arg) = @_;
+        pipe(my $r, local $opt->{1});
+        PublicInbox::IO::attach_pid($r, spawn($cmd, $env, $opt), @cb_arg);
+}
+
+sub popen_wr {
+        my ($cmd, $env, $opt, @cb_arg) = @_;
+        pipe(local $opt->{0}, my $w);
+        $w->autoflush(1);
+        PublicInbox::IO::attach_pid($w, spawn($cmd, $env, $opt), @cb_arg);
+}
+
+sub read_out_err ($) {
+        my ($opt) = @_;
+        for my $fd (1, 2) { # read stdout/stderr
+                my $fh = delete($opt->{"fh.$fd"}) // next;
+                seek($fh, 0, SEEK_SET);
+                PublicInbox::IO::read_all $fh, undef, $opt->{$fd};
+        }
+}
+
+sub run_wait ($;$$) {
         my ($cmd, $env, $opt) = @_;
-        pipe(my ($r, $w)) or die "pipe: $!\n";
-        $opt ||= {};
-        $opt->{1} = fileno($w);
-        my $pid = spawn($cmd, $env, $opt);
-        return ($r, $pid) if wantarray;
-        my $ret = gensym;
-        tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)};
-        $ret;
+        waitpid(spawn($cmd, $env, $opt), 0);
+        read_out_err($opt);
+        $?
 }
 
 sub run_die ($;$$) {
         my ($cmd, $env, $rdr) = @_;
-        my $pid = spawn($cmd, $env, $rdr);
-        waitpid($pid, 0) == $pid or die "@$cmd did not finish";
-        $? == 0 or die "@$cmd failed: \$?=$?\n";
+        run_wait($cmd, $env, $rdr) and croak "E: @$cmd failed: \$?=$?";
+}
+
+sub run_qx {
+        my ($cmd, $env, $opt) = @_;
+        my $fh = popen_rd($cmd, $env, $opt);
+        my @ret;
+        if (wantarray) {
+                @ret = <$fh>;
+        } else {
+                local $/;
+                $ret[0] = <$fh>;
+        }
+        $fh->close; # caller should check $?
+        read_out_err($opt);
+        wantarray ? @ret : $ret[0];
 }
 
 1;