From 1661ff8e21f3cb1df1a3fc00d917f404f4eae734 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 7 Oct 2023 21:24:08 +0000 Subject: rename ProcessPipe to ProcessIO Since we deal with pipes (of either direction) and bidirectional stream sockets for this class, it's better to remove the `Pipe' from the name and replace it with `IO' to communicate that it works for any form of IO::Handle-like object tied to a process. --- MANIFEST | 2 +- lib/PublicInbox/Gcf2Client.pm | 4 +- lib/PublicInbox/Git.pm | 4 +- lib/PublicInbox/HTTPD/Async.pm | 2 +- lib/PublicInbox/LeiRediff.pm | 2 +- lib/PublicInbox/LeiToMail.pm | 4 +- lib/PublicInbox/ProcessIO.pm | 83 +++++++++++++++++++++++++++++++++++++ lib/PublicInbox/ProcessPipe.pm | 85 -------------------------------------- lib/PublicInbox/Qspawn.pm | 4 +- lib/PublicInbox/Spamcheck/Spamc.pm | 2 +- lib/PublicInbox/Spawn.pm | 6 +-- t/spawn.t | 4 +- 12 files changed, 100 insertions(+), 102 deletions(-) create mode 100644 lib/PublicInbox/ProcessIO.pm delete mode 100644 lib/PublicInbox/ProcessPipe.pm diff --git a/MANIFEST b/MANIFEST index 689c6bf6..c972818f 100644 --- a/MANIFEST +++ b/MANIFEST @@ -318,7 +318,7 @@ lib/PublicInbox/OverIdx.pm lib/PublicInbox/POP3.pm lib/PublicInbox/POP3D.pm lib/PublicInbox/PktOp.pm -lib/PublicInbox/ProcessPipe.pm +lib/PublicInbox/ProcessIO.pm lib/PublicInbox/Qspawn.pm lib/PublicInbox/Reply.pm lib/PublicInbox/RepoAtom.pm diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm index 4a0348b4..f63a0335 100644 --- a/lib/PublicInbox/Gcf2Client.pm +++ b/lib/PublicInbox/Gcf2Client.pm @@ -10,7 +10,7 @@ use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available use PublicInbox::Spawn qw(spawn); use Socket qw(AF_UNIX SOCK_STREAM); use PublicInbox::Syscall qw(EPOLLIN); -use PublicInbox::ProcessPipe; +use PublicInbox::ProcessIO; use autodie qw(socketpair); # fields: @@ -33,7 +33,7 @@ sub new { my $cmd = [$^X, $^W ? ('-w') : (), qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]]; my $pid = spawn($cmd, $env, $opt); - my $sock = PublicInbox::ProcessPipe->maybe_new($pid, $s1); + my $sock = PublicInbox::ProcessIO->maybe_new($pid, $s1); $self->{inflight} = []; $self->{epwatch} = \undef; # for Git->cleanup $self->SUPER::new($sock, EPOLLIN); diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 0fd621e1..94d5dcee 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -165,7 +165,7 @@ sub _sock_cmd { $self->fail("tmpfile($id): $!"); } my $pid = spawn(\@cmd, undef, $opt); - $self->{sock} = PublicInbox::ProcessPipe->maybe_new($pid, $s1); + $self->{sock} = PublicInbox::ProcessIO->maybe_new($pid, $s1); } sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) } @@ -626,7 +626,7 @@ sub cleanup_if_unlinked { my $ret = 0; for my $obj ($self, ($self->{ck} // ())) { my $sock = $obj->{sock} // next; - my PublicInbox::ProcessPipe $pp = tied *$sock; # ProcessPipe + my PublicInbox::ProcessIO $pp = tied *$sock; # ProcessIO my $pid = $pp->{pid} // next; open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1); while (<$fh>) { diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 7bbab1e1..b9d2159c 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -37,7 +37,7 @@ sub new { arg => $arg, # arg for $cb end_obj => $end_obj, # like END{}, can ->event_step }, $class; - my $pp = tied *$io; # ProcessPipe + my $pp = tied *$io; # ProcessIO $pp->{fh}->blocking(0) // die "$io->blocking(0): $!"; $self->SUPER::new($io, EPOLLIN); } diff --git a/lib/PublicInbox/LeiRediff.pm b/lib/PublicInbox/LeiRediff.pm index a886931c..b894342b 100644 --- a/lib/PublicInbox/LeiRediff.pm +++ b/lib/PublicInbox/LeiRediff.pm @@ -152,7 +152,7 @@ sub requote ($$) { # $^X (perl) is overkill, but maybe there's a weird system w/o sed my ($w, $pid) = popen_wr([$^X, '-pe', "s/^/$pfx/"], $lei->{env}, $opt); $w->autoflush(1); - binmode $w, ':utf8'; # incompatible with ProcessPipe due to syswrite + binmode $w, ':utf8'; # incompatible with ProcessIO due to syswrite $lei->{1} = $w; PublicInbox::OnDestroy->new(\&wait_requote, $lei, $pid, $old_1); } diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index f239da82..f56ad330 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -7,7 +7,7 @@ use strict; use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; -use PublicInbox::ProcessPipe; +use PublicInbox::ProcessIO; use PublicInbox::Spawn qw(spawn); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); @@ -162,7 +162,7 @@ sub _post_augment_mbox { # open a compressor process from top-level lei-daemon my ($r, $w) = @{delete $lei->{zpipe}}; my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 }; my $pid = spawn($cmd, undef, $rdr); - $lei->{1} = PublicInbox::ProcessPipe->maybe_new($pid, $w, { + $lei->{1} = PublicInbox::ProcessIO->maybe_new($pid, $w, { cb_arg => [\&reap_compress, $lei, $cmd, $lei->{1} ] }); } diff --git a/lib/PublicInbox/ProcessIO.pm b/lib/PublicInbox/ProcessIO.pm new file mode 100644 index 00000000..eeb66139 --- /dev/null +++ b/lib/PublicInbox/ProcessIO.pm @@ -0,0 +1,83 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ + +# a tied handle for auto reaping of children tied to a pipe or socket, +# see perltie(1) for details. +package PublicInbox::ProcessIO; +use v5.12; +use PublicInbox::DS qw(awaitpid); +use Symbol qw(gensym); + +sub maybe_new { + my ($cls, $pid, $fh, $opt) = @_; + return ($fh, $pid) if wantarray; + my $s = gensym; + tie *$s, $cls, $pid, $fh, @{$opt->{cb_arg} // []}; + $s; +} + +sub waitcb { # awaitpid callback + my ($pid, $err_ref, $cb, @args) = @_; + $$err_ref = $?; # sets >{pp_chld_err} for _close + $cb->($pid, @args) if $cb; +} + +sub TIEHANDLE { + my ($cls, $pid, $fh, @cb_arg) = @_; + my $self = bless { pid => $pid, fh => $fh, ppid => $$ }, $cls; + # we share $err (and not $self) with awaitpid to avoid a ref cycle + $self->{pp_chld_err} = \(my $err); + awaitpid($pid, \&waitcb, \$err, @cb_arg); + $self; +} + +# for IO::Uncompress::Gunzip +sub BINMODE { + my $self = shift; + binmode($self->{fh}, @_); +} + +sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) } + +sub READLINE { readline($_[0]->{fh}) } + +sub WRITE { + use bytes qw(length); + syswrite($_[0]->{fh}, $_[1], $_[2] // length($_[1]), $_[3] // 0); +} + +sub PRINT { + my $self = shift; + print { $self->{fh} } @_; +} + +sub FILENO { fileno($_[0]->{fh}) } + +sub _close ($;$) { + my ($self, $wait) = @_; + my ($fh, $pid) = delete(@$self{qw(fh pid)}); + my $ret = (defined($fh) && $wait) ? close($fh) : ($fh = ''); + return $ret unless defined($pid) && $self->{ppid} == $$; + if ($wait) { # caller cares about the exit status: + # synchronous wait via defined(wantarray) on awaitpid: + defined(${$self->{pp_chld_err}}) or $wait = awaitpid($pid); + ($? = ${$self->{pp_chld_err}}) and $ret = ''; + } else { + awaitpid($pid); # depends on $in_loop or not + } + $ret; +} + +# if caller uses close(), assume they want to check $? immediately so +# we'll waitpid() synchronously. n.b. wantarray doesn't seem to +# propagate `undef' down to tied methods, otherwise I'd rely on that. +sub CLOSE { _close($_[0], 1) } + +# if relying on DESTROY, assume the caller doesn't care about $? and +# we can let the event loop call waitpid() whenever it gets SIGCHLD +sub DESTROY { + _close($_[0]); + undef; +} + +1; diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm deleted file mode 100644 index ba2c1ecb..00000000 --- a/lib/PublicInbox/ProcessPipe.pm +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright (C) all contributors -# License: AGPL-3.0+ - -# a tied handle for auto reaping of children tied to a read-only pipe, see perltie(1) -# DO NOT use this as-is for bidirectional pipes/sockets (e.g. in PublicInbox::Git), -# both ends of the pipe must be at the same level of the Perl object hierarchy -# to ensure orderly destruction. -package PublicInbox::ProcessPipe; -use v5.12; -use PublicInbox::DS qw(awaitpid); -use Symbol qw(gensym); - -sub maybe_new { - my ($cls, $pid, $fh, $opt) = @_; - return ($fh, $pid) if wantarray; - my $s = gensym; - tie *$s, $cls, $pid, $fh, @{$opt->{cb_arg} // []}; - $s; -} - -sub waitcb { # awaitpid callback - my ($pid, $err_ref, $cb, @args) = @_; - $$err_ref = $?; # sets >{pp_chld_err} for _close - $cb->($pid, @args) if $cb; -} - -sub TIEHANDLE { - my ($cls, $pid, $fh, @cb_arg) = @_; - my $self = bless { pid => $pid, fh => $fh, ppid => $$ }, $cls; - # we share $err (and not $self) with awaitpid to avoid a ref cycle - $self->{pp_chld_err} = \(my $err); - awaitpid($pid, \&waitcb, \$err, @cb_arg); - $self; -} - -# for IO::Uncompress::Gunzip -sub BINMODE { - my $self = shift; - binmode($self->{fh}, @_); -} - -sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) } - -sub READLINE { readline($_[0]->{fh}) } - -sub WRITE { - use bytes qw(length); - syswrite($_[0]->{fh}, $_[1], $_[2] // length($_[1]), $_[3] // 0); -} - -sub PRINT { - my $self = shift; - print { $self->{fh} } @_; -} - -sub FILENO { fileno($_[0]->{fh}) } - -sub _close ($;$) { - my ($self, $wait) = @_; - my ($fh, $pid) = delete(@$self{qw(fh pid)}); - my $ret = (defined($fh) && $wait) ? close($fh) : ($fh = ''); - return $ret unless defined($pid) && $self->{ppid} == $$; - if ($wait) { # caller cares about the exit status: - # synchronous wait via defined(wantarray) on awaitpid: - defined(${$self->{pp_chld_err}}) or $wait = awaitpid($pid); - ($? = ${$self->{pp_chld_err}}) and $ret = ''; - } else { - awaitpid($pid); # depends on $in_loop or not - } - $ret; -} - -# if caller uses close(), assume they want to check $? immediately so -# we'll waitpid() synchronously. n.b. wantarray doesn't seem to -# propagate `undef' down to tied methods, otherwise I'd rely on that. -sub CLOSE { _close($_[0], 1) } - -# if relying on DESTROY, assume the caller doesn't care about $? and -# we can let the event loop call waitpid() whenever it gets SIGCHLD -sub DESTROY { - _close($_[0]); - undef; -} - -1; diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 5e4fd5cb..ea7ae647 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -124,7 +124,7 @@ sub finish ($;$) { # we can safely finalize if pipe was closed before, or if # {_err} is defined by waitpid_err. Deleting {rpipe} will - # trigger PublicInbox::ProcessPipe::DESTROY -> waitpid_err, + # trigger PublicInbox::ProcessIO::DESTROY -> waitpid_err, # but it may not fire right away if inside the event loop. my $closed_before = !delete($self->{rpipe}); finalize($self) if $closed_before || defined($self->{_err}); @@ -251,7 +251,7 @@ sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb} if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error if ($async) { # calls rpipe->close && ->event_step $async->close; # PublicInbox::HTTPD::Async::close - } else { # generic PSGI, use PublicInbox::ProcessPipe::CLOSE + } else { # generic PSGI, use PublicInbox::ProcessIO::CLOSE delete($self->{rpipe})->close; event_step($self); } diff --git a/lib/PublicInbox/Spamcheck/Spamc.pm b/lib/PublicInbox/Spamcheck/Spamc.pm index 726866c8..cba33a66 100644 --- a/lib/PublicInbox/Spamcheck/Spamc.pm +++ b/lib/PublicInbox/Spamcheck/Spamc.pm @@ -27,7 +27,7 @@ sub spamcheck { $out = \$buf; } $$out = do { local $/; <$fh> }; - close $fh; # PublicInbox::ProcessPipe::CLOSE + close $fh; # PublicInbox::ProcessIO::CLOSE ($? || $$out eq '') ? 0 : 1; } diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 4c7e0f80..cb8b21c6 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -21,7 +21,7 @@ use PublicInbox::Lock; use Fcntl qw(SEEK_SET); use IO::Handle (); use Carp qw(croak); -use PublicInbox::ProcessPipe; +use PublicInbox::ProcessIO; our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait); our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA); @@ -368,13 +368,13 @@ sub spawn ($;$$) { sub popen_rd { my ($cmd, $env, $opt) = @_; pipe(my $r, local $opt->{1}) or die "pipe: $!\n"; - PublicInbox::ProcessPipe->maybe_new(spawn($cmd, $env, $opt), $r, $opt) + PublicInbox::ProcessIO->maybe_new(spawn($cmd, $env, $opt), $r, $opt) } sub popen_wr { my ($cmd, $env, $opt) = @_; pipe(local $opt->{0}, my $w) or die "pipe: $!\n"; - PublicInbox::ProcessPipe->maybe_new(spawn($cmd, $env, $opt), $w, $opt) + PublicInbox::ProcessIO->maybe_new(spawn($cmd, $env, $opt), $w, $opt) } sub run_wait ($;$$) { diff --git a/t/spawn.t b/t/spawn.t index 04589437..be5aaf9f 100644 --- a/t/spawn.t +++ b/t/spawn.t @@ -149,8 +149,8 @@ EOF $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] }); undef $fh; # ->DESTROY ok(scalar(@c), 'callback fired by ->DESTROY'); - ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c), - 'callback not invoked by ProcessPipe'); + ok(grep(!m[/PublicInbox/ProcessIO\.pm\z], @c), + 'callback not invoked by ProcessIO'); } { # children don't wait on siblings -- cgit v1.2.3-24-ge0c7