user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 5/7] ipc: support setting a locked number of WQ workers
  2021-02-21  7:41  6% [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
@ 2021-02-21  7:41  7% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2021-02-21  7:41 UTC (permalink / raw)
  To: meta

We can use this to ensure sharded work doesn't do unexpected
things if workers are added/removed.  We currently don't
increase/decrease workers once a workqueue is started, but
non-lei code (-httpd/imapd) may start doing so.

This also fixes a bug where lei2mail workers could not
be adjusted via --jobs on the command-line.
---
 lib/PublicInbox/IPC.pm        | 5 ++++-
 lib/PublicInbox/LeiQuery.pm   | 6 +++---
 lib/PublicInbox/LeiXSearch.pm | 4 ++--
 3 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 2aeb6462..1fa67d00 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -341,7 +341,7 @@ sub wq_workers_start {
 	socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or
 		die "socketpair: $!";
 	$self->ipc_atfork_prepare;
-	$nr_workers //= 4;
+	$nr_workers //= $self->{-wq_nr_workers};
 	$nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
 	$self->{-wq_workers} = {};
@@ -354,6 +354,7 @@ sub wq_workers_start {
 sub wq_worker_incr { # SIGTTIN handler
 	my ($self, $oldset, $fields) = @_;
 	$self->{-wq_s2} or return;
+	die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
 	return if wq_workers($self) >= $WQ_MAX_WORKERS;
 	$self->ipc_atfork_prepare;
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
@@ -369,6 +370,7 @@ sub wq_exit { # wakes up wq_worker_decr_wait
 sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
 	my ($self) = @_;
 	return unless wq_workers($self);
+	die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
 	$self->wq_io_do('wq_exit');
 	# caller must call wq_worker_decr_wait in main loop
 }
@@ -376,6 +378,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
 sub wq_worker_decr_wait {
 	my ($self, $timeout, $cb, @args) = @_;
 	return if $self->{-wq_ppid} != $$; # can't reap siblings or parents
+	die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
 	my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1';
 	vec(my $rin = '', fileno($s1), 1) = 1;
 	select(my $rout = $rin, undef, undef, $timeout) or
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index eaf91f2e..398f834f 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -90,14 +90,14 @@ sub lei_q {
 		return $self->fail("`$xj' search jobs must be >= 1");
 	}
 	$xj ||= $lxs->concurrency($opt); # allow: "--jobs ,$WRITER_ONLY"
-	my $nproc = $lxs->detect_nproc; # don't memoize, schedtool(1) exists
+	my $nproc = $lxs->detect_nproc // 1; # don't memoize, schedtool(1) exists
 	$xj = $nproc if $xj > $nproc;
-	$lxs->{jobs} = $xj;
+	$lxs->{-wq_nr_workers} = $xj;
 	if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) {
 		return $self->fail("`$mj' writer jobs must be >= 1");
 	}
-	$self->{l2m}->{jobs} = ($mj // $nproc) if $self->{l2m};
 	PublicInbox::LeiOverview->new($self) or return;
+	$self->{l2m}->{-wq_nr_workers} = ($mj // $nproc) if $self->{l2m};
 
 	my %mset_opt = map { $_ => $opt->{$_} } qw(threads limit offset);
 	$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index a319b75f..524f4d1c 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -407,7 +407,7 @@ sub do_query {
 		if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
 			$lei->start_mua;
 		}
-		$l2m->wq_workers_start('lei2mail', $l2m->{jobs},
+		$l2m->wq_workers_start('lei2mail', undef,
 					$lei->oldset, { lei => $lei });
 		pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
 		# 1031: F_SETPIPE_SZ
@@ -418,7 +418,7 @@ sub do_query {
 		# delete until all lei2mail + lei_xsearch workers are reaped
 		$lei->{git_tmp} = $self->{git_tmp} = git_tmp($self);
 	}
-	$self->wq_workers_start('lei_xsearch', $self->{jobs},
+	$self->wq_workers_start('lei_xsearch', undef,
 				$lei->oldset, { lei => $lei });
 	my $op = delete $lei->{pkt_op_c};
 	delete $lei->{pkt_op_p};

^ permalink raw reply related	[relevance 7%]

* [PATCH 0/7] "lei q -o imaps://..." support
@ 2021-02-21  7:41  6% Eric Wong
  2021-02-21  7:41  7% ` [PATCH 5/7] ipc: support setting a locked number of WQ workers Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2021-02-21  7:41 UTC (permalink / raw)
  To: meta

-a/--augment dedupe is now parallel for both Maildirs and IMAP
stores (probably not worth the serialization cost for mbox*).

LeiAuth remains inefficient, unfortunately; but wq_broadcast
has been added to address it in the future.

The parallelization work for IMAP for "lei q" can also be done
for "lei convert" and "lei import", but it'll probably be opt-in
in case people care about preserving UID order.

Eric Wong (7):
  inbox_writable: require PublicInbox::MdirReader
  lei q: support IMAP/IMAPS --output destinations
  ipc: add wq_broadcast
  lei q: move augment into lei2mail workers
  ipc: support setting a locked number of WQ workers
  net_reader: use and accept URIimap objects in more places
  lei2mail: parallel augment for lock-free stores

 lib/PublicInbox/IPC.pm           |  35 +++++++--
 lib/PublicInbox/InboxWritable.pm |   1 +
 lib/PublicInbox/LeiAuth.pm       |   2 +-
 lib/PublicInbox/LeiOverview.pm   |   7 +-
 lib/PublicInbox/LeiQuery.pm      |  24 +++++--
 lib/PublicInbox/LeiToMail.pm     |  93 ++++++++++++++++++++++--
 lib/PublicInbox/LeiXSearch.pm    |  48 ++++++-------
 lib/PublicInbox/NetReader.pm     |  75 +++++++++++---------
 lib/PublicInbox/NetWriter.pm     |  12 ++++
 lib/PublicInbox/WQWorker.pm      |   8 +--
 lib/PublicInbox/Watch.pm         |  11 +--
 t/ipc.t                          |  39 +++++-----
 t/lei-externals.t                |   3 +-
 xt/net_writer-imap.t             | 118 ++++++++++++++++++++++++++++---
 14 files changed, 362 insertions(+), 114 deletions(-)


^ permalink raw reply	[relevance 6%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-02-21  7:41  6% [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
2021-02-21  7:41  7% ` [PATCH 5/7] ipc: support setting a locked number of WQ workers Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).