about summary refs log tree commit homepage
path: root/lib/PublicInbox/Qspawn.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/Qspawn.pm')
-rw-r--r--lib/PublicInbox/Qspawn.pm398
1 files changed, 172 insertions, 226 deletions
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 53d0ad55..0bf857c6 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # Like most Perl modules in public-inbox, this is internal and
@@ -25,9 +25,15 @@
 # processes such as git-apply(1).
 
 package PublicInbox::Qspawn;
-use strict;
+use v5.12;
 use PublicInbox::Spawn qw(popen_rd);
 use PublicInbox::GzipFilter;
+use Scalar::Util qw(blessed);
+use PublicInbox::Limiter;
+use PublicInbox::Aspawn qw(run_await);
+use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::InputPipe;
+use Carp qw(carp confess);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -38,52 +44,45 @@ my $def_limiter;
 # $cmd is the command to spawn
 # $cmd_env is the environ for the child process (not PSGI env)
 # $opt can include redirects and perhaps other process spawning options
-sub new ($$$;) {
+# {qsp_err} is an optional error buffer callers may access themselves
+sub new {
         my ($class, $cmd, $cmd_env, $opt) = @_;
-        bless { args => [ $cmd, $cmd_env, $opt ] }, $class;
+        bless { args => [ $cmd, $cmd_env, $opt ? { %$opt } : {} ] }, $class;
 }
 
 sub _do_spawn {
         my ($self, $start_cb, $limiter) = @_;
-        my $err;
-        my ($cmd, $cmd_env, $opt) = @{delete $self->{args}};
+        my ($cmd, $cmd_env, $opt) = @{$self->{args}};
         my %o = %{$opt || {}};
         $self->{limiter} = $limiter;
-        foreach my $k (@PublicInbox::Spawn::RLIMITS) {
-                if (defined(my $rlimit = $limiter->{$k})) {
-                        $o{$k} = $rlimit;
-                }
+        for my $k (@PublicInbox::Spawn::RLIMITS) {
+                $opt->{$k} = $limiter->{$k} // next;
+        }
+        $self->{-quiet} = 1 if $o{quiet};
+        $limiter->{running}++;
+        if ($start_cb) {
+                eval { # popen_rd may die on EMFILE, ENFILE
+                        $self->{rpipe} = popen_rd($cmd, $cmd_env, $opt,
+                                                        \&waitpid_err, $self);
+                        $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
+                };
+        } else {
+                eval { run_await($cmd, $cmd_env, $opt, \&wait_await, $self) };
+                warn "E: $@" if $@;
         }
-        $self->{cmd} = $o{quiet} ? undef : $cmd;
-        eval {
-                # popen_rd may die on EMFILE, ENFILE
-                $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o);
-
-                die "E: $!" unless defined($self->{rpipe});
-
-                $limiter->{running}++;
-                $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
-        };
         finish($self, $@) if $@;
 }
 
-sub child_err ($) {
-        my ($child_error) = @_; # typically $?
-        my $exitstatus = ($child_error >> 8) or return;
-        my $sig = $child_error & 127;
-        my $msg = "exit status=$exitstatus";
-        $msg .= " signal=$sig" if $sig;
-        $msg;
+sub psgi_status_err { # Qspawn itself is useful w/o PSGI
+        require PublicInbox::WwwStatic;
+        PublicInbox::WwwStatic::r($_[0] // 500);
 }
 
-sub finalize ($$) {
-        my ($self, $err) = @_;
-
-        my ($env, $qx_cb, $qx_arg, $qx_buf) =
-                delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+sub finalize ($) {
+        my ($self) = @_;
 
-        # done, spawn whatever's in the queue
-        my $limiter = $self->{limiter};
+        # process is done, spawn whatever's in the queue
+        my $limiter = delete $self->{limiter} or return;
         my $running = --$limiter->{running};
 
         if ($running < $limiter->{max}) {
@@ -91,34 +90,69 @@ sub finalize ($$) {
                         _do_spawn(@$next, $limiter);
                 }
         }
-
-        if ($err) {
-                if (defined $self->{err}) {
-                        $self->{err} .= "; $err";
-                } else {
-                        $self->{err} = $err;
-                }
-                if ($env && $self->{cmd}) {
-                        warn join(' ', @{$self->{cmd}}) . ": $err";
+        if (my $err = $self->{_err}) { # set by finish or waitpid_err
+                utf8::decode($err);
+                if (my $dst = $self->{qsp_err}) {
+                        $$dst .= $$dst ? " $err" : "; $err";
                 }
+                warn "E: @{$self->{args}->[0]}: $err\n" if !$self->{-quiet};
+        }
+
+        my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
+        if ($qx_cb_arg) {
+                my $cb = shift @$qx_cb_arg;
+                eval { $cb->($self->{args}->[2]->{1}, @$qx_cb_arg) };
+                return unless $@;
+                warn "E: $@"; # hope qspawn.wcb can handle it
         }
-        if ($qx_cb) {
-                eval { $qx_cb->($qx_buf, $qx_arg) };
-        } elsif (my $wcb = delete $env->{'qspawn.wcb'}) {
+        return if $self->{passed}; # another command chained it
+        if (my $wcb = delete $env->{'qspawn.wcb'}) {
                 # have we started writing, yet?
-                require PublicInbox::WwwStatic;
-                $wcb->(PublicInbox::WwwStatic::r(500));
+                $wcb->(psgi_status_err($env->{'qspawn.fallback'}));
         }
 }
 
-# callback for dwaitpid or ProcessPipe
-sub waitpid_err { finalize($_[0], child_err($?)) }
+sub waitpid_err { # callback for awaitpid
+        my (undef, $self) = @_; # $_[0]: pid
+        $self->{_err} = ''; # for defined check in ->finish
+        if ($?) { # XXX this may be redundant
+                my $status = $? >> 8;
+                my $sig = $? & 127;
+                $self->{_err} .= "exit status=$status";
+                $self->{_err} .= " signal=$sig" if $sig;
+        }
+        finalize($self) if !$self->{rpipe};
+}
+
+sub wait_await { # run_await cb
+        my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
+        waitpid_err($pid, $self);
+}
+
+sub yield_chunk { # $_[-1] is sysread buffer (or undef)
+        my ($self, $ipipe) = @_;
+        if (!defined($_[-1])) {
+                warn "error reading body: $!";
+        } elsif ($_[-1] eq '') { # normal EOF
+                $self->finish;
+                $self->{qfh}->close;
+        } elsif (defined($self->{qfh}->write($_[-1]))) {
+                return; # continue while HTTP client is reading our writes
+        } # else { # HTTP client disconnected
+        delete $self->{rpipe};
+        $ipipe->close;
+}
 
 sub finish ($;$) {
         my ($self, $err) = @_;
-        my $tied_pp = delete($self->{rpipe}) or return finalize($self, $err);
-        my PublicInbox::ProcessPipe $pp = tied *$tied_pp;
-        @$pp{qw(cb arg)} = (\&waitpid_err, $self); # for ->DESTROY
+        $self->{_err} //= $err; # only for $@
+
+        # we can safely finalize if pipe was closed before, or if
+        # {_err} is defined by waitpid_err.  Deleting {rpipe} will
+        # trigger PublicInbox::IO::DESTROY -> waitpid_err,
+        # but it may not fire right away if inside the event loop.
+        my $closed_before = !delete($self->{rpipe});
+        finalize($self) if $closed_before || defined($self->{_err});
 }
 
 sub start ($$$) {
@@ -130,137 +164,92 @@ sub start ($$$) {
         }
 }
 
-sub psgi_qx_init_cb {
-        my ($self) = @_;
-        my $async = delete $self->{async};
-        my ($r, $buf);
-        my $qx_fh = $self->{qx_fh};
-reread:
-        $r = sysread($self->{rpipe}, $buf, 65536);
-        if ($async) {
-                $async->async_pass($self->{psgi_env}->{'psgix.io'},
-                                        $qx_fh, \$buf);
-        } elsif (defined $r) {
-                $r ? (print $qx_fh $buf) : event_step($self, undef);
-        } else {
-                return if $! == EAGAIN; # try again when notified
-                goto reread if $! == EINTR;
-                event_step($self, $!);
-        }
-}
-
-sub psgi_qx_start {
-        my ($self) = @_;
-        if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
-                # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj)
-                $self->{async} = $async->($self->{rpipe},
-                                        \&psgi_qx_init_cb, $self, $self);
-                # init_cb will call ->async_pass or ->close
-        } else { # generic PSGI
-                psgi_qx_init_cb($self) while $self->{qx_fh};
-        }
-}
-
-# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
+# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
 # the stdout of the given command when done; but respects the given limiter
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
 # and safe to slurp.
 sub psgi_qx {
-        my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
+        my ($self, $env, $limiter, @qx_cb_arg) = @_;
         $self->{psgi_env} = $env;
-        my $qx_buf = '';
-        open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
-        $self->{qx_cb} = $qx_cb;
-        $self->{qx_arg} = $qx_arg;
-        $self->{qx_fh} = $qx_fh;
-        $self->{qx_buf} = \$qx_buf;
-        $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
-        start($self, $limiter, \&psgi_qx_start);
+        $self->{qx_cb_arg} = \@qx_cb_arg;
+        $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
+        start($self, $limiter, undef);
 }
 
-# this is called on pipe EOF to reap the process, may be called
-# via PublicInbox::DS event loop OR via GetlineBody for generic
-# PSGI servers.
-sub event_step {
-        my ($self, $err) = @_; # $err: $!
-        warn "psgi_{return,qx} $err" if defined($err);
-        finish($self);
-        my ($fh, $qx_fh) = delete(@$self{qw(fh qx_fh)});
-        $fh->close if $fh; # async-only (psgi_return)
-}
+sub yield_pass {
+        my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
+        my $env = $self->{psgi_env};
+        my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
+        if (ref($res) eq 'CODE') { # chain another command
+                delete $self->{rpipe};
+                $ipipe->close if $ipipe;
+                $res->($wcb);
+                $self->{passed} = 1;
+                return; # all done
+        }
+        confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';
 
-sub rd_hdr ($) {
-        my ($self) = @_;
-        # typically used for reading CGI headers
-        # We also need to check EINTR for generic PSGI servers.
-        my $ret;
-        my $total_rd = 0;
-        my $hdr_buf = $self->{hdr_buf};
-        my ($ph_cb, $ph_arg) = @{$self->{parse_hdr}};
-        do {
-                my $r = sysread($self->{rpipe}, $$hdr_buf, 4096,
-                                length($$hdr_buf));
-                if (defined($r)) {
-                        $total_rd += $r;
-                        eval { $ret = $ph_cb->($total_rd, $hdr_buf, $ph_arg) };
-                        if ($@) {
-                                warn "parse_hdr: $@";
-                                $ret = [ 500, [], [ "Internal error\n" ] ];
-                        }
-                } else {
-                        # caller should notify us when it's ready:
-                        return if $! == EAGAIN;
-                        next if $! == EINTR; # immediate retry
-                        warn "error reading header: $!";
-                        $ret = [ 500, [], [ "Internal error\n" ] ];
-                }
-        } until (defined $ret);
-        delete $self->{parse_hdr}; # done parsing headers
-        $ret;
+        my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
+                        pop(@$res) : delete($env->{'qspawn.filter'});
+        $filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);
+
+        if (scalar(@$res) == 3) { # done early (likely error or static file)
+                delete $self->{rpipe};
+                $ipipe->close if $ipipe;
+                $wcb->($res); # all done
+                return;
+        }
+        scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
+        return ($wcb, $filter) if !$ipipe; # generic PSGI
+        # streaming response
+        my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
+        $qfh = $filter->attach($qfh) if $filter;
+        my ($bref) = @{delete $self->{yield_parse_hdr}};
+        $qfh->write($$bref) if $$bref ne '';
+        $self->{qfh} = $qfh; # keep $ipipe open
 }
 
-sub psgi_return_init_cb {
+sub parse_hdr_done ($$) {
         my ($self) = @_;
-        my $r = rd_hdr($self) or return;
-        my $env = $self->{psgi_env};
-        my $filter = delete $env->{'qspawn.filter'} //
-                PublicInbox::GzipFilter::qsp_maybe($r->[1], $env);
-
-        my $wcb = delete $env->{'qspawn.wcb'};
-        my $async = delete $self->{async};
-        if (scalar(@$r) == 3) { # error
-                if ($async) {
-                        # calls rpipe->close && ->event_step
-                        $async->close;
-                } else {
-                        $self->{rpipe}->close;
-                        event_step($self);
+        my ($ret, $err);
+        if (defined $_[-1]) {
+                my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
+                $$bref .= $_[-1];
+                $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
+                if (($err = $@)) {
+                        $ret = psgi_status_err();
+                } elsif (!$ret && $_[-1] eq '') {
+                        $err = 'EOF';
+                        $ret = psgi_status_err();
                 }
-                $wcb->($r);
-        } elsif ($async) {
-                # done reading headers, handoff to read body
-                my $fh = $wcb->($r); # scalar @$r == 2
-                $fh = $filter->attach($fh) if $filter;
-                $self->{fh} = $fh;
-                $async->async_pass($env->{'psgix.io'}, $fh,
-                                        delete($self->{hdr_buf}));
-        } else { # for synchronous PSGI servers
-                require PublicInbox::GetlineBody;
-                $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe},
-                                        \&event_step, $self,
-                                        ${$self->{hdr_buf}}, $filter);
-                $wcb->($r);
+        } else {
+                $err = "$!";
+                $ret = psgi_status_err();
         }
+        carp <<EOM if $err;
+E: $err @{$self->{args}->[0]} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+        $ret; # undef if headers incomplete
+}
+
+sub ipipe_cb { # InputPipe callback
+        my ($ipipe, $self) = @_; # $_[-1] rbuf
+        if ($self->{qfh}) { # already streaming
+                yield_chunk($self, $ipipe, $_[-1]);
+        } elsif (my $res = parse_hdr_done($self, $_[-1])) {
+                yield_pass($self, $ipipe, $res);
+        } # else: headers incomplete, keep reading
 }
 
-sub psgi_return_start { # may run later, much later...
+sub _yield_start { # may run later, much later...
         my ($self) = @_;
-        if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
-                # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, $end_obj)
-                $self->{async} = $async->($self->{rpipe},
-                                        \&psgi_return_init_cb, $self, $self);
-        } else { # generic PSGI
-                psgi_return_init_cb($self) while $self->{parse_hdr};
+        if ($self->{psgi_env}->{'pi-httpd.async'}) {
+                my $rpipe = $self->{rpipe};
+                $rpipe->blocking(0);
+                PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self);
+        } else {
+                require PublicInbox::GetlineResponse;
+                PublicInbox::GetlineResponse::response($self);
         }
 }
 
@@ -271,7 +260,7 @@ sub psgi_return_start { # may run later, much later...
 #   $env->{'qspawn.wcb'} - the write callback from the PSGI server
 #                          optional, use this if you've already
 #                          captured it elsewhere.  If not given,
-#                          psgi_return will return an anonymous
+#                          psgi_yield will return an anonymous
 #                          sub for the PSGI server to call
 #
 #   $env->{'qspawn.filter'} - filter object, responds to ->attach for
@@ -280,76 +269,33 @@ sub psgi_return_start { # may run later, much later...
 #
 # $limiter - the Limiter object to use (uses the def_limiter if not given)
 #
-# $parse_hdr - Initial read function; often for parsing CGI header output.
+# @parse_hdr_arg - Initial read cb+args; often for parsing CGI header output.
 #              It will be given the return value of sysread from the pipe
 #              and a string ref of the current buffer.  Returns an arrayref
 #              for PSGI responses.  2-element arrays in PSGI mean the
 #              body will be streamed, later, via writes (push-based) to
 #              psgix.io.  3-element arrays means the body is available
 #              immediately (or streamed via ->getline (pull-based)).
-sub psgi_return {
-        my ($self, $env, $limiter, $parse_hdr, $hdr_arg) = @_;
+
+sub psgi_yield {
+        my ($self, $env, $limiter, @parse_hdr_arg)= @_;
         $self->{psgi_env} = $env;
-        $self->{hdr_buf} = \(my $hdr_buf = '');
-        $self->{parse_hdr} = [ $parse_hdr, $hdr_arg ];
-        $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+        $self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
+        $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
 
         # the caller already captured the PSGI write callback from
         # the PSGI server, so we can call ->start, here:
-        $env->{'qspawn.wcb'} and
-                return start($self, $limiter, \&psgi_return_start);
-
-        # the caller will return this sub to the PSGI server, so
-        # it can set the response callback (that is, for
-        # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
-        # but other HTTP servers are supported:
-        sub {
+        $env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
+                # the caller will return this sub to the PSGI server, so
+                # it can set the response callback (that is, for
+                # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+                # but other HTTP servers are supported:
                 $env->{'qspawn.wcb'} = $_[0];
-                start($self, $limiter, \&psgi_return_start);
+                start($self, $limiter, \&_yield_start);
         }
 }
 
-package PublicInbox::Qspawn::Limiter;
-use strict;
-use warnings;
-
-sub new {
-        my ($class, $max) = @_;
-        bless {
-                # 32 is same as the git-daemon connection limit
-                max => $max || 32,
-                running => 0,
-                run_queue => [],
-                # RLIMIT_CPU => undef,
-                # RLIMIT_DATA => undef,
-                # RLIMIT_CORE => undef,
-        }, $class;
-}
-
-sub setup_rlimit {
-        my ($self, $name, $cfg) = @_;
-        foreach my $rlim (@PublicInbox::Spawn::RLIMITS) {
-                my $k = lc($rlim);
-                $k =~ tr/_//d;
-                $k = "publicinboxlimiter.$name.$k";
-                defined(my $v = $cfg->{$k}) or next;
-                my @rlimit = split(/\s*,\s*/, $v);
-                if (scalar(@rlimit) == 1) {
-                        push @rlimit, $rlimit[0];
-                } elsif (scalar(@rlimit) != 2) {
-                        warn "could not parse $k: $v\n";
-                }
-                eval { require BSD::Resource };
-                if ($@) {
-                        warn "BSD::Resource missing for $rlim";
-                        next;
-                }
-                foreach my $i (0..$#rlimit) {
-                        next if $rlimit[$i] ne 'INFINITY';
-                        $rlimit[$i] = BSD::Resource::RLIM_INFINITY();
-                }
-                $self->{$rlim} = \@rlimit;
-        }
-}
+no warnings 'once';
+*DESTROY = \&finalize; # ->finalize is idempotent
 
 1;