about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/IPC.pm14
-rw-r--r--lib/PublicInbox/LEI.pm34
-rw-r--r--lib/PublicInbox/LeiQuery.pm102
-rw-r--r--lib/PublicInbox/LeiXSearch.pm80
4 files changed, 147 insertions, 83 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 8a3120c9..be5b2f45 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -234,6 +234,9 @@ sub wq_worker_loop ($) {
         my $len = $self->{wq_req_len} // (4096 * 33);
         my ($rec, $sub, @args);
         my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
+        local $SIG{PIPE} = sub {
+                die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub;
+        };
         until ($self->{-wq_quit}) {
                 my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
                 my $i = 0;
@@ -242,6 +245,7 @@ sub wq_worker_loop ($) {
                         my $mode = shift(@m);
                         if (open(my $fh, $mode, $fd)) {
                                 $self->{$i++} = $fh;
+                                $fh->autoflush(1);
                         } else {
                                 die "$$ open($mode$fd) (FD:$i): $!";
                         }
@@ -251,8 +255,10 @@ sub wq_worker_loop ($) {
                         die "thaw error on buffer of size:".length($buf);
                 ($sub, @args) = @$rec;
                 eval { $self->$sub(@args) };
-                warn "$$ wq_worker: $@" if $@;
-                delete @$self{0, 1, 2};
+                warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE';
+                undef $sub; # quiet SIG{PIPE} handler
+                # need to close explicitly to avoid warnings after SIGPIPE
+                close($_) for (delete(@$self{0..2}));
         }
 }
 
@@ -284,8 +290,8 @@ sub _wq_worker_start ($$) {
                 PublicInbox::DS::sig_setmask($oldset);
                 my $on_destroy = $self->ipc_atfork_child;
                 eval { wq_worker_loop($self) };
-                die "worker $self->{-wq_ident} PID:$$ died: $@\n" if $@;
-                exit;
+                warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
+                exit($@ ? 1 : 0);
         } else {
                 $self->{-wq_workers}->{$pid} = \undef;
         }
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 17023191..f8b8cd4a 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -269,6 +269,33 @@ sub fail ($$;$) {
         undef;
 }
 
+# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
+sub atfork_child_wq {
+        my ($self, $wq) = @_;
+        $self->{sock} //= $wq->{0};
+        $self->{$_} //= $wq->{$_} for (0..2);
+        my $oldpipe = $SIG{PIPE};
+        (
+                __WARN__ => sub { err($self, @_) },
+                PIPE => sub {
+                        $self->x_it(141);
+                        $oldpipe->() if ref($oldpipe) eq 'CODE';
+                }
+        );
+}
+
+# usage: ($lei, @io) = $lei->atfork_prepare_wq($wq);
+sub atfork_prepare_wq {
+        my ($self, $wq) = @_;
+        if ($wq->wq_workers) {
+                my $ret = bless { %$self }, ref($self);
+                my $in = delete $ret->{0};
+                ($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2});
+        } else {
+                ($self, ($self->{sock} // $self->{0}), @$self{1, 2});
+        }
+}
+
 sub _help ($;$) {
         my ($self, $errmsg) = @_;
         my $cmd = $self->{cmd} // 'COMMAND';
@@ -608,8 +635,8 @@ sub start_pager {
         $self->{1} = $wpager;
         $self->{2} = $wpager if -t $self->{2};
         my $pid = spawn([$pager], $env, $rdr);
-        dwaitpid($pid, undef, $self->{sock});
         $env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
+        [ $pid, @$rdr{1, 2} ];
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
@@ -675,6 +702,8 @@ sub event_step {
 
 sub noop {}
 
+our $oldset; sub oldset { $oldset }
+
 # lei(1) calls this when it can't connect
 sub lazy_start {
         my ($path, $errno, $nfd) = @_;
@@ -691,7 +720,7 @@ sub lazy_start {
         my @st = stat($path) or die "stat($path): $!";
         my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
         pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
-        my $oldset = PublicInbox::DS::block_signals();
+        local $oldset = PublicInbox::DS::block_signals();
         if ($nfd == 1) {
                 require PublicInbox::CmdIPC1;
                 $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
@@ -737,6 +766,7 @@ sub lazy_start {
         };
         my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
         local %SIG = (%SIG, %$sig) if !$sigfd;
+        local $SIG{PIPE} = 'IGNORE';
         if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
                 PublicInbox::DS->SetLoopTimeout(5000);
         } else {
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index f69dccad..040c284d 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -8,7 +8,7 @@ use v5.10.1;
 use PublicInbox::MID qw($MID_EXTRACT);
 use POSIX qw(strftime);
 use PublicInbox::Address qw(pairs);
-use PublicInbox::Search qw(get_pct);
+use PublicInbox::DS qw(dwaitpid);
 
 sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
 
@@ -61,37 +61,30 @@ sub lei_q {
         my $sto = $self->_lei_store(1);
         my $cfg = $self->_lei_cfg(1);
         my $opt = $self->{opt};
-        my $qstr = join(' ', map {;
-                # Consider spaces in argv to be for phrase search in Xapian.
-                # In other words, the users should need only care about
-                # normal shell quotes and not have to learn Xapian quoting.
-                /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
-        } @argv);
-        $opt->{limit} //= 10000;
-        my $lxs;
         require PublicInbox::LeiDedupe;
         my $dd = PublicInbox::LeiDedupe->new($self);
 
         # --local is enabled by default
-        my @src = $opt->{'local'} ? ($sto->search) : ();
+        # src: LeiXSearch || LeiSearch || Inbox
+        my @srcs = $opt->{'local'} ? ($sto->search) : ();
+        require PublicInbox::LeiXSearch;
+        my $lxs = PublicInbox::LeiXSearch->new;
 
         # --external is enabled by default, but allow --no-external
         if ($opt->{external} // 1) {
-                $self->_externals_each(\&_vivify_external, \@src);
-                # {tid} is not unique between indices, so we have to search
-                # each src individually
-                if (!$opt->{thread}) {
-                        require PublicInbox::LeiXSearch;
-                        my $lxs = PublicInbox::LeiXSearch->new;
-                        # local is always first
-                        $lxs->attach_external($_) for @src;
-                        @src = ($lxs);
-                }
+                $self->_externals_each(\&_vivify_external, \@srcs);
         }
-        my $out = $self->{output} // '-';
+        my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs);
+        $j = 1 if !$opt->{thread};
+        if ($self->{pid}) {
+                $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
+                        // $self->wq_workers($j);
+        }
+        my $out = $opt->{output} // '-';
         $out = 'json:/dev/stdout' if $out eq '-';
         my $isatty = -t $self->{1};
-        $self->start_pager if $isatty;
+        # no forking workers after this
+        my $pid_old12 = $self->start_pager if $isatty;
         my $json = substr($out, 0, 5) eq 'json:' ?
                 ref(PublicInbox::Config->json)->new : undef;
         if ($json) {
@@ -104,10 +97,14 @@ sub lei_q {
                 $json->canonical;
         }
 
-        # src: LeiXSearch || LeiSearch || Inbox
         my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
-        delete $mset_opt{limit} if $opt->{limit} < 0;
         $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
+        $mset_opt{qstr} = join(' ', map {;
+                # Consider spaces in argv to be for phrase search in Xapian.
+                # In other words, the users should need only care about
+                # normal shell quotes and not have to learn Xapian quoting.
+                /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
+        } @argv);
         if (defined(my $sort = $opt->{'sort'})) {
                 if ($sort eq 'relevance') {
                         $mset_opt{relevance} = 1;
@@ -123,59 +120,12 @@ sub lei_q {
         # descending docid order
         $mset_opt{relevance} //= -2 if $opt->{thread};
         # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
-
-        # even w/o pretty, do the equivalent of a --pretty=oneline
-        # output so "lei q SEARCH_TERMS | wc -l" can be useful:
-        my $ORS = $json ? ($opt->{pretty} ? ', ' : ",\n") : "\n";
-        my $buf;
-
-        # we can generate too many records to hold in RAM, so we stream
-        # and fake a JSON array starting here:
-        $self->out('[') if $json;
-        my $emit_cb = sub {
-                my ($smsg) = @_;
-                delete @$smsg{qw(tid num)}; # only makes sense if single src
-                chomp($buf = $json->encode(_smsg_unbless($smsg)));
-        };
-        $dd->prepare_dedupe;
-        for my $src (@src) {
-                my $srch = $src->search;
-                my $over = $src->over;
-                my $smsg_for = $src->can('smsg_for'); # LeiXSearch
-                my $mo = { %mset_opt };
-                my $mset = $srch->mset($qstr, $mo);
-                my $ctx = {};
-                if ($smsg_for) {
-                        for my $it ($mset->items) {
-                                my $smsg = $smsg_for->($srch, $it) or next;
-                                next if $dd->is_smsg_dup($smsg);
-                                $self->out($buf .= $ORS) if defined $buf;
-                                $smsg->{relevance} = get_pct($it);
-                                $emit_cb->($smsg);
-                        }
-                } else { # --thread
-                        my $ids = $srch->mset_to_artnums($mset, $mo);
-                        $ctx->{ids} = $ids;
-                        my $i = 0;
-                        my %n2p = map {
-                                ($ids->[$i++], get_pct($_));
-                        } $mset->items;
-                        undef $mset;
-                        while ($over && $over->expand_thread($ctx)) {
-                                for my $n (@{$ctx->{xids}}) {
-                                        my $t = $over->get_art($n) or next;
-                                        next if $dd->is_smsg_dup($t);
-                                        if (my $p = delete $n2p{$t->{num}}) {
-                                                $t->{relevance} = $p;
-                                        }
-                                        $self->out($buf .= $ORS);
-                                        $emit_cb->($t);
-                                }
-                                @{$ctx->{xids}} = ();
-                        }
-                }
+        $self->{mset_opt} = \%mset_opt;
+        $lxs->do_query($self, \@srcs);
+        if ($pid_old12) {
+                $self->{$_} = $pid_old12->[$_] for (1, 2);
+                dwaitpid($pid_old12->[0], undef, $self->{sock});
         }
-        $self->out($buf .= "]\n"); # done
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index b670bc2f..a3010efe 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -7,7 +7,8 @@
 package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::LeiSearch);
+use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
+use PublicInbox::Search qw(get_pct);
 
 sub new {
         my ($class) = @_;
@@ -83,4 +84,81 @@ sub recent {
 
 sub over {}
 
+sub _mset_more ($$) {
+        my ($mset, $mo) = @_;
+        my $size = $mset->size;
+        $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+}
+
+sub query_thread_mset { # for --thread
+        my ($self, $lei, $ibxish) = @_;
+        my ($srch, $over) = ($ibxish->search, $ibxish->over);
+        unless ($srch && $over) {
+                my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+                warn "$desc not indexed by Xapian\n";
+                return;
+        }
+        local %SIG = (%SIG, $lei->atfork_child_wq($self));
+        my $mo = { %{$lei->{mset_opt}} };
+        my $mset;
+        do {
+                $mset = $srch->mset($mo->{qstr}, $mo);
+                my $ids = $srch->mset_to_artnums($mset, $mo);
+                my $ctx = { ids => $ids };
+                my $i = 0;
+                my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+                while ($over->expand_thread($ctx)) {
+                        for my $n (@{$ctx->{xids}}) {
+                                my $smsg = $over->get_art($n) or next;
+                                # next if $dd->is_smsg_dup($smsg); TODO
+                                if (my $p = delete $n2p{$smsg->{num}}) {
+                                        $smsg->{relevance} = $p;
+                                }
+                                print { $self->{1} } Dumper($smsg);
+                                # $self->out($buf .= $ORS);
+                                # $emit_cb->($smsg);
+                        }
+                        @{$ctx->{xids}} = ();
+                }
+        } while (_mset_more($mset, $mo));
+}
+
+sub query_mset { # non-parallel for non-"--thread" users
+        my ($self, $lei, $srcs) = @_;
+        my $mo = { %{$lei->{mset_opt}} };
+        my $mset;
+        local %SIG = (%SIG, $lei->atfork_child_wq($self));
+        $self->attach_external($_) for @$srcs;
+        do {
+                $mset = $self->mset($mo->{qstr}, $mo);
+                for my $it ($mset->items) {
+                        my $smsg = smsg_for($self, $it) or next;
+                        # next if $dd->is_smsg_dup($smsg);
+                        $smsg->{relevance} = get_pct($it);
+                        use Data::Dumper;
+                        print { $self->{1} } Dumper($smsg);
+                        # $self->out($buf .= $ORS) if defined $buf;
+                        #$emit_cb->($smsg);
+                }
+        } while (_mset_more($mset, $mo));
+}
+
+sub do_query {
+        my ($self, $lei_orig, $srcs) = @_;
+        my ($lei, @io) = $lei_orig->atfork_prepare_wq($self);
+        $io[1]->autoflush(1);
+        $io[2]->autoflush(1);
+        if ($lei->{opt}->{thread}) {
+                for my $ibxish (@$srcs) {
+                        $self->wq_do('query_thread_mset', @io, $lei, $ibxish);
+                }
+        } else {
+                $self->wq_do('query_mset', @io, $lei, $srcs);
+        }
+        # TODO
+        for my $rmt (@{$self->{remotes} // []}) {
+                $self->wq_do('query_thread_mbox', @io, $lei, $rmt);
+        }
+}
+
 1;