about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-02-02 11:47:02 +0000
committerEric Wong <e@80x24.org>2021-02-03 07:32:45 +0000
commitcff308df82cda8370e98c0c9c6a3704209362a60 (patch)
tree4632b2a438f2fca6369f079064d771f7497fa5f3 /lib
parent9ff9755999351a582c857c38e2996b3ab7eb7481 (diff)
downloadpublic-inbox-cff308df82cda8370e98c0c9c6a3704209362a60.tar.gz
This comma-delimited parameter allows controlling the number or
lei_xsearch and lei2mail worker processes.  With the change
to make IPC wq_* work use the event loop, it's now safe to
run fewer worker processes for searching with no risk of
deadlocks.

MAX_PER_HOST isn't configurable yet for remote hosts,
and maybe it shouldn't be due to potential for abuse.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/IPC.pm19
-rw-r--r--lib/PublicInbox/LEI.pm5
-rw-r--r--lib/PublicInbox/LeiQuery.pm14
-rw-r--r--lib/PublicInbox/LeiXSearch.pm1
-rw-r--r--lib/PublicInbox/V2Writable.pm22
5 files changed, 37 insertions, 24 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 50de1bed..3873649b 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -466,4 +466,23 @@ sub DESTROY {
 # Sereal doesn't have dclone
 sub deep_clone { ipc_thaw(ipc_freeze($_[-1])) }
 
+sub detect_nproc () {
+        # _SC_NPROCESSORS_ONLN = 84 on both Linux glibc and musl
+        return POSIX::sysconf(84) if $^O eq 'linux';
+        return POSIX::sysconf(58) if $^O eq 'freebsd';
+        # TODO: more OSes
+
+        # getconf(1) is POSIX, but *NPROCESSORS* vars are not
+        for (qw(_NPROCESSORS_ONLN NPROCESSORS_ONLN)) {
+                `getconf $_ 2>/dev/null` =~ /^(\d+)$/ and return $1;
+        }
+        for my $nproc (qw(nproc gnproc)) { # GNU coreutils nproc
+                `$nproc 2>/dev/null` =~ /^(\d+)$/ and return $1;
+        }
+
+        # should we bother with `sysctl hw.ncpu`?  Those only give
+        # us total processor count, not online processor count.
+        undef
+}
+
 1;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 2c512c5e..9afc90cf 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -104,7 +104,7 @@ our %CMD = ( # sorted in order of importance/use:
 'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw(
         save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a
         sort|s=s reverse|r offset=i remote! local! external! pretty
-        include|I=s@ exclude=s@ only=s@
+        include|I=s@ exclude=s@ only=s@ jobs|j=s
         mua-cmd|mua=s no-torsocks torsocks=s verbose|v quiet|q
         received-after=s received-before=s sent-after=s sent-since=s),
         PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ],
@@ -236,6 +236,9 @@ my %OPTDESC = (
 'q        only=s@' => [ 'URL_OR_PATHNAME',
                 'only use specified external(s) for search' ],
 
+'q        jobs=s'        => [ '[SEARCH_JOBS][,WRITER_JOBS]',
+                'control number of search and writer jobs' ],
+
 'ls-query        format|f=s' => $ls_format,
 'ls-external        format|f=s' => $ls_format,
 
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index ca214ca1..72a67c24 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -17,6 +17,7 @@ sub lei_q {
         my ($self, @argv) = @_;
         require PublicInbox::LeiXSearch;
         require PublicInbox::LeiOverview;
+        require PublicInbox::V2Writable;
         PublicInbox::Config->json; # preload before forking
         my $opt = $self->{opt};
         # prepare any number of LeiXSearch || LeiSearch || Inbox || URL
@@ -53,13 +54,22 @@ sub lei_q {
         unless ($lxs->locals || $lxs->remotes) {
                 return $self->fail('no local or remote inboxes to search');
         }
-        my $xj = $lxs->concurrency($opt);
+        my ($xj, $mj) = split(/,/, $opt->{jobs} // '');
+        if (defined($xj) && $xj ne '' && $xj !~ /\A[1-9][0-9]*\z/) {
+                return $self->fail("`$xj' search jobs must be >= 1");
+        }
+        $xj ||= $lxs->concurrency($opt); # allow: "--jobs ,$WRITER_ONLY"
+        my $nproc = $lxs->detect_nproc; # don't memoize, schedtool(1) exists
+        $xj = $nproc if $xj > $nproc;
         PublicInbox::LeiOverview->new($self) or return;
         $self->atfork_prepare_wq($lxs);
         $lxs->wq_workers_start('lei_xsearch', $xj, $self->oldset);
         delete $lxs->{-ipc_atfork_child_close};
         if (my $l2m = $self->{l2m}) {
-                my $mj = 4; # TODO: configurable
+                if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) {
+                        return $self->fail("`$mj' writer jobs must be >= 1");
+                }
+                $mj //= $nproc;
                 $self->atfork_prepare_wq($l2m);
                 $l2m->wq_workers_start('lei2mail', $mj, $self->oldset);
                 delete $l2m->{-ipc_atfork_child_close};
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 57a18075..37bd233e 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -350,7 +350,6 @@ sub do_post_augment {
 }
 
 my $MAX_PER_HOST = 4;
-sub MAX_PER_HOST { $MAX_PER_HOST }
 
 sub concurrency {
         my ($self, $opt) = @_;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 35b7fe30..cbd4f003 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -8,6 +8,7 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::Lock);
 use PublicInbox::SearchIdxShard;
+use PublicInbox::IPC;
 use PublicInbox::Eml;
 use PublicInbox::Git;
 use PublicInbox::Import;
@@ -35,32 +36,13 @@ our $PACKING_FACTOR = 0.4;
 # to increase Xapian shards
 our $NPROC_MAX_DEFAULT = 4;
 
-sub detect_nproc () {
-        # _SC_NPROCESSORS_ONLN = 84 on both Linux glibc and musl
-        return POSIX::sysconf(84) if $^O eq 'linux';
-        return POSIX::sysconf(58) if $^O eq 'freebsd';
-        # TODO: more OSes
-
-        # getconf(1) is POSIX, but *NPROCESSORS* vars are not
-        for (qw(_NPROCESSORS_ONLN NPROCESSORS_ONLN)) {
-                `getconf $_ 2>/dev/null` =~ /^(\d+)$/ and return $1;
-        }
-        for my $nproc (qw(nproc gnproc)) { # GNU coreutils nproc
-                `$nproc 2>/dev/null` =~ /^(\d+)$/ and return $1;
-        }
-
-        # should we bother with `sysctl hw.ncpu`?  Those only give
-        # us total processor count, not online processor count.
-        undef
-}
-
 sub nproc_shards ($) {
         my ($creat_opt) = @_;
         my $n = $creat_opt->{nproc} if ref($creat_opt) eq 'HASH';
         $n //= $ENV{NPROC};
         if (!$n) {
                 # assume 2 cores if not detectable or zero
-                state $NPROC_DETECTED = detect_nproc() || 2;
+                state $NPROC_DETECTED = PublicInbox::IPC::detect_nproc() || 2;
                 $n = $NPROC_DETECTED;
                 $n = $NPROC_MAX_DEFAULT if $n > $NPROC_MAX_DEFAULT;
         }