From a71cb67a1237c450a9cbbd6738c5af3b73ba4c61 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 21 Mar 2020 02:03:44 +0000 Subject: qspawn: reinstate filter support, add gzip filter We'll be supporting gzipped from sqlite3(1) dumps for altid files in future commits. In the future (and if we survive), we may replace Plack::Middleware::Deflater with our own GzipFilter to work better with asynchronous responses without relying on memory-intensive anonymous subs. --- MANIFEST | 2 ++ lib/PublicInbox/GetlineBody.pm | 21 ++++++++-------- lib/PublicInbox/GzipFilter.pm | 54 ++++++++++++++++++++++++++++++++++++++++++ lib/PublicInbox/Qspawn.pm | 8 ++++++- t/gzip_filter.t | 37 +++++++++++++++++++++++++++++ t/httpd-corner.psgi | 9 +++++++ t/httpd-corner.t | 25 +++++++++++++++++++ 7 files changed, 144 insertions(+), 12 deletions(-) create mode 100644 lib/PublicInbox/GzipFilter.pm create mode 100644 t/gzip_filter.t diff --git a/MANIFEST b/MANIFEST index f077d722..d53af77c 100644 --- a/MANIFEST +++ b/MANIFEST @@ -111,6 +111,7 @@ lib/PublicInbox/Filter/Vger.pm lib/PublicInbox/GetlineBody.pm lib/PublicInbox/Git.pm lib/PublicInbox/GitHTTPBackend.pm +lib/PublicInbox/GzipFilter.pm lib/PublicInbox/HTTP.pm lib/PublicInbox/HTTPD.pm lib/PublicInbox/HTTPD/Async.pm @@ -233,6 +234,7 @@ t/filter_vger.t t/git-http-backend.psgi t/git.fast-import-data t/git.t +t/gzip_filter.t t/hl_mod.t t/html_index.t t/httpd-corner.psgi diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm index 92719a82..6becaaf5 100644 --- a/lib/PublicInbox/GetlineBody.pm +++ b/lib/PublicInbox/GetlineBody.pm @@ -13,13 +13,13 @@ use strict; use warnings; sub new { - my ($class, $rpipe, $end, $end_arg, $buf) = @_; + my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_; bless { rpipe => $rpipe, end => $end, end_arg => $end_arg, - buf => $buf, - filter => 0, + initial_buf => $buf, + filter => $filter, }, $class; } @@ -30,19 +30,18 @@ sub DESTROY { $_[0]->close } sub getline { my ($self) = @_; - my $filter = $self->{filter}; - return if $filter == -1; # last call was EOF - - my $buf = delete $self->{buf}; # initial buffer - $buf = $self->{rpipe}->getline unless defined $buf; - $self->{filter} = -1 unless defined $buf; # set EOF for next call + my $rpipe = $self->{rpipe} or return; # EOF was set on previous call + my $buf = delete($self->{initial_buf}) // $rpipe->getline; + delete($self->{rpipe}) unless defined $buf; # set EOF for next call + if (my $filter = $self->{filter}) { + $buf = $filter->translate($buf); + } $buf; } sub close { my ($self) = @_; - my ($rpipe, $end, $end_arg) = delete @$self{qw(rpipe end end_arg)}; - close $rpipe if $rpipe; + my ($end, $end_arg) = delete @$self{qw(end end_arg)}; $end->($end_arg) if $end; } diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm new file mode 100644 index 00000000..d883130f --- /dev/null +++ b/lib/PublicInbox/GzipFilter.pm @@ -0,0 +1,54 @@ +# Copyright (C) 2020 all contributors +# License: AGPL-3.0+ + +# Qspawn filter +package PublicInbox::GzipFilter; +use strict; +use bytes (); # length +use Compress::Raw::Zlib qw(Z_FINISH Z_OK); +my %OPT = (-WindowBits => 15 + 16, -AppendOutput => 1); + +sub new { + my ($gz, $err) = Compress::Raw::Zlib::Deflate->new(%OPT); + $err == Z_OK or die "Deflate->new failed: $err"; + bless { gz => $gz }, shift; +} + +# for Qspawn if using $env->{'pi-httpd.async'} +sub attach { + my ($self, $fh) = @_; + $self->{fh} = $fh; + $self +} + +# for GetlineBody (via Qspawn) when NOT using $env->{'pi-httpd.async'} +sub translate ($$) { + my $self = $_[0]; + my $zbuf = delete($self->{zbuf}); + if (defined $_[1]) { # my $buf = $_[1]; + my $err = $self->{gz}->deflate($_[1], $zbuf); + die "gzip->deflate: $err" if $err != Z_OK; + return $zbuf if length($zbuf) >= 8192; + + $self->{zbuf} = $zbuf; + ''; + } else { # undef == EOF + my $err = $self->{gz}->flush($zbuf, Z_FINISH); + die "gzip->flush: $err" if $err != Z_OK; + $zbuf; + } +} + +sub write { + # my $ret = bytes::length($_[1]); # XXX does anybody care? + $_[0]->{fh}->write(translate($_[0], $_[1])); +} + +sub close { + my ($self) = @_; + my $fh = delete $self->{fh}; + $fh->write(translate($self, undef)); + $fh->close; +} + +1; diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 63ec3648..52aea3eb 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -243,6 +243,7 @@ sub psgi_return_init_cb { my ($self) = @_; my $r = rd_hdr($self) or return; my $env = $self->{psgi_env}; + my $filter = delete $env->{'qspawn.filter'}; my $wcb = delete $env->{'qspawn.wcb'}; my $async = delete $self->{async}; if (scalar(@$r) == 3) { # error @@ -257,6 +258,7 @@ sub psgi_return_init_cb { } elsif ($async) { # done reading headers, handoff to read body my $fh = $wcb->($r); # scalar @$r == 2 + $fh = $filter->attach($fh) if $filter; $self->{fh} = $fh; $async->async_pass($env->{'psgix.io'}, $fh, delete($self->{hdr_buf})); @@ -264,7 +266,7 @@ sub psgi_return_init_cb { require PublicInbox::GetlineBody; $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe}, \&event_step, $self, - ${$self->{hdr_buf}}); + ${$self->{hdr_buf}}, $filter); $wcb->($r); } @@ -294,6 +296,10 @@ sub psgi_return_start { # may run later, much later... # psgi_return will return an anonymous # sub for the PSGI server to call # +# $env->{'qspawn.filter'} - filter object, responds to ->attach for +# pi-httpd.async and ->translate for generic +# PSGI servers +# # $limiter - the Limiter object to use (uses the def_limiter if not given) # # $parse_hdr - Initial read function; often for parsing CGI header output. diff --git a/t/gzip_filter.t b/t/gzip_filter.t new file mode 100644 index 00000000..400214e6 --- /dev/null +++ b/t/gzip_filter.t @@ -0,0 +1,37 @@ +# Copyright (C) 2020 all contributors +# License: AGPL-3.0+ +use strict; +use Test::More; +use IO::Handle (); # autoflush +use Fcntl qw(SEEK_SET); +use PublicInbox::TestCommon; +require_mods(qw(Compress::Zlib IO::Uncompress::Gunzip)); +require_ok 'PublicInbox::GzipFilter'; + +{ + open my $fh, '+>', undef or die "open: $!"; + open my $dup, '>&', $fh or die "dup $!"; + $dup->autoflush(1); + my $filter = PublicInbox::GzipFilter->new->attach($dup); + ok($filter->write("hello"), 'wrote something'); + ok($filter->write("world"), 'wrote more'); + $filter->close; + seek($fh, 0, SEEK_SET) or die; + IO::Uncompress::Gunzip::gunzip($fh => \(my $buf)); + is($buf, 'helloworld', 'buffer matches'); +} + +{ + pipe(my ($r, $w)) or die "pipe: $!"; + $w->autoflush(1); + close $r or die; + my $filter = PublicInbox::GzipFilter->new->attach($w); + my $sigpipe; + local $SIG{PIPE} = sub { $sigpipe = 1 }; + open my $fh, '<', 'COPYING' or die "open(COPYING): $!"; + my $buf = do { local $/; <$fh> }; + while ($filter->write($buf .= rand)) {} + ok($sigpipe, 'got SIGPIPE'); + close $w; +} +done_testing; diff --git a/t/httpd-corner.psgi b/t/httpd-corner.psgi index 35d1216e..f2427234 100644 --- a/t/httpd-corner.psgi +++ b/t/httpd-corner.psgi @@ -85,6 +85,15 @@ my $app = sub { close $null; [ 200, [ qw(Content-Type application/octet-stream) ]]; }); + } elsif ($path eq '/psgi-return-gzip') { + require PublicInbox::Qspawn; + require PublicInbox::GzipFilter; + my $cmd = [qw(echo hello world)]; + my $qsp = PublicInbox::Qspawn->new($cmd); + $env->{'qspawn.filter'} = PublicInbox::GzipFilter->new; + return $qsp->psgi_return($env, undef, sub { + [ 200, [ qw(Content-Type application/octet-stream)]] + }); } elsif ($path eq '/pid') { $code = 200; push @$body, "$$\n"; diff --git a/t/httpd-corner.t b/t/httpd-corner.t index c99e5ec7..e50aa436 100644 --- a/t/httpd-corner.t +++ b/t/httpd-corner.t @@ -22,6 +22,7 @@ my $err = "$tmpdir/stderr.log"; my $out = "$tmpdir/stdout.log"; my $psgi = "./t/httpd-corner.psgi"; my $sock = tcp_server() or die; +my @zmods = qw(PublicInbox::GzipFilter IO::Uncompress::Gunzip); # make sure stdin is not a pipe for lsof test to check for leaking pipes open(STDIN, '<', '/dev/null') or die 'no /dev/null: $!'; @@ -324,6 +325,14 @@ SKIP: { close $fh or die "curl errored out \$?=$?"; is($n, 30 * 1024 * 1024, 'got expected output from curl'); is($non_zero, 0, 'read all zeros'); + + require_mods(@zmods, 1); + open $fh, '-|', qw(curl -sS), "$base/psgi-return-gzip" or die; + binmode $fh; + my $buf = do { local $/; <$fh> }; + close $fh or die "curl errored out \$?=$?"; + IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out)); + is($out, "hello world\n"); } { @@ -596,6 +605,22 @@ SKIP: { is_deeply([], [keys %child], 'no extra pipes with -W0'); }; +# ensure compatibility with other PSGI servers +SKIP: { + require_mods(@zmods, qw(Plack::Test HTTP::Request::Common), 3); + use_ok 'HTTP::Request::Common'; + use_ok 'Plack::Test'; + my $app = require $psgi; + test_psgi($app, sub { + my ($cb) = @_; + my $req = GET('http://example.com/psgi-return-gzip'); + my $res = $cb->($req); + my $buf = $res->content; + IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out)); + is($out, "hello world\n"); + }); +} + done_testing(); sub capture { -- cgit v1.2.3-24-ge0c7