From 584f9a6e1e47fca4041ad5205f631197c2bafc59 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 16 Oct 2021 09:29:53 +0000 Subject: lei sockets: favor level-triggered epoll for fairness Sigfd->event_step needs priority over script/lei clients, LeiSelfSocket, and everything else. --- lib/PublicInbox/LEI.pm | 20 ++++++++------------ lib/PublicInbox/LeiSelfSocket.pm | 27 +++++++++++---------------- 2 files changed, 19 insertions(+), 28 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 876598f9..6b989b33 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -12,13 +12,13 @@ use parent qw(PublicInbox::DS PublicInbox::LeiExternal PublicInbox::LeiQuery); use Getopt::Long (); use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un); -use Errno qw(EPIPE EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET); +use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET); use Cwd qw(getcwd); use POSIX qw(strftime); use IO::Handle (); use Fcntl qw(SEEK_SET); use PublicInbox::Config; -use PublicInbox::Syscall qw(EPOLLIN EPOLLET); +use PublicInbox::Syscall qw(EPOLLIN); use PublicInbox::DS qw(now dwaitpid); use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::Lock; @@ -1125,16 +1125,12 @@ sub event_step { local %ENV = %{$self->{env}}; local $current_lei = $self; eval { - my $buf; - while (my @fds = $recv_cmd->($self->{sock}, $buf, 4096)) { - if (scalar(@fds) == 1 && !defined($fds[0])) { - return if $! == EAGAIN; - next if $! == EINTR; - last if $! == ECONNRESET; - die "recvmsg: $!"; - } - for (@fds) { open my $rfh, '+<&=', $_ } + my @fds = $recv_cmd->($self->{sock}, my $buf, 4096); + if (scalar(@fds) == 1 && !defined($fds[0])) { + return if $! == EAGAIN; + die "recvmsg: $!" if $! != ECONNRESET; } + for (@fds) { open my $rfh, '+<&=', $_ } if ($buf eq '') { _drop_wq($self); # EOF, client disconnected dclose($self); @@ -1162,7 +1158,7 @@ sub event_step_init { my $sock = $self->{sock} or return; $self->{-event_init_done} //= do { # persist til $ops done $sock->blocking(0); - $self->SUPER::new($sock, EPOLLIN|EPOLLET); + $self->SUPER::new($sock, EPOLLIN); $sock; }; } diff --git a/lib/PublicInbox/LeiSelfSocket.pm b/lib/PublicInbox/LeiSelfSocket.pm index 3d847649..dd64b6cf 100644 --- a/lib/PublicInbox/LeiSelfSocket.pm +++ b/lib/PublicInbox/LeiSelfSocket.pm @@ -10,7 +10,7 @@ use v5.10.1; use parent qw(PublicInbox::DS); use Data::Dumper; $Data::Dumper::Useqq = 1; # should've been the Perl default :P -use PublicInbox::Syscall qw(EPOLLIN EPOLLET); +use PublicInbox::Syscall qw(EPOLLIN); use PublicInbox::Spawn; my $recv_cmd; @@ -20,26 +20,21 @@ sub new { $r->blocking(0); no warnings 'once'; $recv_cmd = $PublicInbox::LEI::recv_cmd; - $self->SUPER::new($r, EPOLLIN|EPOLLET); + $self->SUPER::new($r, EPOLLIN); } sub event_step { my ($self) = @_; - while (1) { - my (@fds) = $recv_cmd->($self->{sock}, my $buf, 4096 * 33); - if (scalar(@fds) == 1 && !defined($fds[0])) { - return if $!{EAGAIN}; - next if $!{EINTR}; - die "recvmsg: $!"; - } - # open so perl can auto-close them: - for my $fd (@fds) { - open(my $newfh, '+<&=', $fd) or die "open +<&=$fd: $!"; - } - return $self->close if $buf eq ''; - warn Dumper({ 'unexpected self msg' => $buf, fds => \@fds }); - # TODO: figure out what to do with these messages... + my (@fds) = $recv_cmd->($self->{sock}, my $buf, 4096 * 33); + if (scalar(@fds) == 1 && !defined($fds[0])) { + return if $!{EAGAIN}; + die "recvmsg: $!" unless $!{ECONNRESET}; + } else { # just in case open so perl can auto-close them: + for (@fds) { open my $fh, '+<&=', $_ }; } + return $self->close if $buf eq ''; + warn Dumper({ 'unexpected self msg' => $buf, fds => \@fds }); + # TODO: figure out what to do with these messages... } 1; -- cgit v1.2.3-24-ge0c7