user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 09/26] qspawn: introduce new psgi_yield API
  2023-10-25  0:29  7% [PATCH 00/26] process management simplifications Eric Wong
@ 2023-10-25  0:29  3% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2023-10-25  0:29 UTC (permalink / raw)
  To: meta

This is intended to replace psgi_return and HTTPD/Async
entirely, hopefully making our code less convoluted while
maintaining the ability to handle slow clients on
memory-constrained systems

This was made possible by the philosophy shift in commit 21a539a2df0c
(httpd/async: switch to buffering-as-fast-as-possible, 2019-06-28).

We'll still support generic PSGI via the `pull' model with a
GetlineResponse class which is similar to the old GetlineBody.
---
 MANIFEST                           |   1 +
 lib/PublicInbox/GetlineResponse.pm |  40 ++++++++++
 lib/PublicInbox/GitHTTPBackend.pm  |   4 +-
 lib/PublicInbox/GzipFilter.pm      |   3 +-
 lib/PublicInbox/HTTP.pm            |   8 +-
 lib/PublicInbox/InputPipe.pm       |  12 +--
 lib/PublicInbox/LEI.pm             |   2 +-
 lib/PublicInbox/Qspawn.pm          | 119 ++++++++++++++++++++++++++++-
 8 files changed, 176 insertions(+), 13 deletions(-)
 create mode 100644 lib/PublicInbox/GetlineResponse.pm

diff --git a/MANIFEST b/MANIFEST
index f087621c..420b40a1 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -204,6 +204,7 @@ lib/PublicInbox/Filter/Vger.pm
 lib/PublicInbox/Gcf2.pm
 lib/PublicInbox/Gcf2Client.pm
 lib/PublicInbox/GetlineBody.pm
+lib/PublicInbox/GetlineResponse.pm
 lib/PublicInbox/Git.pm
 lib/PublicInbox/GitAsyncCat.pm
 lib/PublicInbox/GitCredential.pm
diff --git a/lib/PublicInbox/GetlineResponse.pm b/lib/PublicInbox/GetlineResponse.pm
new file mode 100644
index 00000000..290cce74
--- /dev/null
+++ b/lib/PublicInbox/GetlineResponse.pm
@@ -0,0 +1,40 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# For generic PSGI servers (not public-inbox-httpd/netd) which assumes their
+# getline response bodies can be backpressure-aware for slow clients
+# This depends on rpipe being _blocking_ on getline.
+package PublicInbox::GetlineResponse;
+use v5.12;
+
+sub response {
+	my ($qsp) = @_;
+	my ($res, $rbuf);
+	do { # read header synchronously
+		sysread($qsp->{rpipe}, $rbuf, 65536);
+		$res = $qsp->parse_hdr_done($rbuf); # fills $bref
+	} until defined($res);
+	my ($wcb, $filter) = $qsp->yield_pass(undef, $res) or return;
+	my $self = $res->[2] = bless {
+		qsp => $qsp,
+		filter => $filter,
+	}, __PACKAGE__;
+	my ($bref) = @{delete $qsp->{yield_parse_hdr}};
+	$self->{rbuf} = $$bref if $$bref ne '';
+	$wcb->($res);
+}
+
+sub getline {
+	my ($self) = @_;
+	my $rpipe = $self->{qsp}->{rpipe} // do {
+		delete($self->{qsp})->finish;
+		return; # EOF was set on previous call
+	};
+	my $buf = delete($self->{rbuf}) // $rpipe->getline;
+	$buf // delete($self->{qsp}->{rpipe}); # set EOF for next call
+	$self->{filter} ? $self->{filter}->translate($buf) : $buf;
+}
+
+sub close {}
+
+1;
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index edbc0157..d7e0bced 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -79,7 +79,7 @@ sub serve_dumb {
 	PublicInbox::WwwStatic::response($env, $h, $path, $type);
 }
 
