user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
* [PATCH 0/2] limiter logic deduplication
@ 2025-03-21 22:22 Eric Wong
  2025-03-21 22:22 ` [PATCH 1/2] qspawn: rename {psgi_env} => {env} Eric Wong
  2025-03-21 22:22 ` [PATCH 2/2] limiter: refactor to reduce code duplication Eric Wong
  0 siblings, 2 replies; 4+ messages in thread
From: Eric Wong @ 2025-03-21 22:22 UTC (permalink / raw)
  To: meta

Hopefully this makes things easier to understand...

Eric Wong (2):
  qspawn: rename {psgi_env} => {env}
  limiter: refactor to reduce code duplication

 lib/PublicInbox/Limiter.pm      | 36 +++++++++++++++++--
 lib/PublicInbox/PlackLimiter.pm | 48 ++++++++++---------------
 lib/PublicInbox/Qspawn.pm       | 62 +++++++++++++--------------------
 lib/PublicInbox/ViewVCS.pm      | 45 +++++-------------------
 t/qspawn.t                      |  6 +++-
 5 files changed, 90 insertions(+), 107 deletions(-)

^ permalink raw reply	[flat|nested] 4+ messages in thread

* [PATCH 1/2] qspawn: rename {psgi_env} => {env}
  2025-03-21 22:22 [PATCH 0/2] limiter logic deduplication Eric Wong
@ 2025-03-21 22:22 ` Eric Wong
  2025-03-21 22:22 ` [PATCH 2/2] limiter: refactor to reduce code duplication Eric Wong
  1 sibling, 0 replies; 4+ messages in thread
From: Eric Wong @ 2025-03-21 22:22 UTC (permalink / raw)
  To: meta

There's no need to make a distinction here, and
it will help us make the limiter code more flexible
and reusable in next commits.
---
 lib/PublicInbox/Qspawn.pm | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 34e9eff6..b882ea53 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -88,7 +88,7 @@ sub _finalize ($) {
 		warn "E: @{$self->{args}->[0]}: $err\n" if !$self->{-quiet};
 	}
 
