user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@yhbt.net>
To: meta@public-inbox.org
Subject: [PATCH 14/43] mboxgz: do asynchronous git blob retrievals
Date: Sun,  5 Jul 2020 23:27:30 +0000	[thread overview]
Message-ID: <20200705232759.3161-15-e@yhbt.net> (raw)
In-Reply-To: <20200705232759.3161-1-e@yhbt.net>

This lets the -httpd worker process make better use of time
instead of waiting for git-cat-file to respond.  With 4 jobs in
the new test case against a clone of
<https://public-inbox.org/meta/>, a speedup of 10-12% is shown.
Even a single job shows a 2-5% improvement on an SSD.
---
 MANIFEST                  |  1 +
 lib/PublicInbox/HTTP.pm   |  7 +++
 lib/PublicInbox/MboxGz.pm | 69 +++++++++++++++++++++++----
 xt/httpd-async-stream.t   | 99 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 167 insertions(+), 9 deletions(-)
 create mode 100644 xt/httpd-async-stream.t

diff --git a/MANIFEST b/MANIFEST
index dcd7a7e5f..9b0f50203 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -368,6 +368,7 @@ xt/cmp-msgview.t
 xt/eml_check_limits.t
 xt/git-http-backend.t
 xt/git_async_cmp.t
+xt/httpd-async-stream.t
 xt/imapd-mbsync-oimap.t
 xt/imapd-validate.t
 xt/mem-imapd-tls.t
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 828174653..5844ef440 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -488,6 +488,13 @@ sub busy () {
 	($self->{rbuf} || exists($self->{env}) || $self->{wbuf});
 }
 
+# runs $cb on the next iteration of the event loop at earliest
+sub next_step {
+	my ($self, $cb) = @_;
+	return unless exists $self->{sock};
+	$self->requeue if 1 == push(@{$self->{wbuf}}, $cb);
+}
+
 # Chunked and Identity packages are used for writing responses.
 # They may be exposed to the PSGI application when the PSGI app
 # returns a CODE ref for "push"-based responses
diff --git a/lib/PublicInbox/MboxGz.pm b/lib/PublicInbox/MboxGz.pm
index 535ef96c9..8c9010afb 100644
--- a/lib/PublicInbox/MboxGz.pm
+++ b/lib/PublicInbox/MboxGz.pm
@@ -6,6 +6,9 @@ use parent 'PublicInbox::GzipFilter';
 use PublicInbox::Eml;
 use PublicInbox::Hval qw/to_filename/;
 use PublicInbox::Mbox;
+use PublicInbox::GitAsyncCat;
+*msg_hdr = \&PublicInbox::Mbox::msg_hdr;
+*msg_body = \&PublicInbox::Mbox::msg_body;
 
 sub new {
 	my ($class, $ctx, $cb) = @_;
@@ -17,33 +20,81 @@ sub new {
 	}, $class;
 }
 
