From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id BC5BE1F934 for ; Sat, 2 Jan 2021 09:13:44 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 1/6] processpipe: allow synchronous close to set $? Date: Fri, 1 Jan 2021 19:13:39 -1400 Message-Id: <20210102091344.13477-2-e@80x24.org> In-Reply-To: <20210102091344.13477-1-e@80x24.org> References: <20210102091344.13477-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: To get rid of the ugly $PublicInbox::DS::in_loop localization in MboxReader, we'll distinguish between ->CLOSE and ->DESTROY with ProcessPipe. If we end up closing via ->DESTROY, we'll assume the caller will want to deal with $? asynchronously via the event loop (or not even care about $?). If we hit ->CLOSE directly, we'll assume the caller called close() and wants to check $? synchronously. Note: wantarray doesn't seem to propagate into tied methods, otherwise I'd be relying on that. --- lib/PublicInbox/MboxReader.pm | 3 --- lib/PublicInbox/ProcessPipe.pm | 45 +++++++++++++++++++++------------- lib/PublicInbox/Spawn.pm | 10 ++++---- t/mbox_reader.t | 17 +++++++++++++ t/spawn.t | 38 ++++++++++++++++++++++++++++ 5 files changed, 88 insertions(+), 25 deletions(-) diff --git a/lib/PublicInbox/MboxReader.pm b/lib/PublicInbox/MboxReader.pm index 1894756d..59ce4fb6 100644 --- a/lib/PublicInbox/MboxReader.pm +++ b/lib/PublicInbox/MboxReader.pm @@ -5,7 +5,6 @@ package PublicInbox::MboxReader; use strict; use v5.10.1; -use PublicInbox::DS (); # localize $in_loop for error detection :< use Data::Dumper; $Data::Dumper::Useqq = 1; # should've been the default, for bad data @@ -14,7 +13,6 @@ my $from_strict = sub _mbox_from { my ($mbfh, $from_re, $eml_cb, @arg) = @_; - local $PublicInbox::DS::in_loop; # disable dwaitpid my $buf = ''; my @raw; while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) { @@ -75,7 +73,6 @@ sub _extract_hdr { sub _mbox_cl ($$$;@) { my ($mbfh, $uxs_from, $eml_cb, @arg) = @_; - local $PublicInbox::DS::in_loop; # disable dwaitpid my $buf = ''; while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) { if ($r == 0) { # detect "curl --fail" diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm index 336d5ac4..400a22f3 100644 --- a/lib/PublicInbox/ProcessPipe.pm +++ b/lib/PublicInbox/ProcessPipe.pm @@ -6,10 +6,12 @@ package PublicInbox::ProcessPipe; use strict; use v5.10.1; use PublicInbox::DS qw(dwaitpid); +use Carp qw(carp); sub TIEHANDLE { my ($class, $pid, $fh, $cb, $arg) = @_; - bless { pid => $pid, fh => $fh, cb => $cb, arg => $arg }, $class; + bless { pid => $pid, fh => $fh, ppid => $$, cb => $cb, arg => $arg }, + $class; } sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) } @@ -26,32 +28,41 @@ sub PRINT { print { $self->{fh} } @_; } -sub adjust_ret { # dwaitpid callback - my ($retref, $pid) = @_; - $$retref = '' if $? -} +sub FILENO { fileno($_[0]->{fh}) } -sub CLOSE { - my $fh = delete($_[0]->{fh}); - my $ret = defined $fh ? close($fh) : ''; - my ($pid, $cb, $arg) = delete @{$_[0]}{qw(pid cb arg)}; - if (defined $pid) { - unless ($cb) { - $cb = \&adjust_ret; - $arg = \$ret; +sub _close ($;$) { + my ($self, $wait) = @_; + my $fh = delete $self->{fh}; + my $ret = defined($fh) ? close($fh) : ''; + my ($pid, $cb, $arg) = delete @$self{qw(pid cb arg)}; + return $ret unless defined($pid) && $self->{ppid} == $$; + if ($wait) { # caller cares about the exit status: + my $wp = waitpid($pid, 0); + if ($wp == $pid) { + $ret = '' if $?; + if ($cb) { + eval { $cb->($arg, $pid) }; + carp "E: cb(arg, $pid): $@" if $@; + } + } else { + carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?"; } + } else { # caller just undef-ed it, let event loop deal with it dwaitpid $pid, $cb, $arg; } $ret; } -sub FILENO { fileno($_[0]->{fh}) } +# if caller uses close(), assume they want to check $? immediately so +# we'll waitpid() synchronously. n.b. wantarray doesn't seem to +# propagate `undef' down to tied methods, otherwise I'd rely on that. +sub CLOSE { _close($_[0], 1) } +# if relying on DESTROY, assume the caller doesn't care about $? and +# we can let the event loop call waitpid() whenever it gets SIGCHLD sub DESTROY { - CLOSE(@_); + _close($_[0]); undef; } -sub pid { $_[0]->{pid} } - 1; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 1ee40503..762a0549 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -295,14 +295,14 @@ sub spawn ($;$$) { } sub popen_rd { - my ($cmd, $env, $opts) = @_; + my ($cmd, $env, $opt) = @_; pipe(my ($r, $w)) or die "pipe: $!\n"; - $opts ||= {}; - $opts->{1} = fileno($w); - my $pid = spawn($cmd, $env, $opts); + $opt ||= {}; + $opt->{1} = fileno($w); + my $pid = spawn($cmd, $env, $opt); return ($r, $pid) if wantarray; my $ret = gensym; - tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r; + tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)}; $ret; } diff --git a/t/mbox_reader.t b/t/mbox_reader.t index 53458ec2..4ea2ae29 100644 --- a/t/mbox_reader.t +++ b/t/mbox_reader.t @@ -72,4 +72,21 @@ for my $fmt (@mbox) { $check_fmt->($fmt) } s/\n/\r\n/sg for (values %raw); for my $fmt (@mbox) { $check_fmt->($fmt) } +SKIP: { + use PublicInbox::Spawn qw(popen_rd); + use Time::HiRes qw(alarm); + my $fh = popen_rd([ $^X, '-E', <<'' ]); +say "From x@y Fri Oct 2 00:00:00 1993"; +print "a: b\n\n", "x" x 70000, "\n\n"; +say "From x@y Fri Oct 2 00:00:00 2010"; +print "Final: bit\n\n", "Incomplete\n\n"; +exit 1 + + my @x; + eval { $reader->mboxrd($fh, sub { push @x, shift->as_string }) }; + like($@, qr/error closing mbox/, 'detects error reading from pipe'); + is(scalar(@x), 1, 'only saw one message'); + is(scalar(grep(/Final/, @x)), 0, 'no incomplete bit'); +} + done_testing; diff --git a/t/spawn.t b/t/spawn.t index 552bba33..d97e13a6 100644 --- a/t/spawn.t +++ b/t/spawn.t @@ -98,6 +98,44 @@ EOF isnt($?, 0, '$? set properly: '.$?); } +{ # ->CLOSE vs ->DESTROY waitpid caller distinction + my @c; + my $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } }); + ok(close($fh), '->CLOSE fired and successful'); + ok(scalar(@c), 'callback fired by ->CLOSE'); + ok(grep(!m[/PublicInbox/DS\.pm\z], @c), 'callback not invoked by DS'); + + @c = (); + $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } }); + undef $fh; # ->DESTROY + ok(scalar(@c), 'callback fired by ->DESTROY'); + ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c), + 'callback not invoked by ProcessPipe'); +} + +{ # children don't wait on siblings + use POSIX qw(_exit); + pipe(my ($r, $w)) or BAIL_OUT $!; + my $cb = sub { warn "x=$$\n" }; + my $fh = popen_rd(['cat'], undef, { 0 => $r, cb => $cb }); + my $pp = tied *$fh; + my $pid = fork // BAIL_OUT $!; + local $SIG{__WARN__} = sub { _exit(1) }; + if ($pid == 0) { + local $SIG{__DIE__} = sub { _exit(2) }; + undef $fh; + _exit(0); + } + waitpid($pid, 0); + is($?, 0, 'forked process exited'); + my @w; + local $SIG{__WARN__} = sub { push @w, @_ }; + close $w; + close $fh; + is($?, 0, 'cat exited'); + is_deeply(\@w, [ "x=$$\n" ], 'callback fired from owner'); +} + SKIP: { eval { require BSD::Resource;