about summary refs log tree commit homepage
path: root/lib/PublicInbox/Qspawn.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/Qspawn.pm')
-rw-r--r--lib/PublicInbox/Qspawn.pm296
1 files changed, 149 insertions, 147 deletions
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 651fa390..1a2b70e7 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -27,7 +27,6 @@ package PublicInbox::Qspawn;
 use strict;
 use warnings;
 use PublicInbox::Spawn qw(popen_rd);
-require Plack::Util;
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -36,17 +35,17 @@ my $def_limiter;
 
 # declares a command to spawn (but does not spawn it).
 # $cmd is the command to spawn
-# $env is the environ for the child process
+# $cmd_env is the environ for the child process (not PSGI env)
 # $opt can include redirects and perhaps other process spawning options
 sub new ($$$;) {
-        my ($class, $cmd, $env, $opt) = @_;
-        bless { args => [ $cmd, $env, $opt ] }, $class;
+        my ($class, $cmd, $cmd_env, $opt) = @_;
+        bless { args => [ $cmd, $cmd_env, $opt ] }, $class;
 }
 
 sub _do_spawn {
         my ($self, $start_cb, $limiter) = @_;
         my $err;
-        my ($cmd, $env, $opts) = @{$self->{args}};
+        my ($cmd, $cmd_env, $opts) = @{$self->{args}};
         my %opts = %{$opts || {}};
         $self->{limiter} = $limiter;
         foreach my $k (PublicInbox::Spawn::RLIMITS()) {
@@ -55,7 +54,7 @@ sub _do_spawn {
                 }
         }
 
-        ($self->{rpipe}, $self->{pid}) = popen_rd($cmd, $env, \%opts);
+        ($self->{rpipe}, $self->{pid}) = popen_rd($cmd, $cmd_env, \%opts);
 
         # drop any IO handles opt was holding open via $opt->{hold}
         # No need to hold onto the descriptor once the child process has it.
@@ -66,7 +65,7 @@ sub _do_spawn {
         } else {
                 $self->{err} = $!;
         }
-        $start_cb->($self->{rpipe});
+        $start_cb->($self);
 }
 
 sub child_err ($) {
@@ -94,7 +93,8 @@ sub waitpid_err ($$) {
                 $err = "W: waitpid($xpid, 0) => $pid: $!";
         } # else should not be called with pid == 0
 
-        my $env = delete $self->{env};
+        my ($env, $qx_cb, $qx_arg, $qx_buf) =
+                delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
 
         # done, spawn whatever's in the queue
         my $limiter = $self->{limiter};
@@ -112,16 +112,12 @@ sub waitpid_err ($$) {
                         log_err($env, join(' ', @{$self->{args}}) . ": $err");
                 }
         }
-        if (my $fin_cb = delete $self->{fin_cb}) {
-                eval { $fin_cb->() }
-        }
+        eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
 }
 
-sub do_waitpid ($;$$) {
-        my ($self, $env, $fin_cb) = @_;
+sub do_waitpid ($) {
+        my ($self) = @_;
         my $pid = $self->{pid};
-        $self->{env} = $env;
-        $self->{fin_cb} = $fin_cb;
         # PublicInbox::DS may not be loaded
         eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) };
         # done if we're running in PublicInbox::DS::EventLoop
@@ -132,16 +128,18 @@ sub do_waitpid ($;$$) {
         }
 }
 
