user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 5/7] ipc: support setting a locked number of WQ workers
Date: Sun, 21 Feb 2021 07:41:32 +0000	[thread overview]
Message-ID: <20210221074134.15084-6-e@80x24.org> (raw)
In-Reply-To: <20210221074134.15084-1-e@80x24.org>

We can use this to ensure sharded work doesn't do unexpected
things if workers are added/removed.  We currently don't
increase/decrease workers once a workqueue is started, but
non-lei code (-httpd/imapd) may start doing so.

This also fixes a bug where lei2mail workers could not
be adjusted via --jobs on the command-line.
---
 lib/PublicInbox/IPC.pm        | 5 ++++-
 lib/PublicInbox/LeiQuery.pm   | 6 +++---
 lib/PublicInbox/LeiXSearch.pm | 4 ++--
 3 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 2aeb6462..1fa67d00 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -341,7 +341,7 @@ sub wq_workers_start {
 	socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or
 		die "socketpair: $!";
 	$self->ipc_atfork_prepare;
-	$nr_workers //= 4;
+	$nr_workers //= $self->{-wq_nr_workers};
 	$nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
 	$self->{-wq_workers} = {};
@@ -354,6 +354,7 @@ sub wq_workers_start {
 sub wq_worker_incr { # SIGTTIN handler
 	my ($self, $oldset, $fields) = @_;
 	$self->{-wq_s2} or return;
+	die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
 	return if wq_workers($self) >= $WQ_MAX_WORKERS;
 	$self->ipc_atfork_prepare;
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
@@ -369,6 +370,7 @@ sub wq_exit { # wakes up wq_worker_decr_wait
 sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
 	my ($self) = @_;
 	return unless wq_workers($self);
+	die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
 	$self->wq_io_do('wq_exit');
 	# caller must call wq_worker_decr_wait in main loop
 }
@@ -376,6 +378,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
 sub wq_worker_decr_wait {
 	my ($self, $timeout, $cb, @args) = @_;
 	return if $self->{-wq_ppid} != $$; # can't reap siblings or parents
+	die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
 	my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1';
 	vec(my $rin = '', fileno($s1), 1) = 1;
 	select(my $rout = $rin, undef, undef, $timeout) or
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index eaf91f2e..398f834f 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -90,14 +90,14 @@ sub lei_q {
 		return $self->fail("`$xj' search jobs must be >= 1");
 	}
 	$xj ||= $lxs->concurrency($opt); # allow: "--jobs ,$WRITER_ONLY"
-	my $nproc = $lxs->detect_nproc; # don't memoize, schedtool(1) exists
+	my $nproc = $lxs->detect_nproc // 1; # don't memoize, schedtool(1) exists
 	$xj = $nproc if $xj > $nproc;
-	$lxs->{jobs} = $xj;
+	$lxs->{-wq_nr_workers} = $xj;
 	if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) {
 		return $self->fail("`$mj' writer jobs must be >= 1");
 	}
-	$self->{l2m}->{jobs} = ($mj // $nproc) if $self->{l2m};
 	PublicInbox::LeiOverview->new($self) or return;
+	$self->{l2m}->{-wq_nr_workers} = ($mj // $nproc) if $self->{l2m};
 
 	my %mset_opt = map { $_ => $opt->{$_} } qw(threads limit offset);
 	$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index a319b75f..524f4d1c 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -407,7 +407,7 @@ sub do_query {
 		if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
 			$lei->start_mua;
 		}
-		$l2m->wq_workers_start('lei2mail', $l2m->{jobs},
+		$l2m->wq_workers_start('lei2mail', undef,
 					$lei->oldset, { lei => $lei });
 		pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
 		# 1031: F_SETPIPE_SZ
@@ -418,7 +418,7 @@ sub do_query {
 		# delete until all lei2mail + lei_xsearch workers are reaped
 		$lei->{git_tmp} = $self->{git_tmp} = git_tmp($self);
 	}
-	$self->wq_workers_start('lei_xsearch', $self->{jobs},
+	$self->wq_workers_start('lei_xsearch', undef,
 				$lei->oldset, { lei => $lei });
 	my $op = delete $lei->{pkt_op_c};
 	delete $lei->{pkt_op_p};

  parent reply	other threads:[~2021-02-21  7:41 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-21  7:41 [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
2021-02-21  7:41 ` [PATCH 1/7] inbox_writable: require PublicInbox::MdirReader Eric Wong
2021-02-21  7:41 ` [PATCH 2/7] lei q: support IMAP/IMAPS --output destinations Eric Wong
2021-02-21  7:41 ` [PATCH 3/7] ipc: add wq_broadcast Eric Wong
2021-02-21  7:41 ` [PATCH 4/7] lei q: move augment into lei2mail workers Eric Wong
2021-02-21  7:41 ` Eric Wong [this message]
2021-02-21  7:41 ` [PATCH 6/7] net_reader: use and accept URIimap objects in more places Eric Wong
2021-02-21  7:41 ` [PATCH 7/7] lei2mail: parallel augment for lock-free stores Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210221074134.15084-6-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).