+# this is public-inbox-httpd-specific
+sub mboxgz_blob_cb { # git->cat_async callback
+	my ($bref, $oid, $type, $size, $self) = @_;
+	my $http = $self->{ctx}->{env}->{'psgix.io'} or return; # client abort
+	my $smsg = delete $self->{smsg} or die 'BUG: no smsg';
+	if (!defined($oid)) {
+		# it's possible to have TOCTOU if an admin runs
+		# public-inbox-(edit|purge), just move onto the next message
+		return $http->next_step(\&async_next);
+	} else {
+		$smsg->{blob} eq $oid or die "BUG: $smsg->{blob} != $oid";
+	}
+	$self->zmore(msg_hdr($self->{ctx},
+				PublicInbox::Eml->new($bref)->header_obj,
+				$smsg->{mid}));
+
+	# PublicInbox::HTTP::{Chunked,Identity}::write
+	$self->{http_out}->write($self->translate(msg_body($$bref)));
+
+	$http->next_step(\&async_next);
+}
+
+# this is public-inbox-httpd-specific
+sub async_step ($) {
+	my ($self) = @_;
+	if (my $smsg = $self->{smsg} = $self->{cb}->($self->{ctx})) {
+		git_async_cat($self->{ctx}->{-inbox}->git, $smsg->{blob},
+				\&mboxgz_blob_cb, $self);
+	} elsif (my $out = delete $self->{http_out}) {
+		$out->write($self->zflush);
+		$out->close;
+	}
+}
+
+# called by PublicInbox::DS::write
+sub async_next {
+	my ($http) = @_; # PublicInbox::HTTP
+	async_step($http->{forward});
+}
+
+# called by PublicInbox::HTTP::close, or any other PSGI server
+sub close { !!delete($_[0]->{http_out}) }
+
 sub response {
 	my ($class, $ctx, $cb, $fn) = @_;
-	my $body = $class->new($ctx, $cb);
+	my $self = $class->new($ctx, $cb);
 	# http://www.iana.org/assignments/media-types/application/gzip
 	$fn = defined($fn) && $fn ne '' ? to_filename($fn) : 'no-subject';
 	my $h = [ qw(Content-Type application/gzip),
 		'Content-Disposition', "inline; filename=$fn.mbox.gz" ];
-	[ 200, $h, $body ];
+	if ($ctx->{env}->{'pi-httpd.async'}) {
+		sub {
+			my ($wcb) = @_; # -httpd provided write callback
+			$self->{http_out} = $wcb->([200, $h]);
+			$self->{ctx}->{env}->{'psgix.io'}->{forward} = $self;
+			async_step($self); # start stepping
+		};
+	} else { # generic PSGI
+		[ 200, $h, $self ];
+	}
 }
 
-# called by Plack::Util::foreach or similar
+# called by Plack::Util::foreach or similar (generic PSGI)
 sub getline {
 	my ($self) = @_;
 	my $ctx = $self->{ctx} or return;
 	while (my $smsg = $self->{cb}->($ctx)) {
 		my $mref = $ctx->{-inbox}->msg_by_smsg($smsg) or next;
 		my $h = PublicInbox::Eml->new($mref)->header_obj;
-		$self->zmore(
-			PublicInbox::Mbox::msg_hdr($ctx, $h, $smsg->{mid})
-		);
-		return $self->translate(PublicInbox::Mbox::msg_body($$mref));
+		$self->zmore(msg_hdr($ctx, $h, $smsg->{mid}));
+		return $self->translate(msg_body($$mref));
 	}
 	# signal that we're done and can return undef next call:
 	delete $self->{ctx};
 	$self->zflush;
 }
 
-sub close {} # noop
-
 1;
