From ca05b93cddfa2cc495451410222af3753bfe1b4e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:22 +0000 Subject: ds: get rid of event_watch field We don't need to keep track of that field since we always know what events we're interested in when using one-shot wakeups. --- lib/PublicInbox/DS.pm | 64 +++++++++++++++--------------------------- lib/PublicInbox/EvCleanup.pm | 4 +-- lib/PublicInbox/HTTP.pm | 13 +++++---- lib/PublicInbox/HTTPD/Async.pm | 10 ++++--- lib/PublicInbox/NNTP.pm | 25 ++++++----------- lib/PublicInbox/Syscall.pm | 6 ++-- 6 files changed, 50 insertions(+), 72 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 9c801214..f5986e55 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -29,8 +29,6 @@ use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket 'wbuf', # arrayref of coderefs or GLOB refs 'wbuf_off', # offset into first element of wbuf to start writing at - 'event_watch', # bitmask of events the client is interested in - # (EPOLLIN,OUT,etc.) ); use Errno qw(EAGAIN EINVAL); @@ -318,6 +316,17 @@ sub PostEventLoop { return $keep_running; } +# map EPOLL* bits to kqueue EV_* flags for EV_SET +sub kq_flag ($$) { + my ($bit, $ev) = @_; + if ($ev & $bit) { + my $fl = EV_ADD() | EV_ENABLE(); + ($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl; + } else { + EV_DISABLE(); + } +} + ##################################################################### ### PublicInbox::DS-the-object code ##################################################################### @@ -344,25 +353,21 @@ sub new { Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || "")) unless $sock && $fd; - $self->{event_watch} = $ev; - _InitPoller(); if ($HaveEpoll) { retry: if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) { - $self->{event_watch} = ($ev &= ~EPOLLEXCLUSIVE); + $ev &= ~EPOLLEXCLUSIVE; goto retry; } die "couldn't add epoll watch for $fd: $!\n"; } } elsif ($HaveKQueue) { - my $f = $ev & EPOLLIN ? EV_ENABLE() : EV_DISABLE(); - $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | $f); - $f = $ev & EPOLLOUT ? EV_ENABLE() : EV_DISABLE(); - $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | $f); + $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | kq_flag(EPOLLIN, $ev)); + $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | kq_flag(EPOLLOUT, $ev)); } Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})") @@ -454,7 +459,7 @@ next_buf: } } elsif ($! == EAGAIN) { $self->{wbuf_off} = $off; - watch_write($self, 1); + watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } else { return $self->close; @@ -467,7 +472,6 @@ next_buf: } # while @$wbuf delete $self->{wbuf}; - $self->watch_write(0); 1; # all done } @@ -544,7 +548,7 @@ sub write { return $self->close; } $self->{wbuf} = [ tmpbuf($bref, $written) ]; - watch_write($self, 1); + watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } } @@ -563,49 +567,27 @@ sub msg_more ($$) { # queue up the unwritten substring: $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ]; - watch_write($self, 1); + watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } } $self->write(\($_[1])); } -sub watch_chg ($$$) { - my ($self, $bits, $set) = @_; +sub watch ($$) { + my ($self, $ev) = @_; my $sock = $self->{sock} or return; - my $cur = $self->{event_watch}; - my $changes = $cur; - if ($set) { - $changes |= $bits; - } else { - $changes &= ~$bits; - } - return if $changes == $cur; my $fd = fileno($sock); if ($HaveEpoll) { - epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $changes) and + epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $ev) and confess("EPOLL_CTL_MOD $!"); } elsif ($HaveKQueue) { - my $flag = $set ? EV_ENABLE() : EV_DISABLE(); - $KQueue->EV_SET($fd, EVFILT_READ(), $flag) if $bits & EPOLLIN; - $KQueue->EV_SET($fd, EVFILT_WRITE(), $flag) if $bits & EPOLLOUT; + $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev)); + $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev)); } - $self->{event_watch} = $changes; } -=head2 C<< $obj->watch_read( $boolean ) >> - -Turn 'readable' event notification on or off. - -=cut -sub watch_read ($$) { watch_chg($_[0], EPOLLIN, $_[1]) }; - -=head2 C<< $obj->watch_write( $boolean ) >> - -Turn 'writable' event notification on or off. - -=cut -sub watch_write ($$) { watch_chg($_[0], EPOLLOUT, $_[1]) }; +sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) } package PublicInbox::DS::Timer; # [$abs_float_firetime, $coderef]; diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm index d60ac2cc..a9f6167d 100644 --- a/lib/PublicInbox/EvCleanup.pm +++ b/lib/PublicInbox/EvCleanup.pm @@ -6,6 +6,7 @@ package PublicInbox::EvCleanup; use strict; use warnings; use base qw(PublicInbox::DS); +use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT); my $ENABLED; sub enabled { $ENABLED } @@ -59,13 +60,12 @@ sub _run_later () { # Called by PublicInbox::DS sub event_step { my ($self) = @_; - $self->watch_write(0); _run_asap(); } sub _asap_timer () { $singleton ||= once_init(); - $singleton->watch_write(1); + $singleton->watch(EPOLLOUT|EPOLLONESHOT); 1; } diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index afa71ea5..773d77ba 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -20,6 +20,7 @@ use HTTP::Date qw(time2str); use IO::Handle; require PublicInbox::EvCleanup; PublicInbox::DS->import(qw(msg_more write_in_full)); +use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); use constant { CHUNK_START => -1, # [a-f0-9]+\r\n CHUNK_END => -2, # \r\n @@ -56,7 +57,7 @@ sub http_date () { sub new ($$$) { my ($class, $sock, $addr, $httpd) = @_; my $self = fields::new($class); - $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN()); + $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT); $self->{httpd} = $httpd; $self->{rbuf} = ''; ($self->{remote_addr}, $self->{remote_port}) = @@ -80,7 +81,8 @@ sub event_step { # called by PublicInbox::DS return $self->close if $r == 0; return rbuf_process($self); } - return if $!{EAGAIN}; # no need to call watch_read(1) again + + return $self->watch_in1 if $!{EAGAIN}; # common for clients to break connections without warning, # would be too noisy to log here: @@ -100,7 +102,7 @@ sub rbuf_process { ($r == -2 && length($self->{rbuf}) > 0x4000)) { return quit($self, 400); } - return $self->watch_read(1) if $r < 0; # incomplete + return $self->watch_in1 if $r < 0; # incomplete $self->{rbuf} = substr($self->{rbuf}, $r); my $len = input_prepare($self, \%env); @@ -143,7 +145,6 @@ sub read_input ($) { sub app_dispatch { my ($self, $input) = @_; - $self->watch_read(0); my $env = $self->{env}; $env->{REMOTE_ADDR} = $self->{remote_addr}; $env->{REMOTE_PORT} = $self->{remote_port}; @@ -236,7 +237,7 @@ sub identity_wcb ($) { sub next_request ($) { my ($self) = @_; if ($self->{rbuf} eq '') { # wait for next request - $self->watch_read(1); + $self->watch_in1; } else { # avoid recursion for pipelined requests push @$pipelineq, $self; $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq); @@ -360,7 +361,7 @@ sub recv_err { return $self->close if (defined $r && $r == 0); if ($!{EAGAIN}) { $self->{input_left} = $len; - return; + return $self->watch_in1; } err($self, "error reading for input: $! ($len bytes remaining)"); quit($self, 500); diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index dae62e55..f32ef009 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -31,10 +31,12 @@ sub new { $self; } +sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) } + # fires after pending writes are complete: sub restart_read_cb ($) { my ($self) = @_; - sub { $self->watch_read(1) } + sub { restart_read($self) } } sub main_cb ($$$) { @@ -46,16 +48,16 @@ sub main_cb ($$$) { $fh->write($$bref); if ($http->{sock}) { # !closed if ($http->{wbuf}) { - $self->watch_read(0); + $self->watch(0); $http->write(restart_read_cb($self)); } - # stay in watch_read, but let other clients + # stay in EPOLLIN, but let other clients # get some work done, too. return; } # fall through to close below... } elsif (!defined $r) { - return if $!{EAGAIN} || $!{EINTR}; + return restart_read($self) if $!{EAGAIN} || $!{EINTR}; } # Done! Error handling will happen in $fh->close diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index eb1679a7..98f88410 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -24,6 +24,7 @@ use constant { r225 => '225 Headers follow (multi-line)', r430 => '430 No article with that message-id', }; +use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); my @OVERVIEW = qw(Subject From Date Message-ID References Xref); my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n"; @@ -52,12 +53,6 @@ sub next_tick () { # pipelined request, we bypassed socket-readiness # checks to get here: event_step($nntp); - - # maybe there's more pipelined data, or we'll have - # to register it for socket-readiness notifications - if (!$nntp->{long_res} && $nntp->{sock}) { - check_read($nntp); - } } } } @@ -97,7 +92,7 @@ sub expire_old () { sub new ($$$) { my ($class, $sock, $nntpd) = @_; my $self = fields::new($class); - $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN()); + $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT); $self->{nntpd} = $nntpd; res($self, '201 ' . $nntpd->{servername} . ' ready - post via email'); $self->{rbuf} = ''; @@ -624,11 +619,10 @@ sub long_response ($$) { # make sure we disable reading during a long response, # clients should not be sending us stuff and making us do more # work while we are stream a response to them - $self->watch_read(0); my $t0 = now(); $self->{long_res} = sub { my $more = eval { $cb->() }; - if ($@ || !$self->{sock}) { + if ($@ || !$self->{sock}) { # something bad happened... $self->{long_res} = undef; if ($@) { @@ -922,10 +916,6 @@ sub do_write ($$) { my $done = $self->write(\($_[1])); return 0 unless $self->{sock}; - # Do not watch for readability if we have data in the queue, - # instead re-enable watching for readability when we can - $self->watch_read(0) if (!$done || $self->{long_res}); - $done; } @@ -943,7 +933,6 @@ sub event_step { my ($self) = @_; return unless $self->flush_write && $self->{sock}; - return if $self->{long_res}; update_idle_time($self); # only read more requests if we've drained the write buffer, @@ -957,7 +946,7 @@ sub event_step { my $off = length($$rbuf); $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off); unless (defined $r) { - return if $!{EAGAIN}; + return $self->watch_in1 if $!{EAGAIN}; return $self->close; } return $self->close if $r == 0; @@ -978,6 +967,10 @@ sub event_step { my $len = length($$rbuf); return $self->close if ($len >= LINE_MAX); update_idle_time($self); + + # maybe there's more pipelined data, or we'll have + # to register it for socket-readiness notifications + check_read($self) unless ($self->{long_res} || $self->{wbuf}); } sub check_read { @@ -993,7 +986,7 @@ sub check_read { } else { # no pipelined requests available, let the kernel know # to wake us up if there's more - $self->watch_read(1); # PublicInbox::DS::watch_read + $self->watch_in1; # PublicInbox::DS::watch_in1 } } diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm index 17fd1398..f1988e61 100644 --- a/lib/PublicInbox/Syscall.pm +++ b/lib/PublicInbox/Syscall.pm @@ -24,11 +24,11 @@ $VERSION = "0.25"; @EXPORT_OK = qw(sendfile epoll_ctl epoll_create epoll_wait EPOLLIN EPOLLOUT EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD - EPOLLEXCLUSIVE); + EPOLLONESHOT EPOLLEXCLUSIVE); %EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait EPOLLIN EPOLLOUT EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD - EPOLLEXCLUSIVE)], + EPOLLONESHOT EPOLLEXCLUSIVE)], sendfile => [qw(sendfile)], ); @@ -38,7 +38,7 @@ use constant EPOLLOUT => 4; # use constant EPOLLHUP => 16; # use constant EPOLLRDBAND => 128; use constant EPOLLEXCLUSIVE => (1 << 28); -# use constant EPOLLONESHOT => (1 << 30); +use constant EPOLLONESHOT => (1 << 30); # use constant EPOLLET => (1 << 31); use constant EPOLL_CTL_ADD => 1; use constant EPOLL_CTL_DEL => 2; -- cgit v1.2.3-24-ge0c7