about summary refs log tree commit homepage
path: root/lib/PublicInbox/Qspawn.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/Qspawn.pm')
-rw-r--r--lib/PublicInbox/Qspawn.pm143
1 files changed, 142 insertions, 1 deletions
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 3500f8a4..913fac81 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -9,6 +9,8 @@ package PublicInbox::Qspawn;
 use strict;
 use warnings;
 use PublicInbox::Spawn qw(popen_rd);
+require Plack::Util;
+my $def_limiter;
 
 sub new ($$$;) {
         my ($class, $cmd, $env, $opt) = @_;
@@ -28,13 +30,22 @@ sub _do_spawn {
         $cb->($self->{rpipe});
 }
 
+sub child_err ($) {
+        my ($child_error) = @_; # typically $?
+        my $exitstatus = ($child_error >> 8) or return;
+        my $sig = $child_error & 127;
+        my $msg = "exit status=$exitstatus";
+        $msg .= " signal=$sig" if $sig;
+        $msg;
+}
+
 sub finish ($) {
         my ($self) = @_;
         my $limiter = $self->{limiter};
         my $running;
         if (delete $self->{rpipe}) {
                 my $pid = delete $self->{pid};
-                $self->{err} = $pid == waitpid($pid, 0) ? $? :
+                $self->{err} = $pid == waitpid($pid, 0) ? child_err($?) :
                                 "PID:$pid still running?";
                 $running = --$limiter->{running};
         }
@@ -59,6 +70,119 @@ sub start {
         }
 }
 
+sub _psgi_finish ($$) {
+        my ($self, $env) = @_;
+        my $err = $self->finish;
+        if ($err && !$env->{'qspawn.quiet'}) {
+                $err = join(' ', @{$self->{args}->[0]}).": $err\n";
+                $env->{'psgi.errors'}->print($err);
+        }
+}
+
+sub psgi_qx {
+        my ($self, $env, $limiter, $qx_cb) = @_;
+        my $qx = PublicInbox::Qspawn::Qx->new;
+        my $end = sub {
+                _psgi_finish($self, $env);
+                eval { $qx_cb->($qx) };
+                $qx = undef;
+        };
+        my $rpipe;
+        my $async = $env->{'pi-httpd.async'};
+        my $cb = sub {
+                my $r = sysread($rpipe, my $buf, 8192);
+                if ($async) {
+                        $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
+                } elsif (defined $r) {
+                        $r ? $qx->write($buf) : $end->();
+                } else {
+                        return if $!{EAGAIN} || $!{EINTR}; # loop again
+                        $end->();
+                }
+        };
+        $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+        $self->start($limiter, sub { # may run later, much later...
+                ($rpipe) = @_;
+                if ($async) {
+                # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
+                        $async = $async->($rpipe, $cb, $end);
+                } else { # generic PSGI
+                        $cb->() while $qx;
+                }
+        });
+}
+
+# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
+sub filter_fh ($$) {
+        my ($fh, $filter) = @_;
+        Plack::Util::inline_object(
+                close => sub {
+                        $fh->write($filter->(undef));
+                        $fh->close;
+                },
+                write => sub {
+                        $fh->write($filter->($_[0]));
+                });
+}
+
+sub psgi_return {
+        my ($self, $env, $limiter, $parse_hdr) = @_;
+        my ($fh, $rpipe);
+        my $end = sub {
+                _psgi_finish($self, $env);
+                $fh->close if $fh; # async-only
+        };
+
+        my $buf = '';
+        my $rd_hdr = sub {
+                my $r = sysread($rpipe, $buf, 1024, length($buf));
+                return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
+                $parse_hdr->($r, \$buf);
+        };
+        my $res = delete $env->{'qspawn.response'};
+        my $async = $env->{'pi-httpd.async'};
+        my $cb = sub {
+                my $r = $rd_hdr->() or return;
+                $rd_hdr = undef;
+                my $filter = delete $env->{'qspawn.filter'};
+                if (scalar(@$r) == 3) { # error
+                        if ($async) {
+                                $async->close; # calls rpipe->close and $end
+                        } else {
+                                $rpipe->close;
+                                $end->();
+                        }
+                        $res->($r);
+                } elsif ($async) {
+                        $fh = $res->($r); # scalar @$r == 2
+                        $fh = filter_fh($fh, $filter) if $filter;
+                        $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
+                } else { # for synchronous PSGI servers
+                        require PublicInbox::GetlineBody;
+                        $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
+                                                                $buf, $filter);
+                        $res->($r);
+                }
+        };
+        $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+        my $start_cb = sub { # may run later, much later...
+                ($rpipe) = @_;
+                if ($async) {
+                        # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
+                        $async = $async->($rpipe, $cb, $end);
+                } else { # generic PSGI
+                        $cb->() while $rd_hdr;
+                }
+        };
+
+        return $self->start($limiter, $start_cb) if $res;
+
+        sub {
+                ($res) = @_;
+                $self->start($limiter, $start_cb);
+        };
+}
+
 package PublicInbox::Qspawn::Limiter;
 use strict;
 use warnings;
@@ -73,4 +197,21 @@ sub new {
         }, $class;
 }
 
+# captures everything into a buffer and executes a callback when done
+package PublicInbox::Qspawn::Qx;
+use strict;
+use warnings;
+
+sub new {
+        my ($class) = @_;
+        my $buf = '';
+        bless \$buf, $class;
+}
+
+# called by PublicInbox::HTTPD::Async ($fh->write)
+sub write {
+        ${$_[0]} .= $_[1];
+        undef;
+}
+
 1;