-	my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
+	my ($env, $qx_cb_arg) = delete @$self{qw(env qx_cb_arg)};
 	if ($qx_cb_arg) {
 		my $cb = shift @$qx_cb_arg;
 		eval { $cb->($self->{args}->[2]->{1}, @$qx_cb_arg) };
@@ -161,8 +161,8 @@ sub start ($$$) {
 	if ($limiter->{running} < $limiter->{max}) {
 		_do_spawn($self, $start_cb, $limiter);
 	} elsif ($limiter->is_too_busy) {
-		$self->{psgi_env}->{'qspawn.fallback'} //= 503 if
-			$self->{psgi_env};
+		$self->{env}->{'qspawn.fallback'} //= 503 if
+			$self->{env};
 		_finalize $self;
 	} else {
 		push @{$limiter->{run_queue}}, [ $self, $start_cb ];
@@ -175,7 +175,7 @@ sub start ($$$) {
 # and safe to slurp.
 sub psgi_qx {
 	my ($self, $env, $limiter, @qx_cb_arg) = @_;
-	$self->{psgi_env} = $env;
+	$self->{env} = $env;
 	$self->{qx_cb_arg} = \@qx_cb_arg;
 	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new;
 	start($self, $limiter, undef);
@@ -183,7 +183,7 @@ sub psgi_qx {
 
 sub yield_pass {
 	my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
-	my $env = $self->{psgi_env};
+	my $env = $self->{env};
 	my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
 	if (ref($res) eq 'CODE') { # chain another command
 		delete $self->{rpipe};
@@ -232,7 +232,7 @@ sub parse_hdr_done ($$) {
 		$ret = psgi_status_err();
 	}
 	carp <<EOM if $err;
-E: $err @{$self->{args}->[0]} ($self->{psgi_env}->{REQUEST_URI})
+E: $err @{$self->{args}->[0]} ($self->{env}->{REQUEST_URI})
 EOM
 	$ret; # undef if headers incomplete
 }
@@ -248,7 +248,7 @@ sub ipipe_cb { # InputPipe callback
 
 sub _yield_start { # may run later, much later...
 	my ($self) = @_;
-	if ($self->{psgi_env}->{'pi-httpd.async'}) {
+	if ($self->{env}->{'pi-httpd.async'}) {
 		my $rpipe = $self->{rpipe};
 		PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self);
 	} else {
@@ -283,7 +283,7 @@ sub _yield_start { # may run later, much later...
 
 sub psgi_yield {
 	my ($self, $env, $limiter, @parse_hdr_arg)= @_;
-	$self->{psgi_env} = $env;
+	$self->{env} = $env;
 	$self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
 	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new;
 

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 2/2] limiter: refactor to reduce code duplication
  2025-03-21 22:22 [PATCH 0/2] limiter logic deduplication Eric Wong
  2025-03-21 22:22 ` [PATCH 1/2] qspawn: rename {psgi_env} => {env} Eric Wong
@ 2025-03-21 22:22 ` Eric Wong
  2025-03-22 10:18   ` [SQUASH 3/2] plack_limiter: fix error response Eric Wong
  1 sibling, 1 reply; 4+ messages in thread
From: Eric Wong @ 2025-03-21 22:22 UTC (permalink / raw)
  To: meta

PlackLimiter, Qspawn, and ViewVCS all have roughly the same
code around our base Limiter package, so put everything around
a new Limiter->may_start subroutine.  PlackLimiter loses some
stats as a result but that's logged anyways and I doubt the
customizable error message was worth the effort.

We now have ckhup and 499 (client disconnect) handling for all
PSGI uses of Limiter, as well.

t/qspawn.t changes were required since the original ->finalize
logic now relies on on_destroy; but none of the existing PSGI
code using Qspawn required changes.
---
 lib/PublicInbox/Limiter.pm      | 36 ++++++++++++++++++++++--
 lib/PublicInbox/PlackLimiter.pm | 48 +++++++++++++------------------
 lib/PublicInbox/Qspawn.pm       | 50 ++++++++++++---------------------
 lib/PublicInbox/ViewVCS.pm      | 45 ++++++-----------------------
 t/qspawn.t                      |  6 +++-
 5 files changed, 84 insertions(+), 101 deletions(-)

diff --git a/lib/PublicInbox/Limiter.pm b/lib/PublicInbox/Limiter.pm
index fc62d0d4..f90c8b56 100644
--- a/lib/PublicInbox/Limiter.pm
+++ b/lib/PublicInbox/Limiter.pm
@@ -4,6 +4,7 @@
 package PublicInbox::Limiter;
 use v5.12;
 use PublicInbox::Spawn;
+use PublicInbox::OnDestroy;
 
 sub new {
 	my ($class, $max) = @_;
@@ -61,9 +62,40 @@ EOM
 	}
 }
 
-sub is_too_busy {
+sub _do_start ($$$$) {
+	my ($self, $start_cb, $ctx, $fail_cb) = @_;
+	$ctx->{"limiter.next.$self"} = on_destroy \&_start_next, $self;
+	++$self->{running};
+	eval { $start_cb->($ctx, $self) };
+	if ($@) {
+		print { $ctx->{env}->{'psgi.errors'} } "E: $@\n";
+		$fail_cb->($ctx, 500, 'internal error');
+	}
+}
+
+sub _start_next { # on_destroy cb
 	my ($self) = @_;
-	scalar(@{$self->{run_queue}}) > ($self->{depth} // 32)
+	--$self->{running};
+	my ($rec, $ck, $start_cb, $ctx, $fail_cb);
+	while (1) {
+		$rec = shift @{$self->{run_queue}} or return;
+		($start_cb, $ctx, $fail_cb) = @$rec;
+		$ck = $ctx->{env}->{'pi-httpd.ckhup'} or last;
+		$ck->($ctx->{env}->{'psgix.io'}->{sock}) or last;
+		$fail_cb->($ctx, 499, 'client disconnected');
+	}
+	_do_start $self, $start_cb, $ctx, $fail_cb;
+}
+
+sub may_start {
+	my ($self, $start_cb, $ctx, $fail_cb) = @_;
+	if ($self->{running} < $self->{max}) {
+		_do_start $self, $start_cb, $ctx, $fail_cb;
+	} elsif (@{$self->{run_queue}} > ($self->{depth} // 32)) {
+		$fail_cb->($ctx, 503, 'too busy');
+	} else {
+		push @{$self->{run_queue}}, [ $start_cb, $ctx, $fail_cb ];
+	}
 }
 
 1;
diff --git a/lib/PublicInbox/PlackLimiter.pm b/lib/PublicInbox/PlackLimiter.pm
index a1cc51dc..2ca0a63f 100644
--- a/lib/PublicInbox/PlackLimiter.pm
+++ b/lib/PublicInbox/PlackLimiter.pm
@@ -4,7 +4,7 @@
 package PublicInbox::PlackLimiter;
 use v5.12;
 use parent qw(Plack::Middleware);
-use PublicInbox::OnDestroy;
+use PublicInbox::Limiter;
 
 sub prepare_app { # called via Plack::Component (used by Plack::Middleware)
 	my ($self) = @_;
@@ -12,25 +12,12 @@ sub prepare_app { # called via Plack::Component (used by Plack::Middleware)
 	$self->{max} //= 2;
 	$self->{run_queue} = [];
 	$self->{running} = 0;
-	$self->{rejected} = 0;
-	$self->{message} //= "too busy\n";
 }
 
-sub r503 ($) {
-	my @body = ($_[0]->{message});
-	++$_[0]->{rejected};
-	[ 503, [ 'Content-Type' => 'text/plain',
-		'Content-Length' => length($body[0]) ], \@body ]
-}
-
-sub next_req { # on_destroy cb
-	my ($self) = @_;
-	--$self->{running};
-	my $env = shift @{$self->{run_queue}} or return;
-	my $wcb = delete $env->{'p-i.limiter.wcb'} // die 'BUG: no wcb';
-	my $res = eval { call($self, $env) };
-	return warn("W: $@") if $@;
-	ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res);
+sub lim_fail { # limiter->may_start fail_cb
+	my (undef, $code, $msg) = @_;
+	[ $code, [ 'Content-Type' => 'text/plain',
+		'Content-Length' => length($msg) ], [ $msg ] ]
 }
 
 sub stats ($) {
@@ -39,29 +26,32 @@ sub stats ($) {
 	my $res = <<EOM;
 running: $self->{running}
 queued: $nq
-rejected: $self->{rejected}
 max: $self->{max}
 EOM
 	[ 200, [ 'Content-Type' => 'text/plain',
 		'Content-Length' => length($res) ], [ $res ] ]
 }
 
+sub app_call { # limiter->may_start start_cb
+	my ($ctx, $self) = @_;
+	my $wcb = delete $ctx->{psgi_wcb};
+	my $env = delete $ctx->{env}; # avoid cyclic ref
+	push @{$env->{'limiter.ctx'}}, $ctx; # handoff limiter.next.$self
+	my $res = eval { $self->app->($env) };
+	return warn("W: $@") if $@;
+	ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res);
+}
+
 sub call {
 	my ($self, $env) = @_;
 	if (defined $self->{stats_match_cb}) {
 		return stats $self if $self->{stats_match_cb}->($env);
 	}
 	return $self->app->($env) if !$self->{match_cb}->($env);
-	return r503($self) if @{$self->{run_queue}} > ($self->{depth} // 32);
-	if ($self->{running} < $self->{max}) {
-		++$self->{running};
-		$env->{'p-i.limiter.next'} = on_destroy \&next_req, $self;
-		$self->app->($env);
-	} else { # capture write cb from PSGI server and queue up
-		sub {
-			$env->{'p-i.limiter.wcb'} = $_[0];
-			push @{$self->{run_queue}}, $env;
-		};
+	sub { # capture write cb from PSGI server
+		my $ctx = { env => $env, psgi_wcb => $_[0] };
+		PublicInbox::Limiter::may_start(
+				$self, \&app_call, $ctx, \&lim_fail);
 	}
 }
 
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index b882ea53..690860c1 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -51,16 +51,10 @@ sub new {
 }
 
 sub _do_spawn {
-	my ($self, $start_cb, $limiter) = @_;
+	my ($self) = @_;
 	my ($cmd, $cmd_env, $opt) = @{$self->{args}};
-	my %o = %{$opt || {}};
-	$self->{limiter} = $limiter;
-	for my $k (@PublicInbox::Spawn::RLIMITS) {
-		$opt->{$k} = $limiter->{$k} // next;
-	}
-	$self->{-quiet} = 1 if $o{quiet};
-	$limiter->{running}++;
-	if ($start_cb) {
+	$self->{-quiet} = 1 if $opt->{quiet};
+	if (my $start_cb = delete $self->{-start_cb}) {
 		eval { # popen_rd may die on EMFILE, ENFILE
 			$self->{rpipe} = popen_rd($cmd, $cmd_env, $opt,
 							\&waitpid_err, $self);
@@ -78,7 +72,7 @@ sub psgi_status_err { # Qspawn itself is useful w/o PSGI
 	PublicInbox::WwwStatic::r($_[0] // 500);
 }
 
-sub _finalize ($) {
+sub finalize ($) {
 	my ($self) = @_;
 	if (my $err = $self->{_err}) { # set by finish or waitpid_err
 		utf8::decode($err);
@@ -102,17 +96,6 @@ sub _finalize ($) {
 	}
 }
 
-sub finalize ($) {
-	my ($self) = @_;
-
-	# process is done, spawn whatever's in the queue
-	my $limiter = delete $self->{limiter} or return;
-	--$limiter->{running};
-	my $next = shift @{$limiter->{run_queue}};
-	_do_spawn(@$next, $limiter) if $next;
-	_finalize $self;
-}
-
 sub waitpid_err { # callback for awaitpid
 	my (undef, $self) = @_; # $_[0]: pid
 	$self->{_err} = ''; # for defined check in ->finish
@@ -156,17 +139,21 @@ sub finish ($;$) {
 	finalize($self) if $closed_before || defined($self->{_err});
 }
 
-sub start ($$$) {
+sub _qsp_fail { # limiter fail_cb
+	my ($self, $code, $msg) = @_;
+	$self->{env}->{'qspawn.fallback'} //= $code; # likely 503
+	finalize $self;
+}
+
+sub start ($$;$) {
 	my ($self, $limiter, $start_cb) = @_;
-	if ($limiter->{running} < $limiter->{max}) {
-		_do_spawn($self, $start_cb, $limiter);
-	} elsif ($limiter->is_too_busy) {
-		$self->{env}->{'qspawn.fallback'} //= 503 if
-			$self->{env};
-		_finalize $self;
-	} else {
-		push @{$limiter->{run_queue}}, [ $self, $start_cb ];
+	$self->{-start_cb} = $start_cb if $start_cb;
+	my %opt;
+	for (@PublicInbox::Spawn::RLIMITS) {
+		$opt{$_} = $limiter->{$_} // next;
 	}
+	%{$self->{args}->[2]} = (%{$self->{args}->[2]}, %opt) if keys %opt;
+	$limiter->may_start(\&_do_spawn, $self, \&_qsp_fail);
 }
 
 # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
@@ -177,8 +164,7 @@ sub psgi_qx {
 	my ($self, $env, $limiter, @qx_cb_arg) = @_;
 	$self->{env} = $env;
 	$self->{qx_cb_arg} = \@qx_cb_arg;
-	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new;
-	start($self, $limiter, undef);
+	start($self, $limiter ||= $def_limiter ||= PublicInbox::Limiter->new);
 }
 
 sub yield_pass {
diff --git a/lib/PublicInbox/ViewVCS.pm b/lib/PublicInbox/ViewVCS.pm
index 552e3241..e9ed4711 100644
--- a/lib/PublicInbox/ViewVCS.pm
+++ b/lib/PublicInbox/ViewVCS.pm
@@ -636,15 +636,8 @@ sub show_blob { # git->cat_async callback
 		'</code></pre></td></tr></table>'.dbg_log($ctx), @def);
 }
 
-sub start_solver ($$) {
-	my ($ctx, $limiter) = @_;
-	$ctx->{-next_solver} = on_destroy \&next_solver, $limiter;
-	++$limiter->{running};
-	if (my $ck = $ctx->{env}->{'pi-httpd.ckhup'}) {
-		$ck->($ctx->{env}->{'psgix.io'}->{sock}) and
-			return html_page $ctx, 499, 'client disconnected';
-	}
-
+sub start_solver {
+	my ($ctx) = @_;
 	while (my ($from, $to) = each %QP_MAP) {
 		my $v = $ctx->{qp}->{$from} // next;
 		$ctx->{hints}->{$to} = $v if $v ne '';
@@ -662,19 +655,11 @@ sub start_solver ($$) {
 	$solver->solve(@$ctx{qw(env lh oid_b hints)});
 }
 
-# run the next solver job when done and DESTROY-ed (on_destroy cb)
-sub next_solver {
-	my ($limiter) = @_;
-	--$limiter->{running};
-	my $ctx = shift(@{$limiter->{run_queue}}) // return;
-	eval { start_solver $ctx, $limiter };
-	return unless $@;
-	warn "W: start_solver: $@";
-	html_page($ctx, 500) if $ctx->{-wcb};
-}
-
-sub may_start_solver ($) {
-	my ($ctx) = @_;
+# GET /$INBOX/$GIT_OBJECT_ID/s/
+# GET /$INBOX/$GIT_OBJECT_ID/s/$FILENAME
+sub show ($$;$) {
+	my ($ctx, $oid_b, $fn) = @_;
+	@$ctx{qw(oid_b fn)} = ($oid_b, $fn);
 	my $limiter = $ctx->{www}->{pi_cfg}->limiter('-codeblob');
 
 	# {solver_limiter} just inherits rlimits from the configurable
@@ -685,23 +670,9 @@ sub may_start_solver ($) {
 		$l->{$_} = $limiter->{$_} for grep /^RLIMIT_/, keys %$limiter;
 		$l;
 	};
-	if ($limiter->{running} < $limiter->{max}) {
-		start_solver $ctx, $limiter;
-	} elsif ($limiter->is_too_busy) {
-		html_page $ctx, 503, 'too busy';
-	} else {
-		push @{$limiter->{run_queue}}, $ctx;
-	}
-}
-
-# GET /$INBOX/$GIT_OBJECT_ID/s/
-# GET /$INBOX/$GIT_OBJECT_ID/s/$FILENAME
-sub show ($$;$) {
-	my ($ctx, $oid_b, $fn) = @_;
-	@$ctx{qw(oid_b fn)} = ($oid_b, $fn);
 	sub {
 		$ctx->{-wcb} = $_[0]; # HTTP write callback
-		may_start_solver $ctx;
+		$limiter->may_start(\&start_solver, $ctx, \&html_page);
 	};
 }
 
diff --git a/t/qspawn.t b/t/qspawn.t
index 507f86a5..dfa33ada 100644
--- a/t/qspawn.t
+++ b/t/qspawn.t
@@ -82,7 +82,11 @@ foreach my $cmd ([qw(sleep 1)], [qw(sh -c), 'sleep 1; false']) {
 		ok(!finish_err($s), 'no error on sleep');
 		is_deeply([], \@err, 'no warnings');
 	}
-	ok(!finish_err($_->[0]), "true $_->[1] succeeded") foreach @t;
+	undef $s;
+	for (@t) { # DESTROY in order
+		my ($qsp, $i) = (shift(@$_), shift(@$_));
+		ok !finish_err($qsp), "true $i succeeded";
+	}
 	is_deeply([qw(sleep 0 1 2)], \@run, 'ran in order');
 }
 

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [SQUASH 3/2] plack_limiter: fix error response
  2025-03-21 22:22 ` [PATCH 2/2] limiter: refactor to reduce code duplication Eric Wong
@ 2025-03-22 10:18   ` Eric Wong
  0 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2025-03-22 10:18 UTC (permalink / raw)
  To: meta

We still need to remember to call the write callback we captured
from the PSGI server to avoid leaving sockets open indefinitely
with no response.
---
 lib/PublicInbox/PlackLimiter.pm | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/PlackLimiter.pm b/lib/PublicInbox/PlackLimiter.pm
index 2ca0a63f..48039212 100644
--- a/lib/PublicInbox/PlackLimiter.pm
+++ b/lib/PublicInbox/PlackLimiter.pm
@@ -15,9 +15,9 @@ sub prepare_app { # called via Plack::Component (used by Plack::Middleware)
 }
 
 sub lim_fail { # limiter->may_start fail_cb
-	my (undef, $code, $msg) = @_;
-	[ $code, [ 'Content-Type' => 'text/plain',
-		'Content-Length' => length($msg) ], [ $msg ] ]
+	my ($ctx, $code, $msg) = @_;
+	delete($ctx->{psgi_wcb})->([ $code, [ 'Content-Type' => 'text/plain',
+		'Content-Length' => length($msg) ], [ $msg ] ]);
 }
 
 sub stats ($) {

^ permalink raw reply related	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2025-03-22 10:18 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2025-03-21 22:22 [PATCH 0/2] limiter logic deduplication Eric Wong
2025-03-21 22:22 ` [PATCH 1/2] qspawn: rename {psgi_env} => {env} Eric Wong
2025-03-21 22:22 ` [PATCH 2/2] limiter: refactor to reduce code duplication Eric Wong
2025-03-22 10:18   ` [SQUASH 3/2] plack_limiter: fix error response Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).