about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-10 12:15:11 +0000
committerEric Wong <e@80x24.org>2021-01-12 03:51:42 +0000
commit0c89ebd477d1c7a695a0a0b3023c0d41abe573fa (patch)
tree756c531c331dd7c048465448e61da51d61320696
parent372ff2ba6467e8fcea3eb19e5527a5fc398802f9 (diff)
downloadpublic-inbox-0c89ebd477d1c7a695a0a0b3023c0d41abe573fa.tar.gz
Improve interactivity and user experience by allowing the user
to return to the terminal immediately when the pager is exited
(e.g. hitting the `q' key in less(1)).

This is a massive change which restructures query handling to
allow parallel search when --thread expansion is in use and
offloading to a separate worker when --thread is not in use.

The Xapian query offload changes allow us to reenter the event
loop right away once the search(es) are shipped off to the work
queue workers.

This means the main lei-daemon process can forget the lei(1)
client socket immediately once it's handed off to worker
processes.

We now unblock SIGPIPE in query workers and send an exit(141)
response to the lei(1) client socket to denote SIGPIPE.

This also allows parallelization for users using "lei q" from
multiple terminals.

JSON output is currently broken and will need to be restructured
for more flexibility and fork-safety.
-rw-r--r--lib/PublicInbox/IPC.pm14
-rw-r--r--lib/PublicInbox/LEI.pm34
-rw-r--r--lib/PublicInbox/LeiQuery.pm102
-rw-r--r--lib/PublicInbox/LeiXSearch.pm80
4 files changed, 147 insertions, 83 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 8a3120c9..be5b2f45 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -234,6 +234,9 @@ sub wq_worker_loop ($) {
         my $len = $self->{wq_req_len} // (4096 * 33);
         my ($rec, $sub, @args);
         my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
+        local $SIG{PIPE} = sub {
+                die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub;
+        };
         until ($self->{-wq_quit}) {
                 my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
                 my $i = 0;
@@ -242,6 +245,7 @@ sub wq_worker_loop ($) {
                         my $mode = shift(@m);
                         if (open(my $fh, $mode, $fd)) {
                                 $self->{$i++} = $fh;
+                                $fh->autoflush(1);
                         } else {
                                 die "$$ open($mode$fd) (FD:$i): $!";
                         }
@@ -251,8 +255,10 @@ sub wq_worker_loop ($) {
                         die "thaw error on buffer of size:".length($buf);
                 ($sub, @args) = @$rec;
                 eval { $self->$sub(@args) };
-                warn "$$ wq_worker: $@" if $@;
-                delete @$self{0, 1, 2};
+                warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE';
+                undef $sub; # quiet SIG{PIPE} handler
+                # need to close explicitly to avoid warnings after SIGPIPE
+                close($_) for (delete(@$self{0..2}));
         }
 }
 
@@ -284,8 +290,8 @@ sub _wq_worker_start ($$) {
                 PublicInbox::DS::sig_setmask($oldset);
                 my $on_destroy = $self->ipc_atfork_child;
                 eval { wq_worker_loop($self) };
-                die "worker $self->{-wq_ident} PID:$$ died: $@\n" if $@;
-                exit;
+                warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
+                exit($@ ? 1 : 0);
         } else {
                 $self->{-wq_workers}->{$pid} = \undef;
         }
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 17023191..f8b8cd4a 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -269,6 +269,33 @@ sub fail ($$;$) {
         undef;
 }
 
+# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
+sub atfork_child_wq {
+        my ($self, $wq) = @_;
+        $self->{sock} //= $wq->{0};
+        $self->{$_} //= $wq->{$_} for (0..2);
+        my $oldpipe = $SIG{PIPE};
+        (
+                __WARN__ => sub { err($self, @_) },
+                PIPE => sub {
+                        $self->x_it(141);
+                        $oldpipe->() if ref($oldpipe) eq 'CODE';
+                }
+        );
+}
+
+# usage: ($lei, @io) = $lei->atfork_prepare_wq($wq);
+sub atfork_prepare_wq {
+        my ($self, $wq) = @_;
+        if ($wq->wq_workers) {
+                my $ret = bless { %$self }, ref($self);
+                my $in = delete $ret->{0};
+                ($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2});
+        } else {
+                ($self, ($self->{sock} // $self->{0}), @$self{1, 2});
+        }
+}
+
 sub _help ($;$) {
         my ($self, $errmsg) = @_;
         my $cmd = $self->{cmd} // 'COMMAND';
@@ -608,8 +635,8 @@ sub start_pager {
         $self->{1} = $wpager;
         $self->{2} = $wpager if -t $self->{2};
         my $pid = spawn([$pager], $env, $rdr);
-        dwaitpid($pid, undef, $self->{sock});
         $env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
+        [ $pid, @$rdr{1, 2} ];
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
@@ -675,6 +702,8 @@ sub event_step {
 
 sub noop {}
 
+our $oldset; sub oldset { $oldset }
+
 # lei(1) calls this when it can't connect
 sub lazy_start {
         my ($path, $errno, $nfd) = @_;
@@ -691,7 +720,7 @@ sub lazy_start {
         my @st = stat($path) or die "stat($path): $!";
         my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
         pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
-        my $oldset = PublicInbox::DS::block_signals();
+        local $oldset = PublicInbox::DS::block_signals();
         if ($nfd == 1) {
                 require PublicInbox::CmdIPC1;
                 $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
@@ -737,6 +766,7 @@ sub lazy_start {
         };
         my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
         local %SIG = (%SIG, %$sig) if !$sigfd;
+        local $SIG{PIPE} = 'IGNORE';
         if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
                 PublicInbox::DS->SetLoopTimeout(5000);
         } else {
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index f69dccad..040c284d 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -8,7 +8,7 @@ use v5.10.1;
 use PublicInbox::MID qw($MID_EXTRACT);
 use POSIX qw(strftime);
 use PublicInbox::Address qw(pairs);
-use PublicInbox::Search qw(get_pct);
+use PublicInbox::DS qw(dwaitpid);
 
 sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
 
@@ -61,37 +61,30 @@ sub lei_q {
         my $sto = $self->_lei_store(1);
         my $cfg = $self->_lei_cfg(1);
         my $opt = $self->{opt};
-        my $qstr = join(' ', map {;
-                # Consider spaces in argv to be for phrase search in Xapian.
-                # In other words, the users should need only care about
-                # normal shell quotes and not have to learn Xapian quoting.
-                /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
-        } @argv);
-        $opt->{limit} //= 10000;
-        my $lxs;
         require PublicInbox::LeiDedupe;
         my $dd = PublicInbox::LeiDedupe->new($self);
 
         # --local is enabled by default
-        my @src = $opt->{'local'} ? ($sto->search) : ();
+        # src: LeiXSearch || LeiSearch || Inbox
+        my @srcs = $opt->{'local'} ? ($sto->search) : ();
+        require PublicInbox::LeiXSearch;
+        my $lxs = PublicInbox::LeiXSearch->new;
 
         # --external is enabled by default, but allow --no-external
         if ($opt->{external} // 1) {
-                $self->_externals_each(\&_vivify_external, \@src);
-                # {tid} is not unique between indices, so we have to search
-                # each src individually
-                if (!$opt->{thread}) {
-                        require PublicInbox::LeiXSearch;
-                        my $lxs = PublicInbox::LeiXSearch->new;
-                        # local is always first
-                        $lxs->attach_external($_) for @src;
-                        @src = ($lxs);
-                }
+                $self->_externals_each(\&_vivify_external, \@srcs);
         }
-        my $out = $self->{output} // '-';
+        my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs);
+        $j = 1 if !$opt->{thread};
+        if ($self->{pid}) {
+                $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
+                        // $self->wq_workers($j);
+        }
+        my $out = $opt->{output} // '-';
         $out = 'json:/dev/stdout' if $out eq '-';
         my $isatty = -t $self->{1};
-        $self->start_pager if $isatty;
+        # no forking workers after this
+        my $pid_old12 = $self->start_pager if $isatty;
         my $json = substr($out, 0, 5) eq 'json:' ?
                 ref(PublicInbox::Config->json)->new : undef;
         if ($json) {
@@ -104,10 +97,14 @@ sub lei_q {
                 $json->canonical;
         }
 
-        # src: LeiXSearch || LeiSearch || Inbox
         my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
-        delete $mset_opt{limit} if $opt->{limit} < 0;
         $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
+        $mset_opt{qstr} = join(' ', map {;
+                # Consider spaces in argv to be for phrase search in Xapian.
+                # In other words, the users should need only care about
+                # normal shell quotes and not have to learn Xapian quoting.
+                /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
+        } @argv);
         if (defined(my $sort = $opt->{'sort'})) {
                 if ($sort eq 'relevance') {
                         $mset_opt{relevance} = 1;
@@ -123,59 +120,12 @@ sub lei_q {
         # descending docid order
         $mset_opt{relevance} //= -2 if $opt->{thread};
         # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
-
-        # even w/o pretty, do the equivalent of a --pretty=oneline
-        # output so "lei q SEARCH_TERMS | wc -l" can be useful:
-        my $ORS = $json ? ($opt->{pretty} ? ', ' : ",\n") : "\n";
-        my $buf;
-
-        # we can generate too many records to hold in RAM, so we stream
-        # and fake a JSON array starting here:
-        $self->out('[') if $json;
-        my $emit_cb = sub {
-                my ($smsg) = @_;
-                delete @$smsg{qw(tid num)}; # only makes sense if single src
-                chomp($buf = $json->encode(_smsg_unbless($smsg)));
-        };
-        $dd->prepare_dedupe;
-        for my $src (@src) {
-                my $srch = $src->search;
-                my $over = $src->over;
-                my $smsg_for = $src->can('smsg_for'); # LeiXSearch
-                my $mo = { %mset_opt };
-                my $mset = $srch->mset($qstr, $mo);
-                my $ctx = {};
-                if ($smsg_for) {
-                        for my $it ($mset->items) {
-                                my $smsg = $smsg_for->($srch, $it) or next;
-                                next if $dd->is_smsg_dup($smsg);
-                                $self->out($buf .= $ORS) if defined $buf;
-                                $smsg->{relevance} = get_pct($it);
-                                $emit_cb->($smsg);
-                        }
-                } else { # --thread
-                        my $ids = $srch->mset_to_artnums($mset, $mo);
-                        $ctx->{ids} = $ids;
-                        my $i = 0;
-                        my %n2p = map {
-                                ($ids->[$i++], get_pct($_));
-                        } $mset->items;
-                        undef $mset;
-                        while ($over && $over->expand_thread($ctx)) {
-                                for my $n (@{$ctx->{xids}}) {
-                                        my $t = $over->get_art($n) or next;
-                                        next if $dd->is_smsg_dup($t);
-                                        if (my $p = delete $n2p{$t->{num}}) {
-                                                $t->{relevance} = $p;
-                                        }
-                                        $self->out($buf .= $ORS);
-                                        $emit_cb->($t);
-                                }
-                                @{$ctx->{xids}} = ();
-                        }
-                }
+        $self->{mset_opt} = \%mset_opt;
+        $lxs->do_query($self, \@srcs);
+        if ($pid_old12) {
+                $self->{$_} = $pid_old12->[$_] for (1, 2);
+                dwaitpid($pid_old12->[0], undef, $self->{sock});
         }
-        $self->out($buf .= "]\n"); # done
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index b670bc2f..a3010efe 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -7,7 +7,8 @@
 package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::LeiSearch);
+use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
+use PublicInbox::Search qw(get_pct);
 
 sub new {
         my ($class) = @_;
@@ -83,4 +84,81 @@ sub recent {
 
 sub over {}
 
+sub _mset_more ($$) {
+        my ($mset, $mo) = @_;
+        my $size = $mset->size;
+        $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+}
+
+sub query_thread_mset { # for --thread
+        my ($self, $lei, $ibxish) = @_;
+        my ($srch, $over) = ($ibxish->search, $ibxish->over);
+        unless ($srch && $over) {
+                my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+                warn "$desc not indexed by Xapian\n";
+                return;
+        }
+        local %SIG = (%SIG, $lei->atfork_child_wq($self));
+        my $mo = { %{$lei->{mset_opt}} };
+        my $mset;
+        do {
+                $mset = $srch->mset($mo->{qstr}, $mo);
+                my $ids = $srch->mset_to_artnums($mset, $mo);
+                my $ctx = { ids => $ids };
+                my $i = 0;
+                my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+                while ($over->expand_thread($ctx)) {
+                        for my $n (@{$ctx->{xids}}) {
+                                my $smsg = $over->get_art($n) or next;
+                                # next if $dd->is_smsg_dup($smsg); TODO
+                                if (my $p = delete $n2p{$smsg->{num}}) {
+                                        $smsg->{relevance} = $p;
+                                }
+                                print { $self->{1} } Dumper($smsg);
+                                # $self->out($buf .= $ORS);
+                                # $emit_cb->($smsg);
+                        }
+                        @{$ctx->{xids}} = ();
+                }
+        } while (_mset_more($mset, $mo));
+}
+
+sub query_mset { # non-parallel for non-"--thread" users
+        my ($self, $lei, $srcs) = @_;
+        my $mo = { %{$lei->{mset_opt}} };
+        my $mset;
+        local %SIG = (%SIG, $lei->atfork_child_wq($self));
+        $self->attach_external($_) for @$srcs;
+        do {
+                $mset = $self->mset($mo->{qstr}, $mo);
+                for my $it ($mset->items) {
+                        my $smsg = smsg_for($self, $it) or next;
+                        # next if $dd->is_smsg_dup($smsg);
+                        $smsg->{relevance} = get_pct($it);
+                        use Data::Dumper;
+                        print { $self->{1} } Dumper($smsg);
+                        # $self->out($buf .= $ORS) if defined $buf;
+                        #$emit_cb->($smsg);
+                }
+        } while (_mset_more($mset, $mo));
+}
+
+sub do_query {
+        my ($self, $lei_orig, $srcs) = @_;
+        my ($lei, @io) = $lei_orig->atfork_prepare_wq($self);
+        $io[1]->autoflush(1);
+        $io[2]->autoflush(1);
+        if ($lei->{opt}->{thread}) {
+                for my $ibxish (@$srcs) {
+                        $self->wq_do('query_thread_mset', @io, $lei, $ibxish);
+                }
+        } else {
+                $self->wq_do('query_mset', @io, $lei, $srcs);
+        }
+        # TODO
+        for my $rmt (@{$self->{remotes} // []}) {
+                $self->wq_do('query_thread_mbox', @io, $lei, $rmt);
+        }
+}
+
 1;