From 261182531d33e980e4316ca2a7b9ade603e85cf2 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 18 Jan 2017 07:27:03 +0000 Subject: repobrowse: git snapshot waits for all commands asynchronously This new asynchronous API, psgi_qx, will allow us to take advantage of non-blocking I/O from even small commands; as those may still need to wait for slow operations. --- lib/PublicInbox/Qspawn.pm | 94 +++++++++++++++++++++++++++----- lib/PublicInbox/RepobrowseGitSnapshot.pm | 65 ++++++++++++++-------- 2 files changed, 122 insertions(+), 37 deletions(-) diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 11645db5..da770cc0 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -63,6 +63,48 @@ sub start { } } +sub _psgi_finish ($$) { + my ($self, $env) = @_; + my $err = $self->finish; + if ($err && !$env->{'qspawn.quiet'}) { + $err = join(' ', @{$self->{args}->[0]}).": $err\n"; + $env->{'psgi.errors'}->print($err); + } +} + +sub psgi_qx { + my ($self, $env, $limiter, $qx_cb) = @_; + my $qx = PublicInbox::Qspawn::Qx->new($qx_cb); + my $end = sub { + _psgi_finish($self, $env); + $qx->close; + $qx = undef; + }; + my $rpipe; + my $async = $env->{'pi-httpd.async'}; + my $cb = sub { + my $r = sysread($rpipe, my $buf, 8192); + if ($async) { + $async->async_pass($env->{'psgix.io'}, $qx, \$buf); + } elsif (defined $r) { + $r ? $qx->write($buf) : $end->(); + } else { + return if $!{EAGAIN} || $!{EINTR}; # loop again + $end->(); + } + }; + $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); + $self->start($limiter, sub { # may run later, much later... + ($rpipe) = @_; + if ($async) { + # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) + $async = $async->($rpipe, $cb, $end); + } else { # generic PSGI + $cb->() while $qx; + } + }); +} + # create a filter for "push"-based streaming PSGI writes used by HTTPD::Async sub filter_fh ($$) { my ($fh, $filter) = @_; @@ -80,11 +122,7 @@ sub psgi_return { my ($self, $env, $limiter, $parse_hdr) = @_; my ($fh, $rpipe); my $end = sub { - my $err = $self->finish; - if ($err && !$env->{'qspawn.quiet'}) { - $err = join(' ', @{$self->{args}->[0]}).": $err\n"; - $env->{'psgi.errors'}->print($err); - } + _psgi_finish($self, $env); $fh->close if $fh; # async-only }; @@ -94,7 +132,7 @@ sub psgi_return { return if !defined($r) && ($!{EINTR} || $!{EAGAIN}); $parse_hdr->($r, \$buf); }; - my $res; + my $res = delete $env->{'qspawn.response'}; my $async = $env->{'pi-httpd.async'}; my $cb = sub { my $r = $rd_hdr->() or return; @@ -120,17 +158,21 @@ sub psgi_return { } }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); + my $start_cb = sub { # may run later, much later... + ($rpipe) = @_; + if ($async) { + # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) + $async = $async->($rpipe, $cb, $end); + } else { # generic PSGI + $cb->() while $rd_hdr; + } + }; + + return $self->start($limiter, $start_cb) if $res; + sub { ($res) = @_; - $self->start($limiter, sub { # may run later, much later... - ($rpipe) = @_; - if ($async) { - # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) - $async = $async->($rpipe, $cb, $end); - } else { # generic PSGI - $cb->() while $rd_hdr; - } - }); + $self->start($limiter, $start_cb); }; } @@ -148,4 +190,26 @@ sub new { }, $class; } +# captures everything into a buffer and executes a callback when done +package PublicInbox::Qspawn::Qx; +use strict; +use warnings; + +sub new { + my ($class, $cb) = @_; + bless [ '', $cb ], $class; +} + +sub write { + $_[0]->[0] .= $_[1]; + undef; +} + +sub close { + my ($self) = @_; + my $cb = $self->[1]; + eval { $cb->(\($self->[0])) }; + undef; +} + 1; diff --git a/lib/PublicInbox/RepobrowseGitSnapshot.pm b/lib/PublicInbox/RepobrowseGitSnapshot.pm index 9e3ff83e..cceb4641 100644 --- a/lib/PublicInbox/RepobrowseGitSnapshot.pm +++ b/lib/PublicInbox/RepobrowseGitSnapshot.pm @@ -46,7 +46,7 @@ sub call_git_snapshot ($$) { # invoked by PublicInbox::RepobrowseBase::call return $self->r(404) if $orig_fn =~ /["\s]/s; return $self->r(404) unless ($ref =~ s/\.($SUFFIX)\z//o); my $fmt = $1; - + my $env = $req->{env}; my $repo_info = $req->{repo_info}; # support disabling certain snapshots types entirely to twart @@ -60,31 +60,52 @@ sub call_git_snapshot ($$) { # invoked by PublicInbox::RepobrowseBase::call return $self->r(404) if $ref =~ /\A-/; my $git = $repo_info->{git}; - my $tree; + my $tree = ''; + my $last_cb = sub { + delete $env->{'repobrowse.tree_cb'}; + delete $env->{'qspawn.quiet'}; + my $pfx = "$repo_info->{snapshot_pfx}-$ref/"; + my $cmd = [ 'git', "--git-dir=$git->{git_dir}", 'archive', + "--prefix=$pfx", "--format=$fmt", $tree ]; + my $rdr = { 2 => $git->err_begin }; + my $qsp = PublicInbox::Qspawn->new($cmd, undef, $rdr); + $qsp->psgi_return($env, undef, sub { + my $r = $_[0]; + return $self->r(500) unless $r; + [ 200, [ 'Content-Type', + $FMT_TYPES{$fmt} || 'application/octet-stream', + 'Content-Disposition', + qq(inline; filename="$orig_fn"), + 'ETag', qq("$tree") ] ]; + }); + }; - # try prefixing "v" or "V" for tag names - foreach my $r ($ref, "v$ref", "V$ref") { - $tree = $git->qx([qw(rev-parse --verify --revs-only), $r], - undef, { 2 => $git->err_begin }); - if (defined $tree) { + my @cmd = ('git', "--git-dir=$git->{git_dir}", + qw(rev-parse --verify --revs-only)); + # try prefixing "v" or "V" for tag names to get the tree + my @refs = ("V$ref", "v$ref", $ref); + $env->{'qspawn.quiet'} = 1; + my $tree_cb = $env->{'repobrowse.tree_cb'} = sub { + my ($ref) = @_; + if (ref($ref) eq 'SCALAR') { + $tree = $$ref; chomp $tree; - last if $tree ne ''; } + return $last_cb->() if $tree ne ''; + unless (scalar(@refs)) { + my $res = delete $env->{'qspawn.response'}; + return $res->($self->r(404)); + } + my $rdr = { 2 => $git->err_begin }; + my $r = pop @refs; + my $qsp = PublicInbox::Qspawn->new([@cmd, $r], undef, $rdr); + $qsp->psgi_qx($env, undef, $env->{'repobrowse.tree_cb'}); + }; + sub { + $env->{'qspawn.response'} = $_[0]; + # kick off the "loop" foreach @refs + $tree_cb->(undef); } - return $self->r(404) if (!defined $tree || $tree eq ''); - - my $pfx = "$repo_info->{snapshot_pfx}-$ref/"; - my $cmd = [ 'git', "--git-dir=$git->{git_dir}", 'archive', - "--prefix=$pfx", "--format=$fmt", $tree ]; - my $qsp = PublicInbox::Qspawn->new($cmd); - $qsp->psgi_return($req->{env}, undef, sub { - my $r = $_[0]; - return $self->r(500) unless $r; - [ 200, [ 'Content-Type', $FMT_TYPES{$fmt} || - 'application/octet-stream', - 'Content-Disposition', qq(inline; filename="$orig_fn"), - 'ETag', qq("$tree") ] ]; - }); } 1; -- cgit v1.2.3-24-ge0c7