user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 14/22] lei: query: ensure pager exit is instantaneous
Date: Sun, 10 Jan 2021 12:15:11 +0000	[thread overview]
Message-ID: <20210110121519.17044-15-e@80x24.org> (raw)
In-Reply-To: <20210110121519.17044-1-e@80x24.org>

Improve interactivity and user experience by allowing the user
to return to the terminal immediately when the pager is exited
(e.g. hitting the `q' key in less(1)).

This is a massive change which restructures query handling to
allow parallel search when --thread expansion is in use and
offloading to a separate worker when --thread is not in use.

The Xapian query offload changes allow us to reenter the event
loop right away once the search(es) are shipped off to the work
queue workers.

This means the main lei-daemon process can forget the lei(1)
client socket immediately once it's handed off to worker
processes.

We now unblock SIGPIPE in query workers and send an exit(141)
response to the lei(1) client socket to denote SIGPIPE.

This also allows parallelization for users using "lei q" from
multiple terminals.

JSON output is currently broken and will need to be restructured
for more flexibility and fork-safety.
---
 lib/PublicInbox/IPC.pm        |  14 +++--
 lib/PublicInbox/LEI.pm        |  34 +++++++++++-
 lib/PublicInbox/LeiQuery.pm   | 102 +++++++++-------------------------
 lib/PublicInbox/LeiXSearch.pm |  80 +++++++++++++++++++++++++-
 4 files changed, 147 insertions(+), 83 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 8a3120c9..be5b2f45 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -234,6 +234,9 @@ sub wq_worker_loop ($) {
 	my $len = $self->{wq_req_len} // (4096 * 33);
 	my ($rec, $sub, @args);
 	my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
+	local $SIG{PIPE} = sub {
+		die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub;
+	};
 	until ($self->{-wq_quit}) {
 		my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
 		my $i = 0;
@@ -242,6 +245,7 @@ sub wq_worker_loop ($) {
 			my $mode = shift(@m);
 			if (open(my $fh, $mode, $fd)) {
 				$self->{$i++} = $fh;
+				$fh->autoflush(1);
 			} else {
 				die "$$ open($mode$fd) (FD:$i): $!";
 			}
@@ -251,8 +255,10 @@ sub wq_worker_loop ($) {
 			die "thaw error on buffer of size:".length($buf);
 		($sub, @args) = @$rec;
 		eval { $self->$sub(@args) };
-		warn "$$ wq_worker: $@" if $@;
-		delete @$self{0, 1, 2};
+		warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE';
+		undef $sub; # quiet SIG{PIPE} handler
+		# need to close explicitly to avoid warnings after SIGPIPE
+		close($_) for (delete(@$self{0..2}));
 	}
 }
 
@@ -284,8 +290,8 @@ sub _wq_worker_start ($$) {
 		PublicInbox::DS::sig_setmask($oldset);
 		my $on_destroy = $self->ipc_atfork_child;
 		eval { wq_worker_loop($self) };
-		die "worker $self->{-wq_ident} PID:$$ died: $@\n" if $@;
-		exit;
+		warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
+		exit($@ ? 1 : 0);
 	} else {
 		$self->{-wq_workers}->{$pid} = \undef;
 	}
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 17023191..f8b8cd4a 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -269,6 +269,33 @@ sub fail ($$;$) {
 	undef;
 }
 
+# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
+sub atfork_child_wq {
+	my ($self, $wq) = @_;
+	$self->{sock} //= $wq->{0};
+	$self->{$_} //= $wq->{$_} for (0..2);
+	my $oldpipe = $SIG{PIPE};
+	(
+		__WARN__ => sub { err($self, @_) },
+		PIPE => sub {
+			$self->x_it(141);
+			$oldpipe->() if ref($oldpipe) eq 'CODE';
+		}
+	);
+}
+
+# usage: ($lei, @io) = $lei->atfork_prepare_wq($wq);
+sub atfork_prepare_wq {
+	my ($self, $wq) = @_;
+	if ($wq->wq_workers) {
+		my $ret = bless { %$self }, ref($self);
+		my $in = delete $ret->{0};
+		($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2});
+	} else {
+		($self, ($self->{sock} // $self->{0}), @$self{1, 2});
+	}
+}
+
 sub _help ($;$) {
 	my ($self, $errmsg) = @_;
 	my $cmd = $self->{cmd} // 'COMMAND';
@@ -608,8 +635,8 @@ sub start_pager {
 	$self->{1} = $wpager;
 	$self->{2} = $wpager if -t $self->{2};
 	my $pid = spawn([$pager], $env, $rdr);
-	dwaitpid($pid, undef, $self->{sock});
 	$env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
+	[ $pid, @$rdr{1, 2} ];
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
@@ -675,6 +702,8 @@ sub event_step {
 
 sub noop {}
 
+our $oldset; sub oldset { $oldset }
+
 # lei(1) calls this when it can't connect
 sub lazy_start {
 	my ($path, $errno, $nfd) = @_;
@@ -691,7 +720,7 @@ sub lazy_start {
 	my @st = stat($path) or die "stat($path): $!";
 	my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
 	pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
-	my $oldset = PublicInbox::DS::block_signals();
+	local $oldset = PublicInbox::DS::block_signals();
 	if ($nfd == 1) {
 		require PublicInbox::CmdIPC1;
 		$recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
@@ -737,6 +766,7 @@ sub lazy_start {
 	};
 	my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
 	local %SIG = (%SIG, %$sig) if !$sigfd;
+	local $SIG{PIPE} = 'IGNORE';
 	if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
 		PublicInbox::DS->SetLoopTimeout(5000);
 	} else {
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index f69dccad..040c284d 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -8,7 +8,7 @@ use v5.10.1;
 use PublicInbox::MID qw($MID_EXTRACT);
 use POSIX qw(strftime);
 use PublicInbox::Address qw(pairs);
-use PublicInbox::Search qw(get_pct);
+use PublicInbox::DS qw(dwaitpid);
 
 sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
 
@@ -61,37 +61,30 @@ sub lei_q {
 	my $sto = $self->_lei_store(1);
 	my $cfg = $self->_lei_cfg(1);
 	my $opt = $self->{opt};
-	my $qstr = join(' ', map {;
-		# Consider spaces in argv to be for phrase search in Xapian.
-		# In other words, the users should need only care about
-		# normal shell quotes and not have to learn Xapian quoting.
-		/\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
-	} @argv);
-	$opt->{limit} //= 10000;
-	my $lxs;
 	require PublicInbox::LeiDedupe;
 	my $dd = PublicInbox::LeiDedupe->new($self);
 
 	# --local is enabled by default
-	my @src = $opt->{'local'} ? ($sto->search) : ();
+	# src: LeiXSearch || LeiSearch || Inbox
+	my @srcs = $opt->{'local'} ? ($sto->search) : ();
+	require PublicInbox::LeiXSearch;
+	my $lxs = PublicInbox::LeiXSearch->new;
 
 	# --external is enabled by default, but allow --no-external
 	if ($opt->{external} // 1) {
-		$self->_externals_each(\&_vivify_external, \@src);
-		# {tid} is not unique between indices, so we have to search
-		# each src individually
-		if (!$opt->{thread}) {
-			require PublicInbox::LeiXSearch;
-			my $lxs = PublicInbox::LeiXSearch->new;
-			# local is always first
-			$lxs->attach_external($_) for @src;
-			@src = ($lxs);
-		}
+		$self->_externals_each(\&_vivify_external, \@srcs);
 	}
-	my $out = $self->{output} // '-';
+	my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs);
+	$j = 1 if !$opt->{thread};
+	if ($self->{pid}) {
+		$lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
+			// $self->wq_workers($j);
+	}
+	my $out = $opt->{output} // '-';
 	$out = 'json:/dev/stdout' if $out eq '-';
 	my $isatty = -t $self->{1};
-	$self->start_pager if $isatty;
+	# no forking workers after this
+	my $pid_old12 = $self->start_pager if $isatty;
 	my $json = substr($out, 0, 5) eq 'json:' ?
 		ref(PublicInbox::Config->json)->new : undef;
 	if ($json) {
@@ -104,10 +97,14 @@ sub lei_q {
 		$json->canonical;
 	}
 
-	# src: LeiXSearch || LeiSearch || Inbox
 	my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
-	delete $mset_opt{limit} if $opt->{limit} < 0;
 	$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
+	$mset_opt{qstr} = join(' ', map {;
+		# Consider spaces in argv to be for phrase search in Xapian.
+		# In other words, the users should need only care about
+		# normal shell quotes and not have to learn Xapian quoting.
+		/\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
+	} @argv);
 	if (defined(my $sort = $opt->{'sort'})) {
 		if ($sort eq 'relevance') {
 			$mset_opt{relevance} = 1;
@@ -123,59 +120,12 @@ sub lei_q {
 	# descending docid order
 	$mset_opt{relevance} //= -2 if $opt->{thread};
 	# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
-
-	# even w/o pretty, do the equivalent of a --pretty=oneline
-	# output so "lei q SEARCH_TERMS | wc -l" can be useful:
-	my $ORS = $json ? ($opt->{pretty} ? ', ' : ",\n") : "\n";
-	my $buf;
-
-	# we can generate too many records to hold in RAM, so we stream
-	# and fake a JSON array starting here:
-	$self->out('[') if $json;
-	my $emit_cb = sub {
-		my ($smsg) = @_;
-		delete @$smsg{qw(tid num)}; # only makes sense if single src
-		chomp($buf = $json->encode(_smsg_unbless($smsg)));
-	};
-	$dd->prepare_dedupe;
-	for my $src (@src) {
-		my $srch = $src->search;
-		my $over = $src->over;
-		my $smsg_for = $src->can('smsg_for'); # LeiXSearch
-		my $mo = { %mset_opt };
-		my $mset = $srch->mset($qstr, $mo);
-		my $ctx = {};
-		if ($smsg_for) {
-			for my $it ($mset->items) {
-				my $smsg = $smsg_for->($srch, $it) or next;
-				next if $dd->is_smsg_dup($smsg);
-				$self->out($buf .= $ORS) if defined $buf;
-				$smsg->{relevance} = get_pct($it);
-				$emit_cb->($smsg);
-			}
-		} else { # --thread
-			my $ids = $srch->mset_to_artnums($mset, $mo);
-			$ctx->{ids} = $ids;
-			my $i = 0;
-			my %n2p = map {
-				($ids->[$i++], get_pct($_));
-			} $mset->items;
-			undef $mset;
-			while ($over && $over->expand_thread($ctx)) {
-				for my $n (@{$ctx->{xids}}) {
-					my $t = $over->get_art($n) or next;
-					next if $dd->is_smsg_dup($t);
-					if (my $p = delete $n2p{$t->{num}}) {
-						$t->{relevance} = $p;
-					}
-					$self->out($buf .= $ORS);
-					$emit_cb->($t);
-				}
-				@{$ctx->{xids}} = ();
-			}
-		}
+	$self->{mset_opt} = \%mset_opt;
+	$lxs->do_query($self, \@srcs);
+	if ($pid_old12) {
+		$self->{$_} = $pid_old12->[$_] for (1, 2);
+		dwaitpid($pid_old12->[0], undef, $self->{sock});
 	}
-	$self->out($buf .= "]\n"); # done
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index b670bc2f..a3010efe 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -7,7 +7,8 @@
 package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::LeiSearch);
+use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
+use PublicInbox::Search qw(get_pct);
 
 sub new {
 	my ($class) = @_;
@@ -83,4 +84,81 @@ sub recent {
 
 sub over {}
 
+sub _mset_more ($$) {
+	my ($mset, $mo) = @_;
+	my $size = $mset->size;
+	$size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+}
+
+sub query_thread_mset { # for --thread
+	my ($self, $lei, $ibxish) = @_;
+	my ($srch, $over) = ($ibxish->search, $ibxish->over);
+	unless ($srch && $over) {
+		my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+		warn "$desc not indexed by Xapian\n";
+		return;
+	}
+	local %SIG = (%SIG, $lei->atfork_child_wq($self));
+	my $mo = { %{$lei->{mset_opt}} };
+	my $mset;
+	do {
+		$mset = $srch->mset($mo->{qstr}, $mo);
+		my $ids = $srch->mset_to_artnums($mset, $mo);
+		my $ctx = { ids => $ids };
+		my $i = 0;
+		my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+		while ($over->expand_thread($ctx)) {
+			for my $n (@{$ctx->{xids}}) {
+				my $smsg = $over->get_art($n) or next;
+				# next if $dd->is_smsg_dup($smsg); TODO
+				if (my $p = delete $n2p{$smsg->{num}}) {
+					$smsg->{relevance} = $p;
+				}
+				print { $self->{1} } Dumper($smsg);
+				# $self->out($buf .= $ORS);
+				# $emit_cb->($smsg);
+			}
+			@{$ctx->{xids}} = ();
+		}
+	} while (_mset_more($mset, $mo));
+}
+
+sub query_mset { # non-parallel for non-"--thread" users
+	my ($self, $lei, $srcs) = @_;
+	my $mo = { %{$lei->{mset_opt}} };
+	my $mset;
+	local %SIG = (%SIG, $lei->atfork_child_wq($self));
+	$self->attach_external($_) for @$srcs;
+	do {
+		$mset = $self->mset($mo->{qstr}, $mo);
+		for my $it ($mset->items) {
+			my $smsg = smsg_for($self, $it) or next;
+			# next if $dd->is_smsg_dup($smsg);
+			$smsg->{relevance} = get_pct($it);
+			use Data::Dumper;
+			print { $self->{1} } Dumper($smsg);
+			# $self->out($buf .= $ORS) if defined $buf;
+			#$emit_cb->($smsg);
+		}
+	} while (_mset_more($mset, $mo));
+}
+
+sub do_query {
+	my ($self, $lei_orig, $srcs) = @_;
+	my ($lei, @io) = $lei_orig->atfork_prepare_wq($self);
+	$io[1]->autoflush(1);
+	$io[2]->autoflush(1);
+	if ($lei->{opt}->{thread}) {
+		for my $ibxish (@$srcs) {
+			$self->wq_do('query_thread_mset', @io, $lei, $ibxish);
+		}
+	} else {
+		$self->wq_do('query_mset', @io, $lei, $srcs);
+	}
+	# TODO
+	for my $rmt (@{$self->{remotes} // []}) {
+		$self->wq_do('query_thread_mbox', @io, $lei, $rmt);
+	}
+}
+
 1;

  parent reply	other threads:[~2021-01-10 12:15 UTC|newest]

Thread overview: 23+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
2021-01-10 12:14 ` [PATCH 01/22] lei query + pagination sorta working Eric Wong
2021-01-10 12:14 ` [PATCH 02/22] lei q: deduplicate smsg Eric Wong
2021-01-10 12:15 ` [PATCH 03/22] ds: block signals when reaping Eric Wong
2021-01-10 12:15 ` [PATCH 04/22] ipc: add support for asynchronous callbacks Eric Wong
2021-01-10 12:15 ` [PATCH 05/22] cmd_ipc: send FDs with buffer payload Eric Wong
2021-01-10 12:15 ` [PATCH 06/22] ipc: avoid excessive evals Eric Wong
2021-01-10 12:15 ` [PATCH 07/22] ipc: work queue support via SOCK_SEQPACKET Eric Wong
2021-01-10 12:15 ` [PATCH 08/22] ipc: eliminate ipc_worker_stop method Eric Wong
2021-01-10 12:15 ` [PATCH 09/22] ipc: wq: support dynamic worker count change Eric Wong
2021-01-10 12:15 ` [PATCH 10/22] ipc: drop -ipc_parent_pid field Eric Wong
2021-01-10 12:15 ` [PATCH 11/22] ipc: DESTROY and wq_workers methods Eric Wong
2021-01-10 12:15 ` [PATCH 12/22] lei: rename $w to $wpager for warning message Eric Wong
2021-01-10 12:15 ` [PATCH 13/22] lei: fix oneshot TTY detection by passing STD*{GLOB} Eric Wong
2021-01-10 12:15 ` Eric Wong [this message]
2021-01-10 12:15 ` [PATCH 15/22] ipc: start supporting sending/receiving more than 3 FDs Eric Wong
2021-01-10 12:15 ` [PATCH 16/22] ipc: fix IO::FDPass use with a worker limit of 1 Eric Wong
2021-01-10 12:15 ` [PATCH 17/22] ipc: drop unused fields, default sighandlers for wq Eric Wong
2021-01-10 12:15 ` [PATCH 18/22] lei: get rid of client {pid} field Eric Wong
2021-01-10 12:15 ` [PATCH 19/22] lei: fork + FD cleanup Eric Wong
2021-01-10 12:15 ` [PATCH 20/22] lei: run pager in client script Eric Wong
2021-01-10 12:15 ` [PATCH 21/22] lei_xsearch: transfer 4 FDs internally, drop IO::FDPass Eric Wong
2021-01-10 12:15 ` [PATCH 22/22] lei: query: restore JSON output overview Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210110121519.17044-15-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).