about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-02-21 07:41:32 +0000
committerEric Wong <e@80x24.org>2021-02-21 08:59:32 +0000
commit707d4aca2256f284718c247ef00db64cd976d244 (patch)
treedc3b7100b6a50e0df55528fd7875af28d4f66b5a
parent07bb4d74f25b0c2c14a8762905087be5a0f7e934 (diff)
downloadpublic-inbox-707d4aca2256f284718c247ef00db64cd976d244.tar.gz
We can use this to ensure sharded work doesn't do unexpected
things if workers are added/removed.  We currently don't
increase/decrease workers once a workqueue is started, but
non-lei code (-httpd/imapd) may start doing so.

This also fixes a bug where lei2mail workers could not
be adjusted via --jobs on the command-line.
-rw-r--r--lib/PublicInbox/IPC.pm5
-rw-r--r--lib/PublicInbox/LeiQuery.pm6
-rw-r--r--lib/PublicInbox/LeiXSearch.pm4
3 files changed, 9 insertions, 6 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 2aeb6462..1fa67d00 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -341,7 +341,7 @@ sub wq_workers_start {
         socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or
                 die "socketpair: $!";
         $self->ipc_atfork_prepare;
-        $nr_workers //= 4;
+        $nr_workers //= $self->{-wq_nr_workers};
         $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
         my $sigset = $oldset // PublicInbox::DS::block_signals();
         $self->{-wq_workers} = {};
@@ -354,6 +354,7 @@ sub wq_workers_start {
 sub wq_worker_incr { # SIGTTIN handler
         my ($self, $oldset, $fields) = @_;
         $self->{-wq_s2} or return;
+        die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
         return if wq_workers($self) >= $WQ_MAX_WORKERS;
         $self->ipc_atfork_prepare;
         my $sigset = $oldset // PublicInbox::DS::block_signals();
@@ -369,6 +370,7 @@ sub wq_exit { # wakes up wq_worker_decr_wait
 sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
         my ($self) = @_;
         return unless wq_workers($self);
+        die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
         $self->wq_io_do('wq_exit');
         # caller must call wq_worker_decr_wait in main loop
 }
@@ -376,6 +378,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
 sub wq_worker_decr_wait {
         my ($self, $timeout, $cb, @args) = @_;
         return if $self->{-wq_ppid} != $$; # can't reap siblings or parents
+        die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
         my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1';
         vec(my $rin = '', fileno($s1), 1) = 1;
         select(my $rout = $rin, undef, undef, $timeout) or
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index eaf91f2e..398f834f 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -90,14 +90,14 @@ sub lei_q {
                 return $self->fail("`$xj' search jobs must be >= 1");
         }
         $xj ||= $lxs->concurrency($opt); # allow: "--jobs ,$WRITER_ONLY"
-        my $nproc = $lxs->detect_nproc; # don't memoize, schedtool(1) exists
+        my $nproc = $lxs->detect_nproc // 1; # don't memoize, schedtool(1) exists
         $xj = $nproc if $xj > $nproc;
-        $lxs->{jobs} = $xj;
+        $lxs->{-wq_nr_workers} = $xj;
         if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) {
                 return $self->fail("`$mj' writer jobs must be >= 1");
         }
-        $self->{l2m}->{jobs} = ($mj // $nproc) if $self->{l2m};
         PublicInbox::LeiOverview->new($self) or return;
+        $self->{l2m}->{-wq_nr_workers} = ($mj // $nproc) if $self->{l2m};
 
         my %mset_opt = map { $_ => $opt->{$_} } qw(threads limit offset);
         $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index a319b75f..524f4d1c 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -407,7 +407,7 @@ sub do_query {
                 if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
                         $lei->start_mua;
                 }
-                $l2m->wq_workers_start('lei2mail', $l2m->{jobs},
+                $l2m->wq_workers_start('lei2mail', undef,
                                         $lei->oldset, { lei => $lei });
                 pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
                 # 1031: F_SETPIPE_SZ
@@ -418,7 +418,7 @@ sub do_query {
                 # delete until all lei2mail + lei_xsearch workers are reaped
                 $lei->{git_tmp} = $self->{git_tmp} = git_tmp($self);
         }
-        $self->wq_workers_start('lei_xsearch', $self->{jobs},
+        $self->wq_workers_start('lei_xsearch', undef,
                                 $lei->oldset, { lei => $lei });
         my $op = delete $lei->{pkt_op_c};
         delete $lei->{pkt_op_p};