From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 05/30] qspawn: replace anonymous $end callbacks w/ event_step
Date: Wed, 25 Dec 2019 07:50:39 +0000 [thread overview]
Message-ID: <20191225075104.22184-6-e@80x24.org> (raw)
In-Reply-To: <20191225075104.22184-1-e@80x24.org>
This will tie into the DS event loop if that's used, but
event_step an be called directly without relying on the
event loop from Apache or other HTTP servers (or PSGI tests).
---
lib/PublicInbox/GetlineBody.pm | 8 +--
lib/PublicInbox/HTTPD/Async.pm | 16 +++---
lib/PublicInbox/Qspawn.pm | 90 ++++++++++++++++++----------------
3 files changed, 61 insertions(+), 53 deletions(-)
diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm
index f8cdd1b7..750a8c53 100644
--- a/lib/PublicInbox/GetlineBody.pm
+++ b/lib/PublicInbox/GetlineBody.pm
@@ -13,10 +13,11 @@ use strict;
use warnings;
sub new {
- my ($class, $rpipe, $end, $buf, $filter) = @_;
+ my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_;
bless {
rpipe => $rpipe,
end => $end,
+ end_arg => $end_arg,
buf => $buf,
filter => $filter || 0,
}, $class;
@@ -40,10 +41,9 @@ sub getline {
sub close {
my ($self) = @_;
- my $rpipe = delete $self->{rpipe};
+ my ($rpipe, $end, $end_arg) = delete @$self{qw(rpipe end end_arg)};
close $rpipe if $rpipe;
- my $end = delete $self->{end};
- $end->() if $end;
+ $end->($end_arg) if $end;
}
1;
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 70769bba..ac0ca3df 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -10,7 +10,7 @@ package PublicInbox::HTTPD::Async;
use strict;
use warnings;
use base qw(PublicInbox::DS);
-use fields qw(cb arg end end_arg);
+use fields qw(cb arg end_obj);
use Errno qw(EAGAIN);
use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
@@ -18,13 +18,13 @@ use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
# $io is a read-only pipe ($rpipe) for now, but may be a
# bidirectional socket in the future.
sub new {
- my ($class, $io, $cb, $arg, $end, $end_arg) = @_;
+ my ($class, $io, $cb, $arg, $end_obj) = @_;
# no $io? call $cb at the top of the next event loop to
# avoid recursion:
unless (defined($io)) {
PublicInbox::DS::requeue($cb);
- die '$end unsupported w/o $io' if $end;
+ die '$end_obj unsupported w/o $io' if $end_obj;
return;
}
@@ -33,8 +33,7 @@ sub new {
$self->SUPER::new($io, EPOLLIN | EPOLLET);
$self->{cb} = $cb; # initial read callback, later replaced by main_cb
$self->{arg} = $arg; # arg for $cb
- $self->{end} = $end; # like END {}, but only for this object
- $self->{end_arg} = $end_arg; # arg for $end
+ $self->{end_obj} = $end_obj; # like END{}, can ->event_step
$self;
}
@@ -98,8 +97,11 @@ sub close {
$self->SUPER::close; # DS::close
# we defer this to the next timer loop since close is deferred
- if (my $end = delete $self->{end}) {
- PublicInbox::DS::requeue($end);
+ if (my $end_obj = delete $self->{end_obj}) {
+ # this calls $end_obj->event_step
+ # (likely PublicInbox::Qspawn::event_step,
+ # NOT PublicInbox::HTTPD::Async::event_step)
+ PublicInbox::DS::requeue($end_obj);
}
}
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index ba980e73..6cb28b9a 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -94,7 +94,8 @@ sub waitpid_err ($$) {
$err = "W: waitpid($xpid, 0) => $pid: $!";
} # else should not be called with pid == 0
- my $env = delete $self->{psgi_env};
+ my ($env, $qx_cb, $qx_arg, $qx_buf) =
+ delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
# done, spawn whatever's in the queue
my $limiter = $self->{limiter};
@@ -112,15 +113,12 @@ sub waitpid_err ($$) {
log_err($env, join(' ', @{$self->{args}}) . ": $err");
}
}
- if (my $fin_cb = delete $self->{fin_cb}) {
- eval { $fin_cb->() }
- }
+ eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
}
-sub do_waitpid ($;$$) {
- my ($self, $fin_cb) = @_;
+sub do_waitpid ($) {
+ my ($self) = @_;
my $pid = $self->{pid};
- $self->{fin_cb} = $fin_cb;
# PublicInbox::DS may not be loaded
eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) };
# done if we're running in PublicInbox::DS::EventLoop
@@ -131,12 +129,14 @@ sub do_waitpid ($;$$) {
}
}
-sub finish ($;$) {
- my ($self, $fin_cb) = @_;
+sub finish ($) {
+ my ($self) = @_;
if (delete $self->{rpipe}) {
- do_waitpid($self, $fin_cb);
- } elsif ($fin_cb) {
- eval { $fin_cb->() };
+ do_waitpid($self);
+ } else {
+ my ($env, $qx_cb, $qx_arg, $qx_buf) =
+ delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+ eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
}
}
@@ -154,16 +154,14 @@ sub start {
# $env is the PSGI env. As with ``/qx; only use this when output is small
# and safe to slurp.
sub psgi_qx {
- my ($self, $env, $limiter, $qx_cb, $cb_arg) = @_;
+ my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
$self->{psgi_env} = $env;
- my $scalar = '';
- open(my $qx, '+>', \$scalar) or die; # PerlIO::scalar
- my $end = sub {
- my $err = $_[0]; # $!
- log_err($env, "psgi_qx: $err") if defined($err);
- finish($self, sub { $qx_cb->(\$scalar, $cb_arg) });
- $qx = undef;
- };
+ my $qx_buf = '';
+ open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
+ $self->{qx_cb} = $qx_cb;
+ $self->{qx_arg} = $qx_arg;
+ $self->{qx_fh} = $qx_fh;
+ $self->{qx_buf} = \$qx_buf;
my $rpipe; # comes from popen_rd
my $async = $env->{'pi-httpd.async'};
my $cb = sub {
@@ -171,24 +169,24 @@ sub psgi_qx {
reread:
$r = sysread($rpipe, $buf, 65536);
if ($async) {
- $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
+ $async->async_pass($env->{'psgix.io'}, $qx_fh, \$buf);
} elsif (defined $r) {
- $r ? $qx->write($buf) : $end->();
+ $r ? $qx_fh->write($buf) : event_step($self, undef);
} else {
return if $! == EAGAIN; # try again when notified
goto reread if $! == EINTR;
- $end->($!);
+ event_step($self, $!);
}
};
$limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
$self->start($limiter, sub { # start_cb, may run later, much later...
($rpipe) = @_; # popen_rd result
if ($async) {
- # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
- $async = $async->($rpipe, $cb, undef, $end);
+ # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end_obj)
+ $async = $async->($rpipe, $cb, undef, $self);
# $cb will call ->async_pass or ->close
} else { # generic PSGI
- $cb->() while $qx;
+ $cb->() while $self->{qx_fh};
}
});
}
@@ -206,6 +204,17 @@ sub filter_fh ($$) {
});
}
+# this is called on pipe EOF to reap the process, may be called
+# via PublicInbox::DS event loop OR via GetlineBody for generic
+# PSGI servers.
+sub event_step {
+ my ($self, $err) = @_; # $err: $!
+ log_err($self->{psgi_env}, "psgi_{return,qx} $err") if defined($err);
+ finish($self);
+ my ($fh, $qx_fh) = delete(@$self{qw(fh qx_fh)});
+ $fh->close if $fh; # async-only (psgi_return)
+}
+
# Used for streaming the stdout of one process as a PSGI response.
#
# $env is the PSGI env.
@@ -231,14 +240,7 @@ sub filter_fh ($$) {
sub psgi_return {
my ($self, $env, $limiter, $parse_hdr) = @_;
$self->{psgi_env} = $env;
- my ($fh, $rpipe);
- my $end = sub {
- my $err = $_[0]; # $!
- log_err($env, "psgi_return: $err") if defined($err);
- finish($self);
- $fh->close if $fh; # async-only
- };
-
+ my $rpipe;
my $buf = '';
my $rd_hdr = sub {
# typically used for reading CGI headers
@@ -271,21 +273,24 @@ sub psgi_return {
my $filter = delete $env->{'qspawn.filter'};
if (scalar(@$r) == 3) { # error
if ($async) {
- $async->close; # calls rpipe->close and $end
+ # calls rpipe->close && ->event_step
+ $async->close;
} else {
$rpipe->close;
- $end->();
+ event_step($self);
}
$wcb->($r);
} elsif ($async) {
# done reading headers, handoff to read body
- $fh = $wcb->($r); # scalar @$r == 2
+ my $fh = $wcb->($r); # scalar @$r == 2
$fh = filter_fh($fh, $filter) if $filter;
+ $self->{fh} = $fh;
$async->async_pass($env->{'psgix.io'}, $fh, \$buf);
} else { # for synchronous PSGI servers
require PublicInbox::GetlineBody;
- $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
- $buf, $filter);
+ $r->[2] = PublicInbox::GetlineBody->new($rpipe,
+ \&event_step, $self,
+ $buf, $filter);
$wcb->($r);
}
@@ -297,8 +302,9 @@ sub psgi_return {
my $start_cb = sub { # may run later, much later...
($rpipe) = @_;
if ($async) {
- # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
- $async = $async->($rpipe, $cb, undef, $end);
+ # PublicInbox::HTTPD::Async->new($rpipe, $cb, $cb_arg,
+ # $end_obj)
+ $async = $async->($rpipe, $cb, undef, $self);
# $cb will call ->async_pass or ->close
} else { # generic PSGI
$cb->() while $rd_hdr;
next prev parent reply other threads:[~2019-12-25 7:51 UTC|newest]
Thread overview: 36+ messages / expand[flat|nested] mbox.gz Atom feed top
2019-12-25 7:50 [PATCH 00/30] www: eliminate most per-request closures Eric Wong
2019-12-25 7:50 ` [PATCH 01/30] git: allow async_cat to pass arg to callback Eric Wong
2019-12-25 7:50 ` [PATCH 02/30] httpd/async: support passing arg to callbacks Eric Wong
2019-12-26 7:53 ` Eric Wong
2019-12-25 7:50 ` [PATCH 03/30] qspawn: remove some anonymous subs for psgi_qx Eric Wong
2019-12-25 7:50 ` [PATCH 04/30] qspawn: disambiguate command vs PSGI env Eric Wong
2019-12-25 7:50 ` Eric Wong [this message]
2019-12-25 7:50 ` [PATCH 06/30] msg_iter: provide means to stop using anonymous subs Eric Wong
2019-12-25 7:50 ` [PATCH 07/30] qspawn: reduce local vars, de-anonymize rd_hdr Eric Wong
2019-12-25 7:50 ` [PATCH 08/30] httpd/async: get rid of ephemeral main_cb Eric Wong
2019-12-25 7:50 ` [PATCH 09/30] qspawn: psgi_return: initial cb can be named Eric Wong
2019-12-25 7:50 ` [PATCH 10/30] qspawn: psgi_return_start: hoist out from psgi_return Eric Wong
2019-12-25 7:50 ` [PATCH 11/30] qspawn: psgi_qx: eliminate anonymous subs Eric Wong
2019-12-25 7:50 ` [PATCH 12/30] qspawn: drop "qspawn.filter" support, for now Eric Wong
2019-12-25 7:50 ` [PATCH 13/30] qspawn: psgi_return: allow non-anon parse_hdr callback Eric Wong
2019-12-25 7:50 ` [PATCH 14/30] githttpbackend: split out wwwstatic Eric Wong
2019-12-26 12:50 ` Eric Wong
2019-12-27 10:36 ` Eric Wong
2019-12-25 7:50 ` [PATCH 15/30] www: lazy load Plack::Util Eric Wong
2019-12-25 7:50 ` [PATCH 16/30] mboxgz: pass $ctx to callback to avoid anon subs Eric Wong
2019-12-25 7:50 ` [PATCH 17/30] feed: avoid anonymous subs Eric Wong
2019-12-25 7:50 ` [PATCH 18/30] config: each_inbox: pass user arg to callback Eric Wong
2019-12-26 6:48 ` Eric Wong
2019-12-25 7:50 ` [PATCH 19/30] view: avoid anon sub in stream_thread Eric Wong
2019-12-25 7:50 ` [PATCH 20/30] view: msg_html: stop using an anonymous sub Eric Wong
2019-12-25 7:50 ` [PATCH 21/30] contentid: no " Eric Wong
2019-12-25 7:50 ` [PATCH 22/30] wwwtext: avoid anonymous sub in response Eric Wong
2019-12-25 7:50 ` [PATCH 23/30] searchview: pass named subs to Www*Stream Eric Wong
2019-12-25 7:50 ` [PATCH 24/30] view: thread_html: pass named sub to WwwStream Eric Wong
2019-12-25 7:50 ` [PATCH 25/30] searchview: remove anonymous sub when sorting threads by relevance Eric Wong
2019-12-25 7:51 ` [PATCH 26/30] view: msg_iter calls add_body_text directly Eric Wong
2019-12-25 7:51 ` [PATCH 27/30] wwwattach: avoid anonymous sub for msg_iter Eric Wong
2019-12-25 7:51 ` [PATCH 28/30] viewvcs: avoid anonymous sub for HTML response Eric Wong
2019-12-25 7:51 ` [PATCH 29/30] solvergit: allow passing arg to user-supplied callback Eric Wong
2019-12-28 9:17 ` Eric Wong
2019-12-25 7:51 ` [PATCH 30/30] search: retry_reopen passes user arg to callback Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: http://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20191225075104.22184-6-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).