diff --git a/xt/httpd-async-stream.t b/xt/httpd-async-stream.t
new file mode 100644
index 000000000..29bcb6125
--- /dev/null
+++ b/xt/httpd-async-stream.t
@@ -0,0 +1,99 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+# Expensive test to validate compression and TLS.
+use strict;
+use Test::More;
+use PublicInbox::TestCommon;
+use PublicInbox::DS qw(now);
+use PublicInbox::Spawn qw(which popen_rd);
+use Digest::MD5;
+use POSIX qw(_exit);
+my $inboxdir = $ENV{GIANT_INBOX_DIR};
+plan skip_all => "GIANT_INBOX_DIR not defined for $0" unless $inboxdir;
+my $curl = which('curl') or plan skip_all => "curl(1) missing for $0";
+my ($tmpdir, $for_destroy) = tmpdir();
+require_mods(qw(DBD::SQLite));
+my $JOBS = $ENV{TEST_JOBS} // 4;
+diag "TEST_JOBS=$JOBS";
+
+my $make_local_server = sub {
+	my $pi_config = "$tmpdir/config";
+	open my $fh, '>', $pi_config or die "open($pi_config): $!";
+	print $fh <<"" or die "print $pi_config: $!";
+[publicinbox "test"]
+inboxdir = $inboxdir
+address = test\@example.com
+
+	close $fh or die "close($pi_config): $!";
+	my ($out, $err) = ("$tmpdir/out", "$tmpdir/err");
+	for ($out, $err) {
+		open my $fh, '>', $_ or die "truncate: $!";
+	}
+	my $http = tcp_server();
+	my $rdr = { 3 => $http };
+
+	# not using multiple workers, here, since we want to increase
+	# the chance of tripping concurrency bugs within PublicInbox/HTTP*.pm
+	my $cmd = [ '-httpd', "--stdout=$out", "--stderr=$err", '-W0' ];
+	my $host_port = $http->sockhost.':'.$http->sockport;
+	push @$cmd, "-lhttp://$host_port";
+	my $url = "$host_port/test/all.mbox.gz";
+	print STDERR "# CMD ". join(' ', @$cmd). "\n";
+	my $env = { PI_CONFIG => $pi_config };
+	(start_script($cmd, $env, $rdr), $url);
+};
+
+my ($td, $url) = $make_local_server->();
+
+my $do_get_all = sub {
+	my ($job) = @_;
+	local $SIG{__DIE__} = sub { print STDERR $job, ': ', @_; _exit(1) };
+	my $dig = Digest::MD5->new;
+	my ($buf, $nr);
+	my $bytes = 0;
+	my $t0 = now();
+	my ($rd, $pid) = popen_rd([$curl, qw(-HHost:example.com -sSf), $url]);
+	while (1) {
+		$nr = sysread($rd, $buf, 65536);
+		last if !$nr;
+		$dig->add($buf);
+		$bytes += $nr;
+	}
+	my $res = $dig->hexdigest;
+	my $elapsed = sprintf('%0.3f', now() - $t0);
+	close $rd or die "close curl failed: $!\n";
+	waitpid($pid, 0) == $pid or die "waitpid failed: $!\n";
+	$? == 0 or die "curl failed: $?\n";
+	print STDERR "# $job $$ ($?) $res (${elapsed}s) $bytes bytes\n";
+	$res;
+};
+
+my (%pids, %res);
+for my $job (1..$JOBS) {
+	pipe(my ($r, $w)) or die;
+	my $pid = fork;
+	if ($pid == 0) {
+		close $r or die;
+		my $res = $do_get_all->($job);
+		print $w $res or die;
+		close $w or die;
+		_exit(0);
+	}
+	close $w or die;
+	$pids{$pid} = [ $job, $r ];
+}
+
+while (scalar keys %pids) {
+	my $pid = waitpid(-1, 0) or next;
+	my $child = delete $pids{$pid} or next;
+	my ($job, $rpipe) = @$child;
+	is($?, 0, "$job done");
+	my $sum = do { local $/; <$rpipe> };
+	push @{$res{$sum}}, $job;
+}
+is(scalar keys %res, 1, 'all got the same result');
+$td->kill;
+$td->join;
+is($?, 0, 'no error on -httpd exit');
+done_testing;

  parent reply	other threads:[~2020-07-05 23:28 UTC|newest]

