From 9eac193c72e1380972f3589cb6b4f36b79183233 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 5 Jul 2020 23:27:37 +0000 Subject: wwwatomstream: support async blob fetch This allows -httpd to handle other requests while waiting for git to retrieve and decode blobs. We'll also break apart t/psgi_v2.t further to ensure tests run against -httpd in addition to generic PSGI testing. Using xt/httpd-async-stream.t to test against clones of meta@public-inbox.org shows a 10-12% performance improvement with the following env: TEST_JOBS=1000 TEST_CURL_OPT=--compressed TEST_ENDPOINT=new.atom --- Documentation/mknews.perl | 7 ++-- lib/PublicInbox/WwwAtomStream.pm | 74 +++++++++++++++++++++++++++++----- t/psgi_v2.t | 86 +++++++++++++++++++++++++--------------- 3 files changed, 123 insertions(+), 44 deletions(-) diff --git a/Documentation/mknews.perl b/Documentation/mknews.perl index ba049d9e..2d22d147 100755 --- a/Documentation/mknews.perl +++ b/Documentation/mknews.perl @@ -147,9 +147,10 @@ EOF } sub mime2atom { - my ($out, $astream, $mime, $ctx) = @_; - my $smsg = bless { mime => $mime }, 'PublicInbox::Smsg'; - if (defined(my $str = $astream->feed_entry($smsg))) { + my ($out, $astream, $eml, $ctx) = @_; + my $smsg = bless {}, 'PublicInbox::Smsg'; + $smsg->populate($eml); + if (defined(my $str = $astream->feed_entry($smsg, $eml))) { print $out $str or die; } } diff --git a/lib/PublicInbox/WwwAtomStream.pm b/lib/PublicInbox/WwwAtomStream.pm index 9f322d38..58330922 100644 --- a/lib/PublicInbox/WwwAtomStream.pm +++ b/lib/PublicInbox/WwwAtomStream.pm @@ -15,9 +15,11 @@ use PublicInbox::Address; use PublicInbox::Hval qw(ascii_html mid_href); use PublicInbox::MsgTime qw(msg_timestamp); use PublicInbox::GzipFilter qw(gzf_maybe); +use PublicInbox::GitAsyncCat; -# called by PSGI server after getline: -sub close {} +# called by generic PSGI server after getline, +# and also by PublicInbox::HTTP::close +sub close { !!delete($_[0]->{http_out}) } sub new { my ($class, $ctx, $cb) = @_; @@ -27,12 +29,62 @@ sub new { bless $ctx, $class; } +# called by PublicInbox::DS::write +sub atom_async_next { + my ($http) = @_; # PublicInbox::HTTP + atom_async_step($http->{forward}); +} + +# this is public-inbox-httpd-specific +sub atom_blob_cb { # git->cat_async callback + my ($bref, $oid, $type, $size, $ctx) = @_; + my $http = $ctx->{env}->{'psgix.io'} or return; # client abort + my $smsg = delete $ctx->{smsg} or die 'BUG: no smsg'; + if (!defined($oid)) { + # it's possible to have TOCTOU if an admin runs + # public-inbox-(edit|purge), just move onto the next message + return $http->next_step(\&atom_async_next); + } else { + $smsg->{blob} eq $oid or die "BUG: $smsg->{blob} != $oid"; + } + my $buf = feed_entry($ctx, $smsg, PublicInbox::Eml->new($bref)); + if (my $gzf = $ctx->{gzf}) { + $buf = $gzf->translate($buf); + } + # PublicInbox::HTTP::{Chunked,Identity}::write + $ctx->{http_out}->write($buf); + + $http->next_step(\&atom_async_next); +} + +sub atom_async_step { # this is public-inbox-httpd-specific + my ($ctx) = @_; + if (my $smsg = $ctx->{smsg} = $ctx->{cb}->($ctx)) { + git_async_cat($ctx->{-inbox}->git, $smsg->{blob}, + \&atom_blob_cb, $ctx); + } elsif (my $out = delete $ctx->{http_out}) { + if (my $gzf = delete $ctx->{gzf}) { + $out->write($gzf->zflush); + } + $out->close; + } +} + sub response { my ($class, $ctx, $code, $cb) = @_; - my $h = [ 'Content-Type' => 'application/atom+xml' ]; + my $res_hdr = [ 'Content-Type' => 'application/atom+xml' ]; $class->new($ctx, $cb); - $ctx->{gzf} = gzf_maybe($h, $ctx->{env}); - [ $code, $h, $ctx ] + $ctx->{gzf} = gzf_maybe($res_hdr, $ctx->{env}); + if ($ctx->{env}->{'pi-httpd.async'}) { + sub { + my ($wcb) = @_; # -httpd provided write callback + $ctx->{http_out} = $wcb->([200, $res_hdr]); + $ctx->{env}->{'psgix.io'}->{forward} = $ctx; + atom_async_step($ctx); # start stepping + }; + } else { + [ $code, $res_hdr, $ctx ]; + } } # called once for each message by PSGI server @@ -40,8 +92,13 @@ sub getline { my ($self) = @_; my $buf = do { if (my $middle = $self->{cb}) { - my $smsg = $middle->($self); - feed_entry($self, $smsg) if $smsg; + if (my $smsg = $middle->($self)) { + my $eml = $self->{-inbox}->smsg_eml($smsg) or + return ''; + feed_entry($self, $smsg, $eml); + } else { + undef; + } } } // (delete($self->{cb}) ? '' : undef); @@ -108,8 +165,7 @@ sub atom_header { # returns undef or string sub feed_entry { - my ($ctx, $smsg) = @_; - my $eml = $ctx->{-inbox}->smsg_eml($smsg) or return ''; + my ($ctx, $smsg, $eml) = @_; my $hdr = $eml->header_obj; my $mid = $smsg->{mid}; my $irt = PublicInbox::View::in_reply_to($hdr); diff --git a/t/psgi_v2.t b/t/psgi_v2.t index 90a710a4..4ab9601c 100644 --- a/t/psgi_v2.t +++ b/t/psgi_v2.t @@ -14,6 +14,36 @@ use_ok($_) for (qw(HTTP::Request::Common Plack::Test)); use_ok 'PublicInbox::WWW'; use_ok 'PublicInbox::V2Writable'; my ($inboxdir, $for_destroy) = tmpdir(); +my $cfgpath = "$inboxdir/$$.config"; +SKIP: { + require_mods(qw(Plack::Test::ExternalServer), 1); + open my $fh, '>', $cfgpath or BAIL_OUT $!; + print $fh < $cfgpath }; + my $sock = tcp_server() or die; + my ($out, $err) = map { "$inboxdir/std$_.log" } qw(out err); + my $cmd = [ qw(-httpd -W0), "--stdout=$out", "--stderr=$err" ]; + my $td = start_script($cmd, $env, { 3 => $sock }); + my ($h, $p) = ($sock->sockhost, $sock->sockport); + local $ENV{PLACK_TEST_EXTERNALSERVER_URI} = "http://$h:$p"; + Plack::Test::ExternalServer::test_psgi(client => $client); + $td->join('TERM'); + open my $fh, '<', $err or BAIL_OUT $!; + is(do { local $/; <$fh> }, '', 'no errors'); + } +}; + my $ibx = { inboxdir => $inboxdir, name => 'test-v2writable', @@ -60,7 +90,7 @@ EOF my $config = PublicInbox::Config->new(\$cfg); my $www = PublicInbox::WWW->new($config); my ($res, $raw, @from_); -test_psgi(sub { $www->call(@_) }, sub { +my $client0 = sub { my ($cb) = @_; $res = $cb->(GET('/v2test/description')); like($res->content, qr!\$INBOX_DIR/description missing!, @@ -90,7 +120,9 @@ test_psgi(sub { $www->call(@_) }, sub { @bodies = ($res->content =~ /^(hello [^<]+)$/mg); is_deeply(\@bodies, [ "hello world!\n", "hello world\n" ], 'new.html ordering is chronological'); -}); +}; +test_psgi(sub { $www->call(@_) }, $client0); +$run_httpd->($client0, 9); $mime->header_set('Message-Id', 'a-mid@b'); $mime->body_set("hello ghosts\n"); @@ -103,7 +135,7 @@ $mids = mids($mime->header_obj); my $third = $mids->[-1]; $im->done; -my $client = sub { +my $client1 = sub { my ($cb) = @_; $res = $cb->(GET("/v2test/$third/raw")); $raw = $res->content; @@ -196,32 +228,10 @@ my $client = sub { like($raw, qr/\b3\+ messages\b/, 'thread overview shown'); }; -test_psgi(sub { $www->call(@_) }, $client); -SKIP: { - require_mods(qw(Plack::Test::ExternalServer), 37); - my $cfgpath = "$inboxdir/$$.config"; - open my $fh, '>', $cfgpath or BAIL_OUT $!; - print $fh < $cfgpath }; - my $sock = tcp_server() or die; - my ($out, $err) = map { "$inboxdir/std$_.log" } qw(out err); - my $cmd = [ qw(-httpd -W0), "--stdout=$out", "--stderr=$err" ]; - my $td = start_script($cmd, $env, { 3 => $sock }); - my ($h, $p) = ($sock->sockhost, $sock->sockport); - local $ENV{PLACK_TEST_EXTERNALSERVER_URI} = "http://$h:$p"; - Plack::Test::ExternalServer::test_psgi(client => $client); - $td->join('TERM'); - open $fh, '<', $err or BAIL_OUT $!; - is(do { local $/; <$fh> }, '', 'no errors'); -}; +test_psgi(sub { $www->call(@_) }, $client1); +$run_httpd->($client1, 38); -test_psgi(sub { $www->call(@_) }, sub { - my ($cb) = @_; +{ my $exp = [ qw( ) ]; $mime->header_set('Message-Id', @$exp); $mime->header_set('Subject', '4th dupe'); @@ -230,10 +240,12 @@ test_psgi(sub { $www->call(@_) }, sub { $im->done; my @h = $mime->header('Message-ID'); is_deeply($exp, \@h, 'reused existing Message-ID'); - $config->each_inbox(sub { $_[0]->search->reopen }); +} - $res = $cb->(GET('/v2test/new.atom')); +my $client2 = sub { + my ($cb) = @_; + my $res = $cb->(GET('/v2test/new.atom')); my @ids = ($res->content =~ m!urn:uuid:([^<]+)!sg); my %ids; $ids{$_}++ for @ids; @@ -256,7 +268,11 @@ test_psgi(sub { $www->call(@_) }, sub { is($res->code, 200, 'got info refs for dumb clones w/ .git suffix'); $res = $cb->(GET('/v2test/info/refs')); is($res->code, 404, 'v2 git URL w/o shard fails'); +}; +test_psgi(sub { $www->call(@_) }, $client2); +$run_httpd->($client2, 8); +{ # ensure conflicted attachments can be resolved foreach my $body (qw(old new)) { $mime = eml_load "t/psgi_v2-$body.eml"; @@ -264,7 +280,11 @@ test_psgi(sub { $www->call(@_) }, sub { } $im->done; $config->each_inbox(sub { $_[0]->search->reopen }); - $res = $cb->(GET('/v2test/a@dup/')); +} + +my $client3 = sub { + my ($cb) = @_; + my $res = $cb->(GET('/v2test/a@dup/')); my @links = ($res->content =~ m!"\.\./([^/]+/2-attach\.txt)\"!g); is(scalar(@links), 2, 'both attachment links exist'); isnt($links[0], $links[1], 'attachment links are different'); @@ -276,7 +296,9 @@ test_psgi(sub { $www->call(@_) }, sub { } $res = $cb->(GET('/v2test/?t=1970'.'01'.'01'.'000000')); is($res->code, 404, '404 for out-of-range t= param'); -}); +}; +test_psgi(sub { $www->call(@_) }, $client3); +$run_httpd->($client3, 4); done_testing(); -- cgit v1.2.3-24-ge0c7