From a71cb67a1237c450a9cbbd6738c5af3b73ba4c61 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 21 Mar 2020 02:03:44 +0000 Subject: qspawn: reinstate filter support, add gzip filter We'll be supporting gzipped from sqlite3(1) dumps for altid files in future commits. In the future (and if we survive), we may replace Plack::Middleware::Deflater with our own GzipFilter to work better with asynchronous responses without relying on memory-intensive anonymous subs. --- lib/PublicInbox/GetlineBody.pm | 21 ++++++++-------- lib/PublicInbox/GzipFilter.pm | 54 ++++++++++++++++++++++++++++++++++++++++++ lib/PublicInbox/Qspawn.pm | 8 ++++++- 3 files changed, 71 insertions(+), 12 deletions(-) create mode 100644 lib/PublicInbox/GzipFilter.pm (limited to 'lib') diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm index 92719a82..6becaaf5 100644 --- a/lib/PublicInbox/GetlineBody.pm +++ b/lib/PublicInbox/GetlineBody.pm @@ -13,13 +13,13 @@ use strict; use warnings; sub new { - my ($class, $rpipe, $end, $end_arg, $buf) = @_; + my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_; bless { rpipe => $rpipe, end => $end, end_arg => $end_arg, - buf => $buf, - filter => 0, + initial_buf => $buf, + filter => $filter, }, $class; } @@ -30,19 +30,18 @@ sub DESTROY { $_[0]->close } sub getline { my ($self) = @_; - my $filter = $self->{filter}; - return if $filter == -1; # last call was EOF - - my $buf = delete $self->{buf}; # initial buffer - $buf = $self->{rpipe}->getline unless defined $buf; - $self->{filter} = -1 unless defined $buf; # set EOF for next call + my $rpipe = $self->{rpipe} or return; # EOF was set on previous call + my $buf = delete($self->{initial_buf}) // $rpipe->getline; + delete($self->{rpipe}) unless defined $buf; # set EOF for next call + if (my $filter = $self->{filter}) { + $buf = $filter->translate($buf); + } $buf; } sub close { my ($self) = @_; - my ($rpipe, $end, $end_arg) = delete @$self{qw(rpipe end end_arg)}; - close $rpipe if $rpipe; + my ($end, $end_arg) = delete @$self{qw(end end_arg)}; $end->($end_arg) if $end; } diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm new file mode 100644 index 00000000..d883130f --- /dev/null +++ b/lib/PublicInbox/GzipFilter.pm @@ -0,0 +1,54 @@ +# Copyright (C) 2020 all contributors +# License: AGPL-3.0+ + +# Qspawn filter +package PublicInbox::GzipFilter; +use strict; +use bytes (); # length +use Compress::Raw::Zlib qw(Z_FINISH Z_OK); +my %OPT = (-WindowBits => 15 + 16, -AppendOutput => 1); + +sub new { + my ($gz, $err) = Compress::Raw::Zlib::Deflate->new(%OPT); + $err == Z_OK or die "Deflate->new failed: $err"; + bless { gz => $gz }, shift; +} + +# for Qspawn if using $env->{'pi-httpd.async'} +sub attach { + my ($self, $fh) = @_; + $self->{fh} = $fh; + $self +} + +# for GetlineBody (via Qspawn) when NOT using $env->{'pi-httpd.async'} +sub translate ($$) { + my $self = $_[0]; + my $zbuf = delete($self->{zbuf}); + if (defined $_[1]) { # my $buf = $_[1]; + my $err = $self->{gz}->deflate($_[1], $zbuf); + die "gzip->deflate: $err" if $err != Z_OK; + return $zbuf if length($zbuf) >= 8192; + + $self->{zbuf} = $zbuf; + ''; + } else { # undef == EOF + my $err = $self->{gz}->flush($zbuf, Z_FINISH); + die "gzip->flush: $err" if $err != Z_OK; + $zbuf; + } +} + +sub write { + # my $ret = bytes::length($_[1]); # XXX does anybody care? + $_[0]->{fh}->write(translate($_[0], $_[1])); +} + +sub close { + my ($self) = @_; + my $fh = delete $self->{fh}; + $fh->write(translate($self, undef)); + $fh->close; +} + +1; diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 63ec3648..52aea3eb 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -243,6 +243,7 @@ sub psgi_return_init_cb { my ($self) = @_; my $r = rd_hdr($self) or return; my $env = $self->{psgi_env}; + my $filter = delete $env->{'qspawn.filter'}; my $wcb = delete $env->{'qspawn.wcb'}; my $async = delete $self->{async}; if (scalar(@$r) == 3) { # error @@ -257,6 +258,7 @@ sub psgi_return_init_cb { } elsif ($async) { # done reading headers, handoff to read body my $fh = $wcb->($r); # scalar @$r == 2 + $fh = $filter->attach($fh) if $filter; $self->{fh} = $fh; $async->async_pass($env->{'psgix.io'}, $fh, delete($self->{hdr_buf})); @@ -264,7 +266,7 @@ sub psgi_return_init_cb { require PublicInbox::GetlineBody; $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe}, \&event_step, $self, - ${$self->{hdr_buf}}); + ${$self->{hdr_buf}}, $filter); $wcb->($r); } @@ -294,6 +296,10 @@ sub psgi_return_start { # may run later, much later... # psgi_return will return an anonymous # sub for the PSGI server to call # +# $env->{'qspawn.filter'} - filter object, responds to ->attach for +# pi-httpd.async and ->translate for generic +# PSGI servers +# # $limiter - the Limiter object to use (uses the def_limiter if not given) # # $parse_hdr - Initial read function; often for parsing CGI header output. -- cgit v1.2.3-24-ge0c7