Thread overview: 44+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-07-05 23:27 [PATCH 00/43] www: async git cat-file w/ -httpd Eric Wong
2020-07-05 23:27 ` [PATCH 01/43] gzipfilter: minor cleanups Eric Wong
2020-07-05 23:27 ` [PATCH 02/43] wwwstream: oneshot: perform gzip without middleware Eric Wong
2020-07-05 23:27 ` [PATCH 03/43] www*stream: gzip ->getline responses Eric Wong
2020-07-05 23:27 ` [PATCH 04/43] wwwtext: gzip text/plain responses, as well Eric Wong
2020-07-05 23:27 ` [PATCH 05/43] wwwtext: switch to html_oneshot Eric Wong
2020-07-05 23:27 ` [PATCH 06/43] www: need: use WwwStream::html_oneshot Eric Wong
2020-07-05 23:27 ` [PATCH 07/43] wwwlisting: use GzipFilter for HTML Eric Wong
2020-07-05 23:27 ` [PATCH 08/43] gzipfilter: replace Compress::Raw::Deflate usages Eric Wong
2020-07-05 23:27 ` [PATCH 09/43] {gzip,noop}filter: ->zmore returns undef, always Eric Wong
2020-07-05 23:27 ` [PATCH 10/43] mbox: remove html_oneshot import Eric Wong
2020-07-05 23:27 ` [PATCH 11/43] wwwstatic: support gzipped directory listings Eric Wong
2020-07-05 23:27 ` [PATCH 12/43] qspawn: learn to gzip streaming responses Eric Wong
2020-07-05 23:27 ` [PATCH 13/43] stop auto-loading Plack::Middleware::Deflater Eric Wong
2020-07-05 23:27 ` Eric Wong [this message]
2020-07-05 23:27 ` [PATCH 15/43] mboxgz: reduce hash depth Eric Wong
2020-07-05 23:27 ` [PATCH 16/43] mbox: async blob fetch for "single message" raw mboxrd Eric Wong
2020-07-05 23:27 ` [PATCH 17/43] wwwatomstream: simplify feed_update callers Eric Wong
2020-07-05 23:27 ` [PATCH 18/43] wwwatomstream: use PublicInbox::Inbox->modified for feed_updated Eric Wong
2020-07-05 23:27 ` [PATCH 19/43] wwwatomstream: reuse $ctx as $self Eric Wong
2020-07-05 23:27 ` [PATCH 20/43] xt/httpd-async-stream: allow more options Eric Wong
2020-07-05 23:27 ` [PATCH 21/43] wwwatomstream: support async blob fetch Eric Wong
2020-07-05 23:27 ` [PATCH 22/43] wwwstream: reduce object graph depth Eric Wong
2020-07-05 23:27 ` [PATCH 23/43] wwwstream: reduce blob fetch paths for ->getline Eric Wong
2020-07-05 23:27 ` [PATCH 24/43] www: start making gzipfilter the parent response class Eric Wong
2020-07-05 23:27 ` [PATCH 25/43] remove unused/redundant zlib-related imports Eric Wong
2020-07-05 23:27 ` [PATCH 26/43] wwwstream: use parent.pm and no warnings Eric Wong
2020-07-05 23:27 ` [PATCH 27/43] wwwstream: subclass off GzipFilter Eric Wong
2020-07-05 23:27 ` [PATCH 28/43] view: make /$INBOX/$MSGID/ permalink async Eric Wong
2020-07-05 23:27 ` [PATCH 29/43] view: /$INBOX/$MSGID/t/ reads blobs asynchronously Eric Wong
2020-07-05 23:27 ` [PATCH 30/43] view: update /$INBOX/$MSGID/T/ to be async Eric Wong
2020-07-05 23:27 ` [PATCH 31/43] feed: generate_i: eliminate pointless loop Eric Wong
2020-07-05 23:27 ` [PATCH 32/43] feed: /$INBOX/new.html fetches blobs asynchronously Eric Wong
2020-07-05 23:27 ` [PATCH 33/43] ssearchview: /$INBOX/?q=$QUERY&x=t uses async blobs Eric Wong
2020-07-05 23:27 ` [PATCH 34/43] view: eml_entry: reduce parameters Eric Wong
2020-07-05 23:27 ` [PATCH 35/43] view: /$INBOX/$MSGID/t/: avoid extra hash lookup in eml case Eric Wong
2020-07-05 23:27 ` [PATCH 36/43] wwwstream: eliminate ::response, use html_oneshot Eric Wong
2020-07-05 23:27 ` [PATCH 37/43] www: update internal docs Eric Wong
2020-07-05 23:27 ` [PATCH 38/43] view: simplify eml_entry callers further Eric Wong
2020-07-05 23:27 ` [PATCH 39/43] wwwtext: simplify gzf_maybe use Eric Wong
2020-07-05 23:27 ` [PATCH 40/43] wwwattach: support async blob retrievals Eric Wong
2020-07-05 23:27 ` [PATCH 41/43] gzipfilter: drop HTTP connection on bugs or data corruption Eric Wong
2020-07-05 23:27 ` [PATCH 42/43] daemon: warn on missing blobs Eric Wong
2020-07-05 23:27 ` [PATCH 43/43] gzipfilter: check http->{forward} for client disconnects Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200705232759.3161-15-e@yhbt.net \
    --to=e@yhbt.net \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).