From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.1 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, URIBL_SBL,URIBL_SBL_A shortcircuit=no autolearn=no autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 2A34E1FFAB for ; Sun, 10 Jan 2021 12:15:21 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 21/22] lei_xsearch: transfer 4 FDs internally, drop IO::FDPass Date: Sun, 10 Jan 2021 12:15:18 +0000 Message-Id: <20210110121519.17044-22-e@80x24.org> In-Reply-To: <20210110121519.17044-1-e@80x24.org> References: <20210110121519.17044-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: It's easier to make the code more generic by transferring all four FDs (std(in|out|err) + socket) instead of omitting stdin. We'll be reading from stdin on some imports, and possibly outputting to stdout, so omitting stdin now would needlessly complicate things. The differences with IO::FDPass "1" code paths and the "4" code paths used by Inline::C and Socket::MsgHdr are far too much to support and test at the moment. --- MANIFEST | 1 - lib/PublicInbox/CmdIPC1.pm | 32 -------------------- lib/PublicInbox/IPC.pm | 45 ++++++++------------------- lib/PublicInbox/LEI.pm | 57 +++++++++++++++++++++-------------- lib/PublicInbox/LeiQuery.pm | 9 ++---- lib/PublicInbox/LeiXSearch.pm | 6 ++++ lib/PublicInbox/Spawn.pm | 2 +- script/lei | 9 ++---- t/cmd_ipc.t | 9 ------ t/ipc.t | 53 +++++++++++++++++--------------- t/lei.t | 6 ++-- 11 files changed, 89 insertions(+), 140 deletions(-) delete mode 100644 lib/PublicInbox/CmdIPC1.pm diff --git a/MANIFEST b/MANIFEST index 62c14cd2..caddd8df 100644 --- a/MANIFEST +++ b/MANIFEST @@ -109,7 +109,6 @@ lib/PublicInbox/Admin.pm lib/PublicInbox/AdminEdit.pm lib/PublicInbox/AltId.pm lib/PublicInbox/Cgit.pm -lib/PublicInbox/CmdIPC1.pm lib/PublicInbox/CmdIPC4.pm lib/PublicInbox/CompressNoop.pm lib/PublicInbox/Config.pm diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm deleted file mode 100644 index de6e54ef..00000000 --- a/lib/PublicInbox/CmdIPC1.pm +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright (C) 2021 all contributors -# License: AGPL-3.0+ - -# callers should use PublicInbox::CmdIPC1->can('send_cmd1') (or recv_cmd1) -# 2nd choice for lei(1) front-end and 3rd choice for lei internals -package PublicInbox::CmdIPC1; -use strict; -use v5.10.1; -BEGIN { eval { -require IO::FDPass; # XS, available in all major distros -no warnings 'once'; - -*send_cmd1 = sub ($$$$) { # (sock, fds, buf, flags) = @_; - my ($sock, $fds, undef, $flags) = @_; - for my $fd (@$fds) { - IO::FDPass::send(fileno($sock), $fd) or - die "IO::FDPass::send: $!"; - } - send($sock, $_[2], $flags) or die "send $!"; -}; - -*recv_cmd1 = sub ($$$;$) { - my ($s, undef, $len, $nfds) = @_; - $nfds //= 3; - my @fds = map { IO::FDPass::recv(fileno($s)) } (1..$nfds); - recv($s, $_[1], $len, 0) // die "recv: $!"; - length($_[1]) == 0 ? () : @fds; -}; - -} } # /eval /BEGIN - -1; diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 88f81e47..c54fcc64 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -37,37 +37,16 @@ if ($enc && $dec) { # should be custom ops } // warn("Storable (part of Perl) missing: $@\n"); } -my $recv_cmd1; # PublicInbox::CmdIPC1::recv_cmd1; my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4'); my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do { require PublicInbox::CmdIPC4; $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4'); PublicInbox::CmdIPC4->can('send_cmd4'); -} // do { - # IO::FDPass only allows sending a single FD at-a-time, which - # means we can't guarantee all packets end up on the same worker, - # so we cap WQ_MAX_WORKERS - require PublicInbox::CmdIPC1; - $recv_cmd1 = PublicInbox::CmdIPC1->can('recv_cmd1'); - $WQ_MAX_WORKERS = 1 if $recv_cmd1; - wq_set_recv_fds(3); - PublicInbox::CmdIPC1->can('send_cmd1'); }; -# needed to tell recv_cmd1 how many times to loop IO::FDPass::recv -sub wq_set_recv_fds { - return unless $recv_cmd1; - my $nfds = pop; - my $sub = sub { - my ($sock, $fds, undef, $flags) = @_; - $recv_cmd1->($sock, $fds, $_[2], $flags, $nfds); - }; - my $self = pop; - if (ref $self) { - $self->{-wq_recv_cmd} = $sub; - } else { - $recv_cmd = $sub; - } +sub wq_set_recv_modes { + my ($self, @modes) = @_; + $self->{-wq_recv_modes} = \@modes; } sub _get_rec ($) { @@ -259,7 +238,9 @@ sub ipc_sibling_atfork_child { sub _close_recvd ($) { my ($self) = @_; - close($_) for (grep { defined } (delete @$self{0..2})); + my $x = $self->{-wq_recv_modes}; + my $end = $x ? $#$x : 2; + close($_) for (grep { defined } (delete @$self{0..$end})); } sub wq_worker_loop ($) { @@ -271,13 +252,12 @@ sub wq_worker_loop ($) { local $SIG{PIPE} = sub { my $cur_sub = $sub; _close_recvd($self); - die(bless(\$cur_sub, __PACKAGE__.'::PIPE')) if $cur_sub; + die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub; }; - my $rcv = $self->{-wq_recv_cmd} // $recv_cmd; while (1) { - my (@fds) = $rcv->($s2, $buf, $len) or return; # EOF + my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF my $i = 0; - my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]}; + my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]}; for my $fd (@fds) { my $mode = shift(@m); if (open(my $cmdfh, $mode, $fd)) { @@ -296,7 +276,8 @@ sub wq_worker_loop ($) { undef $sub; # quiet SIG{PIPE} handler die $@ if $@; }; - warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE'; + warn "$$ wq_worker: $@" if $@ && + ref($@) ne 'PublicInbox::SIGPIPE'; # need to close explicitly to avoid warnings after SIGPIPE _close_recvd($self); } @@ -310,8 +291,8 @@ sub wq_do { # always async } else { @$self{0..$#$ios} = @$ios; eval { $self->$sub(@args) }; - warn "wq_do: $@" if $@; - delete @$self{0..$#$ios}; + warn "wq_do: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE'; + delete @$self{0..$#$ios}; # don't close } } diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index d19fb311..7313738e 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -277,8 +277,9 @@ sub atfork_prepare_wq { # usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq)); sub atfork_child_wq { my ($self, $wq) = @_; - $self->{sock} //= $wq->{0}; - $self->{$_} //= $wq->{$_} for (0..2); + return () if $self->{0}; # did not fork + $self->{$_} = $wq->{$_} for (0..2); + $self->{sock} = $wq->{3} // die 'BUG: no {sock}'; # may be undef my $oldpipe = $SIG{PIPE}; %PATH2CFG = (); @TO_CLOSE_ATFORK_CHILD = (); @@ -298,11 +299,10 @@ sub atfork_parent_wq { my $env = delete $self->{env}; # env is inherited at fork my $ret = bless { %$self }, ref($self); $self->{env} = $env; - delete @$ret{qw(-lei_store cfg)}; - my $in = delete $ret->{0}; - ($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2}); + delete @$ret{qw(-lei_store cfg pgr)}; + ($ret, delete @$ret{qw(0 1 2 sock)}); } else { - ($self, ($self->{sock} // $self->{0}), @$self{1, 2}); + ($self, @$self{qw(0 1 2 sock)}); } } @@ -641,7 +641,7 @@ sub start_pager { $new_env{MORE} = 'FRX' if $^O eq 'freebsd'; pipe(my ($r, $wpager)) or return warn "pipe: $!"; my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} }; - my $pid; + my $pgr = [ undef, @$rdr{1, 2} ]; if (my $sock = $self->{sock}) { # lei(1) process runs it delete @new_env{keys %$env}; # only set iff unset my $buf = "exec 1\0".$pager; @@ -649,12 +649,23 @@ sub start_pager { my $fds = [ map { fileno($_) } @$rdr{0..2} ]; $send_cmd->($sock, $fds, $buf .= "\n", 0); } else { - $pid = spawn([$pager], $env, $rdr); + $pgr->[0] = spawn([$pager], $env, $rdr); } $self->{1} = $wpager; $self->{2} = $wpager if -t $self->{2}; $env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git - [ $pid, @$rdr{1, 2} ]; + $self->{pgr} = $pgr; +} + +sub stop_pager { + my ($self) = @_; + my $pgr = delete($self->{pgr}) or return; + my $pid = $pgr->[0]; + close $self->{1}; + # {2} may not be redirected + $self->{1} = $pgr->[1]; + $self->{2} = $pgr->[2]; + dwaitpid($pid, undef, $self->{sock}) if $pid; } sub accept_dispatch { # Listener {post_accept} callback @@ -738,11 +749,7 @@ sub lazy_start { my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; local $oldset = PublicInbox::DS::block_signals(); - if ($nfd == 1) { - require PublicInbox::CmdIPC1; - $send_cmd = PublicInbox::CmdIPC1->can('send_cmd1'); - $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1'); - } elsif ($nfd == 4) { + if ($nfd == 4) { $send_cmd = PublicInbox::Spawn->can('send_cmd4'); $recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do { require PublicInbox::CmdIPC4; @@ -751,7 +758,7 @@ sub lazy_start { }; } $recv_cmd or die <<""; -(Socket::MsgHdr || IO::FDPass || Inline::C) missing/unconfigured (nfd=$nfd); +(Socket::MsgHdr || Inline::C) missing/unconfigured (nfd=$nfd); require PublicInbox::Listener; require PublicInbox::EOFpipe; @@ -839,19 +846,24 @@ sub lazy_start { exit($exit_code // 0); } -# for users w/o IO::FDPass +# for users w/o Socket::Msghdr sub oneshot { my ($main_pkg) = @_; my $exit = $main_pkg->can('exit'); # caller may override exit() local $quit = $exit if $exit; local %PATH2CFG; umask(077) // die("umask(077): $!"); - dispatch((bless { - 0 => *STDIN{GLOB}, - 1 => *STDOUT{GLOB}, - 2 => *STDERR{GLOB}, - env => \%ENV - }, __PACKAGE__), @ARGV); + local $SIG{PIPE} = sub { die(bless(\"$_[0]", 'PublicInbox::SIGPIPE')) }; + eval { + my $self = bless { + 0 => *STDIN{GLOB}, + 1 => *STDOUT{GLOB}, + 2 => *STDERR{GLOB}, + env => \%ENV + }, __PACKAGE__; + dispatch($self, @ARGV); + }; + die $@ if $@ && ref($@) ne 'PublicInbox::SIGPIPE'; } # ensures stdout hits the FS before sock disconnects so a client @@ -859,6 +871,7 @@ sub oneshot { sub DESTROY { my ($self) = @_; $self->{1}->autoflush(1); + stop_pager($self); } 1; diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 6e778785..2f4b99e5 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -80,14 +80,14 @@ sub lei_q { if ($self->{sock}) { $self->atfork_prepare_wq($lxs); $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset) - // $self->wq_workers($j); + // $lxs->wq_workers($j); } unshift(@srcs, $sto->search) if $opt->{'local'}; my $out = $opt->{output} // '-'; $out = 'json:/dev/stdout' if $out eq '-'; my $isatty = -t $self->{1}; # no forking workers after this - my $pid_old12 = $self->start_pager if $isatty; + $self->start_pager if $isatty; my $json = substr($out, 0, 5) eq 'json:' ? ref(PublicInbox::Config->json)->new : undef; if ($json) { @@ -125,11 +125,6 @@ sub lei_q { # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self); $self->{mset_opt} = \%mset_opt; $lxs->do_query($self, \@srcs); - if ($pid_old12) { # [ pid, stdout, stderr ] - my $pid = $pid_old12->[0]; - $self->{$_} = $pid_old12->[$_] for (1, 2); - dwaitpid($pid, undef, $self->{sock}) if $pid; - } } 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index b4172734..94f7c2bc 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -168,4 +168,10 @@ sub ipc_atfork_child { $self->SUPER::ipc_atfork_child; # PublicInbox::IPC } +sub ipc_atfork_prepare { + my ($self) = @_; + $self->wq_set_recv_modes(qw[<&= >&= >&= +<&=]); + $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC +} + 1; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index b35bf54c..ef822e1b 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS'; #include #if defined(CMSG_SPACE) && defined(CMSG_LEN) -#define SEND_FD_CAPA 3 +#define SEND_FD_CAPA 4 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int)) union my_cmsg { struct cmsghdr hdr; diff --git a/script/lei b/script/lei index aac8fa94..5c32ab88 100755 --- a/script/lei +++ b/script/lei @@ -8,11 +8,6 @@ use PublicInbox::CmdIPC4; my $narg = 4; my $recv_cmd = PublicInbox::CmdIPC4->can('recv_cmd4'); my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do { - require PublicInbox::CmdIPC1; # 2nd choice - $narg = 1; - $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1'); - PublicInbox::CmdIPC1->can('send_cmd1'); -} // do { require PublicInbox::Spawn; # takes ~50ms even if built *sigh* $narg = 4; $recv_cmd = PublicInbox::Spawn->can('recv_cmd4'); @@ -77,7 +72,7 @@ Falling back to (slow) one-shot mode $pwd = $cwd; } 1; -}) { # (Socket::MsgHdr|IO::FDPass|Inline::C), $sock, $pwd are all available: +}) { # (Socket::MsgHdr|Inline::C), $sock, $pwd are all available: local $ENV{PWD} = $pwd; my $buf = join("\0", scalar(@ARGV), @ARGV); while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" } @@ -94,7 +89,7 @@ Falling back to (slow) one-shot mode die $buf; } } -} else { # for systems lacking Socket::MsgHdr, IO::FDPass or Inline::C +} else { # for systems lacking Socket::MsgHdr or Inline::C warn $@ if $@; require PublicInbox::LEI; PublicInbox::LEI::oneshot(__PACKAGE__); diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t index 22f73c19..0a0a4e00 100644 --- a/t/cmd_ipc.t +++ b/t/cmd_ipc.t @@ -79,13 +79,4 @@ SKIP: { } } -SKIP: { - require_mods('IO::FDPass', 13); - require_ok 'PublicInbox::CmdIPC1'; - $send = PublicInbox::CmdIPC1->can('send_cmd1'); - $recv = PublicInbox::CmdIPC1->can('recv_cmd1'); - $do_test->(SOCK_STREAM, 0, 'IO::FDPass stream'); - $do_test->($SOCK_SEQPACKET, MSG_EOR, 'IO::FDPass seqpacket'); -} - done_testing; diff --git a/t/ipc.t b/t/ipc.t index fd290809..22423a78 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -6,6 +6,7 @@ use v5.10.1; use Test::More; use PublicInbox::TestCommon; use Fcntl qw(SEEK_SET); +require_mods(qw(Storable||Sereal)); require_ok 'PublicInbox::IPC'; state $once = eval <<''; package PublicInbox::IPC; @@ -94,8 +95,7 @@ my $test = sub { }; $test->('local'); -SKIP: { - require_mods(qw(Storable||Sereal), 16); +{ my $pid = $ipc->ipc_worker_spawn('test worker'); ok($pid > 0 && kill(0, $pid), 'worker spawned and running'); defined($pid) or BAIL_OUT 'no spawn, no test'; @@ -112,7 +112,7 @@ SKIP: { $ipc->ipc_worker_stop; # idempotent # work queues -$ipc->{wq_open_modes} = [qw( >&= >&= >&= )]; +$ipc->wq_set_recv_modes(qw( >&= >&= >&= )); pipe(my ($ra, $wa)) or BAIL_OUT $!; pipe(my ($rb, $wb)) or BAIL_OUT $!; pipe(my ($rc, $wc)) or BAIL_OUT $!; @@ -136,7 +136,7 @@ for my $t ('local', 'worker', 'worker again') { # wq_do works across fork (siblings can feed) SKIP: { - skip 'Socket::MsgHdr, IO::FDPass, Inline::C missing', 7 if !$ppids[0]; + skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0]; is_deeply(\@ppids, [$$, undef, undef], 'parent pid returned in wq_workers_start'); my $pid = fork // BAIL_OUT $!; @@ -161,28 +161,31 @@ SKIP: { } $ipc->wq_close; -seek($warn, 0, SEEK_SET) or BAIL_OUT; -my @warn = <$warn>; -is(scalar(@warn), 3, 'warned 3 times'); -like($warn[0], qr/ wq_do: /, '1st warned from wq_do'); -like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker'); -is($warn[2], $warn[1], 'worker did not die'); - -$SIG{__WARN__} = 'DEFAULT'; -is($ipc->wq_workers_start('wq', 1), $$, 'workers started again'); -is($ipc->wq_workers, 1, '1 worker started'); SKIP: { - $ipc->WQ_MAX_WORKERS > 1 or - skip 'Inline::C or Socket::MsgHdr not available', 4; - $ipc->wq_worker_incr; - is($ipc->wq_workers, 2, 'worker count bumped'); - $ipc->wq_worker_decr; - $ipc->wq_worker_decr_wait(10); - is($ipc->wq_workers, 1, 'worker count lowered'); - is($ipc->wq_workers(2), 2, 'worker count set'); - is($ipc->wq_workers, 2, 'worker count stayed set'); + skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0]; + seek($warn, 0, SEEK_SET) or BAIL_OUT; + my @warn = <$warn>; + is(scalar(@warn), 3, 'warned 3 times'); + like($warn[0], qr/ wq_do: /, '1st warned from wq_do'); + like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker'); + is($warn[2], $warn[1], 'worker did not die'); + + $SIG{__WARN__} = 'DEFAULT'; + is($ipc->wq_workers_start('wq', 1), $$, 'workers started again'); + is($ipc->wq_workers, 1, '1 worker started'); + SKIP: { + $ipc->WQ_MAX_WORKERS > 1 or + skip 'Inline::C or Socket::MsgHdr not available', 4; + $ipc->wq_worker_incr; + is($ipc->wq_workers, 2, 'worker count bumped'); + $ipc->wq_worker_decr; + $ipc->wq_worker_decr_wait(10); + is($ipc->wq_workers, 1, 'worker count lowered'); + is($ipc->wq_workers(2), 2, 'worker count set'); + is($ipc->wq_workers, 2, 'worker count stayed set'); + } + $ipc->wq_close; + is($ipc->wq_workers, undef, 'workers undef after close'); } -$ipc->wq_close; -is($ipc->wq_workers, undef, 'workers undef after close'); done_testing; diff --git a/t/lei.t b/t/lei.t index 992800a5..6819f182 100644 --- a/t/lei.t +++ b/t/lei.t @@ -208,13 +208,11 @@ if ($ENV{TEST_LEI_ONESHOT}) { SKIP: { # real socket require_mods(qw(Cwd), my $nr = 105); - my $nfd = eval { require Socket::MsgHdr; 4 } // - eval { require IO::FDPass; 1 } // do { + my $nfd = eval { require Socket::MsgHdr; 4 } // do { require PublicInbox::Spawn; PublicInbox::Spawn->can('send_cmd4') ? 4 : undef; } // - skip 'Socket::MsgHdr, IO::FDPass or Inline::C missing or unconfigured', - $nr; + skip 'Socket::MsgHdr or Inline::C missing or unconfigured', $nr; local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run"; my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.sock";