diff options
author | Eric Wong <e@80x24.org> | 2021-01-10 12:15:12 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2021-01-12 03:51:42 +0000 |
commit | 6cc0e6870cb4950c08646769f2a7e30729b7d409 (patch) | |
tree | 13eb8e9cd433b215a8980ecbc6c7d1a741473466 /lib | |
parent | 0c89ebd477d1c7a695a0a0b3023c0d41abe573fa (diff) | |
download | public-inbox-6cc0e6870cb4950c08646769f2a7e30729b7d409.tar.gz |
Actually, sending 4 FDs will be useful for lei internal xsearch work once we start accepting input from stdin. It won't be used with the lightweight lei(1) client, however. For WWW (eventually), a single FD may be enough.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/PublicInbox/CmdIPC1.pm | 16 | ||||
-rw-r--r-- | lib/PublicInbox/CmdIPC4.pm | 12 | ||||
-rw-r--r-- | lib/PublicInbox/IPC.pm | 13 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 6 | ||||
-rw-r--r-- | lib/PublicInbox/Spawn.pm | 48 |
5 files changed, 53 insertions, 42 deletions
diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm index 0eed8bed..de6e54ef 100644 --- a/lib/PublicInbox/CmdIPC1.pm +++ b/lib/PublicInbox/CmdIPC1.pm @@ -10,17 +10,19 @@ BEGIN { eval { require IO::FDPass; # XS, available in all major distros no warnings 'once'; -*send_cmd1 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_; - for (1..3) { - IO::FDPass::send(fileno($_[0]), $_[$_]) or +*send_cmd1 = sub ($$$$) { # (sock, fds, buf, flags) = @_; + my ($sock, $fds, undef, $flags) = @_; + for my $fd (@$fds) { + IO::FDPass::send(fileno($sock), $fd) or die "IO::FDPass::send: $!"; } - send($_[0], $_[4], $_[5]) or die "send $!"; + send($sock, $_[2], $flags) or die "send $!"; }; -*recv_cmd1 = sub ($$$) { - my ($s, undef, $len) = @_; - my @fds = map { IO::FDPass::recv(fileno($s)) } (0..2); +*recv_cmd1 = sub ($$$;$) { + my ($s, undef, $len, $nfds) = @_; + $nfds //= 3; + my @fds = map { IO::FDPass::recv(fileno($s)) } (1..$nfds); recv($s, $_[1], $len, 0) // die "recv: $!"; length($_[1]) == 0 ? () : @fds; }; diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm index 90fca62d..c4fcb0d6 100644 --- a/lib/PublicInbox/CmdIPC4.pm +++ b/lib/PublicInbox/CmdIPC4.pm @@ -13,10 +13,12 @@ require Socket::MsgHdr; # XS no warnings 'once'; # 3 FDs per-sendmsg(2) + buffer -*send_cmd4 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_; - my $mh = Socket::MsgHdr->new(buf => $_[4]); - $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('iii', @_[1,2,3])); - Socket::MsgHdr::sendmsg($_[0], $mh, $_[5]) or die "sendmsg: $!"; +*send_cmd4 = sub ($$$$) { # (sock, fds, buf, flags) = @_; + my ($sock, $fds, undef, $flags) = @_; + my $mh = Socket::MsgHdr->new(buf => $_[2]); + $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, + pack('i' x scalar(@$fds), @$fds)); + Socket::MsgHdr::sendmsg($sock, $mh, $flags) or die "sendmsg: $!"; }; *recv_cmd4 = sub ($$$) { @@ -26,7 +28,7 @@ no warnings 'once'; $_[1] = $mh->buf; return () if $r == 0; my (undef, undef, $data) = $mh->cmsghdr; - unpack('iii', $data); + unpack('i' x (length($data) / 4), $data); }; } } # /eval /BEGIN diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index be5b2f45..b0a0bfb5 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -263,16 +263,15 @@ sub wq_worker_loop ($) { } sub wq_do { # always async - my ($self, $sub, $in, $out, $err, @args) = @_; + my ($self, $sub, $ios, @args) = @_; if (my $s1 = $self->{-wq_s1}) { # run in worker - $_ = fileno($_) for ($in, $out, $err); - $send_cmd->($s1, $in, $out, $err, - freeze([$sub, @args]), MSG_EOR); + my $fds = [ map { fileno($_) } @$ios ]; + $send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR); } else { - @$self{0, 1, 2} = ($in, $out, $err); + @$self{0..$#$ios} = @$ios; eval { $self->$sub(@args) }; warn "wq_do: $@" if $@; - delete @$self{0, 1, 2}; + delete @$self{0..$#$ios}; } } @@ -334,7 +333,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker my ($self) = @_; my $workers = $self->{-wq_workers} or return; my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2'; - $self->wq_do('wq_exit', $s2, $s2, $s2); + $self->wq_do('wq_exit', [ $s2, $s2, $s2 ]); $self->{-wq_exit_pending}++; # caller must call wq_worker_decr_wait in main loop } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index a3010efe..c0df21a8 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -150,14 +150,14 @@ sub do_query { $io[2]->autoflush(1); if ($lei->{opt}->{thread}) { for my $ibxish (@$srcs) { - $self->wq_do('query_thread_mset', @io, $lei, $ibxish); + $self->wq_do('query_thread_mset', \@io, $lei, $ibxish); } } else { - $self->wq_do('query_mset', @io, $lei, $srcs); + $self->wq_do('query_mset', \@io, $lei, $srcs); } # TODO for my $rmt (@{$self->{remotes} // []}) { - $self->wq_do('query_thread_mbox', @io, $lei, $rmt); + $self->wq_do('query_thread_mbox', \@io, $lei, $rmt); } } diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 7d0d9597..b35bf54c 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -209,20 +209,22 @@ my $fdpass = <<'FDPASS'; #include <sys/socket.h> #if defined(CMSG_SPACE) && defined(CMSG_LEN) -struct my_3fds { int fds[3]; }; +#define SEND_FD_CAPA 3 +#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int)) union my_cmsg { struct cmsghdr hdr; - char pad[sizeof(struct cmsghdr)+ 8 + sizeof(struct my_3fds) + 8]; + char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE]; }; -int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags) +int send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags) { struct msghdr msg = { 0 }; - struct iovec iov; union my_cmsg cmsg = { 0 }; - int *fdp; - size_t i; STRLEN dlen = 0; + struct iovec iov; + AV *fds = (AV *)SvRV(svfds); + I32 i, nfds = av_len(fds) + 1; + int *fdp; if (SvOK(data)) { iov.iov_base = SvPV(data, dlen); @@ -234,16 +236,22 @@ int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags) } msg.msg_iov = &iov; msg.msg_iovlen = 1; - msg.msg_control = &cmsg.hdr; - msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds)); - - cmsg.hdr.cmsg_level = SOL_SOCKET; - cmsg.hdr.cmsg_type = SCM_RIGHTS; - cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(struct my_3fds)); - fdp = (int *)CMSG_DATA(&cmsg.hdr); - *fdp++ = in; - *fdp++ = out; - *fdp++ = err; + if (nfds) { + if (nfds > SEND_FD_CAPA) { + fprintf(stderr, "FIXME: bump SEND_FD_CAPA=%d\n", nfds); + nfds = SEND_FD_CAPA; + } + msg.msg_control = &cmsg.hdr; + msg.msg_controllen = CMSG_SPACE(nfds * sizeof(int)); + cmsg.hdr.cmsg_level = SOL_SOCKET; + cmsg.hdr.cmsg_type = SCM_RIGHTS; + cmsg.hdr.cmsg_len = CMSG_LEN(nfds * sizeof(int)); + fdp = (int *)CMSG_DATA(&cmsg.hdr); + for (i = 0; i < nfds; i++) { + SV **fd = av_fetch(fds, i, 0); + *fdp++ = SvIV(*fd); + } + } return sendmsg(PerlIO_fileno(s), &msg, flags) >= 0; } @@ -263,17 +271,17 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n) msg.msg_iov = &iov; msg.msg_iovlen = 1; msg.msg_control = &cmsg.hdr; - msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds)); + msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE); i = recvmsg(PerlIO_fileno(s), &msg, 0); if (i < 0) croak("recvmsg: %s", strerror(errno)); SvCUR_set(buf, i); if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET && - cmsg.hdr.cmsg_type == SCM_RIGHTS && - cmsg.hdr.cmsg_len == CMSG_LEN(sizeof(struct my_3fds))) { + cmsg.hdr.cmsg_type == SCM_RIGHTS) { + size_t len = cmsg.hdr.cmsg_len; int *fdp = (int *)CMSG_DATA(&cmsg.hdr); - for (i = 0; i < 3; i++) + for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) Inline_Stack_Push(sv_2mortal(newSViv(*fdp++))); } Inline_Stack_Done; |