user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 11/19] ipc: wq_do => wq_io_do
Date: Sun,  7 Feb 2021 08:51:53 +0000	[thread overview]
Message-ID: <20210207085201.13871-12-e@80x24.org> (raw)
In-Reply-To: <20210207085201.13871-1-e@80x24.org>

We will have a ->wq_do that doesn't pass FDs for I/O.
---
 lib/PublicInbox/IPC.pm         | 12 ++++++------
 lib/PublicInbox/LeiImport.pm   |  4 ++--
 lib/PublicInbox/LeiMirror.pm   |  4 ++--
 lib/PublicInbox/LeiOverview.pm |  4 ++--
 lib/PublicInbox/LeiToMail.pm   |  2 +-
 lib/PublicInbox/LeiXSearch.pm  | 10 +++++-----
 t/ipc.t                        | 14 +++++++-------
 xt/stress-sharedkv.t           |  6 +++---
 8 files changed, 28 insertions(+), 28 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 7e5a0b16..728f726c 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -3,10 +3,10 @@
 
 # base class for remote IPC calls and workqueues, requires Storable or Sereal
 # - ipc_do and ipc_worker_* is for a single worker/producer and uses pipes
-# - wq_do and wq_worker* is for a single producer and multiple workers,
+# - wq_io_do and wq_worker* is for a single producer and multiple workers,
 #   using SOCK_SEQPACKET for work distribution
 # use ipc_do when you need work done on a certain process
-# use wq_do when your work can be done on any idle worker
+# use wq_io_do when your work can be done on any idle worker
 package PublicInbox::IPC;
 use strict;
 use v5.10.1;
@@ -248,12 +248,12 @@ sub wq_worker_loop ($) {
 	PublicInbox::DS->Reset;
 }
 
