From 9eac193c72e1380972f3589cb6b4f36b79183233 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 5 Jul 2020 23:27:37 +0000 Subject: wwwatomstream: support async blob fetch This allows -httpd to handle other requests while waiting for git to retrieve and decode blobs. We'll also break apart t/psgi_v2.t further to ensure tests run against -httpd in addition to generic PSGI testing. Using xt/httpd-async-stream.t to test against clones of meta@public-inbox.org shows a 10-12% performance improvement with the following env: TEST_JOBS=1000 TEST_CURL_OPT=--compressed TEST_ENDPOINT=new.atom --- lib/PublicInbox/WwwAtomStream.pm | 74 +++++++++++++++++++++++++++++++++++----- 1 file changed, 65 insertions(+), 9 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/WwwAtomStream.pm b/lib/PublicInbox/WwwAtomStream.pm index 9f322d38..58330922 100644 --- a/lib/PublicInbox/WwwAtomStream.pm +++ b/lib/PublicInbox/WwwAtomStream.pm @@ -15,9 +15,11 @@ use PublicInbox::Address; use PublicInbox::Hval qw(ascii_html mid_href); use PublicInbox::MsgTime qw(msg_timestamp); use PublicInbox::GzipFilter qw(gzf_maybe); +use PublicInbox::GitAsyncCat; -# called by PSGI server after getline: -sub close {} +# called by generic PSGI server after getline, +# and also by PublicInbox::HTTP::close +sub close { !!delete($_[0]->{http_out}) } sub new { my ($class, $ctx, $cb) = @_; @@ -27,12 +29,62 @@ sub new { bless $ctx, $class; } +# called by PublicInbox::DS::write +sub atom_async_next { + my ($http) = @_; # PublicInbox::HTTP + atom_async_step($http->{forward}); +} + +# this is public-inbox-httpd-specific +sub atom_blob_cb { # git->cat_async callback + my ($bref, $oid, $type, $size, $ctx) = @_; + my $http = $ctx->{env}->{'psgix.io'} or return; # client abort + my $smsg = delete $ctx->{smsg} or die 'BUG: no smsg'; + if (!defined($oid)) { + # it's possible to have TOCTOU if an admin runs + # public-inbox-(edit|purge), just move onto the next message + return $http->next_step(\&atom_async_next); + } else { + $smsg->{blob} eq $oid or die "BUG: $smsg->{blob} != $oid"; + } + my $buf = feed_entry($ctx, $smsg, PublicInbox::Eml->new($bref)); + if (my $gzf = $ctx->{gzf}) { + $buf = $gzf->translate($buf); + } + # PublicInbox::HTTP::{Chunked,Identity}::write + $ctx->{http_out}->write($buf); + + $http->next_step(\&atom_async_next); +} + +sub atom_async_step { # this is public-inbox-httpd-specific + my ($ctx) = @_; + if (my $smsg = $ctx->{smsg} = $ctx->{cb}->($ctx)) { + git_async_cat($ctx->{-inbox}->git, $smsg->{blob}, + \&atom_blob_cb, $ctx); + } elsif (my $out = delete $ctx->{http_out}) { + if (my $gzf = delete $ctx->{gzf}) { + $out->write($gzf->zflush); + } + $out->close; + } +} + sub response { my ($class, $ctx, $code, $cb) = @_; - my $h = [ 'Content-Type' => 'application/atom+xml' ]; + my $res_hdr = [ 'Content-Type' => 'application/atom+xml' ]; $class->new($ctx, $cb); - $ctx->{gzf} = gzf_maybe($h, $ctx->{env}); - [ $code, $h, $ctx ] + $ctx->{gzf} = gzf_maybe($res_hdr, $ctx->{env}); + if ($ctx->{env}->{'pi-httpd.async'}) { + sub { + my ($wcb) = @_; # -httpd provided write callback + $ctx->{http_out} = $wcb->([200, $res_hdr]); + $ctx->{env}->{'psgix.io'}->{forward} = $ctx; + atom_async_step($ctx); # start stepping + }; + } else { + [ $code, $res_hdr, $ctx ]; + } } # called once for each message by PSGI server @@ -40,8 +92,13 @@ sub getline { my ($self) = @_; my $buf = do { if (my $middle = $self->{cb}) { - my $smsg = $middle->($self); - feed_entry($self, $smsg) if $smsg; + if (my $smsg = $middle->($self)) { + my $eml = $self->{-inbox}->smsg_eml($smsg) or + return ''; + feed_entry($self, $smsg, $eml); + } else { + undef; + } } } // (delete($self->{cb}) ? '' : undef); @@ -108,8 +165,7 @@ sub atom_header { # returns undef or string sub feed_entry { - my ($ctx, $smsg) = @_; - my $eml = $ctx->{-inbox}->smsg_eml($smsg) or return ''; + my ($ctx, $smsg, $eml) = @_; my $hdr = $eml->header_obj; my $mid = $smsg->{mid}; my $irt = PublicInbox::View::in_reply_to($hdr); -- cgit v1.2.3-24-ge0c7