From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.9 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 929881FF9E for ; Sun, 10 Jan 2021 12:15:20 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 14/22] lei: query: ensure pager exit is instantaneous Date: Sun, 10 Jan 2021 12:15:11 +0000 Message-Id: <20210110121519.17044-15-e@80x24.org> In-Reply-To: <20210110121519.17044-1-e@80x24.org> References: <20210110121519.17044-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Improve interactivity and user experience by allowing the user to return to the terminal immediately when the pager is exited (e.g. hitting the `q' key in less(1)). This is a massive change which restructures query handling to allow parallel search when --thread expansion is in use and offloading to a separate worker when --thread is not in use. The Xapian query offload changes allow us to reenter the event loop right away once the search(es) are shipped off to the work queue workers. This means the main lei-daemon process can forget the lei(1) client socket immediately once it's handed off to worker processes. We now unblock SIGPIPE in query workers and send an exit(141) response to the lei(1) client socket to denote SIGPIPE. This also allows parallelization for users using "lei q" from multiple terminals. JSON output is currently broken and will need to be restructured for more flexibility and fork-safety. --- lib/PublicInbox/IPC.pm | 14 +++-- lib/PublicInbox/LEI.pm | 34 +++++++++++- lib/PublicInbox/LeiQuery.pm | 102 +++++++++------------------------- lib/PublicInbox/LeiXSearch.pm | 80 +++++++++++++++++++++++++- 4 files changed, 147 insertions(+), 83 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 8a3120c9..be5b2f45 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -234,6 +234,9 @@ sub wq_worker_loop ($) { my $len = $self->{wq_req_len} // (4096 * 33); my ($rec, $sub, @args); my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2'; + local $SIG{PIPE} = sub { + die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub; + }; until ($self->{-wq_quit}) { my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF my $i = 0; @@ -242,6 +245,7 @@ sub wq_worker_loop ($) { my $mode = shift(@m); if (open(my $fh, $mode, $fd)) { $self->{$i++} = $fh; + $fh->autoflush(1); } else { die "$$ open($mode$fd) (FD:$i): $!"; } @@ -251,8 +255,10 @@ sub wq_worker_loop ($) { die "thaw error on buffer of size:".length($buf); ($sub, @args) = @$rec; eval { $self->$sub(@args) }; - warn "$$ wq_worker: $@" if $@; - delete @$self{0, 1, 2}; + warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE'; + undef $sub; # quiet SIG{PIPE} handler + # need to close explicitly to avoid warnings after SIGPIPE + close($_) for (delete(@$self{0..2})); } } @@ -284,8 +290,8 @@ sub _wq_worker_start ($$) { PublicInbox::DS::sig_setmask($oldset); my $on_destroy = $self->ipc_atfork_child; eval { wq_worker_loop($self) }; - die "worker $self->{-wq_ident} PID:$$ died: $@\n" if $@; - exit; + warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; + exit($@ ? 1 : 0); } else { $self->{-wq_workers}->{$pid} = \undef; } diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 17023191..f8b8cd4a 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -269,6 +269,33 @@ sub fail ($$;$) { undef; } +# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq)); +sub atfork_child_wq { + my ($self, $wq) = @_; + $self->{sock} //= $wq->{0}; + $self->{$_} //= $wq->{$_} for (0..2); + my $oldpipe = $SIG{PIPE}; + ( + __WARN__ => sub { err($self, @_) }, + PIPE => sub { + $self->x_it(141); + $oldpipe->() if ref($oldpipe) eq 'CODE'; + } + ); +} + +# usage: ($lei, @io) = $lei->atfork_prepare_wq($wq); +sub atfork_prepare_wq { + my ($self, $wq) = @_; + if ($wq->wq_workers) { + my $ret = bless { %$self }, ref($self); + my $in = delete $ret->{0}; + ($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2}); + } else { + ($self, ($self->{sock} // $self->{0}), @$self{1, 2}); + } +} + sub _help ($;$) { my ($self, $errmsg) = @_; my $cmd = $self->{cmd} // 'COMMAND'; @@ -608,8 +635,8 @@ sub start_pager { $self->{1} = $wpager; $self->{2} = $wpager if -t $self->{2}; my $pid = spawn([$pager], $env, $rdr); - dwaitpid($pid, undef, $self->{sock}); $env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git + [ $pid, @$rdr{1, 2} ]; } sub accept_dispatch { # Listener {post_accept} callback @@ -675,6 +702,8 @@ sub event_step { sub noop {} +our $oldset; sub oldset { $oldset } + # lei(1) calls this when it can't connect sub lazy_start { my ($path, $errno, $nfd) = @_; @@ -691,7 +720,7 @@ sub lazy_start { my @st = stat($path) or die "stat($path): $!"; my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; - my $oldset = PublicInbox::DS::block_signals(); + local $oldset = PublicInbox::DS::block_signals(); if ($nfd == 1) { require PublicInbox::CmdIPC1; $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1'); @@ -737,6 +766,7 @@ sub lazy_start { }; my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK); local %SIG = (%SIG, %$sig) if !$sigfd; + local $SIG{PIPE} = 'IGNORE'; if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets PublicInbox::DS->SetLoopTimeout(5000); } else { diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index f69dccad..040c284d 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -8,7 +8,7 @@ use v5.10.1; use PublicInbox::MID qw($MID_EXTRACT); use POSIX qw(strftime); use PublicInbox::Address qw(pairs); -use PublicInbox::Search qw(get_pct); +use PublicInbox::DS qw(dwaitpid); sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) } @@ -61,37 +61,30 @@ sub lei_q { my $sto = $self->_lei_store(1); my $cfg = $self->_lei_cfg(1); my $opt = $self->{opt}; - my $qstr = join(' ', map {; - # Consider spaces in argv to be for phrase search in Xapian. - # In other words, the users should need only care about - # normal shell quotes and not have to learn Xapian quoting. - /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_ - } @argv); - $opt->{limit} //= 10000; - my $lxs; require PublicInbox::LeiDedupe; my $dd = PublicInbox::LeiDedupe->new($self); # --local is enabled by default - my @src = $opt->{'local'} ? ($sto->search) : (); + # src: LeiXSearch || LeiSearch || Inbox + my @srcs = $opt->{'local'} ? ($sto->search) : (); + require PublicInbox::LeiXSearch; + my $lxs = PublicInbox::LeiXSearch->new; # --external is enabled by default, but allow --no-external if ($opt->{external} // 1) { - $self->_externals_each(\&_vivify_external, \@src); - # {tid} is not unique between indices, so we have to search - # each src individually - if (!$opt->{thread}) { - require PublicInbox::LeiXSearch; - my $lxs = PublicInbox::LeiXSearch->new; - # local is always first - $lxs->attach_external($_) for @src; - @src = ($lxs); - } + $self->_externals_each(\&_vivify_external, \@srcs); } - my $out = $self->{output} // '-'; + my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs); + $j = 1 if !$opt->{thread}; + if ($self->{pid}) { + $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset) + // $self->wq_workers($j); + } + my $out = $opt->{output} // '-'; $out = 'json:/dev/stdout' if $out eq '-'; my $isatty = -t $self->{1}; - $self->start_pager if $isatty; + # no forking workers after this + my $pid_old12 = $self->start_pager if $isatty; my $json = substr($out, 0, 5) eq 'json:' ? ref(PublicInbox::Config->json)->new : undef; if ($json) { @@ -104,10 +97,14 @@ sub lei_q { $json->canonical; } - # src: LeiXSearch || LeiSearch || Inbox my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset); - delete $mset_opt{limit} if $opt->{limit} < 0; $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0; + $mset_opt{qstr} = join(' ', map {; + # Consider spaces in argv to be for phrase search in Xapian. + # In other words, the users should need only care about + # normal shell quotes and not have to learn Xapian quoting. + /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_ + } @argv); if (defined(my $sort = $opt->{'sort'})) { if ($sort eq 'relevance') { $mset_opt{relevance} = 1; @@ -123,59 +120,12 @@ sub lei_q { # descending docid order $mset_opt{relevance} //= -2 if $opt->{thread}; # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self); - - # even w/o pretty, do the equivalent of a --pretty=oneline - # output so "lei q SEARCH_TERMS | wc -l" can be useful: - my $ORS = $json ? ($opt->{pretty} ? ', ' : ",\n") : "\n"; - my $buf; - - # we can generate too many records to hold in RAM, so we stream - # and fake a JSON array starting here: - $self->out('[') if $json; - my $emit_cb = sub { - my ($smsg) = @_; - delete @$smsg{qw(tid num)}; # only makes sense if single src - chomp($buf = $json->encode(_smsg_unbless($smsg))); - }; - $dd->prepare_dedupe; - for my $src (@src) { - my $srch = $src->search; - my $over = $src->over; - my $smsg_for = $src->can('smsg_for'); # LeiXSearch - my $mo = { %mset_opt }; - my $mset = $srch->mset($qstr, $mo); - my $ctx = {}; - if ($smsg_for) { - for my $it ($mset->items) { - my $smsg = $smsg_for->($srch, $it) or next; - next if $dd->is_smsg_dup($smsg); - $self->out($buf .= $ORS) if defined $buf; - $smsg->{relevance} = get_pct($it); - $emit_cb->($smsg); - } - } else { # --thread - my $ids = $srch->mset_to_artnums($mset, $mo); - $ctx->{ids} = $ids; - my $i = 0; - my %n2p = map { - ($ids->[$i++], get_pct($_)); - } $mset->items; - undef $mset; - while ($over && $over->expand_thread($ctx)) { - for my $n (@{$ctx->{xids}}) { - my $t = $over->get_art($n) or next; - next if $dd->is_smsg_dup($t); - if (my $p = delete $n2p{$t->{num}}) { - $t->{relevance} = $p; - } - $self->out($buf .= $ORS); - $emit_cb->($t); - } - @{$ctx->{xids}} = (); - } - } + $self->{mset_opt} = \%mset_opt; + $lxs->do_query($self, \@srcs); + if ($pid_old12) { + $self->{$_} = $pid_old12->[$_] for (1, 2); + dwaitpid($pid_old12->[0], undef, $self->{sock}); } - $self->out($buf .= "]\n"); # done } 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index b670bc2f..a3010efe 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -7,7 +7,8 @@ package PublicInbox::LeiXSearch; use strict; use v5.10.1; -use parent qw(PublicInbox::LeiSearch); +use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); +use PublicInbox::Search qw(get_pct); sub new { my ($class) = @_; @@ -83,4 +84,81 @@ sub recent { sub over {} +sub _mset_more ($$) { + my ($mset, $mo) = @_; + my $size = $mset->size; + $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000)); +} + +sub query_thread_mset { # for --thread + my ($self, $lei, $ibxish) = @_; + my ($srch, $over) = ($ibxish->search, $ibxish->over); + unless ($srch && $over) { + my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; + warn "$desc not indexed by Xapian\n"; + return; + } + local %SIG = (%SIG, $lei->atfork_child_wq($self)); + my $mo = { %{$lei->{mset_opt}} }; + my $mset; + do { + $mset = $srch->mset($mo->{qstr}, $mo); + my $ids = $srch->mset_to_artnums($mset, $mo); + my $ctx = { ids => $ids }; + my $i = 0; + my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items; + while ($over->expand_thread($ctx)) { + for my $n (@{$ctx->{xids}}) { + my $smsg = $over->get_art($n) or next; + # next if $dd->is_smsg_dup($smsg); TODO + if (my $p = delete $n2p{$smsg->{num}}) { + $smsg->{relevance} = $p; + } + print { $self->{1} } Dumper($smsg); + # $self->out($buf .= $ORS); + # $emit_cb->($smsg); + } + @{$ctx->{xids}} = (); + } + } while (_mset_more($mset, $mo)); +} + +sub query_mset { # non-parallel for non-"--thread" users + my ($self, $lei, $srcs) = @_; + my $mo = { %{$lei->{mset_opt}} }; + my $mset; + local %SIG = (%SIG, $lei->atfork_child_wq($self)); + $self->attach_external($_) for @$srcs; + do { + $mset = $self->mset($mo->{qstr}, $mo); + for my $it ($mset->items) { + my $smsg = smsg_for($self, $it) or next; + # next if $dd->is_smsg_dup($smsg); + $smsg->{relevance} = get_pct($it); + use Data::Dumper; + print { $self->{1} } Dumper($smsg); + # $self->out($buf .= $ORS) if defined $buf; + #$emit_cb->($smsg); + } + } while (_mset_more($mset, $mo)); +} + +sub do_query { + my ($self, $lei_orig, $srcs) = @_; + my ($lei, @io) = $lei_orig->atfork_prepare_wq($self); + $io[1]->autoflush(1); + $io[2]->autoflush(1); + if ($lei->{opt}->{thread}) { + for my $ibxish (@$srcs) { + $self->wq_do('query_thread_mset', @io, $lei, $ibxish); + } + } else { + $self->wq_do('query_mset', @io, $lei, $srcs); + } + # TODO + for my $rmt (@{$self->{remotes} // []}) { + $self->wq_do('query_thread_mbox', @io, $lei, $rmt); + } +} + 1;