From cff308df82cda8370e98c0c9c6a3704209362a60 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 2 Feb 2021 11:47:02 +0000 Subject: lei q: support --jobs [SEARCHERS],[WRITERS] This comma-delimited parameter allows controlling the number or lei_xsearch and lei2mail worker processes. With the change to make IPC wq_* work use the event loop, it's now safe to run fewer worker processes for searching with no risk of deadlocks. MAX_PER_HOST isn't configurable yet for remote hosts, and maybe it shouldn't be due to potential for abuse. --- lib/PublicInbox/IPC.pm | 19 +++++++++++++++++++ lib/PublicInbox/LEI.pm | 5 ++++- lib/PublicInbox/LeiQuery.pm | 14 ++++++++++++-- lib/PublicInbox/LeiXSearch.pm | 1 - lib/PublicInbox/V2Writable.pm | 22 ++-------------------- 5 files changed, 37 insertions(+), 24 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 50de1bed..3873649b 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -466,4 +466,23 @@ sub DESTROY { # Sereal doesn't have dclone sub deep_clone { ipc_thaw(ipc_freeze($_[-1])) } +sub detect_nproc () { + # _SC_NPROCESSORS_ONLN = 84 on both Linux glibc and musl + return POSIX::sysconf(84) if $^O eq 'linux'; + return POSIX::sysconf(58) if $^O eq 'freebsd'; + # TODO: more OSes + + # getconf(1) is POSIX, but *NPROCESSORS* vars are not + for (qw(_NPROCESSORS_ONLN NPROCESSORS_ONLN)) { + `getconf $_ 2>/dev/null` =~ /^(\d+)$/ and return $1; + } + for my $nproc (qw(nproc gnproc)) { # GNU coreutils nproc + `$nproc 2>/dev/null` =~ /^(\d+)$/ and return $1; + } + + # should we bother with `sysctl hw.ncpu`? Those only give + # us total processor count, not online processor count. + undef +} + 1; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 2c512c5e..9afc90cf 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -104,7 +104,7 @@ our %CMD = ( # sorted in order of importance/use: 'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw( save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a sort|s=s reverse|r offset=i remote! local! external! pretty - include|I=s@ exclude=s@ only=s@ + include|I=s@ exclude=s@ only=s@ jobs|j=s mua-cmd|mua=s no-torsocks torsocks=s verbose|v quiet|q received-after=s received-before=s sent-after=s sent-since=s), PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ], @@ -236,6 +236,9 @@ my %OPTDESC = ( 'q only=s@' => [ 'URL_OR_PATHNAME', 'only use specified external(s) for search' ], +'q jobs=s' => [ '[SEARCH_JOBS][,WRITER_JOBS]', + 'control number of search and writer jobs' ], + 'ls-query format|f=s' => $ls_format, 'ls-external format|f=s' => $ls_format, diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index ca214ca1..72a67c24 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -17,6 +17,7 @@ sub lei_q { my ($self, @argv) = @_; require PublicInbox::LeiXSearch; require PublicInbox::LeiOverview; + require PublicInbox::V2Writable; PublicInbox::Config->json; # preload before forking my $opt = $self->{opt}; # prepare any number of LeiXSearch || LeiSearch || Inbox || URL @@ -53,13 +54,22 @@ sub lei_q { unless ($lxs->locals || $lxs->remotes) { return $self->fail('no local or remote inboxes to search'); } - my $xj = $lxs->concurrency($opt); + my ($xj, $mj) = split(/,/, $opt->{jobs} // ''); + if (defined($xj) && $xj ne '' && $xj !~ /\A[1-9][0-9]*\z/) { + return $self->fail("`$xj' search jobs must be >= 1"); + } + $xj ||= $lxs->concurrency($opt); # allow: "--jobs ,$WRITER_ONLY" + my $nproc = $lxs->detect_nproc; # don't memoize, schedtool(1) exists + $xj = $nproc if $xj > $nproc; PublicInbox::LeiOverview->new($self) or return; $self->atfork_prepare_wq($lxs); $lxs->wq_workers_start('lei_xsearch', $xj, $self->oldset); delete $lxs->{-ipc_atfork_child_close}; if (my $l2m = $self->{l2m}) { - my $mj = 4; # TODO: configurable + if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) { + return $self->fail("`$mj' writer jobs must be >= 1"); + } + $mj //= $nproc; $self->atfork_prepare_wq($l2m); $l2m->wq_workers_start('lei2mail', $mj, $self->oldset); delete $l2m->{-ipc_atfork_child_close}; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 57a18075..37bd233e 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -350,7 +350,6 @@ sub do_post_augment { } my $MAX_PER_HOST = 4; -sub MAX_PER_HOST { $MAX_PER_HOST } sub concurrency { my ($self, $opt) = @_; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 35b7fe30..cbd4f003 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -8,6 +8,7 @@ use strict; use v5.10.1; use parent qw(PublicInbox::Lock); use PublicInbox::SearchIdxShard; +use PublicInbox::IPC; use PublicInbox::Eml; use PublicInbox::Git; use PublicInbox::Import; @@ -35,32 +36,13 @@ our $PACKING_FACTOR = 0.4; # to increase Xapian shards our $NPROC_MAX_DEFAULT = 4; -sub detect_nproc () { - # _SC_NPROCESSORS_ONLN = 84 on both Linux glibc and musl - return POSIX::sysconf(84) if $^O eq 'linux'; - return POSIX::sysconf(58) if $^O eq 'freebsd'; - # TODO: more OSes - - # getconf(1) is POSIX, but *NPROCESSORS* vars are not - for (qw(_NPROCESSORS_ONLN NPROCESSORS_ONLN)) { - `getconf $_ 2>/dev/null` =~ /^(\d+)$/ and return $1; - } - for my $nproc (qw(nproc gnproc)) { # GNU coreutils nproc - `$nproc 2>/dev/null` =~ /^(\d+)$/ and return $1; - } - - # should we bother with `sysctl hw.ncpu`? Those only give - # us total processor count, not online processor count. - undef -} - sub nproc_shards ($) { my ($creat_opt) = @_; my $n = $creat_opt->{nproc} if ref($creat_opt) eq 'HASH'; $n //= $ENV{NPROC}; if (!$n) { # assume 2 cores if not detectable or zero - state $NPROC_DETECTED = detect_nproc() || 2; + state $NPROC_DETECTED = PublicInbox::IPC::detect_nproc() || 2; $n = $NPROC_DETECTED; $n = $NPROC_MAX_DEFAULT if $n > $NPROC_MAX_DEFAULT; } -- cgit v1.2.3-24-ge0c7