diff options
author | Eric Wong <e@80x24.org> | 2021-01-10 12:15:18 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2021-01-12 03:51:43 +0000 |
commit | 3019046b3ab9736922762df111d60ef7647e36a3 (patch) | |
tree | 6cc7ec956a0c4e3b392367fa7bced25943dbc7b7 /lib/PublicInbox | |
parent | 7b79c918a5ea79f6adc380ca917b0353475ab29c (diff) | |
download | public-inbox-3019046b3ab9736922762df111d60ef7647e36a3.tar.gz |
It's easier to make the code more generic by transferring all four FDs (std(in|out|err) + socket) instead of omitting stdin. We'll be reading from stdin on some imports, and possibly outputting to stdout, so omitting stdin now would needlessly complicate things. The differences with IO::FDPass "1" code paths and the "4" code paths used by Inline::C and Socket::MsgHdr are far too much to support and test at the moment.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/CmdIPC1.pm | 32 | ||||
-rw-r--r-- | lib/PublicInbox/IPC.pm | 45 | ||||
-rw-r--r-- | lib/PublicInbox/LEI.pm | 57 | ||||
-rw-r--r-- | lib/PublicInbox/LeiQuery.pm | 9 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 6 | ||||
-rw-r--r-- | lib/PublicInbox/Spawn.pm | 2 |
6 files changed, 57 insertions, 94 deletions
diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm deleted file mode 100644 index de6e54ef..00000000 --- a/lib/PublicInbox/CmdIPC1.pm +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright (C) 2021 all contributors <meta@public-inbox.org> -# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> - -# callers should use PublicInbox::CmdIPC1->can('send_cmd1') (or recv_cmd1) -# 2nd choice for lei(1) front-end and 3rd choice for lei internals -package PublicInbox::CmdIPC1; -use strict; -use v5.10.1; -BEGIN { eval { -require IO::FDPass; # XS, available in all major distros -no warnings 'once'; - -*send_cmd1 = sub ($$$$) { # (sock, fds, buf, flags) = @_; - my ($sock, $fds, undef, $flags) = @_; - for my $fd (@$fds) { - IO::FDPass::send(fileno($sock), $fd) or - die "IO::FDPass::send: $!"; - } - send($sock, $_[2], $flags) or die "send $!"; -}; - -*recv_cmd1 = sub ($$$;$) { - my ($s, undef, $len, $nfds) = @_; - $nfds //= 3; - my @fds = map { IO::FDPass::recv(fileno($s)) } (1..$nfds); - recv($s, $_[1], $len, 0) // die "recv: $!"; - length($_[1]) == 0 ? () : @fds; -}; - -} } # /eval /BEGIN - -1; diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 88f81e47..c54fcc64 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -37,37 +37,16 @@ if ($enc && $dec) { # should be custom ops } // warn("Storable (part of Perl) missing: $@\n"); } -my $recv_cmd1; # PublicInbox::CmdIPC1::recv_cmd1; my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4'); my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do { require PublicInbox::CmdIPC4; $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4'); PublicInbox::CmdIPC4->can('send_cmd4'); -} // do { - # IO::FDPass only allows sending a single FD at-a-time, which - # means we can't guarantee all packets end up on the same worker, - # so we cap WQ_MAX_WORKERS - require PublicInbox::CmdIPC1; - $recv_cmd1 = PublicInbox::CmdIPC1->can('recv_cmd1'); - $WQ_MAX_WORKERS = 1 if $recv_cmd1; - wq_set_recv_fds(3); - PublicInbox::CmdIPC1->can('send_cmd1'); }; -# needed to tell recv_cmd1 how many times to loop IO::FDPass::recv -sub wq_set_recv_fds { - return unless $recv_cmd1; - my $nfds = pop; - my $sub = sub { - my ($sock, $fds, undef, $flags) = @_; - $recv_cmd1->($sock, $fds, $_[2], $flags, $nfds); - }; - my $self = pop; - if (ref $self) { - $self->{-wq_recv_cmd} = $sub; - } else { - $recv_cmd = $sub; - } +sub wq_set_recv_modes { + my ($self, @modes) = @_; + $self->{-wq_recv_modes} = \@modes; } sub _get_rec ($) { @@ -259,7 +238,9 @@ sub ipc_sibling_atfork_child { sub _close_recvd ($) { my ($self) = @_; - close($_) for (grep { defined } (delete @$self{0..2})); + my $x = $self->{-wq_recv_modes}; + my $end = $x ? $#$x : 2; + close($_) for (grep { defined } (delete @$self{0..$end})); } sub wq_worker_loop ($) { @@ -271,13 +252,12 @@ sub wq_worker_loop ($) { local $SIG{PIPE} = sub { my $cur_sub = $sub; _close_recvd($self); - die(bless(\$cur_sub, __PACKAGE__.'::PIPE')) if $cur_sub; + die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub; }; - my $rcv = $self->{-wq_recv_cmd} // $recv_cmd; while (1) { - my (@fds) = $rcv->($s2, $buf, $len) or return; # EOF + my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF my $i = 0; - my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]}; + my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]}; for my $fd (@fds) { my $mode = shift(@m); if (open(my $cmdfh, $mode, $fd)) { @@ -296,7 +276,8 @@ sub wq_worker_loop ($) { undef $sub; # quiet SIG{PIPE} handler die $@ if $@; }; - warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE'; + warn "$$ wq_worker: $@" if $@ && + ref($@) ne 'PublicInbox::SIGPIPE'; # need to close explicitly to avoid warnings after SIGPIPE _close_recvd($self); } @@ -310,8 +291,8 @@ sub wq_do { # always async } else { @$self{0..$#$ios} = @$ios; eval { $self->$sub(@args) }; - warn "wq_do: $@" if $@; - delete @$self{0..$#$ios}; + warn "wq_do: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE'; + delete @$self{0..$#$ios}; # don't close } } diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index d19fb311..7313738e 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -277,8 +277,9 @@ sub atfork_prepare_wq { # usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq)); sub atfork_child_wq { my ($self, $wq) = @_; - $self->{sock} //= $wq->{0}; - $self->{$_} //= $wq->{$_} for (0..2); + return () if $self->{0}; # did not fork + $self->{$_} = $wq->{$_} for (0..2); + $self->{sock} = $wq->{3} // die 'BUG: no {sock}'; # may be undef my $oldpipe = $SIG{PIPE}; %PATH2CFG = (); @TO_CLOSE_ATFORK_CHILD = (); @@ -298,11 +299,10 @@ sub atfork_parent_wq { my $env = delete $self->{env}; # env is inherited at fork my $ret = bless { %$self }, ref($self); $self->{env} = $env; - delete @$ret{qw(-lei_store cfg)}; - my $in = delete $ret->{0}; - ($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2}); + delete @$ret{qw(-lei_store cfg pgr)}; + ($ret, delete @$ret{qw(0 1 2 sock)}); } else { - ($self, ($self->{sock} // $self->{0}), @$self{1, 2}); + ($self, @$self{qw(0 1 2 sock)}); } } @@ -641,7 +641,7 @@ sub start_pager { $new_env{MORE} = 'FRX' if $^O eq 'freebsd'; pipe(my ($r, $wpager)) or return warn "pipe: $!"; my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} }; - my $pid; + my $pgr = [ undef, @$rdr{1, 2} ]; if (my $sock = $self->{sock}) { # lei(1) process runs it delete @new_env{keys %$env}; # only set iff unset my $buf = "exec 1\0".$pager; @@ -649,12 +649,23 @@ sub start_pager { my $fds = [ map { fileno($_) } @$rdr{0..2} ]; $send_cmd->($sock, $fds, $buf .= "\n", 0); } else { - $pid = spawn([$pager], $env, $rdr); + $pgr->[0] = spawn([$pager], $env, $rdr); } $self->{1} = $wpager; $self->{2} = $wpager if -t $self->{2}; $env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git - [ $pid, @$rdr{1, 2} ]; + $self->{pgr} = $pgr; +} + +sub stop_pager { + my ($self) = @_; + my $pgr = delete($self->{pgr}) or return; + my $pid = $pgr->[0]; + close $self->{1}; + # {2} may not be redirected + $self->{1} = $pgr->[1]; + $self->{2} = $pgr->[2]; + dwaitpid($pid, undef, $self->{sock}) if $pid; } sub accept_dispatch { # Listener {post_accept} callback @@ -738,11 +749,7 @@ sub lazy_start { my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; local $oldset = PublicInbox::DS::block_signals(); - if ($nfd == 1) { - require PublicInbox::CmdIPC1; - $send_cmd = PublicInbox::CmdIPC1->can('send_cmd1'); - $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1'); - } elsif ($nfd == 4) { + if ($nfd == 4) { $send_cmd = PublicInbox::Spawn->can('send_cmd4'); $recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do { require PublicInbox::CmdIPC4; @@ -751,7 +758,7 @@ sub lazy_start { }; } $recv_cmd or die <<""; -(Socket::MsgHdr || IO::FDPass || Inline::C) missing/unconfigured (nfd=$nfd); +(Socket::MsgHdr || Inline::C) missing/unconfigured (nfd=$nfd); require PublicInbox::Listener; require PublicInbox::EOFpipe; @@ -839,19 +846,24 @@ sub lazy_start { exit($exit_code // 0); } -# for users w/o IO::FDPass +# for users w/o Socket::Msghdr sub oneshot { my ($main_pkg) = @_; my $exit = $main_pkg->can('exit'); # caller may override exit() local $quit = $exit if $exit; local %PATH2CFG; umask(077) // die("umask(077): $!"); - dispatch((bless { - 0 => *STDIN{GLOB}, - 1 => *STDOUT{GLOB}, - 2 => *STDERR{GLOB}, - env => \%ENV - }, __PACKAGE__), @ARGV); + local $SIG{PIPE} = sub { die(bless(\"$_[0]", 'PublicInbox::SIGPIPE')) }; + eval { + my $self = bless { + 0 => *STDIN{GLOB}, + 1 => *STDOUT{GLOB}, + 2 => *STDERR{GLOB}, + env => \%ENV + }, __PACKAGE__; + dispatch($self, @ARGV); + }; + die $@ if $@ && ref($@) ne 'PublicInbox::SIGPIPE'; } # ensures stdout hits the FS before sock disconnects so a client @@ -859,6 +871,7 @@ sub oneshot { sub DESTROY { my ($self) = @_; $self->{1}->autoflush(1); + stop_pager($self); } 1; diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 6e778785..2f4b99e5 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -80,14 +80,14 @@ sub lei_q { if ($self->{sock}) { $self->atfork_prepare_wq($lxs); $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset) - // $self->wq_workers($j); + // $lxs->wq_workers($j); } unshift(@srcs, $sto->search) if $opt->{'local'}; my $out = $opt->{output} // '-'; $out = 'json:/dev/stdout' if $out eq '-'; my $isatty = -t $self->{1}; # no forking workers after this - my $pid_old12 = $self->start_pager if $isatty; + $self->start_pager if $isatty; my $json = substr($out, 0, 5) eq 'json:' ? ref(PublicInbox::Config->json)->new : undef; if ($json) { @@ -125,11 +125,6 @@ sub lei_q { # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self); $self->{mset_opt} = \%mset_opt; $lxs->do_query($self, \@srcs); - if ($pid_old12) { # [ pid, stdout, stderr ] - my $pid = $pid_old12->[0]; - $self->{$_} = $pid_old12->[$_] for (1, 2); - dwaitpid($pid, undef, $self->{sock}) if $pid; - } } 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index b4172734..94f7c2bc 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -168,4 +168,10 @@ sub ipc_atfork_child { $self->SUPER::ipc_atfork_child; # PublicInbox::IPC } +sub ipc_atfork_prepare { + my ($self) = @_; + $self->wq_set_recv_modes(qw[<&= >&= >&= +<&=]); + $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC +} + 1; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index b35bf54c..ef822e1b 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS'; #include <sys/socket.h> #if defined(CMSG_SPACE) && defined(CMSG_LEN) -#define SEND_FD_CAPA 3 +#define SEND_FD_CAPA 4 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int)) union my_cmsg { struct cmsghdr hdr; |