about summary refs log tree commit homepage
path: root/lib/PublicInbox/Qspawn.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/Qspawn.pm')
-rw-r--r--lib/PublicInbox/Qspawn.pm119
1 files changed, 118 insertions, 1 deletions
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 9a7e8734..203d8f41 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -31,6 +31,9 @@ use PublicInbox::GzipFilter;
 use Scalar::Util qw(blessed);
 use PublicInbox::Limiter;
 use PublicInbox::Aspawn qw(run_await);
+use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::InputPipe;
+use Carp qw(carp confess);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -61,7 +64,7 @@ sub _do_spawn {
         if ($start_cb) {
                 eval { # popen_rd may die on EMFILE, ENFILE
                         $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
-                                                \&waitpid_err, $self);
+                                                \&waitpid_err, $self, \%o);
                         $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
                 };
         } else {
@@ -126,6 +129,20 @@ sub wait_await { # run_await cb
         waitpid_err($pid, $self, $opt);
 }
 
+sub yield_chunk { # $_[-1] is sysread buffer (or undef)
+        my ($self, $ipipe) = @_;
+        if (!defined($_[-1])) {
+                warn "error reading body: $!";
+        } elsif ($_[-1] eq '') { # normal EOF
+                $self->finish;
+                $self->{qfh}->close;
+        } elsif (defined($self->{qfh}->write($_[-1]))) {
+                return; # continue while HTTP client is reading our writes
+        } # else { # HTTP client disconnected
+        delete $self->{rpipe};
+        $ipipe->close;
+}
+
 sub finish ($;$) {
         my ($self, $err) = @_;
         $self->{_err} //= $err; # only for $@
@@ -201,6 +218,39 @@ EOM
         $ret;
 }
 
+sub yield_pass {
+        my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
+        my $env = $self->{psgi_env};
+        my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
+        if (ref($res) eq 'CODE') { # chain another command
+                delete $self->{rpipe};
+                $ipipe->close if $ipipe;
+                $res->($wcb);
+                $self->{passed} = 1;
+                return; # all done
+        }
+        confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';
+
+        my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
+                        pop(@$res) : delete($env->{'qspawn.filter'});
+        $filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);
+
+        if (scalar(@$res) == 3) { # done early (likely error or static file)
+                delete $self->{rpipe};
+                $ipipe->close if $ipipe;
+                $wcb->($res); # all done
+                return;
+        }
+        scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
+        return ($wcb, $filter) if !$ipipe; # generic PSGI
+        # streaming response
+        my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
+        $qfh = $filter->attach($qfh) if $filter;
+        my ($bref) = @{delete $self->{yield_parse_hdr}};
+        $qfh->write($$bref) if $$bref ne '';
+        $self->{qfh} = $qfh; # keep $ipipe open
+}
+
 sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
         my ($self) = @_;
         my $r = rd_hdr($self) or return; # incomplete
@@ -257,6 +307,55 @@ sub psgi_return_start { # may run later, much later...
         }
 }
 
+sub r500 () { [ 500, [], [ "Internal error\n" ] ] }
+
+sub parse_hdr_done ($$) {
+        my ($self) = @_;
+        my $ret;
+        if (defined $_[-1]) {
+                my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
+                $$bref .= $_[-1];
+                $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
+                if ($@) {
+                        carp "parse_hdr (@{$self->{cmd}}): $@\n";
+                        $ret = r500();
+                } elsif (!$ret && $_[-1] eq '') {
+                        carp <<EOM;
+EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+                        $ret = r500();
+                }
+        } else {
+                carp <<EOM;
+E: parsing headers: $! from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+                $ret = r500();
+        }
+        $ret; # undef if headers incomplete
+}
+
+sub ipipe_cb { # InputPipe callback
+        my ($ipipe, $self) = @_; # $_[-1] rbuf
+        if ($self->{qfh}) { # already streaming
+                yield_chunk($self, $ipipe, $_[-1]);
+        } elsif (my $res = parse_hdr_done($self, $_[-1])) {
+                yield_pass($self, $ipipe, $res);
+        } # else: headers incomplete, keep reading
+}
+
+sub _yield_start { # may run later, much later...
+        my ($self) = @_;
+        if ($self->{psgi_env}->{'pi-httpd.async'}) {
+                require PublicInbox::ProcessIONBF;
+                my $rpipe = $self->{rpipe};
+                PublicInbox::ProcessIONBF->replace($rpipe);
+                PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self);
+        } else {
+                require PublicInbox::GetlineResponse;
+                PublicInbox::GetlineResponse::response($self);
+        }
+}
+
 # Used for streaming the stdout of one process as a PSGI response.
 #
 # $env is the PSGI env.
@@ -302,4 +401,22 @@ sub psgi_return {
         }
 }
 
+sub psgi_yield {
+        my ($self, $env, $limiter, @parse_hdr_arg)= @_;
+        $self->{psgi_env} = $env;
+        $self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
+        $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
+
+        # the caller already captured the PSGI write callback from
+        # the PSGI server, so we can call ->start, here:
+        $env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
+                # the caller will return this sub to the PSGI server, so
+                # it can set the response callback (that is, for
+                # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+                # but other HTTP servers are supported:
+                $env->{'qspawn.wcb'} = $_[0];
+                start($self, $limiter, \&_yield_start);
+        }
+}
+
 1;