diff options
author | Eric Wong <e@80x24.org> | 2019-01-30 07:40:35 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2019-01-30 07:40:35 +0000 |
commit | cf439203c31836e4c48f632f902aa58db6924972 (patch) | |
tree | a46bf739c5318db857783a1b2773ed51ad2f0fde /lib/PublicInbox/Qspawn.pm | |
parent | 41bb8a962d4d2d8f6e7ec9988908aaf7d55f09c7 (diff) | |
parent | 11f984fa36647b0ab107596bcec2dd9b963f1379 (diff) | |
download | public-inbox-cf439203c31836e4c48f632f902aa58db6924972.tar.gz |
* origin/viewvcs: (66 commits) solvergit: deal with alternative diff prefixes solvergit: extract mode from diff headers properly solvergit: avoid "Wide character" warnings solvergit: do not show full path names to "git apply" css/216dark: add comments and tweak highlight colors viewvcs: avoid segfault with highlight.pm at shutdown solvergit: do not solve blobs twice t/check-www-inbox: disable history t/check-www-inbox: don't follow mboxes t/check-www-inbox: replace IPC::Run with PublicInbox::Spawn hval: add src_escape for highlight post-processing viewvcs: wire up syntax-highlighting for blobs hlmod: disable enclosing <pre> tag t/hl_mod: extra check to ensure we escape HTML wwwhighlight: read_in_full returns undef on errors solver: crank up max patches to 9999 viewvcs: do not show final error message twice qspawn: decode $? for user-friendliness solver: reduce "git apply" invocations solver: hold patches in temporary directory ...
Diffstat (limited to 'lib/PublicInbox/Qspawn.pm')
-rw-r--r-- | lib/PublicInbox/Qspawn.pm | 143 |
1 files changed, 142 insertions, 1 deletions
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 3500f8a4..913fac81 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -9,6 +9,8 @@ package PublicInbox::Qspawn; use strict; use warnings; use PublicInbox::Spawn qw(popen_rd); +require Plack::Util; +my $def_limiter; sub new ($$$;) { my ($class, $cmd, $env, $opt) = @_; @@ -28,13 +30,22 @@ sub _do_spawn { $cb->($self->{rpipe}); } +sub child_err ($) { + my ($child_error) = @_; # typically $? + my $exitstatus = ($child_error >> 8) or return; + my $sig = $child_error & 127; + my $msg = "exit status=$exitstatus"; + $msg .= " signal=$sig" if $sig; + $msg; +} + sub finish ($) { my ($self) = @_; my $limiter = $self->{limiter}; my $running; if (delete $self->{rpipe}) { my $pid = delete $self->{pid}; - $self->{err} = $pid == waitpid($pid, 0) ? $? : + $self->{err} = $pid == waitpid($pid, 0) ? child_err($?) : "PID:$pid still running?"; $running = --$limiter->{running}; } @@ -59,6 +70,119 @@ sub start { } } +sub _psgi_finish ($$) { + my ($self, $env) = @_; + my $err = $self->finish; + if ($err && !$env->{'qspawn.quiet'}) { + $err = join(' ', @{$self->{args}->[0]}).": $err\n"; + $env->{'psgi.errors'}->print($err); + } +} + +sub psgi_qx { + my ($self, $env, $limiter, $qx_cb) = @_; + my $qx = PublicInbox::Qspawn::Qx->new; + my $end = sub { + _psgi_finish($self, $env); + eval { $qx_cb->($qx) }; + $qx = undef; + }; + my $rpipe; + my $async = $env->{'pi-httpd.async'}; + my $cb = sub { + my $r = sysread($rpipe, my $buf, 8192); + if ($async) { + $async->async_pass($env->{'psgix.io'}, $qx, \$buf); + } elsif (defined $r) { + $r ? $qx->write($buf) : $end->(); + } else { + return if $!{EAGAIN} || $!{EINTR}; # loop again + $end->(); + } + }; + $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); + $self->start($limiter, sub { # may run later, much later... + ($rpipe) = @_; + if ($async) { + # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) + $async = $async->($rpipe, $cb, $end); + } else { # generic PSGI + $cb->() while $qx; + } + }); +} + +# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async +sub filter_fh ($$) { + my ($fh, $filter) = @_; + Plack::Util::inline_object( + close => sub { + $fh->write($filter->(undef)); + $fh->close; + }, + write => sub { + $fh->write($filter->($_[0])); + }); +} + +sub psgi_return { + my ($self, $env, $limiter, $parse_hdr) = @_; + my ($fh, $rpipe); + my $end = sub { + _psgi_finish($self, $env); + $fh->close if $fh; # async-only + }; + + my $buf = ''; + my $rd_hdr = sub { + my $r = sysread($rpipe, $buf, 1024, length($buf)); + return if !defined($r) && ($!{EINTR} || $!{EAGAIN}); + $parse_hdr->($r, \$buf); + }; + my $res = delete $env->{'qspawn.response'}; + my $async = $env->{'pi-httpd.async'}; + my $cb = sub { + my $r = $rd_hdr->() or return; + $rd_hdr = undef; + my $filter = delete $env->{'qspawn.filter'}; + if (scalar(@$r) == 3) { # error + if ($async) { + $async->close; # calls rpipe->close and $end + } else { + $rpipe->close; + $end->(); + } + $res->($r); + } elsif ($async) { + $fh = $res->($r); # scalar @$r == 2 + $fh = filter_fh($fh, $filter) if $filter; + $async->async_pass($env->{'psgix.io'}, $fh, \$buf); + } else { # for synchronous PSGI servers + require PublicInbox::GetlineBody; + $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end, + $buf, $filter); + $res->($r); + } + }; + $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); + my $start_cb = sub { # may run later, much later... + ($rpipe) = @_; + if ($async) { + # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) + $async = $async->($rpipe, $cb, $end); + } else { # generic PSGI + $cb->() while $rd_hdr; + } + }; + + return $self->start($limiter, $start_cb) if $res; + + sub { + ($res) = @_; + $self->start($limiter, $start_cb); + }; +} + package PublicInbox::Qspawn::Limiter; use strict; use warnings; @@ -73,4 +197,21 @@ sub new { }, $class; } +# captures everything into a buffer and executes a callback when done +package PublicInbox::Qspawn::Qx; +use strict; +use warnings; + +sub new { + my ($class) = @_; + my $buf = ''; + bless \$buf, $class; +} + +# called by PublicInbox::HTTPD::Async ($fh->write) +sub write { + ${$_[0]} .= $_[1]; + undef; +} + 1; |