diff options
Diffstat (limited to 'lib/PublicInbox/Qspawn.pm')
-rw-r--r-- | lib/PublicInbox/Qspawn.pm | 119 |
1 files changed, 118 insertions, 1 deletions
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 9a7e8734..203d8f41 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -31,6 +31,9 @@ use PublicInbox::GzipFilter; use Scalar::Util qw(blessed); use PublicInbox::Limiter; use PublicInbox::Aspawn qw(run_await); +use PublicInbox::Syscall qw(EPOLLIN); +use PublicInbox::InputPipe; +use Carp qw(carp confess); # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers use Errno qw(EAGAIN EINTR); @@ -61,7 +64,7 @@ sub _do_spawn { if ($start_cb) { eval { # popen_rd may die on EMFILE, ENFILE $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o, - \&waitpid_err, $self); + \&waitpid_err, $self, \%o); $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM }; } else { @@ -126,6 +129,20 @@ sub wait_await { # run_await cb waitpid_err($pid, $self, $opt); } +sub yield_chunk { # $_[-1] is sysread buffer (or undef) + my ($self, $ipipe) = @_; + if (!defined($_[-1])) { + warn "error reading body: $!"; + } elsif ($_[-1] eq '') { # normal EOF + $self->finish; + $self->{qfh}->close; + } elsif (defined($self->{qfh}->write($_[-1]))) { + return; # continue while HTTP client is reading our writes + } # else { # HTTP client disconnected + delete $self->{rpipe}; + $ipipe->close; +} + sub finish ($;$) { my ($self, $err) = @_; $self->{_err} //= $err; # only for $@ @@ -201,6 +218,39 @@ EOM $ret; } +sub yield_pass { + my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe + my $env = $self->{psgi_env}; + my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb'); + if (ref($res) eq 'CODE') { # chain another command + delete $self->{rpipe}; + $ipipe->close if $ipipe; + $res->($wcb); + $self->{passed} = 1; + return; # all done + } + confess("BUG: $res unhandled") if ref($res) ne 'ARRAY'; + + my $filter = blessed($res->[2]) && $res->[2]->can('attach') ? + pop(@$res) : delete($env->{'qspawn.filter'}); + $filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env); + + if (scalar(@$res) == 3) { # done early (likely error or static file) + delete $self->{rpipe}; + $ipipe->close if $ipipe; + $wcb->($res); # all done + return; + } + scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res"); + return ($wcb, $filter) if !$ipipe; # generic PSGI + # streaming response + my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity) + $qfh = $filter->attach($qfh) if $filter; + my ($bref) = @{delete $self->{yield_parse_hdr}}; + $qfh->write($$bref) if $$bref ne ''; + $self->{qfh} = $qfh; # keep $ipipe open +} + sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb} my ($self) = @_; my $r = rd_hdr($self) or return; # incomplete @@ -257,6 +307,55 @@ sub psgi_return_start { # may run later, much later... } } +sub r500 () { [ 500, [], [ "Internal error\n" ] ] } + +sub parse_hdr_done ($$) { + my ($self) = @_; + my $ret; + if (defined $_[-1]) { + my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}}; + $$bref .= $_[-1]; + $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) }; + if ($@) { + carp "parse_hdr (@{$self->{cmd}}): $@\n"; + $ret = r500(); + } elsif (!$ret && $_[-1] eq '') { + carp <<EOM; +EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI}) +EOM + $ret = r500(); + } + } else { + carp <<EOM; +E: parsing headers: $! from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI}) +EOM + $ret = r500(); + } + $ret; # undef if headers incomplete +} + +sub ipipe_cb { # InputPipe callback + my ($ipipe, $self) = @_; # $_[-1] rbuf + if ($self->{qfh}) { # already streaming + yield_chunk($self, $ipipe, $_[-1]); + } elsif (my $res = parse_hdr_done($self, $_[-1])) { + yield_pass($self, $ipipe, $res); + } # else: headers incomplete, keep reading +} + +sub _yield_start { # may run later, much later... + my ($self) = @_; + if ($self->{psgi_env}->{'pi-httpd.async'}) { + require PublicInbox::ProcessIONBF; + my $rpipe = $self->{rpipe}; + PublicInbox::ProcessIONBF->replace($rpipe); + PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self); + } else { + require PublicInbox::GetlineResponse; + PublicInbox::GetlineResponse::response($self); + } +} + # Used for streaming the stdout of one process as a PSGI response. # # $env is the PSGI env. @@ -302,4 +401,22 @@ sub psgi_return { } } +sub psgi_yield { + my ($self, $env, $limiter, @parse_hdr_arg)= @_; + $self->{psgi_env} = $env; + $self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ]; + $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32); + + # the caller already captured the PSGI write callback from + # the PSGI server, so we can call ->start, here: + $env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub { + # the caller will return this sub to the PSGI server, so + # it can set the response callback (that is, for + # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback), + # but other HTTP servers are supported: + $env->{'qspawn.wcb'} = $_[0]; + start($self, $limiter, \&_yield_start); + } +} + 1; |