From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 2/2] limiter: refactor to reduce code duplication
Date: Fri, 21 Mar 2025 22:22:30 +0000 [thread overview]
Message-ID: <20250321222231.3276910-3-e@80x24.org> (raw)
In-Reply-To: <20250321222231.3276910-1-e@80x24.org>
PlackLimiter, Qspawn, and ViewVCS all have roughly the same
code around our base Limiter package, so put everything around
a new Limiter->may_start subroutine. PlackLimiter loses some
stats as a result but that's logged anyways and I doubt the
customizable error message was worth the effort.
We now have ckhup and 499 (client disconnect) handling for all
PSGI uses of Limiter, as well.
t/qspawn.t changes were required since the original ->finalize
logic now relies on on_destroy; but none of the existing PSGI
code using Qspawn required changes.
---
lib/PublicInbox/Limiter.pm | 36 ++++++++++++++++++++++--
lib/PublicInbox/PlackLimiter.pm | 48 +++++++++++++------------------
lib/PublicInbox/Qspawn.pm | 50 ++++++++++++---------------------
lib/PublicInbox/ViewVCS.pm | 45 ++++++-----------------------
t/qspawn.t | 6 +++-
5 files changed, 84 insertions(+), 101 deletions(-)
diff --git a/lib/PublicInbox/Limiter.pm b/lib/PublicInbox/Limiter.pm
index fc62d0d4..f90c8b56 100644
--- a/lib/PublicInbox/Limiter.pm
+++ b/lib/PublicInbox/Limiter.pm
@@ -4,6 +4,7 @@
package PublicInbox::Limiter;
use v5.12;
use PublicInbox::Spawn;
+use PublicInbox::OnDestroy;
sub new {
my ($class, $max) = @_;
@@ -61,9 +62,40 @@ EOM
}
}
-sub is_too_busy {
+sub _do_start ($$$$) {
+ my ($self, $start_cb, $ctx, $fail_cb) = @_;
+ $ctx->{"limiter.next.$self"} = on_destroy \&_start_next, $self;
+ ++$self->{running};
+ eval { $start_cb->($ctx, $self) };
+ if ($@) {
+ print { $ctx->{env}->{'psgi.errors'} } "E: $@\n";
+ $fail_cb->($ctx, 500, 'internal error');
+ }
+}
+
+sub _start_next { # on_destroy cb
my ($self) = @_;
- scalar(@{$self->{run_queue}}) > ($self->{depth} // 32)
+ --$self->{running};
+ my ($rec, $ck, $start_cb, $ctx, $fail_cb);
+ while (1) {
+ $rec = shift @{$self->{run_queue}} or return;
+ ($start_cb, $ctx, $fail_cb) = @$rec;
+ $ck = $ctx->{env}->{'pi-httpd.ckhup'} or last;
+ $ck->($ctx->{env}->{'psgix.io'}->{sock}) or last;
+ $fail_cb->($ctx, 499, 'client disconnected');
+ }
+ _do_start $self, $start_cb, $ctx, $fail_cb;
+}
+
+sub may_start {
+ my ($self, $start_cb, $ctx, $fail_cb) = @_;
+ if ($self->{running} < $self->{max}) {
+ _do_start $self, $start_cb, $ctx, $fail_cb;
+ } elsif (@{$self->{run_queue}} > ($self->{depth} // 32)) {
+ $fail_cb->($ctx, 503, 'too busy');
+ } else {
+ push @{$self->{run_queue}}, [ $start_cb, $ctx, $fail_cb ];
+ }
}
1;
diff --git a/lib/PublicInbox/PlackLimiter.pm b/lib/PublicInbox/PlackLimiter.pm
index a1cc51dc..2ca0a63f 100644
--- a/lib/PublicInbox/PlackLimiter.pm
+++ b/lib/PublicInbox/PlackLimiter.pm
@@ -4,7 +4,7 @@
package PublicInbox::PlackLimiter;
use v5.12;
use parent qw(Plack::Middleware);
-use PublicInbox::OnDestroy;
+use PublicInbox::Limiter;
sub prepare_app { # called via Plack::Component (used by Plack::Middleware)
my ($self) = @_;
@@ -12,25 +12,12 @@ sub prepare_app { # called via Plack::Component (used by Plack::Middleware)
$self->{max} //= 2;
$self->{run_queue} = [];
$self->{running} = 0;
- $self->{rejected} = 0;
- $self->{message} //= "too busy\n";
}
-sub r503 ($) {
- my @body = ($_[0]->{message});
- ++$_[0]->{rejected};
- [ 503, [ 'Content-Type' => 'text/plain',
- 'Content-Length' => length($body[0]) ], \@body ]
-}
-
-sub next_req { # on_destroy cb
- my ($self) = @_;
- --$self->{running};
- my $env = shift @{$self->{run_queue}} or return;
- my $wcb = delete $env->{'p-i.limiter.wcb'} // die 'BUG: no wcb';
- my $res = eval { call($self, $env) };
- return warn("W: $@") if $@;
- ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res);
+sub lim_fail { # limiter->may_start fail_cb
+ my (undef, $code, $msg) = @_;
+ [ $code, [ 'Content-Type' => 'text/plain',
+ 'Content-Length' => length($msg) ], [ $msg ] ]
}
sub stats ($) {
@@ -39,29 +26,32 @@ sub stats ($) {
my $res = <<EOM;
running: $self->{running}
queued: $nq
-rejected: $self->{rejected}
max: $self->{max}
EOM
[ 200, [ 'Content-Type' => 'text/plain',
'Content-Length' => length($res) ], [ $res ] ]
}
+sub app_call { # limiter->may_start start_cb
+ my ($ctx, $self) = @_;
+ my $wcb = delete $ctx->{psgi_wcb};
+ my $env = delete $ctx->{env}; # avoid cyclic ref
+ push @{$env->{'limiter.ctx'}}, $ctx; # handoff limiter.next.$self
+ my $res = eval { $self->app->($env) };
+ return warn("W: $@") if $@;
+ ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res);
+}
+
sub call {
my ($self, $env) = @_;
if (defined $self->{stats_match_cb}) {
return stats $self if $self->{stats_match_cb}->($env);
}
return $self->app->($env) if !$self->{match_cb}->($env);
- return r503($self) if @{$self->{run_queue}} > ($self->{depth} // 32);
- if ($self->{running} < $self->{max}) {
- ++$self->{running};
- $env->{'p-i.limiter.next'} = on_destroy \&next_req, $self;
- $self->app->($env);
- } else { # capture write cb from PSGI server and queue up
- sub {
- $env->{'p-i.limiter.wcb'} = $_[0];
- push @{$self->{run_queue}}, $env;
- };
+ sub { # capture write cb from PSGI server
+ my $ctx = { env => $env, psgi_wcb => $_[0] };
+ PublicInbox::Limiter::may_start(
+ $self, \&app_call, $ctx, \&lim_fail);
}
}
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index b882ea53..690860c1 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -51,16 +51,10 @@ sub new {
}
sub _do_spawn {
- my ($self, $start_cb, $limiter) = @_;
+ my ($self) = @_;
my ($cmd, $cmd_env, $opt) = @{$self->{args}};
- my %o = %{$opt || {}};
- $self->{limiter} = $limiter;
- for my $k (@PublicInbox::Spawn::RLIMITS) {
- $opt->{$k} = $limiter->{$k} // next;
- }
- $self->{-quiet} = 1 if $o{quiet};
- $limiter->{running}++;
- if ($start_cb) {
+ $self->{-quiet} = 1 if $opt->{quiet};
+ if (my $start_cb = delete $self->{-start_cb}) {
eval { # popen_rd may die on EMFILE, ENFILE
$self->{rpipe} = popen_rd($cmd, $cmd_env, $opt,
\&waitpid_err, $self);
@@ -78,7 +72,7 @@ sub psgi_status_err { # Qspawn itself is useful w/o PSGI
PublicInbox::WwwStatic::r($_[0] // 500);
}
-sub _finalize ($) {
+sub finalize ($) {
my ($self) = @_;
if (my $err = $self->{_err}) { # set by finish or waitpid_err
utf8::decode($err);
@@ -102,17 +96,6 @@ sub _finalize ($) {
}
}
-sub finalize ($) {
- my ($self) = @_;
-
- # process is done, spawn whatever's in the queue
- my $limiter = delete $self->{limiter} or return;
- --$limiter->{running};
- my $next = shift @{$limiter->{run_queue}};
- _do_spawn(@$next, $limiter) if $next;
- _finalize $self;
-}
-
sub waitpid_err { # callback for awaitpid
my (undef, $self) = @_; # $_[0]: pid
$self->{_err} = ''; # for defined check in ->finish
@@ -156,17 +139,21 @@ sub finish ($;$) {
finalize($self) if $closed_before || defined($self->{_err});
}
-sub start ($$$) {
+sub _qsp_fail { # limiter fail_cb
+ my ($self, $code, $msg) = @_;
+ $self->{env}->{'qspawn.fallback'} //= $code; # likely 503
+ finalize $self;
+}
+
+sub start ($$;$) {
my ($self, $limiter, $start_cb) = @_;
- if ($limiter->{running} < $limiter->{max}) {
- _do_spawn($self, $start_cb, $limiter);
- } elsif ($limiter->is_too_busy) {
- $self->{env}->{'qspawn.fallback'} //= 503 if
- $self->{env};
- _finalize $self;
- } else {
- push @{$limiter->{run_queue}}, [ $self, $start_cb ];
+ $self->{-start_cb} = $start_cb if $start_cb;
+ my %opt;
+ for (@PublicInbox::Spawn::RLIMITS) {
+ $opt{$_} = $limiter->{$_} // next;
}
+ %{$self->{args}->[2]} = (%{$self->{args}->[2]}, %opt) if keys %opt;
+ $limiter->may_start(\&_do_spawn, $self, \&_qsp_fail);
}
# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
@@ -177,8 +164,7 @@ sub psgi_qx {
my ($self, $env, $limiter, @qx_cb_arg) = @_;
$self->{env} = $env;
$self->{qx_cb_arg} = \@qx_cb_arg;
- $limiter ||= $def_limiter ||= PublicInbox::Limiter->new;
- start($self, $limiter, undef);
+ start($self, $limiter ||= $def_limiter ||= PublicInbox::Limiter->new);
}
sub yield_pass {
diff --git a/lib/PublicInbox/ViewVCS.pm b/lib/PublicInbox/ViewVCS.pm
index 552e3241..e9ed4711 100644
--- a/lib/PublicInbox/ViewVCS.pm
+++ b/lib/PublicInbox/ViewVCS.pm
@@ -636,15 +636,8 @@ sub show_blob { # git->cat_async callback
'</code></pre></td></tr></table>'.dbg_log($ctx), @def);
}
-sub start_solver ($$) {
- my ($ctx, $limiter) = @_;
- $ctx->{-next_solver} = on_destroy \&next_solver, $limiter;
- ++$limiter->{running};
- if (my $ck = $ctx->{env}->{'pi-httpd.ckhup'}) {
- $ck->($ctx->{env}->{'psgix.io'}->{sock}) and
- return html_page $ctx, 499, 'client disconnected';
- }
-
+sub start_solver {
+ my ($ctx) = @_;
while (my ($from, $to) = each %QP_MAP) {
my $v = $ctx->{qp}->{$from} // next;
$ctx->{hints}->{$to} = $v if $v ne '';
@@ -662,19 +655,11 @@ sub start_solver ($$) {
$solver->solve(@$ctx{qw(env lh oid_b hints)});
}
-# run the next solver job when done and DESTROY-ed (on_destroy cb)
-sub next_solver {
- my ($limiter) = @_;
- --$limiter->{running};
- my $ctx = shift(@{$limiter->{run_queue}}) // return;
- eval { start_solver $ctx, $limiter };
- return unless $@;
- warn "W: start_solver: $@";
- html_page($ctx, 500) if $ctx->{-wcb};
-}
-
-sub may_start_solver ($) {
- my ($ctx) = @_;
+# GET /$INBOX/$GIT_OBJECT_ID/s/
+# GET /$INBOX/$GIT_OBJECT_ID/s/$FILENAME
+sub show ($$;$) {
+ my ($ctx, $oid_b, $fn) = @_;
+ @$ctx{qw(oid_b fn)} = ($oid_b, $fn);
my $limiter = $ctx->{www}->{pi_cfg}->limiter('-codeblob');
# {solver_limiter} just inherits rlimits from the configurable
@@ -685,23 +670,9 @@ sub may_start_solver ($) {
$l->{$_} = $limiter->{$_} for grep /^RLIMIT_/, keys %$limiter;
$l;
};
- if ($limiter->{running} < $limiter->{max}) {
- start_solver $ctx, $limiter;
- } elsif ($limiter->is_too_busy) {
- html_page $ctx, 503, 'too busy';
- } else {
- push @{$limiter->{run_queue}}, $ctx;
- }
-}
-
-# GET /$INBOX/$GIT_OBJECT_ID/s/
-# GET /$INBOX/$GIT_OBJECT_ID/s/$FILENAME
-sub show ($$;$) {
- my ($ctx, $oid_b, $fn) = @_;
- @$ctx{qw(oid_b fn)} = ($oid_b, $fn);
sub {
$ctx->{-wcb} = $_[0]; # HTTP write callback
- may_start_solver $ctx;
+ $limiter->may_start(\&start_solver, $ctx, \&html_page);
};
}
diff --git a/t/qspawn.t b/t/qspawn.t
index 507f86a5..dfa33ada 100644
--- a/t/qspawn.t
+++ b/t/qspawn.t
@@ -82,7 +82,11 @@ foreach my $cmd ([qw(sleep 1)], [qw(sh -c), 'sleep 1; false']) {
ok(!finish_err($s), 'no error on sleep');
is_deeply([], \@err, 'no warnings');
}
- ok(!finish_err($_->[0]), "true $_->[1] succeeded") foreach @t;
+ undef $s;
+ for (@t) { # DESTROY in order
+ my ($qsp, $i) = (shift(@$_), shift(@$_));
+ ok !finish_err($qsp), "true $i succeeded";
+ }
is_deeply([qw(sleep 0 1 2)], \@run, 'ran in order');
}
next prev parent reply other threads:[~2025-03-21 22:22 UTC|newest]
Thread overview: 4+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-03-21 22:22 [PATCH 0/2] limiter logic deduplication Eric Wong
2025-03-21 22:22 ` [PATCH 1/2] qspawn: rename {psgi_env} => {env} Eric Wong
2025-03-21 22:22 ` Eric Wong [this message]
2025-03-22 10:18 ` [SQUASH 3/2] plack_limiter: fix error response Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250321222231.3276910-3-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).