diff options
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 57 |
1 files changed, 56 insertions, 1 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 10c25246..d32fe09a 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -14,6 +14,7 @@ use PublicInbox::Import; use File::Temp 0.19 (); # 0.19 for ->newdir use File::Spec (); use PublicInbox::Search qw(xap_terms); +use PublicInbox::Spawn qw(popen_rd); sub new { my ($class) = @_; @@ -169,8 +170,58 @@ sub query_mset { # non-parallel for non-"--thread" users $lei->{ovv}->ovv_atexit_child($lei); } +sub each_eml { # callback for MboxReader->mboxrd + my ($eml, $self, $lei, $each_smsg) = @_; + my $smsg = bless {}, 'PublicInbox::Smsg'; + $smsg->populate($eml); + $smsg->{$_} //= '' for qw(from to cc ds subject references mid); + delete @$smsg{qw(From Subject -ds -ts)}; + if (my $startq = delete($self->{5})) { wait_startq($startq) } + return if !$lei->{l2m} && $lei->{dedupe}->is_smsg_dup($smsg); + $each_smsg->($smsg, undef, $eml); +} + sub query_remote_mboxrd { my ($self, $lei, $uri) = @_; + local $0 = "$0 query_remote_mboxrd"; + my %sig = $lei->atfork_child_wq($self); # keep $self->{5} startq + local @SIG{keys %sig} = values %sig; + my $opt = $lei->{opt}; + $uri->query_form(q => $lei->{mset_opt}->{qstr}, x => 'm', + $opt->{thread} ? (t => 1) : ()); + my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $uri); + my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing'; + $dedupe->prepare_dedupe; + my @cmd = qw(curl -XPOST -sSf); + my $tor = $opt->{torsocks} //= 'auto'; + if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' && + (($lei->{env}->{LD_PRELOAD}//'') !~ /torsocks/)) { + unshift @cmd, 'torsocks'; + } elsif (PublicInbox::Config::git_bool($tor)) { + unshift @cmd, 'torsocks'; + } + my $verbose = $opt->{verbose}; + push @cmd, '-v' if $verbose; + push @cmd, $uri->as_string; + $lei->err("# @cmd") if $verbose; + $? = 0; + my $fh = popen_rd(\@cmd, $lei->{env}, { 2 => $lei->{2} }); + $fh = IO::Uncompress::Gunzip->new($fh); + eval { + PublicInbox::MboxReader->mboxrd($fh, \&each_eml, + $self, $lei, $each_smsg); + }; + return $lei->fail("E: @cmd: $@") if $@; + if (($? >> 8) == 22) { # HTTP 404 from curl(1) + $uri->query_form(q => $lei->{mset_opt}->{qstr}); + $lei->err('# no results from '.$uri->as_string); + } elsif ($?) { + $uri->query_form(q => $lei->{mset_opt}->{qstr}); + $lei->err('E: '.$uri->as_string); + $lei->child_error($?); + } + undef $each_smsg; + $lei->{ovv}->ovv_atexit_child($lei); } sub git { @@ -230,7 +281,6 @@ sub start_query { # always runs in main (lei-daemon) process } else { $self->wq_do('query_mset', $io, $lei); } - # TODO for my $uri (remotes($self)) { $self->wq_do('query_remote_mboxrd', $io, $lei, $uri); } @@ -263,6 +313,7 @@ sub do_query { my ($lei, @io) = $lei_orig->atfork_parent_wq($self); $io[0] = undef; pipe(my $done, $io[0]) or die "pipe $!"; + $lei_orig->{1}->autoflush(1); $lei_orig->event_step_init; # wait for shutdowns my $done_op = { @@ -296,6 +347,10 @@ sub do_query { sub ipc_atfork_prepare { my ($self) = @_; + if (exists $self->{remotes}) { + require PublicInbox::MboxReader; + require IO::Uncompress::Gunzip; + } # (0: done_wr, 1: stdout|mbox, 2: stderr, # 3: sock, 4: $l2m->{-wq_s1}, 5: $startq) $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&= <&=]); |