about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2019-06-24 02:52:22 +0000
committerEric Wong <e@80x24.org>2019-06-24 05:26:26 +0000
commitca05b93cddfa2cc495451410222af3753bfe1b4e (patch)
treea6a1b6c437341c82ab3545eeec926956ae76f05c /lib/PublicInbox
parentdbcdabe601cfb29c8b7d5f169be9bf560d656a42 (diff)
downloadpublic-inbox-ca05b93cddfa2cc495451410222af3753bfe1b4e.tar.gz
We don't need to keep track of that field since we always
know what events we're interested in when using one-shot
wakeups.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/DS.pm64
-rw-r--r--lib/PublicInbox/EvCleanup.pm4
-rw-r--r--lib/PublicInbox/HTTP.pm13
-rw-r--r--lib/PublicInbox/HTTPD/Async.pm10
-rw-r--r--lib/PublicInbox/NNTP.pm25
-rw-r--r--lib/PublicInbox/Syscall.pm6
6 files changed, 50 insertions, 72 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 9c801214..f5986e55 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -29,8 +29,6 @@ use PublicInbox::Syscall qw(:epoll);
 use fields ('sock',              # underlying socket
             'wbuf',              # arrayref of coderefs or GLOB refs
             'wbuf_off',  # offset into first element of wbuf to start writing at
-            'event_watch',       # bitmask of events the client is interested in
-                                 # (EPOLLIN,OUT,etc.)
             );
 
 use Errno  qw(EAGAIN EINVAL);
@@ -318,6 +316,17 @@ sub PostEventLoop {
     return $keep_running;
 }
 
+# map EPOLL* bits to kqueue EV_* flags for EV_SET
+sub kq_flag ($$) {
+    my ($bit, $ev) = @_;
+    if ($ev & $bit) {
+        my $fl = EV_ADD() | EV_ENABLE();
+        ($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl;
+    } else {
+        EV_DISABLE();
+    }
+}
+
 #####################################################################
 ### PublicInbox::DS-the-object code
 #####################################################################
@@ -344,25 +353,21 @@ sub new {
     Carp::cluck("undef sock and/or fd in PublicInbox::DS->new.  sock=" . ($sock || "") . ", fd=" . ($fd || ""))
         unless $sock && $fd;
 
-    $self->{event_watch} = $ev;
-
     _InitPoller();
 
     if ($HaveEpoll) {
 retry:
         if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
             if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
-                $self->{event_watch} = ($ev &= ~EPOLLEXCLUSIVE);
+                $ev &= ~EPOLLEXCLUSIVE;
                 goto retry;
             }
             die "couldn't add epoll watch for $fd: $!\n";
         }
     }
     elsif ($HaveKQueue) {
-        my $f = $ev & EPOLLIN ? EV_ENABLE() : EV_DISABLE();
-        $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | $f);
-        $f = $ev & EPOLLOUT ? EV_ENABLE() : EV_DISABLE();
-        $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | $f);
+        $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | kq_flag(EPOLLIN, $ev));
+        $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | kq_flag(EPOLLOUT, $ev));
     }
 
     Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
@@ -454,7 +459,7 @@ next_buf:
                     }
                 } elsif ($! == EAGAIN) {
                     $self->{wbuf_off} = $off;
-                    watch_write($self, 1);
+                    watch($self, EPOLLOUT|EPOLLONESHOT);
                     return 0;
                 } else {
                     return $self->close;
@@ -467,7 +472,6 @@ next_buf:
     } # while @$wbuf
 
     delete $self->{wbuf};
-    $self->watch_write(0);
     1; # all done
 }
 
@@ -544,7 +548,7 @@ sub write {
             return $self->close;
         }
         $self->{wbuf} = [ tmpbuf($bref, $written) ];
-        watch_write($self, 1);
+        watch($self, EPOLLOUT|EPOLLONESHOT);
         return 0;
     }
 }
@@ -563,49 +567,27 @@ sub msg_more ($$) {
 
             # queue up the unwritten substring:
             $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
-            watch_write($self, 1);
+            watch($self, EPOLLOUT|EPOLLONESHOT);
             return 0;
         }
     }
     $self->write(\($_[1]));
 }
 
-sub watch_chg ($$$) {
-    my ($self, $bits, $set) = @_;
+sub watch ($$) {
+    my ($self, $ev) = @_;
     my $sock = $self->{sock} or return;
-    my $cur = $self->{event_watch};
-    my $changes = $cur;
-    if ($set) {
-        $changes |= $bits;
-    } else {
-        $changes &= ~$bits;
-    }
-    return if $changes == $cur;
     my $fd = fileno($sock);
     if ($HaveEpoll) {
-        epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $changes) and
+        epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $ev) and
             confess("EPOLL_CTL_MOD $!");
     } elsif ($HaveKQueue) {
-        my $flag = $set ? EV_ENABLE() : EV_DISABLE();
-        $KQueue->EV_SET($fd, EVFILT_READ(), $flag) if $bits & EPOLLIN;
-        $KQueue->EV_SET($fd, EVFILT_WRITE(), $flag) if $bits & EPOLLOUT;
+        $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
+        $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
     }
