From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 75A192141F for ; Sun, 27 Jan 2019 04:03:43 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 06/14] qspawn: implement psgi_qx Date: Sun, 27 Jan 2019 04:03:33 +0000 Message-Id: <20190127040341.26107-7-e@80x24.org> In-Reply-To: <20190127040341.26107-1-e@80x24.org> References: <20190127040341.26107-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This new asynchronous API, will allow us to take advantage of non-blocking I/O from even small commands; as those may still need to wait for slow operations. --- lib/PublicInbox/Qspawn.pm | 89 ++++++++++++++++++++++++++++++++------- 1 file changed, 74 insertions(+), 15 deletions(-) diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 96fbf38..6859a8a 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -61,6 +61,48 @@ sub start { } } +sub _psgi_finish ($$) { + my ($self, $env) = @_; + my $err = $self->finish; + if ($err && !$env->{'qspawn.quiet'}) { + $err = join(' ', @{$self->{args}->[0]}).": $err\n"; + $env->{'psgi.errors'}->print($err); + } +} + +sub psgi_qx { + my ($self, $env, $limiter, $qx_cb) = @_; + my $qx = PublicInbox::Qspawn::Qx->new; + my $end = sub { + _psgi_finish($self, $env); + eval { $qx_cb->($qx) }; + $qx = undef; + }; + my $rpipe; + my $async = $env->{'pi-httpd.async'}; + my $cb = sub { + my $r = sysread($rpipe, my $buf, 8192); + if ($async) { + $async->async_pass($env->{'psgix.io'}, $qx, \$buf); + } elsif (defined $r) { + $r ? $qx->write($buf) : $end->(); + } else { + return if $!{EAGAIN} || $!{EINTR}; # loop again + $end->(); + } + }; + $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); + $self->start($limiter, sub { # may run later, much later... + ($rpipe) = @_; + if ($async) { + # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) + $async = $async->($rpipe, $cb, $end); + } else { # generic PSGI + $cb->() while $qx; + } + }); +} + # create a filter for "push"-based streaming PSGI writes used by HTTPD::Async sub filter_fh ($$) { my ($fh, $filter) = @_; @@ -78,11 +120,7 @@ sub psgi_return { my ($self, $env, $limiter, $parse_hdr) = @_; my ($fh, $rpipe); my $end = sub { - my $err = $self->finish; - if ($err && !$env->{'qspawn.quiet'}) { - $err = join(' ', @{$self->{args}->[0]}).": $err\n"; - $env->{'psgi.errors'}->print($err); - } + _psgi_finish($self, $env); $fh->close if $fh; # async-only }; @@ -92,7 +130,7 @@ sub psgi_return { return if !defined($r) && ($!{EINTR} || $!{EAGAIN}); $parse_hdr->($r, \$buf); }; - my $res; + my $res = delete $env->{'qspawn.response'}; my $async = $env->{'pi-httpd.async'}; my $cb = sub { my $r = $rd_hdr->() or return; @@ -118,17 +156,21 @@ sub psgi_return { } }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); + my $start_cb = sub { # may run later, much later... + ($rpipe) = @_; + if ($async) { + # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) + $async = $async->($rpipe, $cb, $end); + } else { # generic PSGI + $cb->() while $rd_hdr; + } + }; + + return $self->start($limiter, $start_cb) if $res; + sub { ($res) = @_; - $self->start($limiter, sub { # may run later, much later... - ($rpipe) = @_; - if ($async) { - # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) - $async = $async->($rpipe, $cb, $end); - } else { # generic PSGI - $cb->() while $rd_hdr; - } - }); + $self->start($limiter, $start_cb); }; } @@ -146,4 +188,21 @@ sub new { }, $class; } +# captures everything into a buffer and executes a callback when done +package PublicInbox::Qspawn::Qx; +use strict; +use warnings; + +sub new { + my ($class) = @_; + my $buf = ''; + bless \$buf, $class; +} + +# called by PublicInbox::HTTPD::Async ($fh->write) +sub write { + ${$_[0]} .= $_[1]; + undef; +} + 1; -- EW