From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 9B7901F934 for ; Sun, 24 Jan 2021 11:46:55 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 1/9] lei q: limit concurrency to 4 remote connections Date: Sun, 24 Jan 2021 04:46:47 -0700 Message-Id: <20210124114655.12815-2-e@80x24.org> In-Reply-To: <20210124114655.12815-1-e@80x24.org> References: <20210124114655.12815-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Unfortunately, this isn't a per-host limit, yet; but nevertheless reduces load on existing PublicInbox::WWW instances, since requesting a mboxrd is one of the more expensive operations. --- lib/PublicInbox/LeiQuery.pm | 2 +- lib/PublicInbox/LeiXSearch.pm | 82 +++++++++++++++++++++-------------- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index acab3c2c..a7938e8b 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -26,7 +26,7 @@ sub lei_q { my $cb = $lxs->can('prepare_external'); $self->_externals_each($cb, $lxs); } - my $xj = $opt->{thread} ? $lxs->locals : ($lxs->remotes + 1); + my $xj = $lxs->concurrency($opt); my $ovv = PublicInbox::LeiOverview->new($self) or return; $self->atfork_prepare_wq($lxs); $lxs->wq_workers_start('lei_xsearch', $xj, $self->oldset); diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index defe5e67..1c093a94 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -182,25 +182,16 @@ sub each_eml { # callback for MboxReader->mboxrd } sub query_remote_mboxrd { - my ($self, $lei, $uri) = @_; + my ($self, $lei, $uris) = @_; local $0 = "$0 query_remote_mboxrd"; my %sig = $lei->atfork_child_wq($self); # keep $self->{5} startq local @SIG{keys %sig} = values %sig; - my $opt = $lei->{opt}; - $uri->query_form(q => $lei->{mset_opt}->{qstr}, x => 'm', - $opt->{thread} ? (t => 1) : ()); - my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $uri); + my ($opt, $env) = @$lei{qw(opt env)}; + my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm'); + push(@qform, t => 1) if $opt->{thread}; my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing'; $dedupe->prepare_dedupe; my @cmd = qw(curl -XPOST -sSf); - $opt->{torsocks} = 'false' if $opt->{'no-torsocks'}; - my $tor = $opt->{torsocks} //= 'auto'; - if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' && - (($lei->{env}->{LD_PRELOAD}//'') !~ /torsocks/)) { - unshift @cmd, 'torsocks'; - } elsif (PublicInbox::Config::git_bool($tor)) { - unshift @cmd, 'torsocks'; - } my $verbose = $opt->{verbose}; push @cmd, '-v' if $verbose; for my $o ($lei->curl_opt) { @@ -215,25 +206,36 @@ sub query_remote_mboxrd { push @cmd, "--$o"; } } - push @cmd, $uri->as_string; - $lei->err("# @cmd") if $verbose; - $? = 0; - my $fh = popen_rd(\@cmd, $lei->{env}, { 2 => $lei->{2} }); - $fh = IO::Uncompress::Gunzip->new($fh); - eval { - PublicInbox::MboxReader->mboxrd($fh, \&each_eml, - $self, $lei, $each_smsg); - }; - return $lei->fail("E: @cmd: $@") if $@; - if (($? >> 8) == 22) { # HTTP 404 from curl(1) - $uri->query_form(q => $lei->{mset_opt}->{qstr}); - $lei->err('# no results from '.$uri->as_string); - } elsif ($?) { - $uri->query_form(q => $lei->{mset_opt}->{qstr}); - $lei->err('E: '.$uri->as_string); - $lei->child_error($?); + $opt->{torsocks} = 'false' if $opt->{'no-torsocks'}; + my $tor = $opt->{torsocks} //= 'auto'; + for my $uri (@$uris) { + $uri->query_form(@qform); + my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $uri); + my $cmd = [ @cmd, $uri->as_string ]; + if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' && + (($env->{LD_PRELOAD}//'') !~ /torsocks/)) { + unshift @$cmd, 'torsocks'; + } elsif (PublicInbox::Config::git_bool($tor)) { + unshift @$cmd, 'torsocks'; + } + $lei->err("# @$cmd") if $verbose; + $? = 0; + my $fh = popen_rd($cmd, $env, { 2 => $lei->{2} }); + $fh = IO::Uncompress::Gunzip->new($fh); + eval { + PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self, + $lei, $each_smsg); + }; + return $lei->fail("E: @$cmd: $@") if $@; + if (($? >> 8) == 22) { # HTTP 404 from curl(1) + $uri->query_form(q => $lei->{mset_opt}->{qstr}); + $lei->err('# no results from '.$uri->as_string); + } elsif ($?) { + $uri->query_form(q => $lei->{mset_opt}->{qstr}); + $lei->err('E: '.$uri->as_string); + $lei->child_error($?); + } } - undef $each_smsg; $lei->{ovv}->ovv_atexit_child($lei); } @@ -292,6 +294,17 @@ sub do_post_augment { close $au_done; # triggers wait_startq } +my $MAX_PER_HOST = 4; +sub MAX_PER_HOST { $MAX_PER_HOST } + +sub concurrency { + my ($self, $opt) = @_; + my $nl = $opt->{thread} ? locals($self) : 1; + my $nr = remotes($self); + $nr = $MAX_PER_HOST if $nr > $MAX_PER_HOST; + $nl + $nr; +} + sub start_query { # always runs in main (lei-daemon) process my ($self, $io, $lei) = @_; if ($lei->{opt}->{thread}) { @@ -301,8 +314,13 @@ sub start_query { # always runs in main (lei-daemon) process } else { $self->wq_do('query_mset', $io, $lei); } + my $i = 0; + my $q = []; for my $uri (remotes($self)) { - $self->wq_do('query_remote_mboxrd', $io, $lei, $uri); + push @{$q->[$i++ % $MAX_PER_HOST]}, $uri; + } + for my $uris (@$q) { + $self->wq_do('query_remote_mboxrd', $io, $lei, $uris); } @$io = (); }