about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2017-01-18 07:27:03 +0000
committerEric Wong <e@80x24.org>2019-01-22 03:40:18 +0000
commita6c9e09129dfc34fbd1b9bc757dc201fb9ff77c3 (patch)
treecc636ed1ec6535ad56d247f71ce5b87d1c308b43 /lib
parent891c6f0d5c47e93166cfd8b29785ebb1f0545119 (diff)
downloadpublic-inbox-a6c9e09129dfc34fbd1b9bc757dc201fb9ff77c3.tar.gz
This new asynchronous API, will allow us to take
advantage of non-blocking I/O from even small commands;
as those may still need to wait for slow operations.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/Qspawn.pm89
1 files changed, 74 insertions, 15 deletions
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 96fbf38d..6859a8af 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -61,6 +61,48 @@ sub start {
         }
 }
 
+sub _psgi_finish ($$) {
+        my ($self, $env) = @_;
+        my $err = $self->finish;
+        if ($err && !$env->{'qspawn.quiet'}) {
+                $err = join(' ', @{$self->{args}->[0]}).": $err\n";
+                $env->{'psgi.errors'}->print($err);
+        }
+}
+
+sub psgi_qx {
+        my ($self, $env, $limiter, $qx_cb) = @_;
+        my $qx = PublicInbox::Qspawn::Qx->new;
+        my $end = sub {
+                _psgi_finish($self, $env);
+                eval { $qx_cb->($qx) };
+                $qx = undef;
+        };
+        my $rpipe;
+        my $async = $env->{'pi-httpd.async'};
+        my $cb = sub {
+                my $r = sysread($rpipe, my $buf, 8192);
+                if ($async) {
+                        $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
+                } elsif (defined $r) {
+                        $r ? $qx->write($buf) : $end->();
+                } else {
+                        return if $!{EAGAIN} || $!{EINTR}; # loop again
+                        $end->();
+                }
+        };
+        $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+        $self->start($limiter, sub { # may run later, much later...
+                ($rpipe) = @_;
+                if ($async) {
+                # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
+                        $async = $async->($rpipe, $cb, $end);
+                } else { # generic PSGI
+                        $cb->() while $qx;
+                }
+        });
+}
+
 # create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
 sub filter_fh ($$) {
         my ($fh, $filter) = @_;
@@ -78,11 +120,7 @@ sub psgi_return {
         my ($self, $env, $limiter, $parse_hdr) = @_;
         my ($fh, $rpipe);
         my $end = sub {
-                my $err = $self->finish;
-                if ($err && !$env->{'qspawn.quiet'}) {
-                        $err = join(' ', @{$self->{args}->[0]}).": $err\n";
-                        $env->{'psgi.errors'}->print($err);
-                }
+                _psgi_finish($self, $env);
                 $fh->close if $fh; # async-only
         };
 
@@ -92,7 +130,7 @@ sub psgi_return {
                 return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
                 $parse_hdr->($r, \$buf);
         };
-        my $res;
+        my $res = delete $env->{'qspawn.response'};
         my $async = $env->{'pi-httpd.async'};
         my $cb = sub {
                 my $r = $rd_hdr->() or return;
@@ -118,17 +156,21 @@ sub psgi_return {
                 }
         };
         $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+        my $start_cb = sub { # may run later, much later...
+                ($rpipe) = @_;
+                if ($async) {
+                        # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
+                        $async = $async->($rpipe, $cb, $end);
+                } else { # generic PSGI
+                        $cb->() while $rd_hdr;
+                }
+        };
+
+        return $self->start($limiter, $start_cb) if $res;
+
         sub {
                 ($res) = @_;
-                $self->start($limiter, sub { # may run later, much later...
-                        ($rpipe) = @_;
-                        if ($async) {
-                        # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
-                                $async = $async->($rpipe, $cb, $end);
-                        } else { # generic PSGI
-                                $cb->() while $rd_hdr;
-                        }
-                });
+                $self->start($limiter, $start_cb);
         };
 }
 
@@ -146,4 +188,21 @@ sub new {
         }, $class;
 }
 
+# captures everything into a buffer and executes a callback when done
+package PublicInbox::Qspawn::Qx;
+use strict;
+use warnings;
+
+sub new {
+        my ($class) = @_;
+        my $buf = '';
+        bless \$buf, $class;
+}
+
+# called by PublicInbox::HTTPD::Async ($fh->write)
+sub write {
+        ${$_[0]} .= $_[1];
+        undef;
+}
+
 1;