about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiQuery.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-10 12:15:11 +0000
committerEric Wong <e@80x24.org>2021-01-12 03:51:42 +0000
commit0c89ebd477d1c7a695a0a0b3023c0d41abe573fa (patch)
tree756c531c331dd7c048465448e61da51d61320696 /lib/PublicInbox/LeiQuery.pm
parent372ff2ba6467e8fcea3eb19e5527a5fc398802f9 (diff)
downloadpublic-inbox-0c89ebd477d1c7a695a0a0b3023c0d41abe573fa.tar.gz
Improve interactivity and user experience by allowing the user
to return to the terminal immediately when the pager is exited
(e.g. hitting the `q' key in less(1)).

This is a massive change which restructures query handling to
allow parallel search when --thread expansion is in use and
offloading to a separate worker when --thread is not in use.

The Xapian query offload changes allow us to reenter the event
loop right away once the search(es) are shipped off to the work
queue workers.

This means the main lei-daemon process can forget the lei(1)
client socket immediately once it's handed off to worker
processes.

We now unblock SIGPIPE in query workers and send an exit(141)
response to the lei(1) client socket to denote SIGPIPE.

This also allows parallelization for users using "lei q" from
multiple terminals.

JSON output is currently broken and will need to be restructured
for more flexibility and fork-safety.
Diffstat (limited to 'lib/PublicInbox/LeiQuery.pm')
-rw-r--r--lib/PublicInbox/LeiQuery.pm102
1 files changed, 26 insertions, 76 deletions
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index f69dccad..040c284d 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -8,7 +8,7 @@ use v5.10.1;
 use PublicInbox::MID qw($MID_EXTRACT);
 use POSIX qw(strftime);
 use PublicInbox::Address qw(pairs);
-use PublicInbox::Search qw(get_pct);
+use PublicInbox::DS qw(dwaitpid);
 
 sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
 
@@ -61,37 +61,30 @@ sub lei_q {
         my $sto = $self->_lei_store(1);
         my $cfg = $self->_lei_cfg(1);
         my $opt = $self->{opt};
-        my $qstr = join(' ', map {;
-                # Consider spaces in argv to be for phrase search in Xapian.
-                # In other words, the users should need only care about
-                # normal shell quotes and not have to learn Xapian quoting.
-                /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
-        } @argv);
-        $opt->{limit} //= 10000;
-        my $lxs;
         require PublicInbox::LeiDedupe;
         my $dd = PublicInbox::LeiDedupe->new($self);
 
         # --local is enabled by default
-        my @src = $opt->{'local'} ? ($sto->search) : ();
+        # src: LeiXSearch || LeiSearch || Inbox
+        my @srcs = $opt->{'local'} ? ($sto->search) : ();
+        require PublicInbox::LeiXSearch;
+        my $lxs = PublicInbox::LeiXSearch->new;
 
         # --external is enabled by default, but allow --no-external
         if ($opt->{external} // 1) {
-                $self->_externals_each(\&_vivify_external, \@src);
-                # {tid} is not unique between indices, so we have to search
-                # each src individually
-                if (!$opt->{thread}) {
-                        require PublicInbox::LeiXSearch;
-                        my $lxs = PublicInbox::LeiXSearch->new;
-                        # local is always first
-                        $lxs->attach_external($_) for @src;
-                        @src = ($lxs);
-                }
+                $self->_externals_each(\&_vivify_external, \@srcs);
         }
-        my $out = $self->{output} // '-';
+        my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs);
+        $j = 1 if !$opt->{thread};
+        if ($self->{pid}) {
+                $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
+                        // $self->wq_workers($j);
+        }
+        my $out = $opt->{output} // '-';
         $out = 'json:/dev/stdout' if $out eq '-';
         my $isatty = -t $self->{1};
-        $self->start_pager if $isatty;
+        # no forking workers after this
+        my $pid_old12 = $self->start_pager if $isatty;
         my $json = substr($out, 0, 5) eq 'json:' ?
                 ref(PublicInbox::Config->json)->new : undef;
         if ($json) {
@@ -104,10 +97,14 @@ sub lei_q {
                 $json->canonical;
         }
 
-        # src: LeiXSearch || LeiSearch || Inbox
         my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
-        delete $mset_opt{limit} if $opt->{limit} < 0;
         $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
+        $mset_opt{qstr} = join(' ', map {;
+                # Consider spaces in argv to be for phrase search in Xapian.
+                # In other words, the users should need only care about
+                # normal shell quotes and not have to learn Xapian quoting.
+                /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
+        } @argv);
         if (defined(my $sort = $opt->{'sort'})) {
                 if ($sort eq 'relevance') {
                         $mset_opt{relevance} = 1;
@@ -123,59 +120,12 @@ sub lei_q {
         # descending docid order
         $mset_opt{relevance} //= -2 if $opt->{thread};
         # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
-
-        # even w/o pretty, do the equivalent of a --pretty=oneline
-        # output so "lei q SEARCH_TERMS | wc -l" can be useful:
-        my $ORS = $json ? ($opt->{pretty} ? ', ' : ",\n") : "\n";
-        my $buf;
-
-        # we can generate too many records to hold in RAM, so we stream
-        # and fake a JSON array starting here:
-        $self->out('[') if $json;
-        my $emit_cb = sub {
-                my ($smsg) = @_;
-                delete @$smsg{qw(tid num)}; # only makes sense if single src
-                chomp($buf = $json->encode(_smsg_unbless($smsg)));
-        };
-        $dd->prepare_dedupe;
-        for my $src (@src) {
-                my $srch = $src->search;
-                my $over = $src->over;
-                my $smsg_for = $src->can('smsg_for'); # LeiXSearch
-                my $mo = { %mset_opt };
-                my $mset = $srch->mset($qstr, $mo);
-                my $ctx = {};
-                if ($smsg_for) {
-                        for my $it ($mset->items) {
-                                my $smsg = $smsg_for->($srch, $it) or next;
-                                next if $dd->is_smsg_dup($smsg);
-                                $self->out($buf .= $ORS) if defined $buf;
-                                $smsg->{relevance} = get_pct($it);
-                                $emit_cb->($smsg);
-                        }
-                } else { # --thread
-                        my $ids = $srch->mset_to_artnums($mset, $mo);
-                        $ctx->{ids} = $ids;
-                        my $i = 0;
-                        my %n2p = map {
-                                ($ids->[$i++], get_pct($_));
-                        } $mset->items;
-                        undef $mset;
-                        while ($over && $over->expand_thread($ctx)) {
-                                for my $n (@{$ctx->{xids}}) {
-                                        my $t = $over->get_art($n) or next;
-                                        next if $dd->is_smsg_dup($t);
-                                        if (my $p = delete $n2p{$t->{num}}) {
-                                                $t->{relevance} = $p;
-                                        }
-                                        $self->out($buf .= $ORS);
-                                        $emit_cb->($t);
-                                }
-                                @{$ctx->{xids}} = ();
-                        }
-                }
+        $self->{mset_opt} = \%mset_opt;
+        $lxs->do_query($self, \@srcs);
+        if ($pid_old12) {
+                $self->{$_} = $pid_old12->[$_] for (1, 2);
+                dwaitpid($pid_old12->[0], undef, $self->{sock});
         }
-        $self->out($buf .= "]\n"); # done
 }
 
 1;