-sub git_parse_hdr { # {parse_hdr} for Qspawn
+sub ghb_parse_hdr { # header parser for Qspawn
 	my ($r, $bref, @dumb_args) = @_;
 	my $res = parse_cgi_headers($r, $bref) or return; # incomplete
 	$res->[0] == 403 ? serve_dumb(@dumb_args) : $res;
@@ -106,7 +106,7 @@ sub serve_smart {
 	$env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
 	my $rdr = input_prepare($env) or return r(500);
 	my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
-	$qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path);
+	$qsp->psgi_yield($env, $limiter, \&ghb_parse_hdr, $env, $git, $path);
 }
 
 sub input_prepare {
diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm
index db8e8397..d6ecd5ba 100644
--- a/lib/PublicInbox/GzipFilter.pm
+++ b/lib/PublicInbox/GzipFilter.pm
@@ -123,9 +123,10 @@ sub http_out ($) {
 	};
 }
 
+# returns undef if HTTP client disconnected, may return 0
+# because ->translate can return ''
 sub write {
 	my $self = shift;
-	# my $ret = bytes::length($_[1]); # XXX does anybody care?
 	http_out($self)->write($self->translate(@_));
 }
 
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index ca162939..edc88fe8 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -455,11 +455,12 @@ sub next_step {
 # They may be exposed to the PSGI application when the PSGI app
 # returns a CODE ref for "push"-based responses
 package PublicInbox::HTTP::Chunked;
-use strict;
+use v5.12;
 
 sub write {
 	# ([$http], $buf) = @_;
-	PublicInbox::HTTP::chunked_write($_[0]->[0], $_[1])
+	PublicInbox::HTTP::chunked_write($_[0]->[0], $_[1]);
+	$_[0]->[0]->{sock} ? length($_[1]) : undef;
 }
 
 sub close {
@@ -468,12 +469,13 @@ sub close {
 }
 
 package PublicInbox::HTTP::Identity;
-use strict;
+use v5.12;
 our @ISA = qw(PublicInbox::HTTP::Chunked);
 
 sub write {
 	# ([$http], $buf) = @_;
 	PublicInbox::HTTP::identity_write($_[0]->[0], $_[1]);
+	$_[0]->[0]->{sock} ? length($_[1]) : undef;
 }
 
 1;
diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm
index b38d8270..f4d57e7d 100644
--- a/lib/PublicInbox/InputPipe.pm
+++ b/lib/PublicInbox/InputPipe.pm
@@ -39,14 +39,16 @@ sub consume {
 	if ($@) { # regular file (but not w/ select|IO::Poll backends)
 		$self->{-need_rq} = 1;
 		$self->requeue;
-	} elsif (-p $in || -S _) { # O_NONBLOCK for sockets and pipes
+	} elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF
+	} elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes
 		$in->blocking(0);
 	} elsif (-t $in) { # isatty(3) can't use `_' stat cache
 		unblock_tty($self);
 	}
+	$self;
 }
 
-sub close {
+sub close { # idempotent
 	my ($self) = @_;
 	if (my $t = delete($self->{restore_termios})) {
 		my $fd = fileno($self->{sock} // return);
@@ -60,16 +62,16 @@ sub event_step {
 	my $r = sysread($self->{sock} // return, my $rbuf, 65536);
 	eval {
 		if ($r) {
-			$self->{cb}->(@{$self->{args}}, $rbuf);
+			$self->{cb}->($self, @{$self->{args}}, $rbuf);
 			$self->requeue if $self->{-need_rq};
 		} elsif (defined($r)) { # EOF
-			$self->{cb}->(@{$self->{args}}, '');
+			$self->{cb}->($self, @{$self->{args}}, '');
 			$self->close
 		} elsif ($!{EAGAIN}) { # rely on EPOLLIN
 		} elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty
 			$self->requeue if $self->{-need_rq};
 		} else { # another error
-			$self->{cb}->(@{$self->{args}}, undef);
+			$self->{cb}->($self, @{$self->{args}}, undef);
 			$self->close;
 		}
 	};
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 56e4c001..7bc7b2dc 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1567,7 +1567,7 @@ sub request_umask {
 }
 
 sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin
-	my ($lei, $cb) = @_; # $_[-1] = $rbuf
+	my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf
 	$_[1] // return $lei->fail("error reading stdin: $!");
 	$lei->{stdin_buf} .= $_[-1];
 	do_env($lei, $cb) if $_[-1] eq '';
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 9a7e8734..203d8f41 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -31,6 +31,9 @@ use PublicInbox::GzipFilter;
 use Scalar::Util qw(blessed);
 use PublicInbox::Limiter;
 use PublicInbox::Aspawn qw(run_await);
+use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::InputPipe;
+use Carp qw(carp confess);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -61,7 +64,7 @@ sub _do_spawn {
 	if ($start_cb) {
 		eval { # popen_rd may die on EMFILE, ENFILE
 			$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
-						\&waitpid_err, $self);
+						\&waitpid_err, $self, \%o);
 			$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
 		};
 	} else {
@@ -126,6 +129,20 @@ sub wait_await { # run_await cb
 	waitpid_err($pid, $self, $opt);
 }
 
+sub yield_chunk { # $_[-1] is sysread buffer (or undef)
+	my ($self, $ipipe) = @_;
+	if (!defined($_[-1])) {
+		warn "error reading body: $!";
+	} elsif ($_[-1] eq '') { # normal EOF
+		$self->finish;
+		$self->{qfh}->close;
+	} elsif (defined($self->{qfh}->write($_[-1]))) {
+		return; # continue while HTTP client is reading our writes
+	} # else { # HTTP client disconnected
+	delete $self->{rpipe};
+	$ipipe->close;
+}
+
 sub finish ($;$) {
 	my ($self, $err) = @_;
 	$self->{_err} //= $err; # only for $@
@@ -201,6 +218,39 @@ EOM
 	$ret;
 }
 
+sub yield_pass {
+	my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
+	my $env = $self->{psgi_env};
+	my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
+	if (ref($res) eq 'CODE') { # chain another command
+		delete $self->{rpipe};
+		$ipipe->close if $ipipe;
+		$res->($wcb);
+		$self->{passed} = 1;
+		return; # all done
+	}
+	confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';
+
+	my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
+			pop(@$res) : delete($env->{'qspawn.filter'});
+	$filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);
+
+	if (scalar(@$res) == 3) { # done early (likely error or static file)
+		delete $self->{rpipe};
+		$ipipe->close if $ipipe;
+		$wcb->($res); # all done
+		return;
+	}
+	scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
+	return ($wcb, $filter) if !$ipipe; # generic PSGI
+	# streaming response
+	my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
+	$qfh = $filter->attach($qfh) if $filter;
+	my ($bref) = @{delete $self->{yield_parse_hdr}};
+	$qfh->write($$bref) if $$bref ne '';
+	$self->{qfh} = $qfh; # keep $ipipe open
+}
+
 sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
 	my ($self) = @_;
 	my $r = rd_hdr($self) or return; # incomplete
@@ -257,6 +307,55 @@ sub psgi_return_start { # may run later, much later...
 	}
 }
 
+sub r500 () { [ 500, [], [ "Internal error\n" ] ] }
+
+sub parse_hdr_done ($$) {
+	my ($self) = @_;
+	my $ret;
+	if (defined $_[-1]) {
+		my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
+		$$bref .= $_[-1];
+		$ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
+		if ($@) {
+			carp "parse_hdr (@{$self->{cmd}}): $@\n";
+			$ret = r500();
+		} elsif (!$ret && $_[-1] eq '') {
+			carp <<EOM;
+EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+			$ret = r500();
+		}
+	} else {
+		carp <<EOM;
+E: parsing headers: $! from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+		$ret = r500();
+	}
+	$ret; # undef if headers incomplete
+}
+
+sub ipipe_cb { # InputPipe callback
+	my ($ipipe, $self) = @_; # $_[-1] rbuf
+	if ($self->{qfh}) { # already streaming
+		yield_chunk($self, $ipipe, $_[-1]);
+	} elsif (my $res = parse_hdr_done($self, $_[-1])) {
+		yield_pass($self, $ipipe, $res);
+	} # else: headers incomplete, keep reading
+}
+
+sub _yield_start { # may run later, much later...
+	my ($self) = @_;
+	if ($self->{psgi_env}->{'pi-httpd.async'}) {
+		require PublicInbox::ProcessIONBF;
+		my $rpipe = $self->{rpipe};
+		PublicInbox::ProcessIONBF->replace($rpipe);
+		PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self);
+	} else {
+		require PublicInbox::GetlineResponse;
+		PublicInbox::GetlineResponse::response($self);
+	}
+}
+
 # Used for streaming the stdout of one process as a PSGI response.
 #
 # $env is the PSGI env.
@@ -302,4 +401,22 @@ sub psgi_return {
 	}
 }
 
+sub psgi_yield {
+	my ($self, $env, $limiter, @parse_hdr_arg)= @_;
+	$self->{psgi_env} = $env;
+	$self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
+	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
+
+	# the caller already captured the PSGI write callback from
+	# the PSGI server, so we can call ->start, here:
+	$env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
+		# the caller will return this sub to the PSGI server, so
+		# it can set the response callback (that is, for
+		# PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+		# but other HTTP servers are supported:
+		$env->{'qspawn.wcb'} = $_[0];
+		start($self, $limiter, \&_yield_start);
+	}
+}
+
 1;

^ permalink raw reply related	[relevance 3%]

* [PATCH 00/26] process management simplifications
@ 2023-10-25  0:29  7% Eric Wong
  2023-10-25  0:29  3% ` [PATCH 09/26] qspawn: introduce new psgi_yield API Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2023-10-25  0:29 UTC (permalink / raw)
  To: meta

The convoluted HTTPD/Async code and is now gone and replaced
with the simpler InputPipe (originally developed for lei).
Fortunately, the s/psgi_return/psgi_yield/ change went more
smoothly than I expected.

cindex gets some simplifications, too; which will be helpful
since more work is required on that code.  I didn't get a
chance to use the split out Limiter+Qspawn, though...

3/26 is a fairly large philosophy change to use temporary files
over pipes, but it's probably fine as it's simpler and more
straightforward data flow and only used for small outputs which
can fit in memory, so unlikely to hit disk.

9/26 is the major change which was enabled by a change made
4 years ago.


Eric Wong (26):
  limiter: split out from qspawn
  spawn: support synchronous run_qx
  psgi_qx: use a temporary file rather than pipe
  www_coderepo: capture uses a flattened list
  qspawn: psgi_return allows list for callback args
  qspawn: drop unused err arg for ->event_step
  httpd/async: require IO arg
  xt/check-run: call DS->Reset after all tests
  qspawn: introduce new psgi_yield API
  repo_atom: switch to psgi_yield
  repo_snapshot: psgi_yield
  viewvcs: psgi_yield
  www_altid: switch to psgi_yield
  cgit: switch to psgi_yield
  www_coderepo: psgi_yield
  drop psgi_return, httpd/async and GetlineBody
  qspawn: use WwwStatic for fallbacks and error code
  qspawn: simplify internal argument passing
  cidx_log_p: don't bother with F_SETPIPE_SZ
  cindex: avoid awaitpid for popen
  cindex: use timer for inits
  cindex: start using run_await to simplify code
  cindex: use run_await to read extensions.objectFormat
  cindex: drop XH_PID global
  cindex: use run_await wrapper for git commands
  cindex: use sysread for generating fingerprint

 MANIFEST                           |   5 +-
 lib/PublicInbox/Aspawn.pm          |  34 +++
 lib/PublicInbox/Cgit.pm            |   2 +-
 lib/PublicInbox/CidxLogP.pm        |   3 +-
 lib/PublicInbox/CodeSearchIdx.pm   | 236 +++++++++----------
 lib/PublicInbox/Config.pm          |   4 +-
 lib/PublicInbox/GetlineBody.pm     |  46 ----
 lib/PublicInbox/GetlineResponse.pm |  40 ++++
 lib/PublicInbox/Git.pm             |   6 +
 lib/PublicInbox/GitHTTPBackend.pm  |  17 +-
 lib/PublicInbox/GzipFilter.pm      |   5 +-
 lib/PublicInbox/HTTP.pm            |   8 +-
 lib/PublicInbox/HTTPD.pm           |   5 +-
 lib/PublicInbox/HTTPD/Async.pm     | 109 ---------
 lib/PublicInbox/Inbox.pm           |   4 +-
 lib/PublicInbox/InputPipe.pm       |  12 +-
 lib/PublicInbox/LEI.pm             |   2 +-
 lib/PublicInbox/Limiter.pm         |  47 ++++
 lib/PublicInbox/MailDiff.pm        |   8 +-
 lib/PublicInbox/Qspawn.pm          | 349 +++++++++++------------------
 lib/PublicInbox/RepoAtom.pm        |   6 +-
 lib/PublicInbox/RepoSnapshot.pm    |   2 +-
 lib/PublicInbox/SearchIdx.pm       |  19 +-
 lib/PublicInbox/SolverGit.pm       |  10 +-
 lib/PublicInbox/Spawn.pm           |  69 ++++--
 lib/PublicInbox/ViewVCS.pm         |   7 +-
 lib/PublicInbox/WwwAltId.pm        |   6 +-
 lib/PublicInbox/WwwCoderepo.pm     |  12 +-
 t/dir_idle.t                       |   1 -
 t/fake_inotify.t                   |   2 -
 t/httpd-corner.psgi                |  14 +-
 t/httpd-corner.t                   |  12 +-
 t/qspawn.t                         |   3 +-
 t/spawn.t                          |  13 +-
 xt/check-run.t                     |   2 +
 35 files changed, 515 insertions(+), 605 deletions(-)
 create mode 100644 lib/PublicInbox/Aspawn.pm
 delete mode 100644 lib/PublicInbox/GetlineBody.pm
 create mode 100644 lib/PublicInbox/GetlineResponse.pm
 delete mode 100644 lib/PublicInbox/HTTPD/Async.pm
 create mode 100644 lib/PublicInbox/Limiter.pm

^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2023-10-25  0:29  7% [PATCH 00/26] process management simplifications Eric Wong
2023-10-25  0:29  3% ` [PATCH 09/26] qspawn: introduce new psgi_yield API Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).