about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-31 22:28:16 -1000
committerEric Wong <e@80x24.org>2021-02-01 11:38:10 +0000
commit21ce1a28915374297829bd05feda0cea52adb324 (patch)
treef564e868b9e3396ca60e9c3897fbaa467cb8f3b2
parentdbf555ee5906e5c778941ee850a88640a026b0ea (diff)
downloadpublic-inbox-21ce1a28915374297829bd05feda0cea52adb324.tar.gz
It doesn't save us any code, and the action-at-a-distance
element was making it confusing to track down actual problems.
Another potential problem was keeping references alive too long.

So do like we would a C100K server and check every write
while still ensuring lei(1) exit with a proper SIGPIPE
iff needed.
-rw-r--r--lib/PublicInbox/IPC.pm10
-rw-r--r--lib/PublicInbox/LEI.pm56
-rw-r--r--lib/PublicInbox/LeiExternal.pm3
-rw-r--r--lib/PublicInbox/LeiOverview.pm33
-rw-r--r--lib/PublicInbox/LeiToMail.pm45
-rw-r--r--lib/PublicInbox/LeiXSearch.pm17
-rw-r--r--t/lei_to_mail.t31
7 files changed, 96 insertions, 99 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 479c4377..172552b9 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -139,8 +139,10 @@ sub ipc_worker_spawn {
 
 sub ipc_worker_reap { # dwaitpid callback
         my ($self, $pid) = @_;
-        # SIGTERM (15) is our default exit signal
-        warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15;
+        return if !$?;
+        # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
+        my $s = $? & 127;
+        warn "PID:$pid died with \$?=$?\n" if $s != 15 && $s != 13;
 }
 
 sub wq_wait_old {
@@ -278,7 +280,7 @@ sub recv_and_run {
         undef $buf;
         my $sub = shift @$args;
         eval { $self->$sub(@$args) };
-        warn "$$ wq_worker: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+        warn "$$ wq_worker: $@" if $@;
         delete @$self{0..($nfd-1)};
         $n;
 }
@@ -320,7 +322,7 @@ sub wq_do { # always async
         } else {
                 @$self{0..$#$ios} = @$ios;
                 eval { $self->$sub(@args) };
-                warn "wq_do: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+                warn "wq_do: $@" if $@;
                 delete @$self{0..$#$ios}; # don't close
         }
 }
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index ceba16e4..b915bb0c 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -12,7 +12,7 @@ use parent qw(PublicInbox::DS PublicInbox::LeiExternal
         PublicInbox::LeiQuery);
 use Getopt::Long ();
 use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
-use Errno qw(EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
+use Errno qw(EPIPE EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
 use Cwd qw(getcwd);
 use POSIX ();
 use IO::Handle ();
@@ -277,7 +277,11 @@ sub x_it ($$) {
         dump_and_clear_log();
         if (my $sock = $self->{sock}) {
                 send($sock, "x_it $code", MSG_EOR);
-        } elsif (!($code & 127)) { # oneshot, ignore signals
+        } elsif (my $signum = ($code & 127)) { # oneshot, usually SIGPIPE (13)
+                $SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work
+                kill $signum, $$;
+                sleep; # wait for signal
+        } else { # oneshot
                 # don't want to end up using $? from child processes
                 for my $f (qw(lxs l2m)) {
                         my $wq = delete $self->{$f} or next;
@@ -287,14 +291,15 @@ sub x_it ($$) {
         }
 }
 
-sub puts ($;@) { print { shift->{1} } map { "$_\n" } @_ }
-
-sub out ($;@) { print { shift->{1} } @_ }
-
 sub err ($;@) {
         my $self = shift;
-        my $err = $self->{2} // ($self->{pgr} // [])->[2] // *STDERR{IO};
-        print $err @_, (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
+        my $err = $self->{2} // ($self->{pgr} // [])->[2] // *STDERR{GLOB};
+        my $eor = (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
+        print $err @_, $eor and return;
+        my $old_err = delete $self->{2};
+        close($old_err) if $! == EPIPE && $old_err;;
+        $err = $self->{2} = ($self->{pgr} // [])->[2] // *STDERR{GLOB};
+        print $err @_, $eor or print STDERR @_, $eor;
 }
 
 sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
@@ -306,6 +311,17 @@ sub fail ($$;$) {
         undef;
 }
 
+sub out ($;@) {
+        my $self = shift;
+        return if print { $self->{1} // return } @_; # likely
+        return note_sigpipe($self, 1) if $! == EPIPE;
+        my $err = "error writing to stdout: $!";
+        delete $self->{1};
+        fail($self, $err);
+}
+
+sub puts ($;@) { out(shift, map { "$_\n" } @_) }
+
 sub child_error { # passes non-fatal curl exit codes to user
         my ($self, $child_error) = @_; # child_error is $?
         if (my $sock = $self->{sock}) { # send to lei(1) client
@@ -350,27 +366,23 @@ sub io_restore ($$) {
         }
 }
 
-# usage: my %sig = $lei->atfork_child_wq($wq);
-#         local @SIG{keys %sig} = values %sig;
+# triggers sigpipe_handler
+sub note_sigpipe {
+        my ($self, $fd) = @_;
+        close(delete($self->{$fd})); # explicit close silences Perl warning
+        syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
+        x_it($self, 13);
+}
+
 sub atfork_child_wq {
         my ($self, $wq) = @_;
         io_restore($self, $wq);
+        -p $self->{op_pipe} or die 'BUG: {op_pipe} expected';
         io_restore($self->{l2m}, $wq);
         %PATH2CFG = ();
         undef $errors_log;
         $quit = \&CORE::exit;
-        (PIPE => sub {
-                $self->x_it(13); # SIGPIPE = 13
-                # we need to close explicitly to avoid Perl warning on SIGPIPE
-                for my $i (1, 2) {
-                        next unless $self->{$i} && (-p $self->{$i} || -S _);
-                        close(delete $self->{$i});
-                }
-                # trigger the LeiXSearch $done OpPipe:
-                syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
-                $SIG{PIPE} = 'DEFAULT';
-                die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
-        });
+        $current_lei = $self; # for SIG{__WARN__}
 }
 
 sub io_extract ($;@) {
diff --git a/lib/PublicInbox/LeiExternal.pm b/lib/PublicInbox/LeiExternal.pm
index bf07c41c..b1176824 100644
--- a/lib/PublicInbox/LeiExternal.pm
+++ b/lib/PublicInbox/LeiExternal.pm
@@ -31,11 +31,10 @@ sub _externals_each {
 
 sub lei_ls_external {
         my ($self, @argv) = @_;
-        my $out = $self->{1};
         my ($OFS, $ORS) = $self->{opt}->{z} ? ("\0", "\0\0") : (" ", "\n");
         $self->_externals_each(sub {
                 my ($loc, $boost_val) = @_;
-                print $out $loc, $OFS, 'boost=', $boost_val, $ORS;
+                $self->out($loc, $OFS, 'boost=', $boost_val, $ORS);
         });
 }
 
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index fa041457..1d62ffe2 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -107,28 +107,22 @@ sub new {
 sub ovv_begin {
         my ($self, $lei) = @_;
         if ($self->{fmt} eq 'json') {
-                print { $lei->{1} } '[';
+                $lei->out('[');
         } # TODO HTML/Atom/...
 }
 
 # called once by parent (via PublicInbox::EOFpipe)
 sub ovv_end {
         my ($self, $lei) = @_;
-        my $out = $lei->{1} or return;
         if ($self->{fmt} eq 'json') {
                 # JSON doesn't allow trailing commas, and preventing
                 # trailing commas is a PITA when parallelizing outputs
-                print $out "null]\n";
+                $lei->out("null]\n");
         } elsif ($self->{fmt} eq 'concatjson') {
-                print $out "\n";
+                $lei->out("\n");
         }
 }
 
-sub ovv_atfork_child {
-        my ($self) = @_;
-        # reopen dedupe here
-}
-
 # prepares an smsg for JSON
 sub _unbless_smsg {
         my ($smsg, $mitem) = @_;
@@ -168,9 +162,8 @@ sub ovv_atexit_child {
                 $git->async_wait_all;
         }
         if (my $bref = delete $lei->{ovv_buf}) {
-                my $out = $lei->{1} or return;
                 my $lk = $self->lock_for_scope;
-                print $out $$bref;
+                $lei->out($$bref);
         }
 }
 
@@ -268,11 +261,10 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                                 }
                         } sort keys %$smsg);
                         $buf .= $EOR;
-                        if (length($buf) > 65536) {
-                                my $lk = $self->lock_for_scope;
-                                print { $lei->{1} } $buf;
-                                $buf = '';
-                        }
+                        return if length($buf) < 65536;
+                        my $lk = $self->lock_for_scope;
+                        $lei->out($buf);
+                        $buf = '';
                 }
         } elsif ($json) {
                 my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
@@ -280,11 +272,10 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                         my ($smsg, $mitem) = @_;
                         return if $dedupe->is_smsg_dup($smsg);
                         $buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
-                        if (length($buf) > 65536) {
-                                my $lk = $self->lock_for_scope;
-                                print { $lei->{1} } $buf;
-                                $buf = '';
-                        }
+                        return if length($buf) < 65536;
+                        my $lk = $self->lock_for_scope;
+                        $lei->out($buf);
+                        $buf = '';
                 }
         } # else { ...
 }
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 1f6c2a3b..01e7cec5 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -17,7 +17,7 @@ use PublicInbox::GitAsyncCat;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
-use Errno qw(EEXIST ESPIPE ENOENT);
+use Errno qw(EEXIST ESPIPE ENOENT EPIPE);
 
 # struggles with short-lived repos, Gcf2Client makes little sense with lei;
 # but we may use in-process libgit2 in the future.
@@ -68,14 +68,16 @@ sub _mbox_hdr_buf ($$$) {
 }
 
 sub atomic_append { # for on-disk destinations (O_APPEND, or O_EXCL)
-        my ($fh, $buf) = @_;
-        defined(my $w = syswrite($fh, $$buf)) or die "write: $!";
-        $w == length($$buf) or die "short write: $w != ".length($$buf);
-}
-
-sub _print_full {
-        my ($fh, $buf) = @_;
-        print $fh $$buf or die "print: $!";
+        my ($lei, $buf) = @_;
+        if (defined(my $w = syswrite($lei->{1} // return, $$buf))) {
+                return if $w == length($$buf);
+                $buf = "short atomic write: $w != ".length($$buf);
+        } elsif ($! == EPIPE) {
+                return $lei->note_sigpipe(1);
+        } else {
+                $buf = "atomic write: $!";
+        }
+        $lei->fail($buf);
 }
 
 sub eml2mboxrd ($;$) {
@@ -248,24 +250,19 @@ sub _mbox_write_cb ($$) {
         my $ovv = $lei->{ovv};
         my $m = 'eml2'.$ovv->{fmt};
         my $eml2mbox = $self->can($m) or die "$self->$m missing";
-        my $out = $lei->{1} // die "no stdout ($m, $ovv->{dst})"; # redirected earlier
-        $out->autoflush(1);
-        my $write = $ovv->{lock_path} ? \&_print_full : \&atomic_append;
+        $lei->{1} // die "no stdout ($m, $ovv->{dst})"; # redirected earlier
+        $lei->{1}->autoflush(1);
+        my $atomic_append = !defined($ovv->{lock_path});
         my $dedupe = $lei->{dedupe};
         $dedupe->prepare_dedupe;
         sub { # for git_to_mail
                 my ($buf, $smsg, $eml) = @_;
-                return unless $out;
                 $eml //= PublicInbox::Eml->new($buf);
-                if (!$dedupe->is_dup($eml, $smsg->{blob})) {
-                        $buf = $eml2mbox->($eml, $smsg);
-                        my $lk = $ovv->lock_for_scope;
-                        eval { $write->($out, $buf) };
-                        if ($@) {
-                                die $@ if ref($@) ne 'PublicInbox::SIGPIPE';
-                                undef $out
-                        }
-                }
+                return if $dedupe->is_dup($eml, $smsg->{blob});
+                $buf = $eml2mbox->($eml, $smsg);
+                return atomic_append($lei, $buf) if $atomic_append;
+                my $lk = $ovv->lock_for_scope;
+                $lei->out($$buf);
         }
 }
 
@@ -467,8 +464,7 @@ sub write_mail { # via ->wq_do
         my ($self, $git_dir, $smsg, $lei) = @_;
         my $not_done = delete $self->{$lei->{each_smsg_not_done}};
         my $wcb = $self->{wcb} //= do { # first message
-                my %sig = $lei->atfork_child_wq($self);
-                @SIG{keys %sig} = values %sig; # not local
+                $lei->atfork_child_wq($self);
                 $self->write_cb($lei);
         };
         my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
@@ -483,7 +479,6 @@ sub wq_atexit_child {
                 $git->async_wait_all;
         }
         $SIG{__WARN__} = 'DEFAULT';
-        $SIG{PIPE} = 'DEFAULT';
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index e69b637c..de82a7da 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -109,8 +109,7 @@ sub wait_startq ($) {
 sub query_thread_mset { # for --thread
         my ($self, $lei, $ibxish) = @_;
         local $0 = "$0 query_thread_mset";
-        my %sig = $lei->atfork_child_wq($self);
-        local @SIG{keys %sig} = values %sig;
+        $lei->atfork_child_wq($self);
         my $startq = delete $lei->{startq};
 
         my ($srch, $over) = ($ibxish->search, $ibxish->over);
@@ -145,8 +144,7 @@ sub query_thread_mset { # for --thread
 sub query_mset { # non-parallel for non-"--thread" users
         my ($self, $lei) = @_;
         local $0 = "$0 query_mset";
-        my %sig = $lei->atfork_child_wq($self);
-        local @SIG{keys %sig} = values %sig;
+        $lei->atfork_child_wq($self);
         my $startq = delete $lei->{startq};
         my $mo = { %{$lei->{mset_opt}} };
         my $mset;
@@ -187,8 +185,7 @@ sub kill_reap {
 sub query_remote_mboxrd {
         my ($self, $lei, $uris) = @_;
         local $0 = "$0 query_remote_mboxrd";
-        my %sig = $lei->atfork_child_wq($self); # keep $self->{5} startq
-        local @SIG{keys %sig} = values %sig;
+        $lei->atfork_child_wq($self);
         my ($opt, $env) = @$lei{qw(opt env)};
         my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
         push(@qform, t => 1) if $opt->{thread};
@@ -351,9 +348,7 @@ sub start_query { # always runs in main (lei-daemon) process
 sub query_prepare { # called by wq_do
         my ($self, $lei) = @_;
         local $0 = "$0 query_prepare";
-        my %sig = $lei->atfork_child_wq($self);
-        -p $lei->{op_pipe} or die "BUG: \$done pipe expected";
-        local @SIG{keys %sig} = values %sig;
+        $lei->atfork_child_wq($self);
         delete $lei->{l2m}->{-wq_s1};
         eval { $lei->{l2m}->do_augment($lei) };
         $lei->fail($@) if $@;
@@ -363,11 +358,11 @@ sub query_prepare { # called by wq_do
 sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
         my ($lei) = @_;
         my $lxs = delete $lei->{lxs};
-        if ($lxs && $lxs->wq_kill_old) {
-                kill 'PIPE', $$;
+        if ($lxs && $lxs->wq_kill_old) { # is this the daemon?
                 $lxs->wq_wait_old;
         }
         close(delete $lei->{1}) if $lei->{1};
+        $lei->x_it(13);
 }
 
 sub do_query {
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index 47c0e3d4..f7535687 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -12,6 +12,7 @@ use List::Util qw(shuffle);
 require_mods(qw(DBD::SQLite));
 require PublicInbox::MboxReader;
 require PublicInbox::LeiOverview;
+require PublicInbox::LEI;
 use_ok 'PublicInbox::LeiToMail';
 my $from = "Content-Length: 10\nSubject: x\n\nFrom hell\n";
 my $noeol = "Subject: x\n\nFrom hell";
@@ -73,7 +74,11 @@ for my $mbox (@MBOX) {
 my ($tmpdir, $for_destroy) = tmpdir();
 local $ENV{TMPDIR} = $tmpdir;
 open my $err, '>>', "$tmpdir/lei.err" or BAIL_OUT $!;
-my $lei = { 2 => $err };
+my $lei = bless { 2 => $err }, 'PublicInbox::LEI';
+my $commit = sub {
+        $_[0] = undef; # wcb
+        delete $lei->{1};
+};
 my $buf = <<'EOM';
 From: x@example.com
 Subject: x
@@ -98,9 +103,7 @@ my $wcb_get = sub {
         my $zpipe = $l2m->pre_augment($lei);
         $l2m->do_augment($lei);
         $l2m->post_augment($lei, $zpipe);
-        my $cb = $l2m->write_cb($lei);
-        delete $lei->{1};
-        $cb;
+        $l2m->write_cb($lei);
 };
 
 my $deadbeef = { blob => 'deadbeef', kw => [ qw(seen) ] };
@@ -109,7 +112,7 @@ my $orig = do {
         is(ref $wcb, 'CODE', 'write_cb returned callback');
         ok(-f $fn && !-s _, 'empty file created');
         $wcb->(\(my $dup = $buf), $deadbeef);
-        undef $wcb;
+        $commit->($wcb);
         open my $fh, '<', $fn or BAIL_OUT $!;
         my $raw = do { local $/; <$fh> };
         like($raw, qr/^blah\n/sm, 'wrote content');
@@ -119,7 +122,7 @@ my $orig = do {
         $wcb = $wcb_get->($mbox, $fn);
         ok(-f $fn && !-s _, 'truncated mbox destination');
         $wcb->(\($dup = $buf), $deadbeef);
-        undef $wcb;
+        $commit->($wcb);
         open $fh, '<', $fn or BAIL_OUT $!;
         is(do { local $/; <$fh> }, $raw, 'jobs > 1');
         $raw;
@@ -134,7 +137,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
                 my $f = "$fn.$zsfx";
                 my $wcb = $wcb_get->($mbox, $f);
                 $wcb->(\(my $dup = $buf), $deadbeef);
-                undef $wcb;
+                $commit->($wcb);
                 my $uncompressed = xqx([@$dc_cmd, $f]);
                 is($uncompressed, $orig, "$zsfx works unlocked");
 
@@ -142,13 +145,13 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
                 unlink $f or BAIL_OUT "unlink $!";
                 $wcb = $wcb_get->($mbox, $f);
                 $wcb->(\($dup = $buf), $deadbeef);
-                undef $wcb;
+                $commit->($wcb);
                 is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock");
 
                 local $lei->{opt} = { augment => 1 };
                 $wcb = $wcb_get->($mbox, $f);
                 $wcb->(\($dup = $buf . "\nx\n"), $deadbeef);
-                undef $wcb; # commit
+                $commit->($wcb);
 
                 my $cat = popen_rd([@$dc_cmd, $f]);
                 my @raw;
@@ -160,7 +163,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
                 local $lei->{opt} = { augment => 1, jobs => 2 };
                 $wcb = $wcb_get->($mbox, $f);
                 $wcb->(\($dup = $buf . "\ny\n"), $deadbeef);
-                undef $wcb; # commit
+                $commit->($wcb);
 
                 my @raw3;
                 $cat = popen_rd([@$dc_cmd, $f]);
@@ -183,7 +186,7 @@ if ('default deduplication uses content_hash') {
         my $wcb = $wcb_get->('mboxo', $fn);
         $deadbeef->{kw} = [];
         $wcb->(\(my $x = $buf), $deadbeef) for (1..2);
-        undef $wcb; # undef to commit changes
+        $commit->($wcb);
         my $cmp = '';
         open my $fh, '<', $fn or BAIL_OUT $!;
         PublicInbox::MboxReader->mboxo($fh, sub { $cmp .= $as_orig->(@_) });
@@ -192,7 +195,7 @@ if ('default deduplication uses content_hash') {
         local $lei->{opt} = { augment => 1 };
         $wcb = $wcb_get->('mboxo', $fn);
         $wcb->(\($x = $buf . "\nx\n"), $deadbeef) for (1..2);
-        undef $wcb; # undef to commit changes
+        $commit->($wcb);
         open $fh, '<', $fn or BAIL_OUT $!;
         my @x;
         PublicInbox::MboxReader->mboxo($fh, sub { push @x, $as_orig->(@_) });
@@ -206,7 +209,7 @@ if ('default deduplication uses content_hash') {
         local $lei->{1} = $tmp;
         my $wcb = $wcb_get->('mboxrd', '/dev/stdout');
         $wcb->(\(my $x = $buf), $deadbeef);
-        undef $wcb; # commit
+        $commit->($wcb);
         seek($tmp, 0, SEEK_SET) or BAIL_OUT $!;
         my $cmp = '';
         PublicInbox::MboxReader->mboxrd($tmp, sub { $cmp .= $as_orig->(@_) });
@@ -220,7 +223,7 @@ SKIP: { # FIFO support
         my $cat = popen_rd([which('cat'), $fn]);
         my $wcb = $wcb_get->('mboxo', $fn);
         $wcb->(\(my $x = $buf), $deadbeef);
-        undef $wcb; # commit
+        $commit->($wcb);
         my $cmp = '';
         PublicInbox::MboxReader->mboxo($cat, sub { $cmp .= $as_orig->(@_) });
         is($cmp, $buf, 'message written to FIFO');