about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-10 12:15:18 +0000
committerEric Wong <e@80x24.org>2021-01-12 03:51:43 +0000
commit3019046b3ab9736922762df111d60ef7647e36a3 (patch)
tree6cc7ec956a0c4e3b392367fa7bced25943dbc7b7 /lib/PublicInbox
parent7b79c918a5ea79f6adc380ca917b0353475ab29c (diff)
downloadpublic-inbox-3019046b3ab9736922762df111d60ef7647e36a3.tar.gz
It's easier to make the code more generic by transferring
all four FDs (std(in|out|err) + socket) instead of omitting
stdin.

We'll be reading from stdin on some imports, and possibly
outputting to stdout, so omitting stdin now would needlessly
complicate things.

The differences with IO::FDPass "1" code paths and the "4"
code paths used by Inline::C and Socket::MsgHdr are far too
much to support and test at the moment.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/CmdIPC1.pm32
-rw-r--r--lib/PublicInbox/IPC.pm45
-rw-r--r--lib/PublicInbox/LEI.pm57
-rw-r--r--lib/PublicInbox/LeiQuery.pm9
-rw-r--r--lib/PublicInbox/LeiXSearch.pm6
-rw-r--r--lib/PublicInbox/Spawn.pm2
6 files changed, 57 insertions, 94 deletions
diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm
deleted file mode 100644
index de6e54ef..00000000
--- a/lib/PublicInbox/CmdIPC1.pm
+++ /dev/null
@@ -1,32 +0,0 @@
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# callers should use PublicInbox::CmdIPC1->can('send_cmd1') (or recv_cmd1)
-# 2nd choice for lei(1) front-end and 3rd choice for lei internals
-package PublicInbox::CmdIPC1;
-use strict;
-use v5.10.1;
-BEGIN { eval {
-require IO::FDPass; # XS, available in all major distros
-no warnings 'once';
-
-*send_cmd1 = sub ($$$$) { # (sock, fds, buf, flags) = @_;
-        my ($sock, $fds, undef, $flags) = @_;
-        for my $fd (@$fds) {
-                IO::FDPass::send(fileno($sock), $fd) or
-                                        die "IO::FDPass::send: $!";
-        }
-        send($sock, $_[2], $flags) or die "send $!";
-};
-
-*recv_cmd1 = sub ($$$;$) {
-        my ($s, undef, $len, $nfds) = @_;
-        $nfds //= 3;
-        my @fds = map { IO::FDPass::recv(fileno($s)) } (1..$nfds);
-        recv($s, $_[1], $len, 0) // die "recv: $!";
-        length($_[1]) == 0 ? () : @fds;
-};
-
-} } # /eval /BEGIN
-
-1;
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 88f81e47..c54fcc64 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -37,37 +37,16 @@ if ($enc && $dec) { # should be custom ops
         } // warn("Storable (part of Perl) missing: $@\n");
 }
 
-my $recv_cmd1; # PublicInbox::CmdIPC1::recv_cmd1;
 my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
 my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
         require PublicInbox::CmdIPC4;
         $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4');
         PublicInbox::CmdIPC4->can('send_cmd4');
-} // do {
-        # IO::FDPass only allows sending a single FD at-a-time, which
-        # means we can't guarantee all packets end up on the same worker,
-        # so we cap WQ_MAX_WORKERS
-        require PublicInbox::CmdIPC1;
-        $recv_cmd1 = PublicInbox::CmdIPC1->can('recv_cmd1');
-        $WQ_MAX_WORKERS = 1 if $recv_cmd1;
-        wq_set_recv_fds(3);
-        PublicInbox::CmdIPC1->can('send_cmd1');
 };
 
