# Copyright (C) all contributors # License: AGPL-3.0+ # Like most Perl modules in public-inbox, this is internal and # NOT subject to any stability guarantees! It is only documented # for other hackers. # # This is used to limit the number of processes spawned by the # PSGI server, so it acts like a semaphore and queues up extra # commands to be run if currently at the limit. Multiple "limiters" # may be configured which give inboxes different channels to # operate in. This can be useful to ensure smaller inboxes can # be cloned while cloning of large inboxes is maxed out. # # This does not depend on the PublicInbox::DS::event_loop or any # other external scheduling mechanism, you just need to call # start() and finish() appropriately. However, public-inbox-httpd # (which uses PublicInbox::DS) will be able to schedule this # based on readability of stdout from the spawned process. # See GitHTTPBackend.pm and SolverGit.pm for usage examples. # It does not depend on any form of threading. # # This is useful for scheduling CGI execution of both long-lived # git-http-backend(1) process (for "git clone") as well as short-lived # processes such as git-apply(1). package PublicInbox::Qspawn; use v5.12; use PublicInbox::Spawn qw(popen_rd); use PublicInbox::GzipFilter; use Scalar::Util qw(blessed); use PublicInbox::Limiter; use PublicInbox::Aspawn qw(run_await); use PublicInbox::Syscall qw(EPOLLIN); use PublicInbox::InputPipe; use Carp qw(carp confess); # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers use Errno qw(EAGAIN EINTR); my $def_limiter; # declares a command to spawn (but does not spawn it). # $cmd is the command to spawn # $cmd_env is the environ for the child process (not PSGI env) # $opt can include redirects and perhaps other process spawning options # {qsp_err} is an optional error buffer callers may access themselves sub new { my ($class, $cmd, $cmd_env, $opt) = @_; bless { args => [ $cmd, $cmd_env, $opt ? { %$opt } : {} ] }, $class; } sub _do_spawn { my ($self, $start_cb, $limiter) = @_; my ($cmd, $cmd_env, $opt) = @{$self->{args}}; my %o = %{$opt || {}}; $self->{limiter} = $limiter; for my $k (@PublicInbox::Spawn::RLIMITS) { $opt->{$k} = $limiter->{$k} // next; } $self->{-quiet} = 1 if $o{quiet}; $limiter->{running}++; if ($start_cb) { eval { # popen_rd may die on EMFILE, ENFILE $self->{rpipe} = popen_rd($cmd, $cmd_env, $opt, \&waitpid_err, $self); $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM }; } else { eval { run_await($cmd, $cmd_env, $opt, \&wait_await, $self) }; warn "E: $@" if $@; } finish($self, $@) if $@; } sub psgi_status_err { # Qspawn itself is useful w/o PSGI require PublicInbox::WwwStatic; PublicInbox::WwwStatic::r($_[0] // 500); } sub finalize ($) { my ($self) = @_; # process is done, spawn whatever's in the queue my $limiter = delete $self->{limiter} or return; my $running = --$limiter->{running}; if ($running < $limiter->{max}) { if (my $next = shift(@{$limiter->{run_queue}})) { _do_spawn(@$next, $limiter); } } if (my $err = $self->{_err}) { # set by finish or waitpid_err utf8::decode($err); if (my $dst = $self->{qsp_err}) { $$dst .= $$dst ? " $err" : "; $err"; } warn "E: @{$self->{args}->[0]}: $err\n" if !$self->{-quiet}; } my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)}; if ($qx_cb_arg) { my $cb = shift @$qx_cb_arg; eval { $cb->($self->{args}->[2]->{1}, @$qx_cb_arg) }; return unless $@; warn "E: $@"; # hope qspawn.wcb can handle it } return if $self->{passed}; # another command chained it if (my $wcb = delete $env->{'qspawn.wcb'}) { # have we started writing, yet? $wcb->(psgi_status_err($env->{'qspawn.fallback'})); } } sub waitpid_err { # callback for awaitpid my (undef, $self) = @_; # $_[0]: pid $self->{_err} = ''; # for defined check in ->finish if ($?) { # XXX this may be redundant my $status = $? >> 8; my $sig = $? & 127; $self->{_err} .= "exit status=$status"; $self->{_err} .= " signal=$sig" if $sig; } finalize($self) if !$self->{rpipe}; } sub wait_await { # run_await cb my ($pid, $cmd, $cmd_env, $opt, $self) = @_; waitpid_err($pid, $self); } sub yield_chunk { # $_[-1] is sysread buffer (or undef) my ($self, $ipipe) = @_; if (!defined($_[-1])) { warn "error reading body: $!"; } elsif ($_[-1] eq '') { # normal EOF $self->finish; $self->{qfh}->close; } elsif (defined($self->{qfh}->write($_[-1]))) { return; # continue while HTTP client is reading our writes } # else { # HTTP client disconnected delete $self->{rpipe}; $ipipe->close; } sub finish ($;$) { my ($self, $err) = @_; $self->{_err} //= $err; # only for $@ # we can safely finalize if pipe was closed before, or if # {_err} is defined by waitpid_err. Deleting {rpipe} will # trigger PublicInbox::IO::DESTROY -> waitpid_err, # but it may not fire right away if inside the event loop. my $closed_before = !delete($self->{rpipe}); finalize($self) if $closed_before || defined($self->{_err}); } sub start ($$$) { my ($self, $limiter, $start_cb) = @_; if ($limiter->{running} < $limiter->{max}) { _do_spawn($self, $start_cb, $limiter); } else { push @{$limiter->{run_queue}}, [ $self, $start_cb ]; } } # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with # the stdout of the given command when done; but respects the given limiter # $env is the PSGI env. As with ``/qx; only use this when output is small # and safe to slurp. sub psgi_qx { my ($self, $env, $limiter, @qx_cb_arg) = @_; $self->{psgi_env} = $env; $self->{qx_cb_arg} = \@qx_cb_arg; $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32); start($self, $limiter, undef); } sub yield_pass { my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe my $env = $self->{psgi_env}; my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb'); if (ref($res) eq 'CODE') { # chain another command delete $self->{rpipe}; $ipipe->close if $ipipe; $res->($wcb); $self->{passed} = 1; return; # all done } confess("BUG: $res unhandled") if ref($res) ne 'ARRAY'; my $filter = blessed($res->[2]) && $res->[2]->can('attach') ? pop(@$res) : delete($env->{'qspawn.filter'}); $filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env); if (scalar(@$res) == 3) { # done early (likely error or static file) delete $self->{rpipe}; $ipipe->close if $ipipe; $wcb->($res); # all done return; } scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res"); return ($wcb, $filter) if !$ipipe; # generic PSGI # streaming response my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity) $qfh = $filter->attach($qfh) if $filter; my ($bref) = @{delete $self->{yield_parse_hdr}}; $qfh->write($$bref) if $$bref ne ''; $self->{qfh} = $qfh; # keep $ipipe open } sub parse_hdr_done ($$) { my ($self) = @_; my ($ret, $err); if (defined $_[-1]) { my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}}; $$bref .= $_[-1]; $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) }; if (($err = $@)) { $ret = psgi_status_err(); } elsif (!$ret && $_[-1] eq '') { $err = 'EOF'; $ret = psgi_status_err(); } } else { $err = "$!"; $ret = psgi_status_err(); } carp <{args}->[0]} ($self->{psgi_env}->{REQUEST_URI}) EOM $ret; # undef if headers incomplete } sub ipipe_cb { # InputPipe callback my ($ipipe, $self) = @_; # $_[-1] rbuf if ($self->{qfh}) { # already streaming yield_chunk($self, $ipipe, $_[-1]); } elsif (my $res = parse_hdr_done($self, $_[-1])) { yield_pass($self, $ipipe, $res); } # else: headers incomplete, keep reading } sub _yield_start { # may run later, much later... my ($self) = @_; if ($self->{psgi_env}->{'pi-httpd.async'}) { my $rpipe = $self->{rpipe}; $rpipe->blocking(0); PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self); } else { require PublicInbox::GetlineResponse; PublicInbox::GetlineResponse::response($self); } } # Used for streaming the stdout of one process as a PSGI response. # # $env is the PSGI env. # optional keys in $env: # $env->{'qspawn.wcb'} - the write callback from the PSGI server # optional, use this if you've already # captured it elsewhere. If not given, # psgi_yield will return an anonymous # sub for the PSGI server to call # # $env->{'qspawn.filter'} - filter object, responds to ->attach for # pi-httpd.async and ->translate for generic # PSGI servers # # $limiter - the Limiter object to use (uses the def_limiter if not given) # # @parse_hdr_arg - Initial read cb+args; often for parsing CGI header output. # It will be given the return value of sysread from the pipe # and a string ref of the current buffer. Returns an arrayref # for PSGI responses. 2-element arrays in PSGI mean the # body will be streamed, later, via writes (push-based) to # psgix.io. 3-element arrays means the body is available # immediately (or streamed via ->getline (pull-based)). sub psgi_yield { my ($self, $env, $limiter, @parse_hdr_arg)= @_; $self->{psgi_env} = $env; $self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ]; $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32); # the caller already captured the PSGI write callback from # the PSGI server, so we can call ->start, here: $env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub { # the caller will return this sub to the PSGI server, so # it can set the response callback (that is, for # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback), # but other HTTP servers are supported: $env->{'qspawn.wcb'} = $_[0]; start($self, $limiter, \&_yield_start); } } no warnings 'once'; *DESTROY = \&finalize; # ->finalize is idempotent 1;