diff options
Diffstat (limited to 'lib/PublicInbox/Spawn.pm')
-rw-r--r-- | lib/PublicInbox/Spawn.pm | 275 |
1 files changed, 171 insertions, 104 deletions
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 3f69108a..e9e81e88 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -6,10 +6,8 @@ # is explicitly defined in the environment (and writable). # Under Linux, vfork can make a big difference in spawning performance # as process size increases (fork still needs to mark pages for CoW use). -# Currently, we only use this for code intended for long running -# daemons (inside the PSGI code (-httpd) and -nntpd). The short-lived -# scripts (-mda, -index, -learn, -init) either use IPC::run or standard -# Perl routines. +# None of this is intended to be thread-safe since Perl5 maintainers +# officially discourage the use of threads. # # There'll probably be more OS-level C stuff here, down the line. # We don't want too many DSOs: https://udrepper.livejournal.com/8790.html @@ -17,14 +15,17 @@ package PublicInbox::Spawn; use v5.12; use parent qw(Exporter); -use Symbol qw(gensym); -use Fcntl qw(LOCK_EX SEEK_SET); +use PublicInbox::Lock; +use Fcntl qw(SEEK_SET); use IO::Handle (); -use PublicInbox::ProcessPipe; -our @EXPORT_OK = qw(which spawn popen_rd run_die); -our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA); +use Carp qw(croak); +use PublicInbox::IO; +our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait run_qx); +our (@RLIMITS, %RLIMITS); +use autodie qw(close open pipe seek sysseek truncate); BEGIN { + @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA); my $all_libc = <<'ALL_LIBC'; # all *nix systems we support #include <sys/resource.h> #include <sys/socket.h> @@ -37,12 +38,6 @@ BEGIN { #include <time.h> #include <stdio.h> #include <string.h> - -/* some platforms need alloca.h, but some don't */ -#if defined(__GNUC__) && !defined(alloca) -# define alloca(sz) __builtin_alloca(sz) -#endif - #include <signal.h> #include <assert.h> @@ -54,11 +49,17 @@ BEGIN { * This is unlike "sv_len", which returns what you would expect. */ #define AV2C_COPY(dst, src) do { \ + static size_t dst##__capa; \ I32 i; \ I32 top_index = av_len(src); \ I32 real_len = top_index + 1; \ I32 capa = real_len + 1; \ - dst = alloca(capa * sizeof(char *)); \ + if (capa > dst##__capa) { \ + dst##__capa = 0; /* in case Newx croaks */ \ + Safefree(dst); \ + Newx(dst, capa, char *); \ + dst##__capa = capa; \ + } \ for (i = 0; i < real_len; i++) { \ SV **sv = av_fetch(src, i, 0); \ dst[i] = SvPV_nolen(*sv); \ @@ -87,23 +88,28 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref, AV *env = (AV *)SvRV(envref); AV *rlim = (AV *)SvRV(rlimref); const char *filename = SvPV_nolen(file); - pid_t pid; - char **argv, **envp; + pid_t pid = -1; + static char **argv, **envp; sigset_t set, old; int ret, perrnum; volatile int cerrnum = 0; /* shared due to vfork */ - int chld_is_member; + int chld_is_member; /* needed due to shared memory w/ vfork */ I32 max_fd = av_len(redir); AV2C_COPY(argv, cmd); AV2C_COPY(envp, env); - if (sigfillset(&set)) return -1; - if (sigprocmask(SIG_SETMASK, &set, &old)) return -1; + if (sigfillset(&set)) goto out; + if (sigdelset(&set, SIGABRT)) goto out; + if (sigdelset(&set, SIGBUS)) goto out; + if (sigdelset(&set, SIGFPE)) goto out; + if (sigdelset(&set, SIGILL)) goto out; + if (sigdelset(&set, SIGSEGV)) goto out; + /* no XCPU/XFSZ here */ + if (sigprocmask(SIG_SETMASK, &set, &old)) goto out; chld_is_member = sigismember(&old, SIGCHLD); - if (chld_is_member < 0) return -1; - if (chld_is_member > 0) - sigdelset(&old, SIGCHLD); + if (chld_is_member < 0) goto out; + if (chld_is_member > 0 && sigdelset(&old, SIGCHLD)) goto out; pid = vfork(); if (pid == 0) { @@ -122,8 +128,10 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref, exit_err("setpgid", &cerrnum); for (sig = 1; sig < NSIG; sig++) signal(sig, SIG_DFL); /* ignore errors on signals */ - if (*cd && chdir(cd) < 0) - exit_err("chdir", &cerrnum); + if (*cd && chdir(cd) < 0) { + write(2, "cd ", 3); + exit_err(cd, &cerrnum); + } max_rlim = av_len(rlim); for (i = 0; i < max_rlim; i += 3) { @@ -162,22 +170,26 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref, } else if (perrnum) { errno = perrnum; } +out: + if (pid < 0) + croak("E: fork_exec %s: %s\n", filename, strerror(errno)); return (int)pid; } -static int sleep_wait(unsigned *tries, int err) +static int sendmsg_retry(int *tries) { const struct timespec req = { 0, 100000000 }; /* 100ms */ + int err = errno; switch (err) { + case EINTR: PERL_ASYNC_CHECK(); return 1; case ENOBUFS: case ENOMEM: case ETOOMANYREFS: - if (++*tries < 50) { - fprintf(stderr, "sleeping on sendmsg: %s (#%u)\n", - strerror(err), *tries); - nanosleep(&req, NULL); - return 1; - } - default: - return 0; + if (--*tries < 0) return 0; + fprintf(stderr, "# sleeping on sendmsg: %s (%d tries left)\n", + strerror(err), *tries); + nanosleep(&req, NULL); + PERL_ASYNC_CHECK(); + return 1; + default: return 0; } } @@ -189,7 +201,7 @@ union my_cmsg { char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE]; }; -SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags) +SV *send_cmd4_(PerlIO *s, SV *svfds, SV *data, int flags, int tries) { struct msghdr msg = { 0 }; union my_cmsg cmsg = { 0 }; @@ -199,7 +211,6 @@ SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags) AV *fds = (AV *)SvRV(svfds); I32 i, nfds = av_len(fds) + 1; int *fdp; - unsigned tries = 0; if (SvOK(data)) { iov.iov_base = SvPV(data, dlen); @@ -229,7 +240,7 @@ SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags) } do { sent = sendmsg(PerlIO_fileno(s), &msg, flags); - } while (sent < 0 && sleep_wait(&tries, errno)); + } while (sent < 0 && sendmsg_retry(&tries)); return sent >= 0 ? newSViv(sent) : &PL_sv_undef; } @@ -251,58 +262,79 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n) msg.msg_control = &cmsg.hdr; msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE); - i = recvmsg(PerlIO_fileno(s), &msg, 0); - if (i < 0) - Inline_Stack_Push(&PL_sv_undef); - else + for (;;) { + i = recvmsg(PerlIO_fileno(s), &msg, 0); + if (i >= 0 || errno != EINTR) break; + PERL_ASYNC_CHECK(); + } + if (i >= 0) { SvCUR_set(buf, i); - if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET && - cmsg.hdr.cmsg_type == SCM_RIGHTS) { - size_t len = cmsg.hdr.cmsg_len; - int *fdp = (int *)CMSG_DATA(&cmsg.hdr); - for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) - Inline_Stack_Push(sv_2mortal(newSViv(*fdp++))); + if (cmsg.hdr.cmsg_level == SOL_SOCKET && + cmsg.hdr.cmsg_type == SCM_RIGHTS) { + size_t len = cmsg.hdr.cmsg_len; + int *fdp = (int *)CMSG_DATA(&cmsg.hdr); + for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) + Inline_Stack_Push(sv_2mortal(newSViv(*fdp++))); + } + } else { + Inline_Stack_Push(&PL_sv_undef); + SvCUR_set(buf, 0); } Inline_Stack_Done; } #endif /* defined(CMSG_SPACE) && defined(CMSG_LEN) */ -ALL_LIBC - my $inline_dir = $ENV{PERL_INLINE_DIRECTORY} //= ( +void rlimit_map() +{ + Inline_Stack_Vars; + Inline_Stack_Reset; +ALL_LIBC + my $inline_dir = $ENV{PERL_INLINE_DIRECTORY} // ( $ENV{XDG_CACHE_HOME} // ( ($ENV{HOME} // '/nonexistent').'/.cache' ) ).'/public-inbox/inline-c'; - warn "$inline_dir exists, not writable\n" if -e $inline_dir && !-w _; - $all_libc = undef unless -d _ && -w _; + undef $all_libc unless -d $inline_dir; if (defined $all_libc) { - my $f = "$inline_dir/.public-inbox.lock"; - open my $oldout, '>&', \*STDOUT or die "dup(1): $!"; - open my $olderr, '>&', \*STDERR or die "dup(2): $!"; - open my $fh, '+>', $f or die "open($f): $!"; - open STDOUT, '>&', $fh or die "1>$f: $!"; - open STDERR, '>&', $fh or die "2>$f: $!"; + for (@RLIMITS, 'RLIM_INFINITY') { + $all_libc .= <<EOM; + Inline_Stack_Push(sv_2mortal(newSVpvs("$_"))); + Inline_Stack_Push(sv_2mortal(newSViv($_))); +EOM + } + $all_libc .= <<EOM; + Inline_Stack_Done; +} // rlimit_map +EOM + local $ENV{PERL_INLINE_DIRECTORY} = $inline_dir; + # CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking + my $lk = PublicInbox::Lock->new($inline_dir. + '/.public-inbox.lock'); + my $fh = $lk->lock_acquire; + open my $oldout, '>&', \*STDOUT; + open my $olderr, '>&', \*STDERR; + open STDOUT, '>&', $fh; + open STDERR, '>&', $fh; STDERR->autoflush(1); STDOUT->autoflush(1); - - # CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking - flock($fh, LOCK_EX) or die "LOCK_EX($f): $!"; - eval <<'EOM'; -use Inline C => $all_libc, BUILD_NOISY => 1; -EOM + eval 'use Inline C => $all_libc, BUILD_NOISY => 1'; my $err = $@; - my $ndc_err = ''; - $err = $@; - open(STDERR, '>&', $olderr) or warn "restore stderr: $!"; - open(STDOUT, '>&', $oldout) or warn "restore stdout: $!"; + open(STDERR, '>&', $olderr); + open(STDOUT, '>&', $oldout); if ($err) { seek($fh, 0, SEEK_SET); my @msg = <$fh>; - warn "Inline::C build failed:\n", - $ndc_err, $err, "\n", @msg; + truncate($fh, 0); + warn "Inline::C build failed:\n", $err, "\n", @msg; $all_libc = undef; } } - unless ($all_libc) { + if (defined $all_libc) { # set for Gcf2 + $ENV{PERL_INLINE_DIRECTORY} = $inline_dir; + %RLIMITS = rlimit_map(); + *send_cmd4 = sub ($$$$;$) { + send_cmd4_($_[0], $_[1], $_[2], $_[3], 50); + } + } else { require PublicInbox::SpawnPP; *pi_fork_exec = \&PublicInbox::SpawnPP::pi_fork_exec } @@ -319,59 +351,94 @@ sub which ($) { } sub spawn ($;$$) { - my ($cmd, $env, $opts) = @_; + my ($cmd, $env, $opt) = @_; my $f = which($cmd->[0]) // die "$cmd->[0]: command not found\n"; - my @env; - $opts ||= {}; + my (@env, @rdr); my %env = (%ENV, $env ? %$env : ()); while (my ($k, $v) = each %env) { push @env, "$k=$v" if defined($v); } - my $redir = []; for my $child_fd (0..2) { - my $parent_fd = $opts->{$child_fd}; - if (defined($parent_fd) && $parent_fd !~ /\A[0-9]+\z/) { - my $fd = fileno($parent_fd) // - die "$parent_fd not an IO GLOB? $!"; - $parent_fd = $fd; + my $pfd = $opt->{$child_fd}; + if ('SCALAR' eq ref($pfd)) { + open my $fh, '+>', undef; + $opt->{"fh.$child_fd"} = $fh; # for read_out_err + if ($child_fd == 0) { + print $fh $$pfd; + $fh->flush or die "flush: $!"; + sysseek($fh, 0, SEEK_SET); + } + $pfd = fileno($fh); + } elsif (defined($pfd) && $pfd !~ /\A[0-9]+\z/) { + my $fd = fileno($pfd) // + croak "BUG: $pfd not an IO GLOB? $!"; + $pfd = $fd; } - $redir->[$child_fd] = $parent_fd // $child_fd; + $rdr[$child_fd] = $pfd // $child_fd; } my $rlim = []; - foreach my $l (@RLIMITS) { - my $v = $opts->{$l} // next; - my $r = eval "require BSD::Resource; BSD::Resource::$l();"; - unless (defined $r) { - warn "$l undefined by BSD::Resource: $@\n"; - next; - } + my $v = $opt->{$l} // next; + my $r = $RLIMITS{$l} // + eval "require BSD::Resource; BSD::Resource::$l();" // + do { + warn "$l undefined by BSD::Resource: $@\n"; + next; + }; push @$rlim, $r, @$v; } - my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work? - my $pgid = $opts->{pgid} // -1; - my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd, $pgid); - die "fork_exec @$cmd failed: $!\n" unless $pid > 0; - $pid; + my $cd = $opt->{'-C'} // ''; # undef => NULL mapping doesn't work? + my $pgid = $opt->{pgid} // -1; + pi_fork_exec(\@rdr, $f, $cmd, \@env, $rlim, $cd, $pgid); } sub popen_rd { + my ($cmd, $env, $opt, @cb_arg) = @_; + pipe(my $r, local $opt->{1}); + PublicInbox::IO::attach_pid($r, spawn($cmd, $env, $opt), @cb_arg); +} + +sub popen_wr { + my ($cmd, $env, $opt, @cb_arg) = @_; + pipe(local $opt->{0}, my $w); + $w->autoflush(1); + PublicInbox::IO::attach_pid($w, spawn($cmd, $env, $opt), @cb_arg); +} + +sub read_out_err ($) { + my ($opt) = @_; + for my $fd (1, 2) { # read stdout/stderr + my $fh = delete($opt->{"fh.$fd"}) // next; + seek($fh, 0, SEEK_SET); + PublicInbox::IO::read_all $fh, undef, $opt->{$fd}; + } +} + +sub run_wait ($;$$) { my ($cmd, $env, $opt) = @_; - pipe(my ($r, $w)) or die "pipe: $!\n"; - $opt ||= {}; - $opt->{1} = fileno($w); - my $pid = spawn($cmd, $env, $opt); - return ($r, $pid) if wantarray; - my $ret = gensym; - tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)}; - $ret; + waitpid(spawn($cmd, $env, $opt), 0); + read_out_err($opt); + $? } sub run_die ($;$$) { my ($cmd, $env, $rdr) = @_; - my $pid = spawn($cmd, $env, $rdr); - waitpid($pid, 0) == $pid or die "@$cmd did not finish"; - $? == 0 or die "@$cmd failed: \$?=$?\n"; + run_wait($cmd, $env, $rdr) and croak "E: @$cmd failed: \$?=$?"; +} + +sub run_qx { + my ($cmd, $env, $opt) = @_; + my $fh = popen_rd($cmd, $env, $opt); + my @ret; + if (wantarray) { + @ret = <$fh>; + } else { + local $/; + $ret[0] = <$fh>; + } + $fh->close; # caller should check $? + read_out_err($opt); + wantarray ? @ret : $ret[0]; } 1; |