about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@yhbt.net>2020-03-21 02:03:44 +0000
committerEric Wong <e@yhbt.net>2020-03-25 01:48:34 +0000
commita71cb67a1237c450a9cbbd6738c5af3b73ba4c61 (patch)
tree514fd2f9f289622615069850828475c09fc8ecc4 /lib
parentdecbb9936a25dfedf6ecd916d8e0403f06217ec9 (diff)
downloadpublic-inbox-a71cb67a1237c450a9cbbd6738c5af3b73ba4c61.tar.gz
We'll be supporting gzipped from sqlite3(1) dumps
for altid files in future commits.

In the future (and if we survive), we may replace
Plack::Middleware::Deflater with our own GzipFilter to work
better with asynchronous responses without relying on
memory-intensive anonymous subs.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/GetlineBody.pm21
-rw-r--r--lib/PublicInbox/GzipFilter.pm54
-rw-r--r--lib/PublicInbox/Qspawn.pm8
3 files changed, 71 insertions, 12 deletions
diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm
index 92719a82..6becaaf5 100644
--- a/lib/PublicInbox/GetlineBody.pm
+++ b/lib/PublicInbox/GetlineBody.pm
@@ -13,13 +13,13 @@ use strict;
 use warnings;
 
 sub new {
-        my ($class, $rpipe, $end, $end_arg, $buf) = @_;
+        my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_;
         bless {
                 rpipe => $rpipe,
                 end => $end,
                 end_arg => $end_arg,
-                buf => $buf,
-                filter => 0,
+                initial_buf => $buf,
+                filter => $filter,
         }, $class;
 }
 
@@ -30,19 +30,18 @@ sub DESTROY { $_[0]->close }
 
 sub getline {
         my ($self) = @_;
-        my $filter = $self->{filter};
-        return if $filter == -1; # last call was EOF
-
-        my $buf = delete $self->{buf}; # initial buffer
-        $buf = $self->{rpipe}->getline unless defined $buf;
-        $self->{filter} = -1 unless defined $buf; # set EOF for next call
+        my $rpipe = $self->{rpipe} or return; # EOF was set on previous call
+        my $buf = delete($self->{initial_buf}) // $rpipe->getline;
+        delete($self->{rpipe}) unless defined $buf; # set EOF for next call
+        if (my $filter = $self->{filter}) {
+                $buf = $filter->translate($buf);
+        }
         $buf;
 }
 
 sub close {
         my ($self) = @_;
-        my ($rpipe, $end, $end_arg) = delete @$self{qw(rpipe end end_arg)};
-        close $rpipe if $rpipe;
+        my ($end, $end_arg) = delete @$self{qw(end end_arg)};
         $end->($end_arg) if $end;
 }
 
diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm
new file mode 100644
index 00000000..d883130f
--- /dev/null
+++ b/lib/PublicInbox/GzipFilter.pm
@@ -0,0 +1,54 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Qspawn filter
+package PublicInbox::GzipFilter;
+use strict;
+use bytes (); # length
+use Compress::Raw::Zlib qw(Z_FINISH Z_OK);
+my %OPT = (-WindowBits => 15 + 16, -AppendOutput => 1);
+
+sub new {
+        my ($gz, $err) = Compress::Raw::Zlib::Deflate->new(%OPT);
+        $err == Z_OK or die "Deflate->new failed: $err";
+        bless { gz => $gz }, shift;
+}
+
+# for Qspawn if using $env->{'pi-httpd.async'}
+sub attach {
+        my ($self, $fh) = @_;
+        $self->{fh} = $fh;
+        $self
+}
+
+# for GetlineBody (via Qspawn) when NOT using $env->{'pi-httpd.async'}
+sub translate ($$) {
+        my $self = $_[0];
+        my $zbuf = delete($self->{zbuf});
+        if (defined $_[1]) { # my $buf = $_[1];
+                my $err = $self->{gz}->deflate($_[1], $zbuf);
+                die "gzip->deflate: $err" if $err != Z_OK;
+                return $zbuf if length($zbuf) >= 8192;
+
+                $self->{zbuf} = $zbuf;
+                '';
+        } else { # undef == EOF
+                my $err = $self->{gz}->flush($zbuf, Z_FINISH);
+                die "gzip->flush: $err" if $err != Z_OK;
+                $zbuf;
+        }
+}
+
+sub write {
+        # my $ret = bytes::length($_[1]); # XXX does anybody care?
+        $_[0]->{fh}->write(translate($_[0], $_[1]));
+}
+
+sub close {
+        my ($self) = @_;
+        my $fh = delete $self->{fh};
+        $fh->write(translate($self, undef));
+        $fh->close;
+}
+
+1;
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 63ec3648..52aea3eb 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -243,6 +243,7 @@ sub psgi_return_init_cb {
         my ($self) = @_;
         my $r = rd_hdr($self) or return;
         my $env = $self->{psgi_env};
+        my $filter = delete $env->{'qspawn.filter'};
         my $wcb = delete $env->{'qspawn.wcb'};
         my $async = delete $self->{async};
         if (scalar(@$r) == 3) { # error
@@ -257,6 +258,7 @@ sub psgi_return_init_cb {
         } elsif ($async) {
                 # done reading headers, handoff to read body
                 my $fh = $wcb->($r); # scalar @$r == 2
+                $fh = $filter->attach($fh) if $filter;
                 $self->{fh} = $fh;
                 $async->async_pass($env->{'psgix.io'}, $fh,
                                         delete($self->{hdr_buf}));
@@ -264,7 +266,7 @@ sub psgi_return_init_cb {
                 require PublicInbox::GetlineBody;
                 $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe},
                                         \&event_step, $self,
-                                        ${$self->{hdr_buf}});
+                                        ${$self->{hdr_buf}}, $filter);
                 $wcb->($r);
         }
 
@@ -294,6 +296,10 @@ sub psgi_return_start { # may run later, much later...
 #                          psgi_return will return an anonymous
 #                          sub for the PSGI server to call
 #
+#   $env->{'qspawn.filter'} - filter object, responds to ->attach for
+#                             pi-httpd.async and ->translate for generic
+#                             PSGI servers
+#
 # $limiter - the Limiter object to use (uses the def_limiter if not given)
 #
 # $parse_hdr - Initial read function; often for parsing CGI header output.