about summary refs log tree commit homepage
path: root/lib/PublicInbox/Qspawn.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2019-01-30 07:40:35 +0000
committerEric Wong <e@80x24.org>2019-01-30 07:40:35 +0000
commitcf439203c31836e4c48f632f902aa58db6924972 (patch)
treea46bf739c5318db857783a1b2773ed51ad2f0fde /lib/PublicInbox/Qspawn.pm
parent41bb8a962d4d2d8f6e7ec9988908aaf7d55f09c7 (diff)
parent11f984fa36647b0ab107596bcec2dd9b963f1379 (diff)
downloadpublic-inbox-cf439203c31836e4c48f632f902aa58db6924972.tar.gz
* origin/viewvcs: (66 commits)
  solvergit: deal with alternative diff prefixes
  solvergit: extract mode from diff headers properly
  solvergit: avoid "Wide character" warnings
  solvergit: do not show full path names to "git apply"
  css/216dark: add comments and tweak highlight colors
  viewvcs: avoid segfault with highlight.pm at shutdown
  solvergit: do not solve blobs twice
  t/check-www-inbox: disable history
  t/check-www-inbox: don't follow mboxes
  t/check-www-inbox: replace IPC::Run with PublicInbox::Spawn
  hval: add src_escape for highlight post-processing
  viewvcs: wire up syntax-highlighting for blobs
  hlmod: disable enclosing <pre> tag
  t/hl_mod: extra check to ensure we escape HTML
  wwwhighlight: read_in_full returns undef on errors
  solver: crank up max patches to 9999
  viewvcs: do not show final error message twice
  qspawn: decode $? for user-friendliness
  solver: reduce "git apply" invocations
  solver: hold patches in temporary directory
  ...
Diffstat (limited to 'lib/PublicInbox/Qspawn.pm')
-rw-r--r--lib/PublicInbox/Qspawn.pm143
1 files changed, 142 insertions, 1 deletions
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 3500f8a4..913fac81 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -9,6 +9,8 @@ package PublicInbox::Qspawn;
 use strict;
 use warnings;
 use PublicInbox::Spawn qw(popen_rd);
+require Plack::Util;
+my $def_limiter;
 
 sub new ($$$;) {
         my ($class, $cmd, $env, $opt) = @_;
@@ -28,13 +30,22 @@ sub _do_spawn {
         $cb->($self->{rpipe});
 }
 
+sub child_err ($) {
+        my ($child_error) = @_; # typically $?
+        my $exitstatus = ($child_error >> 8) or return;
+        my $sig = $child_error & 127;
+        my $msg = "exit status=$exitstatus";
+        $msg .= " signal=$sig" if $sig;
+        $msg;
+}
+
 sub finish ($) {
         my ($self) = @_;
         my $limiter = $self->{limiter};
         my $running;
         if (delete $self->{rpipe}) {
                 my $pid = delete $self->{pid};
-                $self->{err} = $pid == waitpid($pid, 0) ? $? :
+                $self->{err} = $pid == waitpid($pid, 0) ? child_err($?) :
                                 "PID:$pid still running?";
                 $running = --$limiter->{running};
         }
@@ -59,6 +70,119 @@ sub start {
         }
 }
 
+sub _psgi_finish ($$) {
+        my ($self, $env) = @_;
+        my $err = $self->finish;
+        if ($err && !$env->{'qspawn.quiet'}) {
+                $err = join(' ', @{$self->{args}->[0]}).": $err\n";
+                $env->{'psgi.errors'}->print($err);
+        }
+}
+
+sub psgi_qx {
+        my ($self, $env, $limiter, $qx_cb) = @_;
+        my $qx = PublicInbox::Qspawn::Qx->new;
+        my $end = sub {
+                _psgi_finish($self, $env);
+                eval { $qx_cb->($qx) };
+                $qx = undef;
+        };
+        my $rpipe;
+        my $async = $env->{'pi-httpd.async'};
+        my $cb = sub {
+                my $r = sysread($rpipe, my $buf, 8192);
+                if ($async) {
+                        $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
+                } elsif (defined $r) {
+                        $r ? $qx->write($buf) : $end->();
+                } else {
+                        return if $!{EAGAIN} || $!{EINTR}; # loop again
+                        $end->();
+                }
+        };
+        $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+        $self->start($limiter, sub { # may run later, much later...
+                ($rpipe) = @_;
+                if ($async) {
+                # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
+                        $async = $async->($rpipe, $cb, $end);
+                } else { # generic PSGI
+                        $cb->() while $qx;
+                }
+        });
+}
+
+# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
+sub filter_fh ($$) {
+        my ($fh, $filter) = @_;
+        Plack::Util::inline_object(
+                close => sub {
+                        $fh->write($filter->(undef));
+                        $fh->close;
+                },
+                write => sub {
+                        $fh->write($filter->($_[0]));
+                });
+}
+
+sub psgi_return {
+        my ($self, $env, $limiter, $parse_hdr) = @_;
+        my ($fh, $rpipe);
+        my $end = sub {
+                _psgi_finish($self, $env);
+                $fh->close if $fh; # async-only
+        };
+
+        my $buf = '';
+        my $rd_hdr = sub {
+                my $r = sysread($rpipe, $buf, 1024, length($buf));
+                return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
+                $parse_hdr->($r, \$buf);
+        };
+        my $res = delete $env->{'qspawn.response'};
+        my $async = $env->{'pi-httpd.async'};
+        my $cb = sub {
+                my $r = $rd_hdr->() or return;
+                $rd_hdr = undef;
+                my $filter = delete $env->{'qspawn.filter'};
+                if (scalar(@$r) == 3) { # error
+                        if ($async) {
+                                $async->close; # calls rpipe->close and $end
+                        } else {
+                                $rpipe->close;
+                                $end->();
+                        }
+                        $res->($r);
+                } elsif ($async) {
+                        $fh = $res->($r); # scalar @$r == 2
+                        $fh = filter_fh($fh, $filter) if $filter;
+                        $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
+                } else { # for synchronous PSGI servers
+                        require PublicInbox::GetlineBody;
+                        $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
+                                                                $buf, $filter);
+                        $res->($r);
+                }
+        };
+        $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+        my $start_cb = sub { # may run later, much later...
+                ($rpipe) = @_;
+                if ($async) {
+                        # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
+                        $async = $async->($rpipe, $cb, $end);
+                } else { # generic PSGI
+                        $cb->() while $rd_hdr;
+                }
+        };
+
+        return $self->start($limiter, $start_cb) if $res;
+
+        sub {
+                ($res) = @_;
+                $self->start($limiter, $start_cb);
+        };
+}
+
 package PublicInbox::Qspawn::Limiter;
 use strict;
 use warnings;
@@ -73,4 +197,21 @@ sub new {
         }, $class;
 }
 
+# captures everything into a buffer and executes a callback when done
+package PublicInbox::Qspawn::Qx;
+use strict;
+use warnings;
+
+sub new {
+        my ($class) = @_;
+        my $buf = '';
+        bless \$buf, $class;
+}
+
+# called by PublicInbox::HTTPD::Async ($fh->write)
+sub write {
+        ${$_[0]} .= $_[1];
+        undef;
+}
+
 1;