diff options
author | Eric Wong <e@80x24.org> | 2021-01-10 12:15:19 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2021-01-12 03:51:43 +0000 |
commit | b0898d18efbc8f646b736088f9600b87be88f91e (patch) | |
tree | 724cf4c2649eaf880087b2fbe2bf2b47ecde1675 /lib/PublicInbox/LeiXSearch.pm | |
parent | 3019046b3ab9736922762df111d60ef7647e36a3 (diff) | |
download | public-inbox-b0898d18efbc8f646b736088f9600b87be88f91e.tar.gz |
This internal API is better suited for fork-friendliness (but locking + dedupe still needs to be re-added). Normal "json" is the default, though stream-friendly "concatjson" and "jsonl" (AKA "ndjson" AKA "ldjson") all seem working (though tests aren't working, yet). For normal "json", the biggest downside is the necessity of a trailing "null" element at the end of the array because of parallel processes, since (AFAIK) regular JSON doesn't allow trailing commas, unlike JavaScript.
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 25 |
1 files changed, 14 insertions, 11 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 94f7c2bc..c030b2b2 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -8,7 +8,6 @@ package PublicInbox::LeiXSearch; use strict; use v5.10.1; use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); -use PublicInbox::Search qw(get_pct); use Sys::Syslog qw(syslog); sub new { @@ -102,26 +101,26 @@ sub query_thread_mset { # for --thread } my $mo = { %{$lei->{mset_opt}} }; my $mset; + my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); do { $mset = $srch->mset($mo->{qstr}, $mo); my $ids = $srch->mset_to_artnums($mset, $mo); my $ctx = { ids => $ids }; my $i = 0; - my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items; + my %n2item = map { ($ids->[$i++], $_) } $mset->items; while ($over->expand_thread($ctx)) { for my $n (@{$ctx->{xids}}) { my $smsg = $over->get_art($n) or next; # next if $dd->is_smsg_dup($smsg); TODO - if (my $p = delete $n2p{$smsg->{num}}) { - $smsg->{relevance} = $p; - } - print { $self->{1} } Dumper($smsg); + my $mitem = delete $n2item{$smsg->{num}}; + $each_smsg->($smsg, $mitem); # $self->out($buf .= $ORS); # $emit_cb->($smsg); } @{$ctx->{xids}} = (); } } while (_mset_more($mset, $mo)); + $lei->{ovv}->ovv_atexit_child($lei); } sub query_mset { # non-parallel for non-"--thread" users @@ -130,23 +129,24 @@ sub query_mset { # non-parallel for non-"--thread" users my $mset; local %SIG = (%SIG, $lei->atfork_child_wq($self)); $self->attach_external($_) for @$srcs; + my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); do { $mset = $self->mset($mo->{qstr}, $mo); for my $it ($mset->items) { my $smsg = smsg_for($self, $it) or next; # next if $dd->is_smsg_dup($smsg); - $smsg->{relevance} = get_pct($it); - use Data::Dumper; - print { $self->{1} } Dumper($smsg); + $each_smsg->($smsg, $it); # $self->out($buf .= $ORS) if defined $buf; #$emit_cb->($smsg); } } while (_mset_more($mset, $mo)); + $lei->{ovv}->ovv_atexit_child($lei); } sub do_query { - my ($self, $lei_orig, $srcs) = @_; + my ($self, $lei_orig, $qry_done, $srcs) = @_; my ($lei, @io) = $lei_orig->atfork_parent_wq($self); + $io[0] = $qry_done; # don't need stdin $io[1]->autoflush(1); $io[2]->autoflush(1); if ($lei->{opt}->{thread}) { @@ -160,6 +160,9 @@ sub do_query { for my $rmt (@{$self->{remotes} // []}) { $self->wq_do('query_thread_mbox', \@io, $lei, $rmt); } + + # sent off to children, they will drop remaining references to it + close $qry_done; } sub ipc_atfork_child { @@ -170,7 +173,7 @@ sub ipc_atfork_child { sub ipc_atfork_prepare { my ($self) = @_; - $self->wq_set_recv_modes(qw[<&= >&= >&= +<&=]); + $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]); $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC } |