about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/LeiQuery.pm2
-rw-r--r--lib/PublicInbox/LeiXSearch.pm82
2 files changed, 51 insertions, 33 deletions
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index acab3c2c..a7938e8b 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -26,7 +26,7 @@ sub lei_q {
                 my $cb = $lxs->can('prepare_external');
                 $self->_externals_each($cb, $lxs);
         }
-        my $xj = $opt->{thread} ? $lxs->locals : ($lxs->remotes + 1);
+        my $xj = $lxs->concurrency($opt);
         my $ovv = PublicInbox::LeiOverview->new($self) or return;
         $self->atfork_prepare_wq($lxs);
         $lxs->wq_workers_start('lei_xsearch', $xj, $self->oldset);
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index defe5e67..1c093a94 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -182,25 +182,16 @@ sub each_eml { # callback for MboxReader->mboxrd
 }
 
 sub query_remote_mboxrd {
-        my ($self, $lei, $uri) = @_;
+        my ($self, $lei, $uris) = @_;
         local $0 = "$0 query_remote_mboxrd";
         my %sig = $lei->atfork_child_wq($self); # keep $self->{5} startq
         local @SIG{keys %sig} = values %sig;
-        my $opt = $lei->{opt};
-        $uri->query_form(q => $lei->{mset_opt}->{qstr}, x => 'm',
-                        $opt->{thread} ? (t => 1) : ());
-        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $uri);
+        my ($opt, $env) = @$lei{qw(opt env)};
+        my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
+        push(@qform, t => 1) if $opt->{thread};
         my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
         $dedupe->prepare_dedupe;
         my @cmd = qw(curl -XPOST -sSf);
-        $opt->{torsocks} = 'false' if $opt->{'no-torsocks'};
-        my $tor = $opt->{torsocks} //= 'auto';
-        if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' &&
-                        (($lei->{env}->{LD_PRELOAD}//'') !~ /torsocks/)) {
-                unshift @cmd, 'torsocks';
-        } elsif (PublicInbox::Config::git_bool($tor)) {
-                unshift @cmd, 'torsocks';
-        }
         my $verbose = $opt->{verbose};
         push @cmd, '-v' if $verbose;
         for my $o ($lei->curl_opt) {
@@ -215,25 +206,36 @@ sub query_remote_mboxrd {
                         push @cmd, "--$o";
                 }
         }
-        push @cmd, $uri->as_string;
-        $lei->err("# @cmd") if $verbose;
-        $? = 0;
-        my $fh = popen_rd(\@cmd, $lei->{env}, { 2 => $lei->{2} });
-        $fh = IO::Uncompress::Gunzip->new($fh);
-        eval {
-                PublicInbox::MboxReader->mboxrd($fh, \&each_eml,
-                                                $self, $lei, $each_smsg);
-        };
-        return $lei->fail("E: @cmd: $@") if $@;
-        if (($? >> 8) == 22) { # HTTP 404 from curl(1)
-                $uri->query_form(q => $lei->{mset_opt}->{qstr});
-                $lei->err('# no results from '.$uri->as_string);
-        } elsif ($?) {
-                $uri->query_form(q => $lei->{mset_opt}->{qstr});
-                $lei->err('E: '.$uri->as_string);
-                $lei->child_error($?);
+        $opt->{torsocks} = 'false' if $opt->{'no-torsocks'};
+        my $tor = $opt->{torsocks} //= 'auto';
+        for my $uri (@$uris) {
+                $uri->query_form(@qform);
+                my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $uri);
+                my $cmd = [ @cmd, $uri->as_string ];
+                if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' &&
+                                (($env->{LD_PRELOAD}//'') !~ /torsocks/)) {
+                        unshift @$cmd, 'torsocks';
+                } elsif (PublicInbox::Config::git_bool($tor)) {
+                        unshift @$cmd, 'torsocks';
+                }
+                $lei->err("# @$cmd") if $verbose;
+                $? = 0;
+                my $fh = popen_rd($cmd, $env, { 2 => $lei->{2} });
+                $fh = IO::Uncompress::Gunzip->new($fh);
+                eval {
+                        PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
+                                                        $lei, $each_smsg);
+                };
+                return $lei->fail("E: @$cmd: $@") if $@;
+                if (($? >> 8) == 22) { # HTTP 404 from curl(1)
+                        $uri->query_form(q => $lei->{mset_opt}->{qstr});
+                        $lei->err('# no results from '.$uri->as_string);
+                } elsif ($?) {
+                        $uri->query_form(q => $lei->{mset_opt}->{qstr});
+                        $lei->err('E: '.$uri->as_string);
+                        $lei->child_error($?);
+                }
         }
-        undef $each_smsg;
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
@@ -292,6 +294,17 @@ sub do_post_augment {
         close $au_done; # triggers wait_startq
 }
 
+my $MAX_PER_HOST = 4;
+sub MAX_PER_HOST { $MAX_PER_HOST }
+
+sub concurrency {
+        my ($self, $opt) = @_;
+        my $nl = $opt->{thread} ? locals($self) : 1;
+        my $nr = remotes($self);
+        $nr = $MAX_PER_HOST if $nr > $MAX_PER_HOST;
+        $nl + $nr;
+}
+
 sub start_query { # always runs in main (lei-daemon) process
         my ($self, $io, $lei) = @_;
         if ($lei->{opt}->{thread}) {
@@ -301,8 +314,13 @@ sub start_query { # always runs in main (lei-daemon) process
         } else {
                 $self->wq_do('query_mset', $io, $lei);
         }
+        my $i = 0;
+        my $q = [];
         for my $uri (remotes($self)) {
-                $self->wq_do('query_remote_mboxrd', $io, $lei, $uri);
+                push @{$q->[$i++ % $MAX_PER_HOST]}, $uri;
+        }
+        for my $uris (@$q) {
+                $self->wq_do('query_remote_mboxrd', $io, $lei, $uris);
         }
         @$io = ();
 }