about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2017-01-11 04:12:26 +0000
committerEric Wong <e@80x24.org>2019-01-22 03:38:39 +0000
commitb490ce09b370d9398d5332ca1dc6260a7ec0aa6c (patch)
tree4cd4620d559aecaec8ff425e9d7ac3fe5da55a23 /lib
parentda9beb99af585718c36725f3457b1b72347bcebf (diff)
downloadpublic-inbox-b490ce09b370d9398d5332ca1dc6260a7ec0aa6c.tar.gz
This is intended for wrapping "git show" and "git diff"
processes in the future and to prevent it from monopolizing
callers.

This will us to better handle backpressure from gigantic
commits.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/GetlineBody.pm16
-rw-r--r--lib/PublicInbox/Qspawn.pm21
2 files changed, 32 insertions, 5 deletions
diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm
index ea07f3d6..0a922fd2 100644
--- a/lib/PublicInbox/GetlineBody.pm
+++ b/lib/PublicInbox/GetlineBody.pm
@@ -13,8 +13,13 @@ use strict;
 use warnings;
 
 sub new {
-        my ($class, $rpipe, $end, $buf) = @_;
-        bless { rpipe => $rpipe, end => $end, buf => $buf }, $class;
+        my ($class, $rpipe, $end, $buf, $filter) = @_;
+        bless {
+                rpipe => $rpipe,
+                end => $end,
+                buf => $buf,
+                filter => $filter || 0,
+        }, $class;
 }
 
 # close should always be called after getline returns undef,
@@ -24,8 +29,13 @@ sub DESTROY { $_[0]->close }
 
 sub getline {
         my ($self) = @_;
+        my $filter = $self->{filter};
+        return if $filter == -1; # last call was EOF
+
         my $buf = delete $self->{buf}; # initial buffer
-        defined $buf ? $buf : $self->{rpipe}->getline;
+        $buf = $self->{rpipe}->getline unless defined $buf;
+        $self->{filter} = -1 unless defined $buf; # set EOF for next call
+        $filter ? $filter->($buf) : $buf;
 }
 
 sub close {
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index b80dac1f..3247cd07 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -9,6 +9,7 @@ package PublicInbox::Qspawn;
 use strict;
 use warnings;
 use PublicInbox::Spawn qw(popen_rd);
+require Plack::Util;
 my $def_limiter;
 
 sub new ($$$;) {
@@ -60,11 +61,25 @@ sub start {
         }
 }
 
+# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
+sub filter_fh ($$) {
+        my ($fh, $filter) = @_;
+        Plack::Util::inline_object(
+                close => sub {
+                        $fh->write($filter->(undef));
+                        $fh->close;
+                },
+                write => sub {
+                        $fh->write($filter->($_[0]));
+                });
+}
+
 sub psgi_return {
         my ($self, $env, $limiter, $parse_hdr) = @_;
         my ($fh, $rpipe);
         my $end = sub {
-                if (my $err = $self->finish) {
+                my $err = $self->finish;
+                if ($err && !$env->{'qspawn.quiet'}) {
                         $err = join(' ', @{$self->{args}->[0]}).": $err\n";
                         $env->{'psgi.errors'}->print($err);
                 }
@@ -84,6 +99,7 @@ sub psgi_return {
         my $cb = sub {
                 my $r = $rd_hdr->() or return;
                 $rd_hdr = undef;
+                my $filter = delete $env->{'qspawn.filter'};
                 if (scalar(@$r) == 3) { # error
                         if ($async) {
                                 $async->close; # calls rpipe->close
@@ -94,11 +110,12 @@ sub psgi_return {
                         $res->($r);
                 } elsif ($async) {
                         $fh = $res->($r); # scalar @$r == 2
+                        $fh = filter_fh($fh, $filter) if $filter;
                         $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
                 } else { # for synchronous PSGI servers
                         require PublicInbox::GetlineBody;
                         $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
-                                                                $buf);
+                                                                $buf, $filter);
                         $res->($r);
                 }
         };