From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id A209D1F5A1 for ; Wed, 25 Dec 2019 07:51:05 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 05/30] qspawn: replace anonymous $end callbacks w/ event_step Date: Wed, 25 Dec 2019 07:50:39 +0000 Message-Id: <20191225075104.22184-6-e@80x24.org> In-Reply-To: <20191225075104.22184-1-e@80x24.org> References: <20191225075104.22184-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This will tie into the DS event loop if that's used, but event_step an be called directly without relying on the event loop from Apache or other HTTP servers (or PSGI tests). --- lib/PublicInbox/GetlineBody.pm | 8 +-- lib/PublicInbox/HTTPD/Async.pm | 16 +++--- lib/PublicInbox/Qspawn.pm | 90 ++++++++++++++++++---------------- 3 files changed, 61 insertions(+), 53 deletions(-) diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm index f8cdd1b7..750a8c53 100644 --- a/lib/PublicInbox/GetlineBody.pm +++ b/lib/PublicInbox/GetlineBody.pm @@ -13,10 +13,11 @@ use strict; use warnings; sub new { - my ($class, $rpipe, $end, $buf, $filter) = @_; + my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_; bless { rpipe => $rpipe, end => $end, + end_arg => $end_arg, buf => $buf, filter => $filter || 0, }, $class; @@ -40,10 +41,9 @@ sub getline { sub close { my ($self) = @_; - my $rpipe = delete $self->{rpipe}; + my ($rpipe, $end, $end_arg) = delete @$self{qw(rpipe end end_arg)}; close $rpipe if $rpipe; - my $end = delete $self->{end}; - $end->() if $end; + $end->($end_arg) if $end; } 1; diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 70769bba..ac0ca3df 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -10,7 +10,7 @@ package PublicInbox::HTTPD::Async; use strict; use warnings; use base qw(PublicInbox::DS); -use fields qw(cb arg end end_arg); +use fields qw(cb arg end_obj); use Errno qw(EAGAIN); use PublicInbox::Syscall qw(EPOLLIN EPOLLET); @@ -18,13 +18,13 @@ use PublicInbox::Syscall qw(EPOLLIN EPOLLET); # $io is a read-only pipe ($rpipe) for now, but may be a # bidirectional socket in the future. sub new { - my ($class, $io, $cb, $arg, $end, $end_arg) = @_; + my ($class, $io, $cb, $arg, $end_obj) = @_; # no $io? call $cb at the top of the next event loop to # avoid recursion: unless (defined($io)) { PublicInbox::DS::requeue($cb); - die '$end unsupported w/o $io' if $end; + die '$end_obj unsupported w/o $io' if $end_obj; return; } @@ -33,8 +33,7 @@ sub new { $self->SUPER::new($io, EPOLLIN | EPOLLET); $self->{cb} = $cb; # initial read callback, later replaced by main_cb $self->{arg} = $arg; # arg for $cb - $self->{end} = $end; # like END {}, but only for this object - $self->{end_arg} = $end_arg; # arg for $end + $self->{end_obj} = $end_obj; # like END{}, can ->event_step $self; } @@ -98,8 +97,11 @@ sub close { $self->SUPER::close; # DS::close # we defer this to the next timer loop since close is deferred - if (my $end = delete $self->{end}) { - PublicInbox::DS::requeue($end); + if (my $end_obj = delete $self->{end_obj}) { + # this calls $end_obj->event_step + # (likely PublicInbox::Qspawn::event_step, + # NOT PublicInbox::HTTPD::Async::event_step) + PublicInbox::DS::requeue($end_obj); } } diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index ba980e73..6cb28b9a 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -94,7 +94,8 @@ sub waitpid_err ($$) { $err = "W: waitpid($xpid, 0) => $pid: $!"; } # else should not be called with pid == 0 - my $env = delete $self->{psgi_env}; + my ($env, $qx_cb, $qx_arg, $qx_buf) = + delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)}; # done, spawn whatever's in the queue my $limiter = $self->{limiter}; @@ -112,15 +113,12 @@ sub waitpid_err ($$) { log_err($env, join(' ', @{$self->{args}}) . ": $err"); } } - if (my $fin_cb = delete $self->{fin_cb}) { - eval { $fin_cb->() } - } + eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb; } -sub do_waitpid ($;$$) { - my ($self, $fin_cb) = @_; +sub do_waitpid ($) { + my ($self) = @_; my $pid = $self->{pid}; - $self->{fin_cb} = $fin_cb; # PublicInbox::DS may not be loaded eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) }; # done if we're running in PublicInbox::DS::EventLoop @@ -131,12 +129,14 @@ sub do_waitpid ($;$$) { } } -sub finish ($;$) { - my ($self, $fin_cb) = @_; +sub finish ($) { + my ($self) = @_; if (delete $self->{rpipe}) { - do_waitpid($self, $fin_cb); - } elsif ($fin_cb) { - eval { $fin_cb->() }; + do_waitpid($self); + } else { + my ($env, $qx_cb, $qx_arg, $qx_buf) = + delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)}; + eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb; } } @@ -154,16 +154,14 @@ sub start { # $env is the PSGI env. As with ``/qx; only use this when output is small # and safe to slurp. sub psgi_qx { - my ($self, $env, $limiter, $qx_cb, $cb_arg) = @_; + my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_; $self->{psgi_env} = $env; - my $scalar = ''; - open(my $qx, '+>', \$scalar) or die; # PerlIO::scalar - my $end = sub { - my $err = $_[0]; # $! - log_err($env, "psgi_qx: $err") if defined($err); - finish($self, sub { $qx_cb->(\$scalar, $cb_arg) }); - $qx = undef; - }; + my $qx_buf = ''; + open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar + $self->{qx_cb} = $qx_cb; + $self->{qx_arg} = $qx_arg; + $self->{qx_fh} = $qx_fh; + $self->{qx_buf} = \$qx_buf; my $rpipe; # comes from popen_rd my $async = $env->{'pi-httpd.async'}; my $cb = sub { @@ -171,24 +169,24 @@ sub psgi_qx { reread: $r = sysread($rpipe, $buf, 65536); if ($async) { - $async->async_pass($env->{'psgix.io'}, $qx, \$buf); + $async->async_pass($env->{'psgix.io'}, $qx_fh, \$buf); } elsif (defined $r) { - $r ? $qx->write($buf) : $end->(); + $r ? $qx_fh->write($buf) : event_step($self, undef); } else { return if $! == EAGAIN; # try again when notified goto reread if $! == EINTR; - $end->($!); + event_step($self, $!); } }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); $self->start($limiter, sub { # start_cb, may run later, much later... ($rpipe) = @_; # popen_rd result if ($async) { - # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) - $async = $async->($rpipe, $cb, undef, $end); + # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end_obj) + $async = $async->($rpipe, $cb, undef, $self); # $cb will call ->async_pass or ->close } else { # generic PSGI - $cb->() while $qx; + $cb->() while $self->{qx_fh}; } }); } @@ -206,6 +204,17 @@ sub filter_fh ($$) { }); } +# this is called on pipe EOF to reap the process, may be called +# via PublicInbox::DS event loop OR via GetlineBody for generic +# PSGI servers. +sub event_step { + my ($self, $err) = @_; # $err: $! + log_err($self->{psgi_env}, "psgi_{return,qx} $err") if defined($err); + finish($self); + my ($fh, $qx_fh) = delete(@$self{qw(fh qx_fh)}); + $fh->close if $fh; # async-only (psgi_return) +} + # Used for streaming the stdout of one process as a PSGI response. # # $env is the PSGI env. @@ -231,14 +240,7 @@ sub filter_fh ($$) { sub psgi_return { my ($self, $env, $limiter, $parse_hdr) = @_; $self->{psgi_env} = $env; - my ($fh, $rpipe); - my $end = sub { - my $err = $_[0]; # $! - log_err($env, "psgi_return: $err") if defined($err); - finish($self); - $fh->close if $fh; # async-only - }; - + my $rpipe; my $buf = ''; my $rd_hdr = sub { # typically used for reading CGI headers @@ -271,21 +273,24 @@ sub psgi_return { my $filter = delete $env->{'qspawn.filter'}; if (scalar(@$r) == 3) { # error if ($async) { - $async->close; # calls rpipe->close and $end + # calls rpipe->close && ->event_step + $async->close; } else { $rpipe->close; - $end->(); + event_step($self); } $wcb->($r); } elsif ($async) { # done reading headers, handoff to read body - $fh = $wcb->($r); # scalar @$r == 2 + my $fh = $wcb->($r); # scalar @$r == 2 $fh = filter_fh($fh, $filter) if $filter; + $self->{fh} = $fh; $async->async_pass($env->{'psgix.io'}, $fh, \$buf); } else { # for synchronous PSGI servers require PublicInbox::GetlineBody; - $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end, - $buf, $filter); + $r->[2] = PublicInbox::GetlineBody->new($rpipe, + \&event_step, $self, + $buf, $filter); $wcb->($r); } @@ -297,8 +302,9 @@ sub psgi_return { my $start_cb = sub { # may run later, much later... ($rpipe) = @_; if ($async) { - # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) - $async = $async->($rpipe, $cb, undef, $end); + # PublicInbox::HTTPD::Async->new($rpipe, $cb, $cb_arg, + # $end_obj) + $async = $async->($rpipe, $cb, undef, $self); # $cb will call ->async_pass or ->close } else { # generic PSGI $cb->() while $rd_hdr;