about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/GetlineBody.pm8
-rw-r--r--lib/PublicInbox/HTTPD/Async.pm16
-rw-r--r--lib/PublicInbox/Qspawn.pm90
3 files changed, 61 insertions, 53 deletions
diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm
index f8cdd1b7..750a8c53 100644
--- a/lib/PublicInbox/GetlineBody.pm
+++ b/lib/PublicInbox/GetlineBody.pm
@@ -13,10 +13,11 @@ use strict;
 use warnings;
 
 sub new {
-        my ($class, $rpipe, $end, $buf, $filter) = @_;
+        my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_;
         bless {
                 rpipe => $rpipe,
                 end => $end,
+                end_arg => $end_arg,
                 buf => $buf,
                 filter => $filter || 0,
         }, $class;
@@ -40,10 +41,9 @@ sub getline {
 
 sub close {
         my ($self) = @_;
-        my $rpipe = delete $self->{rpipe};
+        my ($rpipe, $end, $end_arg) = delete @$self{qw(rpipe end end_arg)};
         close $rpipe if $rpipe;
-        my $end = delete $self->{end};
-        $end->() if $end;
+        $end->($end_arg) if $end;
 }
 
 1;
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index d182c118..8956f719 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -10,7 +10,7 @@ package PublicInbox::HTTPD::Async;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
-use fields qw(cb arg end end_arg);
+use fields qw(cb arg end_obj);
 use Errno qw(EAGAIN);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 
@@ -18,13 +18,13 @@ use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 # $io is a read-only pipe ($rpipe) for now, but may be a
 # bidirectional socket in the future.
 sub new {
-        my ($class, $io, $cb, $arg, $end, $end_arg) = @_;
+        my ($class, $io, $cb, $arg, $end_obj) = @_;
 
         # no $io? call $cb at the top of the next event loop to
         # avoid recursion:
         unless (defined($io)) {
                 PublicInbox::DS::requeue($cb ? $cb : $arg);
-                die '$end unsupported w/o $io' if $end;
+                die '$end_obj unsupported w/o $io' if $end_obj;
                 return;
         }
 
@@ -33,8 +33,7 @@ sub new {
         $self->SUPER::new($io, EPOLLIN | EPOLLET);
         $self->{cb} = $cb; # initial read callback, later replaced by main_cb
         $self->{arg} = $arg; # arg for $cb
-        $self->{end} = $end; # like END {}, but only for this object
-        $self->{end_arg} = $end_arg; # arg for $end
+        $self->{end_obj} = $end_obj; # like END{}, can ->event_step
         $self;
 }
 
@@ -98,8 +97,11 @@ sub close {
         $self->SUPER::close; # DS::close
 
         # we defer this to the next timer loop since close is deferred
-        if (my $end = delete $self->{end}) {
-                PublicInbox::DS::requeue($end);
+        if (my $end_obj = delete $self->{end_obj}) {
+                # this calls $end_obj->event_step
+                # (likely PublicInbox::Qspawn::event_step,
+                #  NOT PublicInbox::HTTPD::Async::event_step)
+                PublicInbox::DS::requeue($end_obj);
         }
 }
 
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index ba980e73..6cb28b9a 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -94,7 +94,8 @@ sub waitpid_err ($$) {
                 $err = "W: waitpid($xpid, 0) => $pid: $!";
         } # else should not be called with pid == 0
 
-        my $env = delete $self->{psgi_env};
+        my ($env, $qx_cb, $qx_arg, $qx_buf) =
+                delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
 
         # done, spawn whatever's in the queue
         my $limiter = $self->{limiter};
@@ -112,15 +113,12 @@ sub waitpid_err ($$) {
                         log_err($env, join(' ', @{$self->{args}}) . ": $err");
                 }
         }
-        if (my $fin_cb = delete $self->{fin_cb}) {
-                eval { $fin_cb->() }
-        }
+        eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
 }
 
-sub do_waitpid ($;$$) {
-        my ($self, $fin_cb) = @_;
+sub do_waitpid ($) {
+        my ($self) = @_;
         my $pid = $self->{pid};
-        $self->{fin_cb} = $fin_cb;
         # PublicInbox::DS may not be loaded
         eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) };
         # done if we're running in PublicInbox::DS::EventLoop
@@ -131,12 +129,14 @@ sub do_waitpid ($;$$) {
         }
 }
 
-sub finish ($;$) {
-        my ($self, $fin_cb) = @_;
+sub finish ($) {
+        my ($self) = @_;
         if (delete $self->{rpipe}) {
-                do_waitpid($self, $fin_cb);
-        } elsif ($fin_cb) {
-                eval { $fin_cb->() };
+                do_waitpid($self);
+        } else {
+                my ($env, $qx_cb, $qx_arg, $qx_buf) =
+                        delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+                eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
         }
 }
 
@@ -154,16 +154,14 @@ sub start {
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
 # and safe to slurp.
 sub psgi_qx {
-        my ($self, $env, $limiter, $qx_cb, $cb_arg) = @_;
+        my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
         $self->{psgi_env} = $env;
-        my $scalar = '';
-        open(my $qx, '+>', \$scalar) or die; # PerlIO::scalar
-        my $end = sub {
-                my $err = $_[0]; # $!
-                log_err($env, "psgi_qx: $err") if defined($err);
-                finish($self, sub { $qx_cb->(\$scalar, $cb_arg) });
-                $qx = undef;
-        };
+        my $qx_buf = '';
+        open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
+        $self->{qx_cb} = $qx_cb;
+        $self->{qx_arg} = $qx_arg;
+        $self->{qx_fh} = $qx_fh;
+        $self->{qx_buf} = \$qx_buf;
         my $rpipe; # comes from popen_rd
         my $async = $env->{'pi-httpd.async'};
         my $cb = sub {
@@ -171,24 +169,24 @@ sub psgi_qx {
 reread:
                 $r = sysread($rpipe, $buf, 65536);
                 if ($async) {
-                        $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
+                        $async->async_pass($env->{'psgix.io'}, $qx_fh, \$buf);
                 } elsif (defined $r) {
-                        $r ? $qx->write($buf) : $end->();
+                        $r ? $qx_fh->write($buf) : event_step($self, undef);
                 } else {
                         return if $! == EAGAIN; # try again when notified
                         goto reread if $! == EINTR;
-                        $end->($!);
+                        event_step($self, $!);
                 }
         };
         $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
         $self->start($limiter, sub { # start_cb, may run later, much later...
                 ($rpipe) = @_; # popen_rd result
                 if ($async) {
-                # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
-                        $async = $async->($rpipe, $cb, undef, $end);
+                # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end_obj)
+                        $async = $async->($rpipe, $cb, undef, $self);
                         # $cb will call ->async_pass or ->close
                 } else { # generic PSGI
-                        $cb->() while $qx;
+                        $cb->() while $self->{qx_fh};
                 }
         });
 }
@@ -206,6 +204,17 @@ sub filter_fh ($$) {
                 });
 }
 
+# this is called on pipe EOF to reap the process, may be called
+# via PublicInbox::DS event loop OR via GetlineBody for generic
+# PSGI servers.
+sub event_step {
+        my ($self, $err) = @_; # $err: $!
+        log_err($self->{psgi_env}, "psgi_{return,qx} $err") if defined($err);
+        finish($self);
+        my ($fh, $qx_fh) = delete(@$self{qw(fh qx_fh)});
+        $fh->close if $fh; # async-only (psgi_return)
+}
+
 # Used for streaming the stdout of one process as a PSGI response.
 #
 # $env is the PSGI env.
@@ -231,14 +240,7 @@ sub filter_fh ($$) {
 sub psgi_return {
         my ($self, $env, $limiter, $parse_hdr) = @_;
         $self->{psgi_env} = $env;
-        my ($fh, $rpipe);
-        my $end = sub {
-                my $err = $_[0]; # $!
-                log_err($env, "psgi_return: $err") if defined($err);
-                finish($self);
-                $fh->close if $fh; # async-only
-        };
-
+        my $rpipe;
         my $buf = '';
         my $rd_hdr = sub {
                 # typically used for reading CGI headers
@@ -271,21 +273,24 @@ sub psgi_return {
                 my $filter = delete $env->{'qspawn.filter'};
                 if (scalar(@$r) == 3) { # error
                         if ($async) {
-                                $async->close; # calls rpipe->close and $end
+                                # calls rpipe->close && ->event_step
+                                $async->close;
                         } else {
                                 $rpipe->close;
-                                $end->();
+                                event_step($self);
                         }
                         $wcb->($r);
                 } elsif ($async) {
                         # done reading headers, handoff to read body
-                        $fh = $wcb->($r); # scalar @$r == 2
+                        my $fh = $wcb->($r); # scalar @$r == 2
                         $fh = filter_fh($fh, $filter) if $filter;
+                        $self->{fh} = $fh;
                         $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
                 } else { # for synchronous PSGI servers
                         require PublicInbox::GetlineBody;
-                        $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
-                                                                $buf, $filter);
+                        $r->[2] = PublicInbox::GetlineBody->new($rpipe,
+                                                \&event_step, $self,
+                                                $buf, $filter);
                         $wcb->($r);
                 }
 
@@ -297,8 +302,9 @@ sub psgi_return {
         my $start_cb = sub { # may run later, much later...
                 ($rpipe) = @_;
                 if ($async) {
-                        # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
-                        $async = $async->($rpipe, $cb, undef, $end);
+                        # PublicInbox::HTTPD::Async->new($rpipe, $cb, $cb_arg,
+                        #                                 $end_obj)
+                        $async = $async->($rpipe, $cb, undef, $self);
                         # $cb will call ->async_pass or ->close
                 } else { # generic PSGI
                         $cb->() while $rd_hdr;