# Copyright (C) all contributors
# License: AGPL-3.0+
# Like most Perl modules in public-inbox, this is internal and
# NOT subject to any stability guarantees! It is only documented
# for other hackers.
#
# This is used to limit the number of processes spawned by the
# PSGI server, so it acts like a semaphore and queues up extra
# commands to be run if currently at the limit. Multiple "limiters"
# may be configured which give inboxes different channels to
# operate in. This can be useful to ensure smaller inboxes can
# be cloned while cloning of large inboxes is maxed out.
#
# This does not depend on the PublicInbox::DS::event_loop or any
# other external scheduling mechanism, you just need to call
# start() and finish() appropriately. However, public-inbox-httpd
# (which uses PublicInbox::DS) will be able to schedule this
# based on readability of stdout from the spawned process.
# See GitHTTPBackend.pm and SolverGit.pm for usage examples.
# It does not depend on any form of threading.
#
# This is useful for scheduling CGI execution of both long-lived
# git-http-backend(1) process (for "git clone") as well as short-lived
# processes such as git-apply(1).
package PublicInbox::Qspawn;
use v5.12;
use PublicInbox::Spawn qw(popen_rd);
use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
use PublicInbox::Limiter;
use PublicInbox::Aspawn qw(run_await);
use PublicInbox::Syscall qw(EPOLLIN);
use PublicInbox::InputPipe;
use Carp qw(carp confess);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
my $def_limiter;
# declares a command to spawn (but does not spawn it).
# $cmd is the command to spawn
# $cmd_env is the environ for the child process (not PSGI env)
# $opt can include redirects and perhaps other process spawning options
# {qsp_err} is an optional error buffer callers may access themselves
sub new {
my ($class, $cmd, $cmd_env, $opt) = @_;
bless { args => [ $cmd, $cmd_env, $opt ? { %$opt } : {} ] }, $class;
}
sub _do_spawn {
my ($self, $start_cb, $limiter) = @_;
my ($cmd, $cmd_env, $opt) = @{$self->{args}};
my %o = %{$opt || {}};
$self->{limiter} = $limiter;
for my $k (@PublicInbox::Spawn::RLIMITS) {
$opt->{$k} = $limiter->{$k} // next;
}
$self->{-quiet} = 1 if $o{quiet};
$limiter->{running}++;
if ($start_cb) {
eval { # popen_rd may die on EMFILE, ENFILE
$self->{rpipe} = popen_rd($cmd, $cmd_env, $opt,
\&waitpid_err, $self);
$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
};
} else {
eval { run_await($cmd, $cmd_env, $opt, \&wait_await, $self) };
warn "E: $@" if $@;
}
finish($self, $@) if $@;
}
sub psgi_status_err { # Qspawn itself is useful w/o PSGI
require PublicInbox::WwwStatic;
PublicInbox::WwwStatic::r($_[0] // 500);
}
sub finalize ($) {
my ($self) = @_;
# process is done, spawn whatever's in the queue
my $limiter = delete $self->{limiter} or return;
my $running = --$limiter->{running};
if ($running < $limiter->{max}) {
if (my $next = shift(@{$limiter->{run_queue}})) {
_do_spawn(@$next, $limiter);
}
}
if (my $err = $self->{_err}) { # set by finish or waitpid_err
utf8::decode($err);
if (my $dst = $self->{qsp_err}) {
$$dst .= $$dst ? " $err" : "; $err";
}
warn "E: @{$self->{args}->[0]}: $err\n" if !$self->{-quiet};
}
my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
if ($qx_cb_arg) {
my $cb = shift @$qx_cb_arg;
eval { $cb->($self->{args}->[2]->{1}, @$qx_cb_arg) };
return unless $@;
warn "E: $@"; # hope qspawn.wcb can handle it
}
return if $self->{passed}; # another command chained it
if (my $wcb = delete $env->{'qspawn.wcb'}) {
# have we started writing, yet?
$wcb->(psgi_status_err($env->{'qspawn.fallback'}));
}
}
sub waitpid_err { # callback for awaitpid
my (undef, $self) = @_; # $_[0]: pid
$self->{_err} = ''; # for defined check in ->finish
if ($?) { # XXX this may be redundant
my $status = $? >> 8;
my $sig = $? & 127;
$self->{_err} .= "exit status=$status";
$self->{_err} .= " signal=$sig" if $sig;
}
finalize($self) if !$self->{rpipe};
}
sub wait_await { # run_await cb
my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
waitpid_err($pid, $self);
}
sub yield_chunk { # $_[-1] is sysread buffer (or undef)
my ($self, $ipipe) = @_;
if (!defined($_[-1])) {
warn "error reading body: $!";
} elsif ($_[-1] eq '') { # normal EOF
$self->finish;
$self->{qfh}->close;
} elsif (defined($self->{qfh}->write($_[-1]))) {
return; # continue while HTTP client is reading our writes
} # else { # HTTP client disconnected
delete $self->{rpipe};
$ipipe->close;
}
sub finish ($;$) {
my ($self, $err) = @_;
$self->{_err} //= $err; # only for $@
# we can safely finalize if pipe was closed before, or if
# {_err} is defined by waitpid_err. Deleting {rpipe} will
# trigger PublicInbox::IO::DESTROY -> waitpid_err,
# but it may not fire right away if inside the event loop.
my $closed_before = !delete($self->{rpipe});
finalize($self) if $closed_before || defined($self->{_err});
}
sub start ($$$) {
my ($self, $limiter, $start_cb) = @_;
if ($limiter->{running} < $limiter->{max}) {
_do_spawn($self, $start_cb, $limiter);
} else {
push @{$limiter->{run_queue}}, [ $self, $start_cb ];
}
}
# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
# the stdout of the given command when done; but respects the given limiter
# $env is the PSGI env. As with ``/qx; only use this when output is small
# and safe to slurp.
sub psgi_qx {
my ($self, $env, $limiter, @qx_cb_arg) = @_;
$self->{psgi_env} = $env;
$self->{qx_cb_arg} = \@qx_cb_arg;
$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
start($self, $limiter, undef);
}
sub yield_pass {
my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
my $env = $self->{psgi_env};
my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
if (ref($res) eq 'CODE') { # chain another command
delete $self->{rpipe};
$ipipe->close if $ipipe;
$res->($wcb);
$self->{passed} = 1;
return; # all done
}
confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';
my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
pop(@$res) : delete($env->{'qspawn.filter'});
$filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);
if (scalar(@$res) == 3) { # done early (likely error or static file)
delete $self->{rpipe};
$ipipe->close if $ipipe;
$wcb->($res); # all done
return;
}
scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
return ($wcb, $filter) if !$ipipe; # generic PSGI
# streaming response
my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
$qfh = $filter->attach($qfh) if $filter;
my ($bref) = @{delete $self->{yield_parse_hdr}};
$qfh->write($$bref) if $$bref ne '';
$self->{qfh} = $qfh; # keep $ipipe open
}
sub parse_hdr_done ($$) {
my ($self) = @_;
my ($ret, $err);
if (defined $_[-1]) {
my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
$$bref .= $_[-1];
$ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
if (($err = $@)) {
$ret = psgi_status_err();
} elsif (!$ret && $_[-1] eq '') {
$err = 'EOF';
$ret = psgi_status_err();
}
} else {
$err = "$!";
$ret = psgi_status_err();
}
carp <{args}->[0]} ($self->{psgi_env}->{REQUEST_URI})
EOM
$ret; # undef if headers incomplete
}
sub ipipe_cb { # InputPipe callback
my ($ipipe, $self) = @_; # $_[-1] rbuf
if ($self->{qfh}) { # already streaming
yield_chunk($self, $ipipe, $_[-1]);
} elsif (my $res = parse_hdr_done($self, $_[-1])) {
yield_pass($self, $ipipe, $res);
} # else: headers incomplete, keep reading
}
sub _yield_start { # may run later, much later...
my ($self) = @_;
if ($self->{psgi_env}->{'pi-httpd.async'}) {
my $rpipe = $self->{rpipe};
$rpipe->blocking(0);
PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self);
} else {
require PublicInbox::GetlineResponse;
PublicInbox::GetlineResponse::response($self);
}
}
# Used for streaming the stdout of one process as a PSGI response.
#
# $env is the PSGI env.
# optional keys in $env:
# $env->{'qspawn.wcb'} - the write callback from the PSGI server
# optional, use this if you've already
# captured it elsewhere. If not given,
# psgi_yield will return an anonymous
# sub for the PSGI server to call
#
# $env->{'qspawn.filter'} - filter object, responds to ->attach for
# pi-httpd.async and ->translate for generic
# PSGI servers
#
# $limiter - the Limiter object to use (uses the def_limiter if not given)
#
# @parse_hdr_arg - Initial read cb+args; often for parsing CGI header output.
# It will be given the return value of sysread from the pipe
# and a string ref of the current buffer. Returns an arrayref
# for PSGI responses. 2-element arrays in PSGI mean the
# body will be streamed, later, via writes (push-based) to
# psgix.io. 3-element arrays means the body is available
# immediately (or streamed via ->getline (pull-based)).
sub psgi_yield {
my ($self, $env, $limiter, @parse_hdr_arg)= @_;
$self->{psgi_env} = $env;
$self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
# the caller already captured the PSGI write callback from
# the PSGI server, so we can call ->start, here:
$env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
# the caller will return this sub to the PSGI server, so
# it can set the response callback (that is, for
# PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
# but other HTTP servers are supported:
$env->{'qspawn.wcb'} = $_[0];
start($self, $limiter, \&_yield_start);
}
}
no warnings 'once';
*DESTROY = \&finalize; # ->finalize is idempotent
1;