From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.9 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 991791FC0D for ; Sun, 7 Feb 2021 08:52:02 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 11/19] ipc: wq_do => wq_io_do Date: Sun, 7 Feb 2021 08:51:53 +0000 Message-Id: <20210207085201.13871-12-e@80x24.org> In-Reply-To: <20210207085201.13871-1-e@80x24.org> References: <20210207085201.13871-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We will have a ->wq_do that doesn't pass FDs for I/O. --- lib/PublicInbox/IPC.pm | 12 ++++++------ lib/PublicInbox/LeiImport.pm | 4 ++-- lib/PublicInbox/LeiMirror.pm | 4 ++-- lib/PublicInbox/LeiOverview.pm | 4 ++-- lib/PublicInbox/LeiToMail.pm | 2 +- lib/PublicInbox/LeiXSearch.pm | 10 +++++----- t/ipc.t | 14 +++++++------- xt/stress-sharedkv.t | 6 +++--- 8 files changed, 28 insertions(+), 28 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 7e5a0b16..728f726c 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -3,10 +3,10 @@ # base class for remote IPC calls and workqueues, requires Storable or Sereal # - ipc_do and ipc_worker_* is for a single worker/producer and uses pipes -# - wq_do and wq_worker* is for a single producer and multiple workers, +# - wq_io_do and wq_worker* is for a single producer and multiple workers, # using SOCK_SEQPACKET for work distribution # use ipc_do when you need work done on a certain process -# use wq_do when your work can be done on any idle worker +# use wq_io_do when your work can be done on any idle worker package PublicInbox::IPC; use strict; use v5.10.1; @@ -248,12 +248,12 @@ sub wq_worker_loop ($) { PublicInbox::DS->Reset; } -sub do_sock_stream { # via wq_do, for big requests +sub do_sock_stream { # via wq_io_do, for big requests my ($self, $len) = @_; recv_and_run($self, delete $self->{0}, $len, 1); } -sub wq_do { # always async +sub wq_io_do { # always async my ($self, $sub, $ios, @args) = @_; if (my $s1 = $self->{-wq_s1}) { # run in worker my $fds = [ map { fileno($_) } @$ios ]; @@ -278,7 +278,7 @@ sub wq_do { # always async } else { @$self{0..$#$ios} = @$ios; eval { $self->$sub(@args) }; - warn "wq_do: $@" if $@; + warn "wq_io_do: $@" if $@; delete @$self{0..$#$ios}; # don't close } } @@ -349,7 +349,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker my ($self) = @_; return unless wq_workers($self); my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2'; - $self->wq_do('wq_exit', [ $s2, $s2, $s2 ]); + $self->wq_io_do('wq_exit', [ $s2, $s2, $s2 ]); # caller must call wq_worker_decr_wait in main loop } diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 2c7cbf2b..3a99570e 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -44,9 +44,9 @@ sub call { # the main "lei import" method $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei}); my $op = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; - $self->wq_do('import_stdin', []) if $self->{0}; + $self->wq_io_do('import_stdin', []) if $self->{0}; for my $x (@argv) { - $self->wq_do('import_path_url', [], $x); + $self->wq_io_do('import_path_url', [], $x); } $self->wq_close(1); $lei->event_step_init; # wait for shutdowns diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index 13795a58..5ba69287 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -251,7 +251,7 @@ sub start_clone_url { die "TODO: non-HTTP/HTTPS clone of $self->{src} not supported, yet"; } -sub do_mirror { # via wq_do +sub do_mirror { # via wq_io_do my ($self) = @_; my $lei = $self->{lei}; eval { @@ -290,7 +290,7 @@ sub start { $self->wq_workers_start('lei_mirror', 1, $lei->oldset, {lei => $lei}); my $op = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; - $self->wq_do('do_mirror', []); + $self->wq_io_do('do_mirror', []); $self->wq_close(1); $lei->event_step_init; # wait for shutdowns if ($lei->{oneshot}) { diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index 24e4c190..dcfb9cc7 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -23,7 +23,7 @@ my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) } -# we open this in the parent process before ->wq_do handoff +# we open this in the parent process before ->wq_io_do handoff sub ovv_out_lk_init ($) { my ($self) = @_; my $tmp = File::Temp->new("lei-ovv.dst.$$.lock-XXXXXX", @@ -205,7 +205,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually sub { my ($smsg, $mitem) = @_; $smsg->{pct} = get_pct($mitem) if $mitem; - $l2m->wq_do('write_mail', [], $git_dir, $smsg); + $l2m->wq_io_do('write_mail', [], $git_dir, $smsg); } } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},"; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 4f847221..3f65e9e9 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -488,7 +488,7 @@ sub poke_dst { } } -sub write_mail { # via ->wq_do +sub write_mail { # via ->wq_io_do my ($self, $git_dir, $smsg) = @_; my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir); git_async_cat($git, $smsg->{blob}, \&git_to_mail, diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 6a1b107b..1ba767c1 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -333,10 +333,10 @@ sub start_query { # always runs in main (lei-daemon) process } if ($lei->{opt}->{thread}) { for my $ibxish (locals($self)) { - $self->wq_do('query_thread_mset', [], $ibxish); + $self->wq_io_do('query_thread_mset', [], $ibxish); } } elsif (locals($self)) { - $self->wq_do('query_mset', []); + $self->wq_io_do('query_mset', []); } my $i = 0; my $q = []; @@ -344,7 +344,7 @@ sub start_query { # always runs in main (lei-daemon) process push @{$q->[$i++ % $MAX_PER_HOST]}, $uri; } for my $uris (@$q) { - $self->wq_do('query_remote_mboxrd', [], $uris); + $self->wq_io_do('query_remote_mboxrd', [], $uris); } } @@ -354,7 +354,7 @@ sub ipc_atfork_child { $self->SUPER::ipc_atfork_child; } -sub query_prepare { # called by wq_do +sub query_prepare { # called by wq_io_do my ($self) = @_; local $0 = "$0 query_prepare"; my $lei = $self->{lei}; @@ -398,7 +398,7 @@ sub do_query { delete $lei->{pkt_op_p}; $l2m->wq_close(1) if $l2m; $lei->event_step_init; # wait for shutdowns - $self->wq_do('query_prepare', []) if $l2m; + $self->wq_io_do('query_prepare', []) if $l2m; start_query($self, $lei); $self->wq_close(1); # lei_xsearch workers stop when done if ($lei->{oneshot}) { diff --git a/t/ipc.t b/t/ipc.t index face5726..345024bd 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -106,7 +106,7 @@ my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!"; close $agpl or BAIL_OUT "close: $!"; for my $t ('local', 'worker', 'worker again') { - $ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world'); + $ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world'); my $i = 0; for my $fh ($ra, $rb, $rc) { my $buf = readline($fh); @@ -114,12 +114,12 @@ for my $t ('local', 'worker', 'worker again') { like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)"); $i++; } - $ipc->wq_do('test_die', [ $wa, $wb, $wc ]); - $ipc->wq_do('test_sha', [ $wa, $wb ], 'hello world'); + $ipc->wq_io_do('test_die', [ $wa, $wb, $wc ]); + $ipc->wq_io_do('test_sha', [ $wa, $wb ], 'hello world'); is(readline($rb), sha1_hex('hello world')."\n", "SHA small ($t)"); { my $bigger = $big x 10; - $ipc->wq_do('test_sha', [ $wa, $wb ], $bigger); + $ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger); my $exp = sha1_hex($bigger)."\n"; undef $bigger; is(readline($rb), $exp, "SHA big ($t)"); @@ -128,7 +128,7 @@ for my $t ('local', 'worker', 'worker again') { push(@ppids, $ppid); } -# wq_do works across fork (siblings can feed) +# wq_io_do works across fork (siblings can feed) SKIP: { skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0]; is_deeply(\@ppids, [$$, undef, undef], @@ -136,7 +136,7 @@ SKIP: { my $pid = fork // BAIL_OUT $!; if ($pid == 0) { use POSIX qw(_exit); - $ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], $$); + $ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], $$); _exit(0); } else { my $i = 0; @@ -160,7 +160,7 @@ SKIP: { seek($warn, 0, SEEK_SET) or BAIL_OUT; my @warn = <$warn>; is(scalar(@warn), 3, 'warned 3 times'); - like($warn[0], qr/ wq_do: /, '1st warned from wq_do'); + like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do'); like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker'); is($warn[2], $warn[1], 'worker did not die'); diff --git a/xt/stress-sharedkv.t b/xt/stress-sharedkv.t index 70de9ffc..1773d4bc 100644 --- a/xt/stress-sharedkv.t +++ b/xt/stress-sharedkv.t @@ -15,14 +15,14 @@ my $nr = $ENV{TEST_STRESS_NR} // 100_000; my $ios = []; my $t = timeit(1, sub { for my $i (1..$nr) { - $ipc->wq_do('test_set_maybe', $ios, $skv, $i); - $ipc->wq_do('test_set_maybe', $ios, $skv, $i); + $ipc->wq_io_do('test_set_maybe', $ios, $skv, $i); + $ipc->wq_io_do('test_set_maybe', $ios, $skv, $i); } }); diag "$nr sets done ".timestr($t); for my $w ($ipc->wq_workers) { - $ipc->wq_do('test_skv_done', $ios); + $ipc->wq_io_do('test_skv_done', $ios); } diag "done requested";