diff options
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/GetlineBody.pm | 16 | ||||
-rw-r--r-- | lib/PublicInbox/Qspawn.pm | 21 |
2 files changed, 32 insertions, 5 deletions
diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm index ea07f3d6..0a922fd2 100644 --- a/lib/PublicInbox/GetlineBody.pm +++ b/lib/PublicInbox/GetlineBody.pm @@ -13,8 +13,13 @@ use strict; use warnings; sub new { - my ($class, $rpipe, $end, $buf) = @_; - bless { rpipe => $rpipe, end => $end, buf => $buf }, $class; + my ($class, $rpipe, $end, $buf, $filter) = @_; + bless { + rpipe => $rpipe, + end => $end, + buf => $buf, + filter => $filter || 0, + }, $class; } # close should always be called after getline returns undef, @@ -24,8 +29,13 @@ sub DESTROY { $_[0]->close } sub getline { my ($self) = @_; + my $filter = $self->{filter}; + return if $filter == -1; # last call was EOF + my $buf = delete $self->{buf}; # initial buffer - defined $buf ? $buf : $self->{rpipe}->getline; + $buf = $self->{rpipe}->getline unless defined $buf; + $self->{filter} = -1 unless defined $buf; # set EOF for next call + $filter ? $filter->($buf) : $buf; } sub close { diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index b80dac1f..3247cd07 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -9,6 +9,7 @@ package PublicInbox::Qspawn; use strict; use warnings; use PublicInbox::Spawn qw(popen_rd); +require Plack::Util; my $def_limiter; sub new ($$$;) { @@ -60,11 +61,25 @@ sub start { } } +# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async +sub filter_fh ($$) { + my ($fh, $filter) = @_; + Plack::Util::inline_object( + close => sub { + $fh->write($filter->(undef)); + $fh->close; + }, + write => sub { + $fh->write($filter->($_[0])); + }); +} + sub psgi_return { my ($self, $env, $limiter, $parse_hdr) = @_; my ($fh, $rpipe); my $end = sub { - if (my $err = $self->finish) { + my $err = $self->finish; + if ($err && !$env->{'qspawn.quiet'}) { $err = join(' ', @{$self->{args}->[0]}).": $err\n"; $env->{'psgi.errors'}->print($err); } @@ -84,6 +99,7 @@ sub psgi_return { my $cb = sub { my $r = $rd_hdr->() or return; $rd_hdr = undef; + my $filter = delete $env->{'qspawn.filter'}; if (scalar(@$r) == 3) { # error if ($async) { $async->close; # calls rpipe->close @@ -94,11 +110,12 @@ sub psgi_return { $res->($r); } elsif ($async) { $fh = $res->($r); # scalar @$r == 2 + $fh = filter_fh($fh, $filter) if $filter; $async->async_pass($env->{'psgix.io'}, $fh, \$buf); } else { # for synchronous PSGI servers require PublicInbox::GetlineBody; $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end, - $buf); + $buf, $filter); $res->($r); } }; |