From b490ce09b370d9398d5332ca1dc6260a7ec0aa6c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 11 Jan 2017 04:12:26 +0000 Subject: qspawn|getlinebody: support streaming filters This is intended for wrapping "git show" and "git diff" processes in the future and to prevent it from monopolizing callers. This will us to better handle backpressure from gigantic commits. --- lib/PublicInbox/Qspawn.pm | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) (limited to 'lib/PublicInbox/Qspawn.pm') diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index b80dac1f..3247cd07 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -9,6 +9,7 @@ package PublicInbox::Qspawn; use strict; use warnings; use PublicInbox::Spawn qw(popen_rd); +require Plack::Util; my $def_limiter; sub new ($$$;) { @@ -60,11 +61,25 @@ sub start { } } +# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async +sub filter_fh ($$) { + my ($fh, $filter) = @_; + Plack::Util::inline_object( + close => sub { + $fh->write($filter->(undef)); + $fh->close; + }, + write => sub { + $fh->write($filter->($_[0])); + }); +} + sub psgi_return { my ($self, $env, $limiter, $parse_hdr) = @_; my ($fh, $rpipe); my $end = sub { - if (my $err = $self->finish) { + my $err = $self->finish; + if ($err && !$env->{'qspawn.quiet'}) { $err = join(' ', @{$self->{args}->[0]}).": $err\n"; $env->{'psgi.errors'}->print($err); } @@ -84,6 +99,7 @@ sub psgi_return { my $cb = sub { my $r = $rd_hdr->() or return; $rd_hdr = undef; + my $filter = delete $env->{'qspawn.filter'}; if (scalar(@$r) == 3) { # error if ($async) { $async->close; # calls rpipe->close @@ -94,11 +110,12 @@ sub psgi_return { $res->($r); } elsif ($async) { $fh = $res->($r); # scalar @$r == 2 + $fh = filter_fh($fh, $filter) if $filter; $async->async_pass($env->{'psgix.io'}, $fh, \$buf); } else { # for synchronous PSGI servers require PublicInbox::GetlineBody; $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end, - $buf); + $buf, $filter); $res->($r); } }; -- cgit v1.2.3-24-ge0c7