diff options
-rw-r--r-- | lib/PublicInbox/IPC.pm | 14 | ||||
-rw-r--r-- | lib/PublicInbox/LEI.pm | 34 | ||||
-rw-r--r-- | lib/PublicInbox/LeiQuery.pm | 102 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 80 |
4 files changed, 147 insertions, 83 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 8a3120c9..be5b2f45 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -234,6 +234,9 @@ sub wq_worker_loop ($) { my $len = $self->{wq_req_len} // (4096 * 33); my ($rec, $sub, @args); my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2'; + local $SIG{PIPE} = sub { + die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub; + }; until ($self->{-wq_quit}) { my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF my $i = 0; @@ -242,6 +245,7 @@ sub wq_worker_loop ($) { my $mode = shift(@m); if (open(my $fh, $mode, $fd)) { $self->{$i++} = $fh; + $fh->autoflush(1); } else { die "$$ open($mode$fd) (FD:$i): $!"; } @@ -251,8 +255,10 @@ sub wq_worker_loop ($) { die "thaw error on buffer of size:".length($buf); ($sub, @args) = @$rec; eval { $self->$sub(@args) }; - warn "$$ wq_worker: $@" if $@; - delete @$self{0, 1, 2}; + warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE'; + undef $sub; # quiet SIG{PIPE} handler + # need to close explicitly to avoid warnings after SIGPIPE + close($_) for (delete(@$self{0..2})); } } @@ -284,8 +290,8 @@ sub _wq_worker_start ($$) { PublicInbox::DS::sig_setmask($oldset); my $on_destroy = $self->ipc_atfork_child; eval { wq_worker_loop($self) }; - die "worker $self->{-wq_ident} PID:$$ died: $@\n" if $@; - exit; + warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; + exit($@ ? 1 : 0); } else { $self->{-wq_workers}->{$pid} = \undef; } diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 17023191..f8b8cd4a 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -269,6 +269,33 @@ sub fail ($$;$) { undef; } +# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq)); +sub atfork_child_wq { + my ($self, $wq) = @_; + $self->{sock} //= $wq->{0}; + $self->{$_} //= $wq->{$_} for (0..2); + my $oldpipe = $SIG{PIPE}; + ( + __WARN__ => sub { err($self, @_) }, + PIPE => sub { + $self->x_it(141); + $oldpipe->() if ref($oldpipe) eq 'CODE'; + } + ); +} + +# usage: ($lei, @io) = $lei->atfork_prepare_wq($wq); +sub atfork_prepare_wq { + my ($self, $wq) = @_; + if ($wq->wq_workers) { + my $ret = bless { %$self }, ref($self); + my $in = delete $ret->{0}; + ($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2}); + } else { + ($self, ($self->{sock} // $self->{0}), @$self{1, 2}); + } +} + sub _help ($;$) { my ($self, $errmsg) = @_; my $cmd = $self->{cmd} // 'COMMAND'; @@ -608,8 +635,8 @@ sub start_pager { $self->{1} = $wpager; $self->{2} = $wpager if -t $self->{2}; my $pid = spawn([$pager], $env, $rdr); - dwaitpid($pid, undef, $self->{sock}); $env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git + [ $pid, @$rdr{1, 2} ]; } sub accept_dispatch { # Listener {post_accept} callback @@ -675,6 +702,8 @@ sub event_step { sub noop {} +our $oldset; sub oldset { $oldset } + # lei(1) calls this when it can't connect sub lazy_start { my ($path, $errno, $nfd) = @_; @@ -691,7 +720,7 @@ sub lazy_start { my @st = stat($path) or die "stat($path): $!"; my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; - my $oldset = PublicInbox::DS::block_signals(); + local $oldset = PublicInbox::DS::block_signals(); if ($nfd == 1) { require PublicInbox::CmdIPC1; $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1'); @@ -737,6 +766,7 @@ sub lazy_start { }; my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK); local %SIG = (%SIG, %$sig) if !$sigfd; + local $SIG{PIPE} = 'IGNORE'; if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets PublicInbox::DS->SetLoopTimeout(5000); } else { diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index f69dccad..040c284d 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -8,7 +8,7 @@ use v5.10.1; use PublicInbox::MID qw($MID_EXTRACT); use POSIX qw(strftime); use PublicInbox::Address qw(pairs); -use PublicInbox::Search qw(get_pct); +use PublicInbox::DS qw(dwaitpid); sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) } @@ -61,37 +61,30 @@ sub lei_q { my $sto = $self->_lei_store(1); my $cfg = $self->_lei_cfg(1); my $opt = $self->{opt}; - my $qstr = join(' ', map {; - # Consider spaces in argv to be for phrase search in Xapian. - # In other words, the users should need only care about - # normal shell quotes and not have to learn Xapian quoting. - /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_ - } @argv); - $opt->{limit} //= 10000; - my $lxs; require PublicInbox::LeiDedupe; my $dd = PublicInbox::LeiDedupe->new($self); # --local is enabled by default - my @src = $opt->{'local'} ? ($sto->search) : (); + # src: LeiXSearch || LeiSearch || Inbox + my @srcs = $opt->{'local'} ? ($sto->search) : (); + require PublicInbox::LeiXSearch; + my $lxs = PublicInbox::LeiXSearch->new; # --external is enabled by default, but allow --no-external if ($opt->{external} // 1) { - $self->_externals_each(\&_vivify_external, \@src); - # {tid} is not unique between indices, so we have to search - # each src individually - if (!$opt->{thread}) { - require PublicInbox::LeiXSearch; - my $lxs = PublicInbox::LeiXSearch->new; - # local is always first - $lxs->attach_external($_) for @src; - @src = ($lxs); - } + $self->_externals_each(\&_vivify_external, \@srcs); } - my $out = $self->{output} // '-'; + my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs); + $j = 1 if !$opt->{thread}; + if ($self->{pid}) { + $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset) + // $self->wq_workers($j); + } + my $out = $opt->{output} // '-'; $out = 'json:/dev/stdout' if $out eq '-'; my $isatty = -t $self->{1}; - $self->start_pager if $isatty; + # no forking workers after this + my $pid_old12 = $self->start_pager if $isatty; my $json = substr($out, 0, 5) eq 'json:' ? ref(PublicInbox::Config->json)->new : undef; if ($json) { @@ -104,10 +97,14 @@ sub lei_q { $json->canonical; } - # src: LeiXSearch || LeiSearch || Inbox my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset); - delete $mset_opt{limit} if $opt->{limit} < 0; $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0; + $mset_opt{qstr} = join(' ', map {; + # Consider spaces in argv to be for phrase search in Xapian. + # In other words, the users should need only care about + # normal shell quotes and not have to learn Xapian quoting. + /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_ + } @argv); if (defined(my $sort = $opt->{'sort'})) { if ($sort eq 'relevance') { $mset_opt{relevance} = 1; @@ -123,59 +120,12 @@ sub lei_q { # descending docid order $mset_opt{relevance} //= -2 if $opt->{thread}; # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self); - - # even w/o pretty, do the equivalent of a --pretty=oneline - # output so "lei q SEARCH_TERMS | wc -l" can be useful: - my $ORS = $json ? ($opt->{pretty} ? ', ' : ",\n") : "\n"; - my $buf; - - # we can generate too many records to hold in RAM, so we stream - # and fake a JSON array starting here: - $self->out('[') if $json; - my $emit_cb = sub { - my ($smsg) = @_; - delete @$smsg{qw(tid num)}; # only makes sense if single src - chomp($buf = $json->encode(_smsg_unbless($smsg))); - }; - $dd->prepare_dedupe; - for my $src (@src) { - my $srch = $src->search; - my $over = $src->over; - my $smsg_for = $src->can('smsg_for'); # LeiXSearch - my $mo = { %mset_opt }; - my $mset = $srch->mset($qstr, $mo); - my $ctx = {}; - if ($smsg_for) { - for my $it ($mset->items) { - my $smsg = $smsg_for->($srch, $it) or next; - next if $dd->is_smsg_dup($smsg); - $self->out($buf .= $ORS) if defined $buf; - $smsg->{relevance} = get_pct($it); - $emit_cb->($smsg); - } - } else { # --thread - my $ids = $srch->mset_to_artnums($mset, $mo); - $ctx->{ids} = $ids; - my $i = 0; - my %n2p = map { - ($ids->[$i++], get_pct($_)); - } $mset->items; - undef $mset; - while ($over && $over->expand_thread($ctx)) { - for my $n (@{$ctx->{xids}}) { - my $t = $over->get_art($n) or next; - next if $dd->is_smsg_dup($t); - if (my $p = delete $n2p{$t->{num}}) { - $t->{relevance} = $p; - } - $self->out($buf .= $ORS); - $emit_cb->($t); - } - @{$ctx->{xids}} = (); - } - } + $self->{mset_opt} = \%mset_opt; + $lxs->do_query($self, \@srcs); + if ($pid_old12) { + $self->{$_} = $pid_old12->[$_] for (1, 2); + dwaitpid($pid_old12->[0], undef, $self->{sock}); } - $self->out($buf .= "]\n"); # done } 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index b670bc2f..a3010efe 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -7,7 +7,8 @@ package PublicInbox::LeiXSearch; use strict; use v5.10.1; -use parent qw(PublicInbox::LeiSearch); +use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); +use PublicInbox::Search qw(get_pct); sub new { my ($class) = @_; @@ -83,4 +84,81 @@ sub recent { sub over {} +sub _mset_more ($$) { + my ($mset, $mo) = @_; + my $size = $mset->size; + $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000)); +} + +sub query_thread_mset { # for --thread + my ($self, $lei, $ibxish) = @_; + my ($srch, $over) = ($ibxish->search, $ibxish->over); + unless ($srch && $over) { + my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; + warn "$desc not indexed by Xapian\n"; + return; + } + local %SIG = (%SIG, $lei->atfork_child_wq($self)); + my $mo = { %{$lei->{mset_opt}} }; + my $mset; + do { + $mset = $srch->mset($mo->{qstr}, $mo); + my $ids = $srch->mset_to_artnums($mset, $mo); + my $ctx = { ids => $ids }; + my $i = 0; + my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items; + while ($over->expand_thread($ctx)) { + for my $n (@{$ctx->{xids}}) { + my $smsg = $over->get_art($n) or next; + # next if $dd->is_smsg_dup($smsg); TODO + if (my $p = delete $n2p{$smsg->{num}}) { + $smsg->{relevance} = $p; + } + print { $self->{1} } Dumper($smsg); + # $self->out($buf .= $ORS); + # $emit_cb->($smsg); + } + @{$ctx->{xids}} = (); + } + } while (_mset_more($mset, $mo)); +} + +sub query_mset { # non-parallel for non-"--thread" users + my ($self, $lei, $srcs) = @_; + my $mo = { %{$lei->{mset_opt}} }; + my $mset; + local %SIG = (%SIG, $lei->atfork_child_wq($self)); + $self->attach_external($_) for @$srcs; + do { + $mset = $self->mset($mo->{qstr}, $mo); + for my $it ($mset->items) { + my $smsg = smsg_for($self, $it) or next; + # next if $dd->is_smsg_dup($smsg); + $smsg->{relevance} = get_pct($it); + use Data::Dumper; + print { $self->{1} } Dumper($smsg); + # $self->out($buf .= $ORS) if defined $buf; + #$emit_cb->($smsg); + } + } while (_mset_more($mset, $mo)); +} + +sub do_query { + my ($self, $lei_orig, $srcs) = @_; + my ($lei, @io) = $lei_orig->atfork_prepare_wq($self); + $io[1]->autoflush(1); + $io[2]->autoflush(1); + if ($lei->{opt}->{thread}) { + for my $ibxish (@$srcs) { + $self->wq_do('query_thread_mset', @io, $lei, $ibxish); + } + } else { + $self->wq_do('query_mset', @io, $lei, $srcs); + } + # TODO + for my $rmt (@{$self->{remotes} // []}) { + $self->wq_do('query_thread_mbox', @io, $lei, $rmt); + } +} + 1; |