-sub do_sock_stream { # via wq_do, for big requests
+sub do_sock_stream { # via wq_io_do, for big requests
 	my ($self, $len) = @_;
 	recv_and_run($self, delete $self->{0}, $len, 1);
 }
 
-sub wq_do { # always async
+sub wq_io_do { # always async
 	my ($self, $sub, $ios, @args) = @_;
 	if (my $s1 = $self->{-wq_s1}) { # run in worker
 		my $fds = [ map { fileno($_) } @$ios ];
@@ -278,7 +278,7 @@ sub wq_do { # always async
 	} else {
 		@$self{0..$#$ios} = @$ios;
 		eval { $self->$sub(@args) };
-		warn "wq_do: $@" if $@;
+		warn "wq_io_do: $@" if $@;
 		delete @$self{0..$#$ios}; # don't close
 	}
 }
@@ -349,7 +349,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
 	my ($self) = @_;
 	return unless wq_workers($self);
 	my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
-	$self->wq_do('wq_exit', [ $s2, $s2, $s2 ]);
+	$self->wq_io_do('wq_exit', [ $s2, $s2, $s2 ]);
 	# caller must call wq_worker_decr_wait in main loop
 }
 
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 2c7cbf2b..3a99570e 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -44,9 +44,9 @@ sub call { # the main "lei import" method
 	$self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
 	my $op = delete $lei->{pkt_op_c};
 	delete $lei->{pkt_op_p};
-	$self->wq_do('import_stdin', []) if $self->{0};
+	$self->wq_io_do('import_stdin', []) if $self->{0};
 	for my $x (@argv) {
-		$self->wq_do('import_path_url', [], $x);
+		$self->wq_io_do('import_path_url', [], $x);
 	}
 	$self->wq_close(1);
 	$lei->event_step_init; # wait for shutdowns
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index 13795a58..5ba69287 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -251,7 +251,7 @@ sub start_clone_url {
 	die "TODO: non-HTTP/HTTPS clone of $self->{src} not supported, yet";
 }
 
-sub do_mirror { # via wq_do
+sub do_mirror { # via wq_io_do
 	my ($self) = @_;
 	my $lei = $self->{lei};
 	eval {
@@ -290,7 +290,7 @@ sub start {
 	$self->wq_workers_start('lei_mirror', 1, $lei->oldset, {lei => $lei});
 	my $op = delete $lei->{pkt_op_c};
 	delete $lei->{pkt_op_p};
-	$self->wq_do('do_mirror', []);
+	$self->wq_io_do('do_mirror', []);
 	$self->wq_close(1);
 	$lei->event_step_init; # wait for shutdowns
 	if ($lei->{oneshot}) {
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 24e4c190..dcfb9cc7 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -23,7 +23,7 @@ my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
 
 sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
 
-# we open this in the parent process before ->wq_do handoff
+# we open this in the parent process before ->wq_io_do handoff
 sub ovv_out_lk_init ($) {
 	my ($self) = @_;
 	my $tmp = File::Temp->new("lei-ovv.dst.$$.lock-XXXXXX",
@@ -205,7 +205,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
 		sub {
 			my ($smsg, $mitem) = @_;
 			$smsg->{pct} = get_pct($mitem) if $mitem;
-			$l2m->wq_do('write_mail', [], $git_dir, $smsg);
+			$l2m->wq_io_do('write_mail', [], $git_dir, $smsg);
 		}
 	} elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
 		my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 4f847221..3f65e9e9 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -488,7 +488,7 @@ sub poke_dst {
 	}
 }
 
-sub write_mail { # via ->wq_do
+sub write_mail { # via ->wq_io_do
 	my ($self, $git_dir, $smsg) = @_;
 	my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
 	git_async_cat($git, $smsg->{blob}, \&git_to_mail,
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 6a1b107b..1ba767c1 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -333,10 +333,10 @@ sub start_query { # always runs in main (lei-daemon) process
 	}
 	if ($lei->{opt}->{thread}) {
 		for my $ibxish (locals($self)) {
-			$self->wq_do('query_thread_mset', [], $ibxish);
+			$self->wq_io_do('query_thread_mset', [], $ibxish);
 		}
 	} elsif (locals($self)) {
-		$self->wq_do('query_mset', []);
+		$self->wq_io_do('query_mset', []);
 	}
 	my $i = 0;
 	my $q = [];
@@ -344,7 +344,7 @@ sub start_query { # always runs in main (lei-daemon) process
 		push @{$q->[$i++ % $MAX_PER_HOST]}, $uri;
 	}
 	for my $uris (@$q) {
-		$self->wq_do('query_remote_mboxrd', [], $uris);
+		$self->wq_io_do('query_remote_mboxrd', [], $uris);
 	}
 }
 
@@ -354,7 +354,7 @@ sub ipc_atfork_child {
 	$self->SUPER::ipc_atfork_child;
 }
 
-sub query_prepare { # called by wq_do
+sub query_prepare { # called by wq_io_do
 	my ($self) = @_;
 	local $0 = "$0 query_prepare";
 	my $lei = $self->{lei};
@@ -398,7 +398,7 @@ sub do_query {
 	delete $lei->{pkt_op_p};
 	$l2m->wq_close(1) if $l2m;
 	$lei->event_step_init; # wait for shutdowns
-	$self->wq_do('query_prepare', []) if $l2m;
+	$self->wq_io_do('query_prepare', []) if $l2m;
 	start_query($self, $lei);
 	$self->wq_close(1); # lei_xsearch workers stop when done
 	if ($lei->{oneshot}) {
diff --git a/t/ipc.t b/t/ipc.t
index face5726..345024bd 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -106,7 +106,7 @@ my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!";
 close $agpl or BAIL_OUT "close: $!";
 
 for my $t ('local', 'worker', 'worker again') {
-	$ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
+	$ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
 	my $i = 0;
 	for my $fh ($ra, $rb, $rc) {
 		my $buf = readline($fh);
@@ -114,12 +114,12 @@ for my $t ('local', 'worker', 'worker again') {
 		like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)");
 		$i++;
 	}
-	$ipc->wq_do('test_die', [ $wa, $wb, $wc ]);
-	$ipc->wq_do('test_sha', [ $wa, $wb ], 'hello world');
+	$ipc->wq_io_do('test_die', [ $wa, $wb, $wc ]);
+	$ipc->wq_io_do('test_sha', [ $wa, $wb ], 'hello world');
 	is(readline($rb), sha1_hex('hello world')."\n", "SHA small ($t)");
 	{
 		my $bigger = $big x 10;
-		$ipc->wq_do('test_sha', [ $wa, $wb ], $bigger);
+		$ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger);
 		my $exp = sha1_hex($bigger)."\n";
 		undef $bigger;
 		is(readline($rb), $exp, "SHA big ($t)");
@@ -128,7 +128,7 @@ for my $t ('local', 'worker', 'worker again') {
 	push(@ppids, $ppid);
 }
 
-# wq_do works across fork (siblings can feed)
+# wq_io_do works across fork (siblings can feed)
 SKIP: {
 	skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0];
 	is_deeply(\@ppids, [$$, undef, undef],
@@ -136,7 +136,7 @@ SKIP: {
 	my $pid = fork // BAIL_OUT $!;
 	if ($pid == 0) {
 		use POSIX qw(_exit);
-		$ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], $$);
+		$ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], $$);
 		_exit(0);
 	} else {
 		my $i = 0;
@@ -160,7 +160,7 @@ SKIP: {
 	seek($warn, 0, SEEK_SET) or BAIL_OUT;
 	my @warn = <$warn>;
 	is(scalar(@warn), 3, 'warned 3 times');
-	like($warn[0], qr/ wq_do: /, '1st warned from wq_do');
+	like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do');
 	like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
 	is($warn[2], $warn[1], 'worker did not die');
 
diff --git a/xt/stress-sharedkv.t b/xt/stress-sharedkv.t
index 70de9ffc..1773d4bc 100644
--- a/xt/stress-sharedkv.t
+++ b/xt/stress-sharedkv.t
@@ -15,14 +15,14 @@ my $nr = $ENV{TEST_STRESS_NR} // 100_000;
 my $ios = [];
 my $t = timeit(1, sub {
 	for my $i (1..$nr) {
-		$ipc->wq_do('test_set_maybe', $ios, $skv, $i);
-		$ipc->wq_do('test_set_maybe', $ios, $skv, $i);
+		$ipc->wq_io_do('test_set_maybe', $ios, $skv, $i);
+		$ipc->wq_io_do('test_set_maybe', $ios, $skv, $i);
 	}
 });
 diag "$nr sets done ".timestr($t);
 
 for my $w ($ipc->wq_workers) {
-	$ipc->wq_do('test_skv_done', $ios);
+	$ipc->wq_io_do('test_skv_done', $ios);
 }
 diag "done requested";
 

  parent reply	other threads:[~2021-02-07  8:52 UTC|newest]

Thread overview: 23+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-07  8:51 [PATCH 00/19] lei import Maildir, remote mboxrd fixes Eric Wong
2021-02-07  8:51 ` [PATCH 01/19] spawn: pi_fork_exec: restore parent sigmask in child Eric Wong
2021-02-07  8:51 ` [PATCH 02/19] spawn: pi_fork_exec: support "pgid" Eric Wong
2021-02-07 23:10   ` dprintf(3) portability? [was [02/19] spawn: pi_fork_exec: support "pgid"] Eric Wong
2021-02-07  8:51 ` [PATCH 03/19] lei add-external: handle interrupts with --mirror Eric Wong
2021-02-07  8:51 ` [PATCH 04/19] spawn_pp: die more consistently in child Eric Wong
2021-02-07  8:51 ` [PATCH 05/19] ipc: do not die inside wq_worker child process Eric Wong
2021-02-07  8:51 ` [PATCH 06/19] ipc: trim down the Storable checks Eric Wong
2021-02-07  8:51 ` [PATCH 07/19] Makefile.PL: depend on IO::Uncompress::Gunzip Eric Wong
2021-02-07  8:51 ` [PATCH 08/19] xapcmd: avoid potential die surprise in children Eric Wong
2021-02-07  8:51 ` [PATCH 09/19] tests: guard setup_public_inboxes for SQLite and Xapian Eric Wong
2021-02-07  8:51 ` [PATCH 10/19] Revert "ipc: add support for asynchronous callbacks" Eric Wong
2021-02-07  8:51 ` Eric Wong [this message]
2021-02-07  8:51 ` [PATCH 12/19] lei: more consistent IPC exit and error handling Eric Wong
2021-02-07  8:51 ` [PATCH 13/19] lei: remove --mua-cmd alias for --mua Eric Wong
2021-02-07  8:51 ` [PATCH 14/19] lei: replace --thread with --threads Eric Wong
2021-02-07  8:51 ` [PATCH 15/19] lei q: improve remote mboxrd UX Eric Wong
2021-02-07  9:32   ` [PATCH 20/19] lei_xsearch: allow quieting regular mset progress, too Eric Wong
2021-02-07  8:51 ` [PATCH 16/19] lei q: SIGWINCH process group with the terminal Eric Wong
2021-02-07  8:51 ` [PATCH 17/19] lei import: support Maildirs Eric Wong
2021-02-07  8:52 ` [PATCH 18/19] imap: avoid unnecessary on-stack delete Eric Wong
2021-02-07  8:52 ` [PATCH 19/19] httpd/async: " Eric Wong
2021-02-07 10:40 ` [PATCH 21/19] lei q: fix arbitrary --mua command handling Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210207085201.13871-12-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    --subject='Re: [PATCH 11/19] ipc: wq_do => wq_io_do' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Code repositories for project(s) associated with this inbox:

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).