From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.9 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id A4D531FF9F for ; Sun, 10 Jan 2021 12:15:20 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 15/22] ipc: start supporting sending/receiving more than 3 FDs Date: Sun, 10 Jan 2021 12:15:12 +0000 Message-Id: <20210110121519.17044-16-e@80x24.org> In-Reply-To: <20210110121519.17044-1-e@80x24.org> References: <20210110121519.17044-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Actually, sending 4 FDs will be useful for lei internal xsearch work once we start accepting input from stdin. It won't be used with the lightweight lei(1) client, however. For WWW (eventually), a single FD may be enough. --- lib/PublicInbox/CmdIPC1.pm | 16 +++++++----- lib/PublicInbox/CmdIPC4.pm | 12 +++++---- lib/PublicInbox/IPC.pm | 13 +++++----- lib/PublicInbox/LeiXSearch.pm | 6 ++--- lib/PublicInbox/Spawn.pm | 48 ++++++++++++++++++++--------------- script/lei | 2 +- t/cmd_ipc.t | 5 ++-- t/ipc.t | 6 ++--- 8 files changed, 60 insertions(+), 48 deletions(-) diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm index 0eed8bed..de6e54ef 100644 --- a/lib/PublicInbox/CmdIPC1.pm +++ b/lib/PublicInbox/CmdIPC1.pm @@ -10,17 +10,19 @@ BEGIN { eval { require IO::FDPass; # XS, available in all major distros no warnings 'once'; -*send_cmd1 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_; - for (1..3) { - IO::FDPass::send(fileno($_[0]), $_[$_]) or +*send_cmd1 = sub ($$$$) { # (sock, fds, buf, flags) = @_; + my ($sock, $fds, undef, $flags) = @_; + for my $fd (@$fds) { + IO::FDPass::send(fileno($sock), $fd) or die "IO::FDPass::send: $!"; } - send($_[0], $_[4], $_[5]) or die "send $!"; + send($sock, $_[2], $flags) or die "send $!"; }; -*recv_cmd1 = sub ($$$) { - my ($s, undef, $len) = @_; - my @fds = map { IO::FDPass::recv(fileno($s)) } (0..2); +*recv_cmd1 = sub ($$$;$) { + my ($s, undef, $len, $nfds) = @_; + $nfds //= 3; + my @fds = map { IO::FDPass::recv(fileno($s)) } (1..$nfds); recv($s, $_[1], $len, 0) // die "recv: $!"; length($_[1]) == 0 ? () : @fds; }; diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm index 90fca62d..c4fcb0d6 100644 --- a/lib/PublicInbox/CmdIPC4.pm +++ b/lib/PublicInbox/CmdIPC4.pm @@ -13,10 +13,12 @@ require Socket::MsgHdr; # XS no warnings 'once'; # 3 FDs per-sendmsg(2) + buffer -*send_cmd4 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_; - my $mh = Socket::MsgHdr->new(buf => $_[4]); - $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('iii', @_[1,2,3])); - Socket::MsgHdr::sendmsg($_[0], $mh, $_[5]) or die "sendmsg: $!"; +*send_cmd4 = sub ($$$$) { # (sock, fds, buf, flags) = @_; + my ($sock, $fds, undef, $flags) = @_; + my $mh = Socket::MsgHdr->new(buf => $_[2]); + $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, + pack('i' x scalar(@$fds), @$fds)); + Socket::MsgHdr::sendmsg($sock, $mh, $flags) or die "sendmsg: $!"; }; *recv_cmd4 = sub ($$$) { @@ -26,7 +28,7 @@ no warnings 'once'; $_[1] = $mh->buf; return () if $r == 0; my (undef, undef, $data) = $mh->cmsghdr; - unpack('iii', $data); + unpack('i' x (length($data) / 4), $data); }; } } # /eval /BEGIN diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index be5b2f45..b0a0bfb5 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -263,16 +263,15 @@ sub wq_worker_loop ($) { } sub wq_do { # always async - my ($self, $sub, $in, $out, $err, @args) = @_; + my ($self, $sub, $ios, @args) = @_; if (my $s1 = $self->{-wq_s1}) { # run in worker - $_ = fileno($_) for ($in, $out, $err); - $send_cmd->($s1, $in, $out, $err, - freeze([$sub, @args]), MSG_EOR); + my $fds = [ map { fileno($_) } @$ios ]; + $send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR); } else { - @$self{0, 1, 2} = ($in, $out, $err); + @$self{0..$#$ios} = @$ios; eval { $self->$sub(@args) }; warn "wq_do: $@" if $@; - delete @$self{0, 1, 2}; + delete @$self{0..$#$ios}; } } @@ -334,7 +333,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker my ($self) = @_; my $workers = $self->{-wq_workers} or return; my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2'; - $self->wq_do('wq_exit', $s2, $s2, $s2); + $self->wq_do('wq_exit', [ $s2, $s2, $s2 ]); $self->{-wq_exit_pending}++; # caller must call wq_worker_decr_wait in main loop } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index a3010efe..c0df21a8 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -150,14 +150,14 @@ sub do_query { $io[2]->autoflush(1); if ($lei->{opt}->{thread}) { for my $ibxish (@$srcs) { - $self->wq_do('query_thread_mset', @io, $lei, $ibxish); + $self->wq_do('query_thread_mset', \@io, $lei, $ibxish); } } else { - $self->wq_do('query_mset', @io, $lei, $srcs); + $self->wq_do('query_mset', \@io, $lei, $srcs); } # TODO for my $rmt (@{$self->{remotes} // []}) { - $self->wq_do('query_thread_mbox', @io, $lei, $rmt); + $self->wq_do('query_thread_mbox', \@io, $lei, $rmt); } } diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 7d0d9597..b35bf54c 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -209,20 +209,22 @@ my $fdpass = <<'FDPASS'; #include #if defined(CMSG_SPACE) && defined(CMSG_LEN) -struct my_3fds { int fds[3]; }; +#define SEND_FD_CAPA 3 +#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int)) union my_cmsg { struct cmsghdr hdr; - char pad[sizeof(struct cmsghdr)+ 8 + sizeof(struct my_3fds) + 8]; + char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE]; }; -int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags) +int send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags) { struct msghdr msg = { 0 }; - struct iovec iov; union my_cmsg cmsg = { 0 }; - int *fdp; - size_t i; STRLEN dlen = 0; + struct iovec iov; + AV *fds = (AV *)SvRV(svfds); + I32 i, nfds = av_len(fds) + 1; + int *fdp; if (SvOK(data)) { iov.iov_base = SvPV(data, dlen); @@ -234,16 +236,22 @@ int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags) } msg.msg_iov = &iov; msg.msg_iovlen = 1; - msg.msg_control = &cmsg.hdr; - msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds)); - - cmsg.hdr.cmsg_level = SOL_SOCKET; - cmsg.hdr.cmsg_type = SCM_RIGHTS; - cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(struct my_3fds)); - fdp = (int *)CMSG_DATA(&cmsg.hdr); - *fdp++ = in; - *fdp++ = out; - *fdp++ = err; + if (nfds) { + if (nfds > SEND_FD_CAPA) { + fprintf(stderr, "FIXME: bump SEND_FD_CAPA=%d\n", nfds); + nfds = SEND_FD_CAPA; + } + msg.msg_control = &cmsg.hdr; + msg.msg_controllen = CMSG_SPACE(nfds * sizeof(int)); + cmsg.hdr.cmsg_level = SOL_SOCKET; + cmsg.hdr.cmsg_type = SCM_RIGHTS; + cmsg.hdr.cmsg_len = CMSG_LEN(nfds * sizeof(int)); + fdp = (int *)CMSG_DATA(&cmsg.hdr); + for (i = 0; i < nfds; i++) { + SV **fd = av_fetch(fds, i, 0); + *fdp++ = SvIV(*fd); + } + } return sendmsg(PerlIO_fileno(s), &msg, flags) >= 0; } @@ -263,17 +271,17 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n) msg.msg_iov = &iov; msg.msg_iovlen = 1; msg.msg_control = &cmsg.hdr; - msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds)); + msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE); i = recvmsg(PerlIO_fileno(s), &msg, 0); if (i < 0) croak("recvmsg: %s", strerror(errno)); SvCUR_set(buf, i); if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET && - cmsg.hdr.cmsg_type == SCM_RIGHTS && - cmsg.hdr.cmsg_len == CMSG_LEN(sizeof(struct my_3fds))) { + cmsg.hdr.cmsg_type == SCM_RIGHTS) { + size_t len = cmsg.hdr.cmsg_len; int *fdp = (int *)CMSG_DATA(&cmsg.hdr); - for (i = 0; i < 3; i++) + for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) Inline_Stack_Push(sv_2mortal(newSViv(*fdp++))); } Inline_Stack_Done; diff --git a/script/lei b/script/lei index d954b9eb..5e30f4d7 100755 --- a/script/lei +++ b/script/lei @@ -67,7 +67,7 @@ Falling back to (slow) one-shot mode $buf .= "\0\0"; select $sock; $| = 1; # unbuffer selected $sock - $send_cmd->($sock, 0, 1, 2, $buf, 0); + $send_cmd->($sock, [ 0, 1, 2 ], $buf, 0); while ($buf = <$sock>) { $buf =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0); die $buf; diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t index b9f4d128..22f73c19 100644 --- a/t/cmd_ipc.t +++ b/t/cmd_ipc.t @@ -17,7 +17,8 @@ my $do_test = sub { SKIP: { my ($s1, $s2); my $src = 'some payload' x 40; socketpair($s1, $s2, AF_UNIX, $type, 0) or BAIL_OUT $!; - $send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag); + my $sfds = [ fileno($r), fileno($w), fileno($s1) ]; + $send->($s1, $sfds, $src, $flag); my (@fds) = $recv->($s2, my $buf, length($src) + 1); is($buf, $src, 'got buffer payload '.$desc); my ($r1, $w1, $s1a); @@ -39,7 +40,7 @@ my $do_test = sub { SKIP: { if (defined($SOCK_SEQPACKET) && $type == $SOCK_SEQPACKET) { $r1 = $w1 = $s1a = undef; $src = (',' x 1023) . '-' .('.' x 1024); - $send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag); + $send->($s1, $sfds, $src, $flag); (@fds) = $recv->($s2, $buf, 1024); is($buf, (',' x 1023) . '-', 'silently truncated buf'); $opens->(); diff --git a/t/ipc.t b/t/ipc.t index 903294c5..d2b6ad4f 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -121,7 +121,7 @@ $warn->autoflush(0); local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ }; my @ppids; for my $t ('local', 'worker', 'worker again') { - $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, 'hello world'); + $ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world'); my $i = 0; for my $fh ($ra, $rb, $rc) { my $buf = readline($fh); @@ -129,7 +129,7 @@ for my $t ('local', 'worker', 'worker again') { like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)"); $i++; } - $ipc->wq_do('test_die', $wa, $wb, $wc); + $ipc->wq_do('test_die', [ $wa, $wb, $wc ]); my $ppid = $ipc->wq_workers_start('wq', 1); push(@ppids, $ppid); } @@ -142,7 +142,7 @@ SKIP: { my $pid = fork // BAIL_OUT $!; if ($pid == 0) { use POSIX qw(_exit); - $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, $$); + $ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], $$); _exit(0); } else { my $i = 0;