-    $self->{event_watch} = $changes;
 }
 
-=head2 C<< $obj->watch_read( $boolean ) >>
-
-Turn 'readable' event notification on or off.
-
-=cut
-sub watch_read ($$) { watch_chg($_[0], EPOLLIN, $_[1]) };
-
-=head2 C<< $obj->watch_write( $boolean ) >>
-
-Turn 'writable' event notification on or off.
-
-=cut
-sub watch_write ($$) { watch_chg($_[0], EPOLLOUT, $_[1]) };
+sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
 
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm
index d60ac2cc..a9f6167d 100644
--- a/lib/PublicInbox/EvCleanup.pm
+++ b/lib/PublicInbox/EvCleanup.pm
@@ -6,6 +6,7 @@ package PublicInbox::EvCleanup;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
 
 my $ENABLED;
 sub enabled { $ENABLED }
@@ -59,13 +60,12 @@ sub _run_later () {
 # Called by PublicInbox::DS
 sub event_step {
         my ($self) = @_;
-        $self->watch_write(0);
         _run_asap();
 }
 
 sub _asap_timer () {
         $singleton ||= once_init();
-        $singleton->watch_write(1);
+        $singleton->watch(EPOLLOUT|EPOLLONESHOT);
         1;
 }
 
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index afa71ea5..773d77ba 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -20,6 +20,7 @@ use HTTP::Date qw(time2str);
 use IO::Handle;
 require PublicInbox::EvCleanup;
 PublicInbox::DS->import(qw(msg_more write_in_full));
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
 use constant {
         CHUNK_START => -1,   # [a-f0-9]+\r\n
         CHUNK_END => -2,     # \r\n
@@ -56,7 +57,7 @@ sub http_date () {
 sub new ($$$) {
         my ($class, $sock, $addr, $httpd) = @_;
         my $self = fields::new($class);
-        $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
+        $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
         $self->{httpd} = $httpd;
         $self->{rbuf} = '';
         ($self->{remote_addr}, $self->{remote_port}) =
@@ -80,7 +81,8 @@ sub event_step { # called by PublicInbox::DS
                 return $self->close if $r == 0;
                 return rbuf_process($self);
         }
-        return if $!{EAGAIN}; # no need to call watch_read(1) again
+
+        return $self->watch_in1 if $!{EAGAIN};
 
         # common for clients to break connections without warning,
         # would be too noisy to log here:
@@ -100,7 +102,7 @@ sub rbuf_process {
                         ($r == -2 && length($self->{rbuf}) > 0x4000)) {
                 return quit($self, 400);
         }
-        return $self->watch_read(1) if $r < 0; # incomplete
+        return $self->watch_in1 if $r < 0; # incomplete
         $self->{rbuf} = substr($self->{rbuf}, $r);
 
         my $len = input_prepare($self, \%env);
@@ -143,7 +145,6 @@ sub read_input ($) {
 
 sub app_dispatch {
         my ($self, $input) = @_;
-        $self->watch_read(0);
         my $env = $self->{env};
         $env->{REMOTE_ADDR} = $self->{remote_addr};
         $env->{REMOTE_PORT} = $self->{remote_port};
@@ -236,7 +237,7 @@ sub identity_wcb ($) {
 sub next_request ($) {
         my ($self) = @_;
         if ($self->{rbuf} eq '') { # wait for next request
-                $self->watch_read(1);
+                $self->watch_in1;
         } else { # avoid recursion for pipelined requests
                 push @$pipelineq, $self;
                 $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
@@ -360,7 +361,7 @@ sub recv_err {
         return $self->close if (defined $r && $r == 0);
         if ($!{EAGAIN}) {
                 $self->{input_left} = $len;
-                return;
+                return $self->watch_in1;
         }
         err($self, "error reading for input: $! ($len bytes remaining)");
         quit($self, 500);
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index dae62e55..f32ef009 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -31,10 +31,12 @@ sub new {
         $self;
 }
 
+sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
+
 # fires after pending writes are complete:
 sub restart_read_cb ($) {
         my ($self) = @_;
-        sub { $self->watch_read(1) }
+        sub { restart_read($self) }
 }
 
 sub main_cb ($$$) {
@@ -46,16 +48,16 @@ sub main_cb ($$$) {
                         $fh->write($$bref);
                         if ($http->{sock}) { # !closed
                                 if ($http->{wbuf}) {
-                                        $self->watch_read(0);
+                                        $self->watch(0);
                                         $http->write(restart_read_cb($self));
                                 }
-                                # stay in watch_read, but let other clients
+                                # stay in EPOLLIN, but let other clients
                                 # get some work done, too.
                                 return;
                         }
                         # fall through to close below...
                 } elsif (!defined $r) {
-                        return if $!{EAGAIN} || $!{EINTR};
+                        return restart_read($self) if $!{EAGAIN} || $!{EINTR};
                 }
 
                 # Done! Error handling will happen in $fh->close
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index eb1679a7..98f88410 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -24,6 +24,7 @@ use constant {
         r225 =>        '225 Headers follow (multi-line)',
         r430 => '430 No article with that message-id',
 };
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
 
 my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
 my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
@@ -52,12 +53,6 @@ sub next_tick () {
                         # pipelined request, we bypassed socket-readiness
                         # checks to get here:
                         event_step($nntp);
-
-                        # maybe there's more pipelined data, or we'll have
-                        # to register it for socket-readiness notifications
-                        if (!$nntp->{long_res} && $nntp->{sock}) {
-                                check_read($nntp);
-                        }
                 }
         }
 }
@@ -97,7 +92,7 @@ sub expire_old () {
 sub new ($$$) {
         my ($class, $sock, $nntpd) = @_;
         my $self = fields::new($class);
-        $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
+        $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
         $self->{nntpd} = $nntpd;
         res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
         $self->{rbuf} = '';
@@ -624,11 +619,10 @@ sub long_response ($$) {
         # make sure we disable reading during a long response,
         # clients should not be sending us stuff and making us do more
         # work while we are stream a response to them
-        $self->watch_read(0);
         my $t0 = now();
         $self->{long_res} = sub {
                 my $more = eval { $cb->() };
-                if ($@ || !$self->{sock}) {
+                if ($@ || !$self->{sock}) { # something bad happened...
                         $self->{long_res} = undef;
 
                         if ($@) {
@@ -922,10 +916,6 @@ sub do_write ($$) {
         my $done = $self->write(\($_[1]));
         return 0 unless $self->{sock};
 
-        # Do not watch for readability if we have data in the queue,
-        # instead re-enable watching for readability when we can
-        $self->watch_read(0) if (!$done || $self->{long_res});
-
         $done;
 }
 
@@ -943,7 +933,6 @@ sub event_step {
         my ($self) = @_;
 
         return unless $self->flush_write && $self->{sock};
-        return if $self->{long_res};
 
         update_idle_time($self);
         # only read more requests if we've drained the write buffer,
@@ -957,7 +946,7 @@ sub event_step {
                 my $off = length($$rbuf);
                 $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
                 unless (defined $r) {
-                        return if $!{EAGAIN};
+                        return $self->watch_in1 if $!{EAGAIN};
                         return $self->close;
                 }
                 return $self->close if $r == 0;
@@ -978,6 +967,10 @@ sub event_step {
         my $len = length($$rbuf);
         return $self->close if ($len >= LINE_MAX);
         update_idle_time($self);
+
+        # maybe there's more pipelined data, or we'll have
+        # to register it for socket-readiness notifications
+        check_read($self) unless ($self->{long_res} || $self->{wbuf});
 }
 
 sub check_read {
@@ -993,7 +986,7 @@ sub check_read {
         } else {
                 # no pipelined requests available, let the kernel know
                 # to wake us up if there's more
-                $self->watch_read(1); # PublicInbox::DS::watch_read
+                $self->watch_in1; # PublicInbox::DS::watch_in1
         }
 }
 
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index 17fd1398..f1988e61 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -24,11 +24,11 @@ $VERSION     = "0.25";
 @EXPORT_OK   = qw(sendfile epoll_ctl epoll_create epoll_wait
                   EPOLLIN EPOLLOUT
                   EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
-                  EPOLLEXCLUSIVE);
+                  EPOLLONESHOT EPOLLEXCLUSIVE);
 %EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait
                              EPOLLIN EPOLLOUT
                              EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
-                             EPOLLEXCLUSIVE)],
+                             EPOLLONESHOT EPOLLEXCLUSIVE)],
                 sendfile => [qw(sendfile)],
                 );
 
@@ -38,7 +38,7 @@ use constant EPOLLOUT      => 4;
 # use constant EPOLLHUP      => 16;
 # use constant EPOLLRDBAND   => 128;
 use constant EPOLLEXCLUSIVE => (1 << 28);
-# use constant EPOLLONESHOT => (1 << 30);
+use constant EPOLLONESHOT => (1 << 30);
 # use constant EPOLLET => (1 << 31);
 use constant EPOLL_CTL_ADD => 1;
 use constant EPOLL_CTL_DEL => 2;