-sub finish ($;$$) {
-        my ($self, $env, $fin_cb) = @_;
+sub finish ($) {
+        my ($self) = @_;
         if (delete $self->{rpipe}) {
-                do_waitpid($self, $env, $fin_cb);
-        } elsif ($fin_cb) {
-                eval { $fin_cb->() };
+                do_waitpid($self);
+        } else {
+                my ($env, $qx_cb, $qx_arg, $qx_buf) =
+                        delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+                eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
         }
 }
 
-sub start {
+sub start ($$$) {
         my ($self, $limiter, $start_cb) = @_;
         if ($limiter->{running} < $limiter->{max}) {
                 _do_spawn($self, $start_cb, $limiter);
@@ -150,60 +148,135 @@ sub start {
         }
 }
 
+sub psgi_qx_init_cb {
+        my ($self) = @_;
+        my $async = delete $self->{async};
+        my ($r, $buf);
+        my $qx_fh = $self->{qx_fh};
+reread:
+        $r = sysread($self->{rpipe}, $buf, 65536);
+        if ($async) {
+                $async->async_pass($self->{psgi_env}->{'psgix.io'},
+                                        $qx_fh, \$buf);
+        } elsif (defined $r) {
+                $r ? $qx_fh->write($buf) : event_step($self, undef);
+        } else {
+                return if $! == EAGAIN; # try again when notified
+                goto reread if $! == EINTR;
+                event_step($self, $!);
+        }
+}
+
+sub psgi_qx_start {
+        my ($self) = @_;
+        if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
+                # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj)
+                $self->{async} = $async->($self->{rpipe},
+                                        \&psgi_qx_init_cb, $self, $self);
+                # init_cb will call ->async_pass or ->close
+        } else { # generic PSGI
+                psgi_qx_init_cb($self) while $self->{qx_fh};
+        }
+}
+
 # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
 # the stdout of the given command when done; but respects the given limiter
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
 # and safe to slurp.
 sub psgi_qx {
-        my ($self, $env, $limiter, $qx_cb) = @_;
-        my $scalar = '';
-        open(my $qx, '+>', \$scalar) or die; # PerlIO::scalar
-        my $end = sub {
-                my $err = $_[0]; # $!
-                log_err($env, "psgi_qx: $err") if defined($err);
-                finish($self, $env, sub { $qx_cb->(\$scalar) });
-                $qx = undef;
-        };
-        my $rpipe; # comes from popen_rd
-        my $async = $env->{'pi-httpd.async'};
-        my $cb = sub {
-                my ($r, $buf);
-reread:
-                $r = sysread($rpipe, $buf, 65536);
-                if ($async) {
-                        $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
-                } elsif (defined $r) {
-                        $r ? $qx->write($buf) : $end->();
+        my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
+        $self->{psgi_env} = $env;
+        my $qx_buf = '';
+        open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
+        $self->{qx_cb} = $qx_cb;
+        $self->{qx_arg} = $qx_arg;
+        $self->{qx_fh} = $qx_fh;
+        $self->{qx_buf} = \$qx_buf;
+        $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+        start($self, $limiter, \&psgi_qx_start);
+}
+
+# this is called on pipe EOF to reap the process, may be called
+# via PublicInbox::DS event loop OR via GetlineBody for generic
+# PSGI servers.
+sub event_step {
+        my ($self, $err) = @_; # $err: $!
+        log_err($self->{psgi_env}, "psgi_{return,qx} $err") if defined($err);
+        finish($self);
+        my ($fh, $qx_fh) = delete(@$self{qw(fh qx_fh)});
+        $fh->close if $fh; # async-only (psgi_return)
+}
+
+sub rd_hdr ($) {
+        my ($self) = @_;
+        # typically used for reading CGI headers
+        # we must loop until EAGAIN for EPOLLET in HTTPD/Async.pm
+        # We also need to check EINTR for generic PSGI servers.
+        my $ret;
+        my $total_rd = 0;
+        my $hdr_buf = $self->{hdr_buf};
+        my ($ph_cb, $ph_arg) = @{$self->{parse_hdr}};
+        do {
+                my $r = sysread($self->{rpipe}, $$hdr_buf, 4096,
+                                length($$hdr_buf));
+                if (defined($r)) {
+                        $total_rd += $r;
+                        $ret = $ph_cb->($total_rd, $hdr_buf, $ph_arg);
                 } else {
-                        return if $! == EAGAIN; # try again when notified
-                        goto reread if $! == EINTR;
-                        $end->($!);
+                        # caller should notify us when it's ready:
+                        return if $! == EAGAIN;
+                        next if $! == EINTR; # immediate retry
+                        log_err($self->{psgi_env}, "error reading header: $!");
+                        $ret = [ 500, [], [ "Internal error\n" ] ];
                 }
-        };
-        $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
-        $self->start($limiter, sub { # start_cb, may run later, much later...
-                ($rpipe) = @_; # popen_rd result
+        } until (defined $ret);
+        delete $self->{parse_hdr}; # done parsing headers
+        $ret;
+}
+
+sub psgi_return_init_cb {
+        my ($self) = @_;
+        my $r = rd_hdr($self) or return;
+        my $env = $self->{psgi_env};
+        my $wcb = delete $env->{'qspawn.wcb'};
+        my $async = delete $self->{async};
+        if (scalar(@$r) == 3) { # error
                 if ($async) {
-                # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
-                        $async = $async->($rpipe, $cb, $end);
-                        # $cb will call ->async_pass or ->close
-                } else { # generic PSGI
-                        $cb->() while $qx;
+                        # calls rpipe->close && ->event_step
+                        $async->close;
+                } else {
+                        $self->{rpipe}->close;
+                        event_step($self);
                 }
-        });
+                $wcb->($r);
+        } elsif ($async) {
+                # done reading headers, handoff to read body
+                my $fh = $wcb->($r); # scalar @$r == 2
+                $self->{fh} = $fh;
+                $async->async_pass($env->{'psgix.io'}, $fh,
+                                        delete($self->{hdr_buf}));
+        } else { # for synchronous PSGI servers
+                require PublicInbox::GetlineBody;
+                $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe},
+                                        \&event_step, $self,
+                                        ${$self->{hdr_buf}});
+                $wcb->($r);
+        }
+
+        # Workaround a leak under Perl 5.16.3 when combined with
+        # Plack::Middleware::Deflater:
+        $wcb = undef;
 }
 
-# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
-sub filter_fh ($$) {
-        my ($fh, $filter) = @_;
-        Plack::Util::inline_object(
-                close => sub {
-                        $fh->write($filter->(undef));
-                        $fh->close;
-                },
-                write => sub {
-                        $fh->write($filter->($_[0]));
-                });
+sub psgi_return_start { # may run later, much later...
+        my ($self) = @_;
+        if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
+                # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, $end_obj)
+                $self->{async} = $async->($self->{rpipe},
+                                        \&psgi_return_init_cb, $self, $self);
+        } else { # generic PSGI
+                psgi_return_init_cb($self) while $self->{parse_hdr};
+        }
 }
 
 # Used for streaming the stdout of one process as a PSGI response.
@@ -216,9 +289,6 @@ sub filter_fh ($$) {
 #                          psgi_return will return an anonymous
 #                          sub for the PSGI server to call
 #
-#   $env->{'qspawn.filter'} - filter callback, receives a string as input,
-#                             undef on EOF
-#
 # $limiter - the Limiter object to use (uses the def_limiter if not given)
 #
 # $parse_hdr - Initial read function; often for parsing CGI header output.
@@ -229,93 +299,25 @@ sub filter_fh ($$) {
 #              psgix.io.  3-element arrays means the body is available
 #              immediately (or streamed via ->getline (pull-based)).
 sub psgi_return {
-        my ($self, $env, $limiter, $parse_hdr) = @_;
-        my ($fh, $rpipe);
-        my $end = sub {
-                my $err = $_[0]; # $!
-                log_err($env, "psgi_return: $err") if defined($err);
-                finish($self, $env);
-                $fh->close if $fh; # async-only
-        };
-
-        my $buf = '';
-        my $rd_hdr = sub {
-                # typically used for reading CGI headers
-                # we must loop until EAGAIN for EPOLLET in HTTPD/Async.pm
-                # We also need to check EINTR for generic PSGI servers.
-                my $ret;
-                my $total_rd = 0;
-                do {
-                        my $r = sysread($rpipe, $buf, 4096, length($buf));
-                        if (defined($r)) {
-                                $total_rd += $r;
-                                $ret = $parse_hdr->($r ? $total_rd : 0, \$buf);
-                        } else {
-                                # caller should notify us when it's ready:
-                                return if $! == EAGAIN;
-                                next if $! == EINTR; # immediate retry
-                                log_err($env, "error reading header: $!");
-                                $ret = [ 500, [], [ "Internal error\n" ] ];
-                        }
-                } until (defined $ret);
-                $ret;
-        };
-
-        my $wcb = delete $env->{'qspawn.wcb'}; # or PSGI server supplies it
-        my $async = $env->{'pi-httpd.async'};
-
-        my $cb = sub {
-                my $r = $rd_hdr->() or return;
-                $rd_hdr = undef; # done reading headers
-                my $filter = delete $env->{'qspawn.filter'};
-                if (scalar(@$r) == 3) { # error
-                        if ($async) {
-                                $async->close; # calls rpipe->close and $end
-                        } else {
-                                $rpipe->close;
-                                $end->();
-                        }
-                        $wcb->($r);
-                } elsif ($async) {
-                        # done reading headers, handoff to read body
-                        $fh = $wcb->($r); # scalar @$r == 2
-                        $fh = filter_fh($fh, $filter) if $filter;
-                        $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
-                } else { # for synchronous PSGI servers
-                        require PublicInbox::GetlineBody;
-                        $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
-                                                                $buf, $filter);
-                        $wcb->($r);
-                }
-
-                # Workaround a leak under Perl 5.16.3 when combined with
-                # Plack::Middleware::Deflater:
-                $wcb = undef;
-        };
+        my ($self, $env, $limiter, $parse_hdr, $hdr_arg) = @_;
+        $self->{psgi_env} = $env;
+        $self->{hdr_buf} = \(my $hdr_buf = '');
+        $self->{parse_hdr} = [ $parse_hdr, $hdr_arg ];
         $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
-        my $start_cb = sub { # may run later, much later...
-                ($rpipe) = @_;
-                if ($async) {
-                        # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
-                        $async = $async->($rpipe, $cb, $end);
-                        # $cb will call ->async_pass or ->close
-                } else { # generic PSGI
-                        $cb->() while $rd_hdr;
-                }
-        };
 
         # the caller already captured the PSGI write callback from
         # the PSGI server, so we can call ->start, here:
-        return $self->start($limiter, $start_cb) if $wcb;
+        $env->{'qspawn.wcb'} and
+                return start($self, $limiter, \&psgi_return_start);
 
         # the caller will return this sub to the PSGI server, so
-        # it can set the response callback (that is, for PublicInbox::HTTP,
-        # the chunked_wcb or identity_wcb callback), but other HTTP servers
-        # are supported:
+        # it can set the response callback (that is, for
+        # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+        # but other HTTP servers are supported:
         sub {
-                ($wcb) = @_;
-                $self->start($limiter, $start_cb);
-        };
+                $env->{'qspawn.wcb'} = $_[0];
+                start($self, $limiter, \&psgi_return_start);
+        }
 }
 
 package PublicInbox::Qspawn::Limiter;