user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 1/9] lei q: start ->mset while query_prepare runs
Date: Tue, 19 Jan 2021 09:34:27 +0000	[thread overview]
Message-ID: <20210119093435.17955-2-e@80x24.org> (raw)
In-Reply-To: <20210119093435.17955-1-e@80x24.org>

We don't need the result of query_prepare (for augmenting or
mass unlinking) until we're ready to deduplicate and write
results to the filesystem.  This ought to let us hide some of
the cost of Xapian searches on multi-device/core systems for
extremely expensive searches.
---
 lib/PublicInbox/LEI.pm        |  2 +-
 lib/PublicInbox/LeiToMail.pm  |  3 +-
 lib/PublicInbox/LeiXSearch.pm | 54 ++++++++++++++++++++---------------
 lib/PublicInbox/Spawn.pm      |  2 +-
 4 files changed, 35 insertions(+), 26 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 6b6ee0f5..4b1dc673 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -293,7 +293,7 @@ sub atfork_child_wq {
 	my ($sock, $l2m_wq_s1);
 	(@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4});
 	$self->{sock} = $sock if -S $sock;
-	$self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1;
+	$self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1 && -S $l2m_wq_s1;
 	%PATH2CFG = ();
 	$quit = \&CORE::exit;
 	@TO_CLOSE_ATFORK_CHILD = ();
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index dcf6d8a3..a1dce550 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -440,6 +440,7 @@ sub lock_free {
 
 sub write_mail { # via ->wq_do
 	my ($self, $git_dir, $oid, $lei, $kw) = @_;
+	my $not_done = delete $self->{4}; # write end of {each_smsg_done}
 	my $wcb = $self->{wcb} //= do { # first message
 		my %sig = $lei->atfork_child_wq($self);
 		@SIG{keys %sig} = values %sig; # not local
@@ -447,7 +448,7 @@ sub write_mail { # via ->wq_do
 		$self->write_cb($lei);
 	};
 	my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
-	$git->cat_async($oid, \&git_to_mail, [ $wcb, $kw ]);
+	$git->cat_async($oid, \&git_to_mail, [ $wcb, $kw, $not_done ]);
 }
 
 sub ipc_atfork_prepare {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index dc5cf3b6..73fd17f4 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -94,8 +94,17 @@ sub _mset_more ($$) {
 	$size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
 }
 
+# $startq will EOF when query_prepare is done augmenting and allow
+# query_mset and query_thread_mset to proceed.
+sub wait_startq ($) {
+	my ($startq) = @_;
+	$_[0] = undef;
+	read($startq, my $query_prepare_done, 1);
+}
+
 sub query_thread_mset { # for --thread
 	my ($self, $lei, $ibxish) = @_;
+	my $startq = delete $self->{5};
 	my %sig = $lei->atfork_child_wq($self);
 	local @SIG{keys %sig} = values %sig;
 
@@ -119,6 +128,7 @@ sub query_thread_mset { # for --thread
 		while ($over->expand_thread($ctx)) {
 			for my $n (@{$ctx->{xids}}) {
 				my $smsg = $over->get_art($n) or next;
+				wait_startq($startq) if $startq;
 				next if $dedupe->is_smsg_dup($smsg);
 				my $mitem = delete $n2item{$smsg->{num}};
 				$each_smsg->($smsg, $mitem);
@@ -132,6 +142,7 @@ sub query_thread_mset { # for --thread
 
 sub query_mset { # non-parallel for non-"--thread" users
 	my ($self, $lei, $srcs) = @_;
+	my $startq = delete $self->{5};
 	my %sig = $lei->atfork_child_wq($self);
 	local @SIG{keys %sig} = values %sig;
 	my $mo = { %{$lei->{mset_opt}} };
@@ -144,6 +155,7 @@ sub query_mset { # non-parallel for non-"--thread" users
 		$mset = $self->mset($mo->{qstr}, $mo);
 		for my $it ($mset->items) {
 			my $smsg = smsg_for($self, $it) or next;
+			wait_startq($startq) if $startq;
 			next if $dedupe->is_smsg_dup($smsg);
 			$each_smsg->($smsg, $it);
 		}
@@ -207,47 +219,42 @@ sub start_query { # always runs in main (lei-daemon) process
 	@$io = ();
 }
 
-sub query_prepare { # wq_do
+sub query_prepare { # for wq_do,
 	my ($self, $lei) = @_;
 	my %sig = $lei->atfork_child_wq($self);
 	local @SIG{keys %sig} = values %sig;
-	if (my $l2m = $lei->{l2m}) {
-		eval { $l2m->do_augment($lei) };
-		return $lei->fail($@) if $@;
-	}
-	# trigger PublicInbox::OpPipe->event_step
-	my $qry_status_wr = $lei->{0} or
-		return $lei->fail('BUG: qry_status_wr missing');
-	$qry_status_wr->autoflush(1);
-	print $qry_status_wr '.' or # this should never fail...
-		return $lei->fail("BUG? print qry_status_wr: $!");
+	eval { $lei->{l2m}->do_augment($lei) };
+	$lei->fail($@) if $@;
 }
 
 sub do_query {
 	my ($self, $lei_orig, $srcs) = @_;
 	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
 	$io[0] = undef;
-	pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
+	pipe(my $done, $io[0]) or die "pipe $!";
 
 	$lei_orig->event_step_init; # wait for shutdowns
-	my $op_map = { '' => [ \&query_done, $self, $lei_orig ] };
+	my $done_op = { '' => [ \&query_done, $self, $lei_orig ] };
 	my $in_loop = exists $lei_orig->{sock};
-	my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
+	$done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
 	my $l2m = $lei->{l2m};
 	if ($l2m) {
 		$l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
 		$io[1] = $lei_orig->{1};
-		$op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
-		$self->wq_do('query_prepare', \@io, $lei);
-		$opp->event_step if !$in_loop;
-	} else {
-		start_query($self, \@io, $lei, $srcs);
+		my @l2m_io = (undef, @io[1..$#io]);
+		pipe(my $startq, $l2m_io[0]) or die "pipe: $!";
+		$self->wq_do('query_prepare', \@l2m_io, $lei);
+		$io[4] //= *STDERR{GLOB};
+		die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
+		fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
+		$io[5] = $startq;
 	}
+	start_query($self, \@io, $lei, $srcs);
 	unless ($in_loop) {
 		my @pids = $self->wq_close;
 		# for the $lei->atfork_child_wq PIPE handler:
-		$op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
-		$opp->event_step;
+		$done_op->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
+		$done->event_step;
 		my $ipc_worker_reap = $self->can('ipc_worker_reap');
 		if (my $l2m_pids = delete $self->{l2m_pids}) {
 			dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
@@ -258,8 +265,9 @@ sub do_query {
 
 sub ipc_atfork_prepare {
 	my ($self) = @_;
-	# (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1})
-	$self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]);
+	# (0: qry_status_wr, 1: stdout|mbox, 2: stderr,
+	#  3: sock, 4: $l2m->{-wq_s1}, 5: $startq)
+	$self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&= <&=]);
 	$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }
 
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index b03f2d59..376d2190 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-#define SEND_FD_CAPA 5
+#define SEND_FD_CAPA 6
 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
 	struct cmsghdr hdr;

  reply	other threads:[~2021-01-19  9:34 UTC|newest]

Thread overview: 19+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-19  9:34 [PATCH 0/9] lei bugfixes and error handling Eric Wong
2021-01-19  9:34 ` Eric Wong [this message]
2021-01-19  9:34 ` [PATCH 2/9] lei q: fix SIGPIPE handling from lei2mail workers Eric Wong
2021-01-19  9:34 ` [PATCH 3/9] lei q: do not spawn MUA early Eric Wong
2021-01-19  9:34 ` [PATCH 4/9] lei: write daemon errors to the sock directory Eric Wong
2021-01-19  9:34 ` [PATCH 5/9] lei q: fix augment of compressed mailboxes Eric Wong
2021-01-19  9:34 ` [PATCH 6/9] lei_overview: do not write if $lei->{1} is gone Eric Wong
2021-01-19  9:34 ` [PATCH 7/9] t/lei: fix double-running of socket test with oneshot Eric Wong
2021-01-19  9:34 ` [PATCH 8/9] lei: test some likely errors due to misuse Eric Wong
2021-01-19  9:34 ` [PATCH 9/9] lei_overview: start implementing format detection Eric Wong
2021-01-20  5:04 ` [PATCH 0/7] lei: fixes piled higher and deeper Eric Wong
2021-01-20  5:16   ` misnumbered, should be [PATCH 10/9]..[PATCH 16/9] :x Eric Wong
2021-01-20  5:04 ` [PATCH 1/7] lei: allow more mbox inode types Eric Wong
2021-01-20  5:04 ` [PATCH 2/7] lei: exit code in oneshot mode Eric Wong
2021-01-20  5:04 ` [PATCH 3/7] overidx: eidx_prep: fix leftover dbh reference Eric Wong
2021-01-20  5:04 ` [PATCH 4/7] lei q: cleanup store initialization Eric Wong
2021-01-20  5:04 ` [PATCH 5/7] lei: dump and clear errors.log in daemon mode Eric Wong
2021-01-20  5:04 ` [PATCH 6/7] lei_xsearch: keep l2m->{-wq_s1} while preparing query Eric Wong
2021-01-20  5:04 ` [PATCH 7/7] lei_to_mail: call PublicInbox::IPC::DESTROY Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210119093435.17955-2-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).