about summary refs log tree commit homepage
path: root/lib/PublicInbox/Qspawn.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2019-12-25 07:50:39 +0000
committerEric Wong <e@80x24.org>2019-12-26 10:48:19 +0000
commitdd57a7d007bf756d856fe3d2b414657ebf009941 (patch)
tree9b68c7531d1c50b7750c26eccd48c0c79b88cb4a /lib/PublicInbox/Qspawn.pm
parent5c887bfc4ccc6fd79e29d89ee8edfccd5cd9685b (diff)
downloadpublic-inbox-dd57a7d007bf756d856fe3d2b414657ebf009941.tar.gz
This will tie into the DS event loop if that's used, but
event_step an be called directly without relying on the
event loop from Apache or other HTTP servers (or PSGI tests).
Diffstat (limited to 'lib/PublicInbox/Qspawn.pm')
-rw-r--r--lib/PublicInbox/Qspawn.pm90
1 files changed, 48 insertions, 42 deletions
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index ba980e73..6cb28b9a 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -94,7 +94,8 @@ sub waitpid_err ($$) {
                 $err = "W: waitpid($xpid, 0) => $pid: $!";
         } # else should not be called with pid == 0
 
-        my $env = delete $self->{psgi_env};
+        my ($env, $qx_cb, $qx_arg, $qx_buf) =
+                delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
 
         # done, spawn whatever's in the queue
         my $limiter = $self->{limiter};
@@ -112,15 +113,12 @@ sub waitpid_err ($$) {
                         log_err($env, join(' ', @{$self->{args}}) . ": $err");
                 }
         }
-        if (my $fin_cb = delete $self->{fin_cb}) {
-                eval { $fin_cb->() }
-        }
+        eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
 }
 
-sub do_waitpid ($;$$) {
-        my ($self, $fin_cb) = @_;
+sub do_waitpid ($) {
+        my ($self) = @_;
         my $pid = $self->{pid};
-        $self->{fin_cb} = $fin_cb;
         # PublicInbox::DS may not be loaded
         eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) };
         # done if we're running in PublicInbox::DS::EventLoop
@@ -131,12 +129,14 @@ sub do_waitpid ($;$$) {
         }
 }
 
-sub finish ($;$) {
-        my ($self, $fin_cb) = @_;
+sub finish ($) {
+        my ($self) = @_;
         if (delete $self->{rpipe}) {
-                do_waitpid($self, $fin_cb);
-        } elsif ($fin_cb) {
-                eval { $fin_cb->() };
+                do_waitpid($self);
+        } else {
+                my ($env, $qx_cb, $qx_arg, $qx_buf) =
+                        delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+                eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
         }
 }
 
@@ -154,16 +154,14 @@ sub start {
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
 # and safe to slurp.
 sub psgi_qx {
-        my ($self, $env, $limiter, $qx_cb, $cb_arg) = @_;
+        my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
         $self->{psgi_env} = $env;
-        my $scalar = '';
-        open(my $qx, '+>', \$scalar) or die; # PerlIO::scalar
-        my $end = sub {
-                my $err = $_[0]; # $!
-                log_err($env, "psgi_qx: $err") if defined($err);
-                finish($self, sub { $qx_cb->(\$scalar, $cb_arg) });
-                $qx = undef;
-        };
+        my $qx_buf = '';
+        open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
+        $self->{qx_cb} = $qx_cb;
+        $self->{qx_arg} = $qx_arg;
+        $self->{qx_fh} = $qx_fh;
+        $self->{qx_buf} = \$qx_buf;
         my $rpipe; # comes from popen_rd
         my $async = $env->{'pi-httpd.async'};
         my $cb = sub {
@@ -171,24 +169,24 @@ sub psgi_qx {
 reread:
                 $r = sysread($rpipe, $buf, 65536);
                 if ($async) {
-                        $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
+                        $async->async_pass($env->{'psgix.io'}, $qx_fh, \$buf);
                 } elsif (defined $r) {
-                        $r ? $qx->write($buf) : $end->();
+                        $r ? $qx_fh->write($buf) : event_step($self, undef);
                 } else {
                         return if $! == EAGAIN; # try again when notified
                         goto reread if $! == EINTR;
-                        $end->($!);
+                        event_step($self, $!);
                 }
         };
         $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
         $self->start($limiter, sub { # start_cb, may run later, much later...
                 ($rpipe) = @_; # popen_rd result
                 if ($async) {
-                # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
-                        $async = $async->($rpipe, $cb, undef, $end);
+                # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end_obj)
+                        $async = $async->($rpipe, $cb, undef, $self);
                         # $cb will call ->async_pass or ->close
                 } else { # generic PSGI
-                        $cb->() while $qx;
+                        $cb->() while $self->{qx_fh};
                 }
         });
 }
@@ -206,6 +204,17 @@ sub filter_fh ($$) {
                 });
 }
 
+# this is called on pipe EOF to reap the process, may be called
+# via PublicInbox::DS event loop OR via GetlineBody for generic
+# PSGI servers.
+sub event_step {
+        my ($self, $err) = @_; # $err: $!
+        log_err($self->{psgi_env}, "psgi_{return,qx} $err") if defined($err);
+        finish($self);
+        my ($fh, $qx_fh) = delete(@$self{qw(fh qx_fh)});
+        $fh->close if $fh; # async-only (psgi_return)
+}
+
 # Used for streaming the stdout of one process as a PSGI response.
 #
 # $env is the PSGI env.
@@ -231,14 +240,7 @@ sub filter_fh ($$) {
 sub psgi_return {
         my ($self, $env, $limiter, $parse_hdr) = @_;
         $self->{psgi_env} = $env;
-        my ($fh, $rpipe);
-        my $end = sub {
-                my $err = $_[0]; # $!
-                log_err($env, "psgi_return: $err") if defined($err);
-                finish($self);
-                $fh->close if $fh; # async-only
-        };
-
+        my $rpipe;
         my $buf = '';
         my $rd_hdr = sub {
                 # typically used for reading CGI headers
@@ -271,21 +273,24 @@ sub psgi_return {
                 my $filter = delete $env->{'qspawn.filter'};
                 if (scalar(@$r) == 3) { # error
                         if ($async) {
-                                $async->close; # calls rpipe->close and $end
+                                # calls rpipe->close && ->event_step
+                                $async->close;
                         } else {
                                 $rpipe->close;
-                                $end->();
+                                event_step($self);
                         }
                         $wcb->($r);
                 } elsif ($async) {
                         # done reading headers, handoff to read body
-                        $fh = $wcb->($r); # scalar @$r == 2
+                        my $fh = $wcb->($r); # scalar @$r == 2
                         $fh = filter_fh($fh, $filter) if $filter;
+                        $self->{fh} = $fh;
                         $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
                 } else { # for synchronous PSGI servers
                         require PublicInbox::GetlineBody;
-                        $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
-                                                                $buf, $filter);
+                        $r->[2] = PublicInbox::GetlineBody->new($rpipe,
+                                                \&event_step, $self,
+                                                $buf, $filter);
                         $wcb->($r);
                 }
 
@@ -297,8 +302,9 @@ sub psgi_return {
         my $start_cb = sub { # may run later, much later...
                 ($rpipe) = @_;
                 if ($async) {
-                        # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
-                        $async = $async->($rpipe, $cb, undef, $end);
+                        # PublicInbox::HTTPD::Async->new($rpipe, $cb, $cb_arg,
+                        #                                 $end_obj)
+                        $async = $async->($rpipe, $cb, undef, $self);
                         # $cb will call ->async_pass or ->close
                 } else { # generic PSGI
                         $cb->() while $rd_hdr;