-# needed to tell recv_cmd1 how many times to loop IO::FDPass::recv
-sub wq_set_recv_fds {
-        return unless $recv_cmd1;
-        my $nfds = pop;
-        my $sub = sub {
-                my ($sock, $fds, undef, $flags) = @_;
-                $recv_cmd1->($sock, $fds, $_[2], $flags, $nfds);
-        };
-        my $self = pop;
-        if (ref $self) {
-                $self->{-wq_recv_cmd} = $sub;
-        } else {
-                $recv_cmd = $sub;
-        }
+sub wq_set_recv_modes {
+        my ($self, @modes) = @_;
+        $self->{-wq_recv_modes} = \@modes;
 }
 
 sub _get_rec ($) {
@@ -259,7 +238,9 @@ sub ipc_sibling_atfork_child {
 
 sub _close_recvd ($) {
         my ($self) = @_;
-        close($_) for (grep { defined } (delete @$self{0..2}));
+        my $x = $self->{-wq_recv_modes};
+        my $end = $x ? $#$x : 2;
+        close($_) for (grep { defined } (delete @$self{0..$end}));
 }
 
 sub wq_worker_loop ($) {
@@ -271,13 +252,12 @@ sub wq_worker_loop ($) {
         local $SIG{PIPE} = sub {
                 my $cur_sub = $sub;
                 _close_recvd($self);
-                die(bless(\$cur_sub, __PACKAGE__.'::PIPE')) if $cur_sub;
+                die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub;
         };
-        my $rcv = $self->{-wq_recv_cmd} // $recv_cmd;
         while (1) {
-                my (@fds) = $rcv->($s2, $buf, $len) or return; # EOF
+                my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
                 my $i = 0;
-                my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]};
+                my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
                 for my $fd (@fds) {
                         my $mode = shift(@m);
                         if (open(my $cmdfh, $mode, $fd)) {
@@ -296,7 +276,8 @@ sub wq_worker_loop ($) {
                         undef $sub; # quiet SIG{PIPE} handler
                         die $@ if $@;
                 };
-                warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE';
+                warn "$$ wq_worker: $@" if $@ &&
+                                        ref($@) ne 'PublicInbox::SIGPIPE';
                 # need to close explicitly to avoid warnings after SIGPIPE
                 _close_recvd($self);
         }
@@ -310,8 +291,8 @@ sub wq_do { # always async
         } else {
                 @$self{0..$#$ios} = @$ios;
                 eval { $self->$sub(@args) };
-                warn "wq_do: $@" if $@;
-                delete @$self{0..$#$ios};
+                warn "wq_do: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+                delete @$self{0..$#$ios}; # don't close
         }
 }
 
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index d19fb311..7313738e 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -277,8 +277,9 @@ sub atfork_prepare_wq {
 # usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
 sub atfork_child_wq {
         my ($self, $wq) = @_;
-        $self->{sock} //= $wq->{0};
-        $self->{$_} //= $wq->{$_} for (0..2);
+        return () if $self->{0}; # did not fork
+        $self->{$_} = $wq->{$_} for (0..2);
+        $self->{sock} = $wq->{3} // die 'BUG: no {sock}'; # may be undef
         my $oldpipe = $SIG{PIPE};
         %PATH2CFG = ();
         @TO_CLOSE_ATFORK_CHILD = ();
@@ -298,11 +299,10 @@ sub atfork_parent_wq {
                 my $env = delete $self->{env}; # env is inherited at fork
                 my $ret = bless { %$self }, ref($self);
                 $self->{env} = $env;
-                delete @$ret{qw(-lei_store cfg)};
-                my $in = delete $ret->{0};
-                ($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2});
+                delete @$ret{qw(-lei_store cfg pgr)};
+                ($ret, delete @$ret{qw(0 1 2 sock)});
         } else {
-                ($self, ($self->{sock} // $self->{0}), @$self{1, 2});
+                ($self, @$self{qw(0 1 2 sock)});
         }
 }
 
@@ -641,7 +641,7 @@ sub start_pager {
         $new_env{MORE} = 'FRX' if $^O eq 'freebsd';
         pipe(my ($r, $wpager)) or return warn "pipe: $!";
         my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
-        my $pid;
+        my $pgr = [ undef, @$rdr{1, 2} ];
         if (my $sock = $self->{sock}) { # lei(1) process runs it
                 delete @new_env{keys %$env}; # only set iff unset
                 my $buf = "exec 1\0".$pager;
@@ -649,12 +649,23 @@ sub start_pager {
                 my $fds = [ map { fileno($_) } @$rdr{0..2} ];
                 $send_cmd->($sock, $fds, $buf .= "\n", 0);
         } else {
-                $pid = spawn([$pager], $env, $rdr);
+                $pgr->[0] = spawn([$pager], $env, $rdr);
         }
         $self->{1} = $wpager;
         $self->{2} = $wpager if -t $self->{2};
         $env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
-        [ $pid, @$rdr{1, 2} ];
+        $self->{pgr} = $pgr;
+}
+
+sub stop_pager {
+        my ($self) = @_;
+        my $pgr = delete($self->{pgr}) or return;
+        my $pid = $pgr->[0];
+        close $self->{1};
+        # {2} may not be redirected
+        $self->{1} = $pgr->[1];
+        $self->{2} = $pgr->[2];
+        dwaitpid($pid, undef, $self->{sock}) if $pid;
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
@@ -738,11 +749,7 @@ sub lazy_start {
         my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
         pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
         local $oldset = PublicInbox::DS::block_signals();
-        if ($nfd == 1) {
-                require PublicInbox::CmdIPC1;
-                $send_cmd = PublicInbox::CmdIPC1->can('send_cmd1');
-                $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
-        } elsif ($nfd == 4) {
+        if ($nfd == 4) {
                 $send_cmd = PublicInbox::Spawn->can('send_cmd4');
                 $recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do {
                         require PublicInbox::CmdIPC4;
@@ -751,7 +758,7 @@ sub lazy_start {
                 };
         }
         $recv_cmd or die <<"";
-(Socket::MsgHdr || IO::FDPass || Inline::C) missing/unconfigured (nfd=$nfd);
+(Socket::MsgHdr || Inline::C) missing/unconfigured (nfd=$nfd);
 
         require PublicInbox::Listener;
         require PublicInbox::EOFpipe;
@@ -839,19 +846,24 @@ sub lazy_start {
         exit($exit_code // 0);
 }
 
-# for users w/o IO::FDPass
+# for users w/o Socket::Msghdr
 sub oneshot {
         my ($main_pkg) = @_;
         my $exit = $main_pkg->can('exit'); # caller may override exit()
         local $quit = $exit if $exit;
         local %PATH2CFG;
         umask(077) // die("umask(077): $!");
-        dispatch((bless {
-                0 => *STDIN{GLOB},
-                1 => *STDOUT{GLOB},
-                2 => *STDERR{GLOB},
-                env => \%ENV
-        }, __PACKAGE__), @ARGV);
+        local $SIG{PIPE} = sub { die(bless(\"$_[0]", 'PublicInbox::SIGPIPE')) };
+        eval {
+                my $self = bless {
+                        0 => *STDIN{GLOB},
+                        1 => *STDOUT{GLOB},
+                        2 => *STDERR{GLOB},
+                        env => \%ENV
+                }, __PACKAGE__;
+                dispatch($self, @ARGV);
+        };
+        die $@ if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
 }
 
 # ensures stdout hits the FS before sock disconnects so a client
@@ -859,6 +871,7 @@ sub oneshot {
 sub DESTROY {
         my ($self) = @_;
         $self->{1}->autoflush(1);
+        stop_pager($self);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 6e778785..2f4b99e5 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -80,14 +80,14 @@ sub lei_q {
         if ($self->{sock}) {
                 $self->atfork_prepare_wq($lxs);
                 $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
-                        // $self->wq_workers($j);
+                        // $lxs->wq_workers($j);
         }
         unshift(@srcs, $sto->search) if $opt->{'local'};
         my $out = $opt->{output} // '-';
         $out = 'json:/dev/stdout' if $out eq '-';
         my $isatty = -t $self->{1};
         # no forking workers after this
-        my $pid_old12 = $self->start_pager if $isatty;
+        $self->start_pager if $isatty;
         my $json = substr($out, 0, 5) eq 'json:' ?
                 ref(PublicInbox::Config->json)->new : undef;
         if ($json) {
@@ -125,11 +125,6 @@ sub lei_q {
         # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
         $self->{mset_opt} = \%mset_opt;
         $lxs->do_query($self, \@srcs);
-        if ($pid_old12) { # [ pid, stdout, stderr ]
-                my $pid = $pid_old12->[0];
-                $self->{$_} = $pid_old12->[$_] for (1, 2);
-                dwaitpid($pid, undef, $self->{sock}) if $pid;
-        }
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index b4172734..94f7c2bc 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -168,4 +168,10 @@ sub ipc_atfork_child {
         $self->SUPER::ipc_atfork_child; # PublicInbox::IPC
 }
 
+sub ipc_atfork_prepare {
+        my ($self) = @_;
+        $self->wq_set_recv_modes(qw[<&= >&= >&= +<&=]);
+        $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
+}
+
 1;
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index b35bf54c..ef822e1b 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-#define SEND_FD_CAPA 3
+#define SEND_FD_CAPA 4
 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
         struct cmsghdr hdr;