user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 03/14] qspawn|getlinebody: support streaming filters
Date: Sun, 27 Jan 2019 04:03:30 +0000	[thread overview]
Message-ID: <20190127040341.26107-4-e@80x24.org> (raw)
In-Reply-To: <20190127040341.26107-1-e@80x24.org>

This is intended for wrapping "git show" and "git diff"
processes in the future and to prevent it from monopolizing
callers.

This will us to better handle backpressure from gigantic
commits.
---
 lib/PublicInbox/GetlineBody.pm | 16 +++++++++++++---
 lib/PublicInbox/Qspawn.pm      | 21 +++++++++++++++++++--
 2 files changed, 32 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm
index ea07f3d..0a922fd 100644
--- a/lib/PublicInbox/GetlineBody.pm
+++ b/lib/PublicInbox/GetlineBody.pm
@@ -13,8 +13,13 @@ use strict;
 use warnings;
 
 sub new {
-	my ($class, $rpipe, $end, $buf) = @_;
-	bless { rpipe => $rpipe, end => $end, buf => $buf }, $class;
+	my ($class, $rpipe, $end, $buf, $filter) = @_;
+	bless {
+		rpipe => $rpipe,
+		end => $end,
+		buf => $buf,
+		filter => $filter || 0,
+	}, $class;
 }
 
 # close should always be called after getline returns undef,
@@ -24,8 +29,13 @@ sub DESTROY { $_[0]->close }
 
 sub getline {
 	my ($self) = @_;
+	my $filter = $self->{filter};
+	return if $filter == -1; # last call was EOF
+
 	my $buf = delete $self->{buf}; # initial buffer
-	defined $buf ? $buf : $self->{rpipe}->getline;
+	$buf = $self->{rpipe}->getline unless defined $buf;
+	$self->{filter} = -1 unless defined $buf; # set EOF for next call
+	$filter ? $filter->($buf) : $buf;
 }
 
 sub close {
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index b80dac1..3247cd0 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -9,6 +9,7 @@ package PublicInbox::Qspawn;
 use strict;
 use warnings;
 use PublicInbox::Spawn qw(popen_rd);
+require Plack::Util;
 my $def_limiter;
 
 sub new ($$$;) {
@@ -60,11 +61,25 @@ sub start {
 	}
 }
 
+# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
+sub filter_fh ($$) {
+	my ($fh, $filter) = @_;
+	Plack::Util::inline_object(
+		close => sub {
+			$fh->write($filter->(undef));
+			$fh->close;
+		},
+		write => sub {
+			$fh->write($filter->($_[0]));
+		});
+}
+
 sub psgi_return {
 	my ($self, $env, $limiter, $parse_hdr) = @_;
 	my ($fh, $rpipe);
 	my $end = sub {
-		if (my $err = $self->finish) {
+		my $err = $self->finish;
+		if ($err && !$env->{'qspawn.quiet'}) {
 			$err = join(' ', @{$self->{args}->[0]}).": $err\n";
 			$env->{'psgi.errors'}->print($err);
 		}
@@ -84,6 +99,7 @@ sub psgi_return {
 	my $cb = sub {
 		my $r = $rd_hdr->() or return;
 		$rd_hdr = undef;
+		my $filter = delete $env->{'qspawn.filter'};
 		if (scalar(@$r) == 3) { # error
 			if ($async) {
 				$async->close; # calls rpipe->close
@@ -94,11 +110,12 @@ sub psgi_return {
 			$res->($r);
 		} elsif ($async) {
 			$fh = $res->($r); # scalar @$r == 2
+			$fh = filter_fh($fh, $filter) if $filter;
 			$async->async_pass($env->{'psgix.io'}, $fh, \$buf);
 		} else { # for synchronous PSGI servers
 			require PublicInbox::GetlineBody;
 			$r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
-								$buf);
+								$buf, $filter);
 			$res->($r);
 		}
 	};
-- 
EW


  parent reply	other threads:[~2019-01-27  4:03 UTC|newest]

Thread overview: 15+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-01-27  4:03 [PATCH 00/14] convert solver to use pi-httpd.async Eric Wong
2019-01-27  4:03 ` [PATCH 01/14] httpd/async: remove needless sysread wrapper Eric Wong
2019-01-27  4:03 ` [PATCH 02/14] qspawn: implement psgi_return and use it for githttpbackend Eric Wong
2019-01-27  4:03 ` Eric Wong [this message]
2019-01-27  4:03 ` [PATCH 04/14] qspawn|httpd/async: improve and fix out-of-date comments Eric Wong
2019-01-27  4:03 ` [PATCH 05/14] httpd/async: stop running command if client disconnects Eric Wong
2019-01-27  4:03 ` [PATCH 06/14] qspawn: implement psgi_qx Eric Wong
2019-01-27  4:03 ` [PATCH 07/14] t/qspawn.t: psgi_qx stderr test Eric Wong
2019-01-27  4:03 ` [PATCH 08/14] view: swap CRLF for LF in HTML output Eric Wong
2019-01-27  4:03 ` [PATCH 09/14] solver: rewrite to use Qspawn->psgi_qx and pi-httpd.async Eric Wong
2019-01-27  4:03 ` [PATCH 10/14] solver: hold patches in temporary directory Eric Wong
2019-01-27  4:03 ` [PATCH 11/14] solver: reduce "git apply" invocations Eric Wong
2019-01-27  4:03 ` [PATCH 12/14] qspawn: decode $? for user-friendliness Eric Wong
2019-01-27  4:03 ` [PATCH 13/14] viewvcs: do not show final error message twice Eric Wong
2019-01-27  4:03 ` [PATCH 14/14] solver: crank up max patches to 9999 Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: http://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20190127040341.26107-4-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).