From 42bb1574bf35a18f037d4ca8b0e6696192d301df Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 10 Jun 2019 01:33:30 +0000 Subject: ds: simplify write buffer accounting Keeping track of write_buf_size was redundant and pointless when we can simply check the number of elements in the buffer array. Multiple sources of truth leads to confusion; confusion leads to bugs. Finally, rename the prefixes to 'wbuf' to ensure we loudly (instead of silently) break any external dependencies being ported over from Danga::Socket, as further changes are pending. --- lib/PublicInbox/DS.pm | 41 +++++++++++++++++------------------------ lib/PublicInbox/HTTP.pm | 6 +++--- lib/PublicInbox/HTTPD/Async.pm | 2 +- lib/PublicInbox/NNTP.pm | 8 ++++---- 4 files changed, 25 insertions(+), 32 deletions(-) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 03612ce8..172a9f52 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -27,9 +27,8 @@ use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket 'fd', # numeric file descriptor - 'write_buf', # arrayref of scalars, scalarrefs, or coderefs to write - 'write_buf_offset', # offset into first array of write_buf to start writing at - 'write_buf_size', # total length of data in all write_buf items + 'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write + 'wbuf_off', # offset into first element of wbuf to start writing at 'closed', # bool: socket is closed 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) ); @@ -449,9 +448,8 @@ sub new { unless $sock && $fd; $self->{fd} = $fd; - $self->{write_buf} = []; - $self->{write_buf_offset} = 0; - $self->{write_buf_size} = 0; + $self->{wbuf} = []; + $self->{wbuf_off} = 0; $self->{closed} = 0; my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; @@ -552,7 +550,7 @@ sub _cleanup { # we need to flush our write buffer, as there may # be self-referential closures (sub { $client->close }) # preventing the object from being destroyed - $self->{write_buf} = []; + @{$self->{wbuf}} = (); # if we're using epoll, we have to remove this from our epoll fd so we stop getting # notifications about it @@ -612,12 +610,12 @@ sub write { # just queue data if there's already a wait my $need_queue; + my $wbuf = $self->{wbuf}; if (defined $data) { $bref = ref $data ? $data : \$data; - if ($self->{write_buf_size}) { - push @{$self->{write_buf}}, $bref; - $self->{write_buf_size} += ref $bref eq "SCALAR" ? length($$bref) : 1; + if (scalar @$wbuf) { + push @$wbuf, $bref; return 0; } @@ -629,7 +627,7 @@ sub write { WRITE: while (1) { - return 1 unless $bref ||= $self->{write_buf}[0]; + return 1 unless $bref ||= $wbuf->[0]; my $len; eval { @@ -638,8 +636,7 @@ sub write { if ($@) { if (UNIVERSAL::isa($bref, "CODE")) { unless ($need_queue) { - $self->{write_buf_size}--; # code refs are worth 1 - shift @{$self->{write_buf}}; + shift @$wbuf; } $bref->(); @@ -654,9 +651,9 @@ sub write { die "Write error: $@ <$bref>"; } - my $to_write = $len - $self->{write_buf_offset}; + my $to_write = $len - $self->{wbuf_off}; my $written = syswrite($self->{sock}, $$bref, $to_write, - $self->{write_buf_offset}); + $self->{wbuf_off}); if (! defined $written) { if ($! == EPIPE) { @@ -665,8 +662,7 @@ sub write { # since connection has stuff to write, it should now be # interested in pending writes: if ($need_queue) { - push @{$self->{write_buf}}, $bref; - $self->{write_buf_size} += $len; + push @$wbuf, $bref; } $self->watch_write(1); return 0; @@ -681,19 +677,17 @@ sub write { DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d", $written, $self->{fd}); if ($need_queue) { - push @{$self->{write_buf}}, $bref; - $self->{write_buf_size} += $len; + push @$wbuf, $bref; } # since connection has stuff to write, it should now be # interested in pending writes: - $self->{write_buf_offset} += $written; - $self->{write_buf_size} -= $written; + $self->{wbuf_off} += $written; $self->on_incomplete_write; return 0; } elsif ($written == $to_write) { DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)", $written, $self->{fd}, $need_queue); - $self->{write_buf_offset} = 0; + $self->{wbuf_off} = 0; $self->watch_write(0); # this was our only write, so we can return immediately @@ -702,8 +696,7 @@ sub write { # can't be anything else to write. return 1 if $need_queue; - $self->{write_buf_size} -= $written; - shift @{$self->{write_buf}}; + shift @$wbuf; undef $bref; next WRITE; } diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 977614b4..fd103251 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -260,7 +260,7 @@ sub getline_cb ($$$) { $write->($buf); # may close in PublicInbox::DS::write unless ($self->{closed}) { my $next = $self->{pull}; - if ($self->{write_buf_size}) { + if (scalar @{$self->{wbuf}}) { $self->write($next); } else { PublicInbox::EvCleanup::asap($next); @@ -315,7 +315,7 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; sub more ($$) { my $self = $_[0]; return if $self->{closed}; - if (MSG_MORE && !$self->{write_buf_size}) { + if (MSG_MORE && !scalar(@{$self->{wbuf}})) { my $n = send($self->{sock}, $_[1], MSG_MORE); if (defined $n) { my $nlen = length($_[1]) - $n; @@ -487,7 +487,7 @@ sub close { # for graceful shutdown in PublicInbox::Daemon: sub busy () { my ($self) = @_; - ($self->{rbuf} ne '' || $self->{env} || $self->{write_buf_size}); + ($self->{rbuf} ne '' || $self->{env} || scalar(@{$self->{wbuf}})); } 1; diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index dbe8a84a..60701085 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -46,7 +46,7 @@ sub main_cb ($$$) { if ($r) { $fh->write($$bref); unless ($http->{closed}) { # PublicInbox::DS sets this - if ($http->{write_buf_size}) { + if (scalar @{$http->{wbuf}}) { $self->watch_read(0); $http->write(restart_read_cb($self)); } diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index be80560f..b62c2187 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -619,7 +619,7 @@ sub long_response ($$) { update_idle_time($self); check_read($self); } - } elsif ($more) { # $self->{write_buf_size}: + } elsif ($more) { # scalar @{$self->{wbuf}}: # no recursion, schedule another call ASAP # but only after all pending writes are done update_idle_time($self); @@ -925,7 +925,7 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; sub do_more ($$) { my ($self, $data) = @_; - if (MSG_MORE && !$self->{write_buf_size}) { + if (MSG_MORE && !scalar(@{$self->{wbuf}})) { my $n = send($self->{sock}, $data, MSG_MORE); if (defined $n) { my $dlen = length($data); @@ -1004,8 +1004,8 @@ sub not_idle_long ($$) { # for graceful shutdown in PublicInbox::Daemon: sub busy { my ($self, $now) = @_; - ($self->{rbuf} ne '' || $self->{long_res} || $self->{write_buf_size} || - not_idle_long($self, $now)); + ($self->{rbuf} ne '' || $self->{long_res} || + scalar(@{$self->{wbuf}}) || not_idle_long($self, $now)); } 1; -- cgit v1.2.3-24-ge0c7 From 6fa2b29fcd0477d126ebb7db7f97b334f74bbcbc Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 3 Jun 2019 09:11:00 +0000 Subject: ds: cleanup Errno imports and favor constant comparisons Stop importing unused constants, and favor integer comparisons of `$!' over `$!{EFOO}' hash lookups. Integer comparisons are slightly faster, even: Benchmark: timing 30 iterations of cmp_eq, cmp_ne, hash_hit, hash_miss... cmp_eq: 1 wallclock secs ( 1.61 usr + 0.00 sys = 1.61 CPU) @ 18.63/s (n=30) cmp_ne: 2 wallclock secs ( 1.57 usr + 0.00 sys = 1.57 CPU) @ 19.11/s (n=30) hash_hit: 4 wallclock secs ( 3.85 usr + 0.00 sys = 3.85 CPU) @ 7.79/s (n=30) hash_miss: 4 wallclock secs ( 3.74 usr + 0.00 sys = 3.74 CPU) @ 8.02/s (n=30) #!/usr/bin/perl -w use Benchmark qw(:all); use Errno qw(EAGAIN EINTR); my ($r, $w); pipe($r, $w) or die 'pipe'; require IO::Handle; $r->blocking(0); my $buf; my $n = 30000; timethese(30, { hash_hit => sub { sysread($r, $buf, 1); for (0..$n) { next if $!{EAGAIN}; die 'FAIL'; } } , 'cmp_eq' => sub { sysread($r, $buf, 1); for (0..$n) { next if $! == EAGAIN; die 'FAIL'; } }, hash_miss => sub { sysread($r, $buf, 1); for (0..$n) { die 'FAIL' if $!{EINTR}; } }, 'cmp_ne' => sub { sysread($r, $buf, 1); for (0..$n) { die 'FAIL' if $! == EINTR; } }, }); --- lib/PublicInbox/DS.pm | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 172a9f52..39f1922f 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -33,8 +33,7 @@ use fields ('sock', # underlying socket 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) ); -use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN ENOTSOCK - EPIPE EAGAIN EBADF ECONNRESET ENOPROTOOPT); +use Errno qw(EPIPE EAGAIN ECONNRESET EINVAL); use Carp qw(croak confess); use constant DebugLevel => 0; @@ -462,7 +461,7 @@ sub new { } retry: if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { - if ($!{EINVAL} && ($ev & $EPOLLEXCLUSIVE)) { + if ($! == EINVAL && ($ev & $EPOLLEXCLUSIVE)) { $EPOLLEXCLUSIVE = 0; # old kernel $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP; goto retry; @@ -730,7 +729,7 @@ sub read { my $res = sysread($sock, $buf, $req_bytes, 0); DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!); - if (! $res && $! != EWOULDBLOCK) { + if (! $res && $! != EAGAIN) { # catches 0=conn closed or undef=error DebugLevel >= 2 && $self->debugmsg("Fd \#%d read hit the end of the road.", $self->{fd}); return undef; -- cgit v1.2.3-24-ge0c7 From 30ab5cf82b9d47242640f748a0f9a088ca783e32 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 4 Jun 2019 12:15:45 +0000 Subject: ds: reduce Errno imports and drop ->close reason ECONNRESET and EPIPE are common on a big Internet filled with unreliable connections, and there's nothing our code can do about it. So no point in wasting code to log them and there are plenty of tracing tools to choose from if such diagnostics are needed. --- lib/PublicInbox/DS.pm | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 39f1922f..e2aa4b55 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -33,7 +33,7 @@ use fields ('sock', # underlying socket 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) ); -use Errno qw(EPIPE EAGAIN ECONNRESET EINVAL); +use Errno qw(EAGAIN EINVAL); use Carp qw(croak confess); use constant DebugLevel => 0; @@ -509,22 +509,15 @@ sub steal_socket { return $sock; } -=head2 C<< $obj->close( [$reason] ) >> +=head2 C<< $obj->close >> -Close the socket. The I argument will be used in debugging messages. +Close the socket. =cut sub close { my PublicInbox::DS $self = $_[0]; return if $self->{closed}; - # print out debugging info for this close - if (DebugLevel) { - my ($pkg, $filename, $line) = caller; - my $reason = $_[1] || ""; - warn "Closing \#$self->{fd} due to $pkg/$filename/$line ($reason)\n"; - } - # this does most of the work of closing us $self->_cleanup(); @@ -655,9 +648,7 @@ sub write { $self->{wbuf_off}); if (! defined $written) { - if ($! == EPIPE) { - return $self->close("EPIPE"); - } elsif ($! == EAGAIN) { + if ($! == EAGAIN) { # since connection has stuff to write, it should now be # interested in pending writes: if ($need_queue) { @@ -665,13 +656,9 @@ sub write { } $self->watch_write(1); return 0; - } elsif ($! == ECONNRESET) { - return $self->close("ECONNRESET"); } - DebugLevel >= 1 && $self->debugmsg("Closing connection ($self) due to write error: $!\n"); - - return $self->close("write_error"); + return $self->close; } elsif ($written != $to_write) { DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d", $written, $self->{fd}); -- cgit v1.2.3-24-ge0c7 From dbf0cad365839a99e8582d6e26ce40c02508155d Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 10 Jun 2019 02:07:26 +0000 Subject: ds: remove {fd} field Storing the file descriptor was redundant as we can quickly call fileno($self->{sock}) and not have to store an extra hash table entry. Multiple sources of truth leads to confusion, confusion leads to bugs. --- lib/PublicInbox/DS.pm | 42 +++++++++++++++--------------------------- lib/PublicInbox/NNTP.pm | 10 ++++++---- 2 files changed, 21 insertions(+), 31 deletions(-) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index e2aa4b55..7d7baac7 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -26,7 +26,6 @@ use warnings; use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket - 'fd', # numeric file descriptor 'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write 'wbuf_off', # offset into first element of wbuf to start writing at 'closed', # bool: socket is closed @@ -440,13 +439,12 @@ sub new { my ($self, $sock, $exclusive) = @_; $self = fields::new($self) unless ref $self; - $self->{sock} = $sock; + $self->{sock} = $sock; my $fd = fileno($sock); Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || "")) unless $sock && $fd; - $self->{fd} = $fd; $self->{wbuf} = []; $self->{wbuf_off} = 0; $self->{closed} = 0; @@ -523,9 +521,8 @@ sub close { # defer closing the actual socket until the event loop is done # processing this round of events. (otherwise we might reuse fds) - if ($self->{sock}) { - push @ToClose, $self->{sock}; - $self->{sock} = undef; + if (my $sock = delete $self->{sock}) { + push @ToClose, $sock; } return 0; @@ -546,11 +543,10 @@ sub _cleanup { # if we're using epoll, we have to remove this from our epoll fd so we stop getting # notifications about it - if ($HaveEpoll && $self->{fd}) { - if (epoll_ctl($Epoll, EPOLL_CTL_DEL, $self->{fd}, $self->{event_watch}) != 0) { - # dump_error prints a backtrace so we can try to figure out why this happened - $self->dump_error("epoll_ctl(): failure deleting fd=$self->{fd} during _cleanup(); $! (" . ($!+0) . ")"); - } + if ($HaveEpoll && $self->{sock}) { + my $fd = fileno($self->{sock}); + epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and + confess("EPOLL_CTL_DEL: $!"); } # we explicitly don't delete from DescriptorMap here until we @@ -561,9 +557,6 @@ sub _cleanup { # looked at $pob->{closed} and ignore it. but if it's an # un-accounted for fd, then it (understandably) freak out a bit # and emit warnings, thinking their state got off. - - # and finally get rid of our fd so we can't use it anywhere else - $self->{fd} = undef; } =head2 C<< $obj->sock() >> @@ -660,8 +653,6 @@ sub write { return $self->close; } elsif ($written != $to_write) { - DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d", - $written, $self->{fd}); if ($need_queue) { push @$wbuf, $bref; } @@ -671,8 +662,6 @@ sub write { $self->on_incomplete_write; return 0; } elsif ($written == $to_write) { - DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)", - $written, $self->{fd}, $need_queue); $self->{wbuf_off} = 0; $self->watch_write(0); @@ -718,7 +707,6 @@ sub read { if (! $res && $! != EAGAIN) { # catches 0=conn closed or undef=error - DebugLevel >= 2 && $self->debugmsg("Fd \#%d read hit the end of the road.", $self->{fd}); return undef; } @@ -779,16 +767,16 @@ sub watch_read { $event &= ~POLLIN if ! $val; $event |= POLLIN if $val; + my $fd = fileno($self->{sock}); # If it changed, set it if ($event != $self->{event_watch}) { if ($HaveKQueue) { - $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_READ(), + $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); } elsif ($HaveEpoll) { - epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event) - and $self->dump_error("couldn't modify epoll settings for $self->{fd} " . - "from $self->{event_watch} -> $event: $! (" . ($!+0) . ")"); + epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and + confess("EPOLL_CTL_MOD: $!"); } $self->{event_watch} = $event; } @@ -808,17 +796,17 @@ sub watch_write { $event &= ~POLLOUT if ! $val; $event |= POLLOUT if $val; + my $fd = fileno($self->{sock}); # If it changed, set it if ($event != $self->{event_watch}) { if ($HaveKQueue) { - $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_WRITE(), + $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(), $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); } elsif ($HaveEpoll) { - epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event) - and $self->dump_error("couldn't modify epoll settings for $self->{fd} " . - "from $self->{event_watch} -> $event: $! (" . ($!+0) . ")"); + epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and + confess "EPOLL_CTL_MOD: $!"; } $self->{event_watch} = $event; } diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index b62c2187..7729399a 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -66,7 +66,8 @@ sub next_tick () { sub update_idle_time ($) { my ($self) = @_; - my $fd = $self->{fd}; + my $sock = $self->{sock} or return; + my $fd = fileno($sock); defined $fd and $EXPMAP->{$fd} = [ now(), $self ]; } @@ -595,7 +596,7 @@ sub long_response ($$) { my ($self, $cb) = @_; die "BUG: nested long response" if $self->{long_res}; - my $fd = $self->{fd}; + my $fd = fileno($self->{sock}); defined $fd or return; # make sure we disable reading during a long response, # clients should not be sending us stuff and making us do more @@ -963,7 +964,7 @@ sub event_read { my $line = $1; return $self->close if $line =~ /[[:cntrl:]]/s; my $t0 = now(); - my $fd = $self->{fd}; + my $fd = fileno($self->{sock}); $r = eval { process_line($self, $line) }; my $d = $self->{long_res} ? " deferred[$fd]" : ''; @@ -995,7 +996,8 @@ sub check_read { sub not_idle_long ($$) { my ($self, $now) = @_; - defined(my $fd = $self->{fd}) or return; + my $sock = $self->{sock} or return; + defined(my $fd = fileno($sock)) or return; my $ary = $EXPMAP->{$fd} or return; my $exp_at = $ary->[0] + $EXPTIME; $exp_at > $now; -- cgit v1.2.3-24-ge0c7 From 180fe300da41ab0251147055bbb9ae5d9ebd2b5b Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 10 Jun 2019 02:18:44 +0000 Subject: ds: remove steal_socket method We won't be needing it, not even for TLS support. --- lib/PublicInbox/DS.pm | 20 -------------------- 1 file changed, 20 deletions(-) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 7d7baac7..5177d1f1 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -487,26 +487,6 @@ retry: ### I N S T A N C E M E T H O D S ##################################################################### -=head2 C<< $obj->steal_socket() >> - -Basically returns our socket and makes it so that we don't try to close it, -but we do remove it from epoll handlers. THIS CLOSES $self. It is the same -thing as calling close, except it gives you the socket to use. - -=cut -sub steal_socket { - my PublicInbox::DS $self = $_[0]; - return if $self->{closed}; - - # cleanup does most of the work of closing this socket - $self->_cleanup(); - - # now undef our internal sock and fd structures so we don't use them - my $sock = $self->{sock}; - $self->{sock} = undef; - return $sock; -} - =head2 C<< $obj->close >> Close the socket. -- cgit v1.2.3-24-ge0c7 From 03ddcf8dd905fc5874115ebb30e7cb6e9bd75a3e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 10 Jun 2019 02:34:48 +0000 Subject: nntp: use sysread to append to existing buffer We already do this in PublicInbox::HTTP, as it's superior to DS::read in this regard. Initially (when I started writing NNTP.pm, I wanted to use Danga::Socket's read buffering and push_back_read (removed in DS) but quickly figured out it wasn't useful at all for dealing with trickling clients. --- lib/PublicInbox/NNTP.pm | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 7729399a..5e66d077 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -954,13 +954,20 @@ sub event_write { sub event_read { my ($self) = @_; use constant LINE_MAX => 512; # RFC 977 section 2.3 - - if (index($self->{rbuf}, "\n") < 0) { - my $buf = $self->read(LINE_MAX) or return $self->close; - $self->{rbuf} .= $$buf; + my $rbuf = \($self->{rbuf}); + my $r; + + if (index($$rbuf, "\n") < 0) { + my $off = length($$rbuf); + $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off); + unless (defined $r) { + return if $!{EAGAIN}; + return $self->close; + } + return $self->close if $r == 0; } - my $r = 1; - while ($r > 0 && $self->{rbuf} =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) { + $r = 1; + while ($r > 0 && $$rbuf =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) { my $line = $1; return $self->close if $line =~ /[[:cntrl:]]/s; my $t0 = now(); @@ -972,7 +979,7 @@ sub event_read { } return $self->close if $r < 0; - my $len = length($self->{rbuf}); + my $len = length($$rbuf); return $self->close if ($len >= LINE_MAX); update_idle_time($self); } -- cgit v1.2.3-24-ge0c7 From a68d7b5b57fd045e17188ab743905564afbd9b05 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 10 Jun 2019 02:39:24 +0000 Subject: ds: remove read method, here, too Since we stop using it in NNTP, we don't need it at all. --- lib/PublicInbox/DS.pm | 30 ------------------------------ 1 file changed, 30 deletions(-) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 5177d1f1..78ea7227 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -663,36 +663,6 @@ sub on_incomplete_write { $self->watch_write(1); } -=head2 C<< $obj->read( $bytecount ) >> - -Read at most I bytes from the underlying handle; returns scalar -ref on read, or undef on connection closed. - -=cut -sub read { - my PublicInbox::DS $self = shift; - return if $self->{closed}; - my $bytes = shift; - my $buf; - my $sock = $self->{sock}; - - # if this is too high, perl quits(!!). reports on mailing lists - # don't seem to point to a universal answer. 5MB worked for some, - # crashed for others. 1MB works for more people. let's go with 1MB - # for now. :/ - my $req_bytes = $bytes > 1048576 ? 1048576 : $bytes; - - my $res = sysread($sock, $buf, $req_bytes, 0); - DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!); - - if (! $res && $! != EAGAIN) { - # catches 0=conn closed or undef=error - return undef; - } - - return \$buf; -} - =head2 (VIRTUAL) C<< $obj->event_read() >> Readable event handler. Concrete deriviatives of PublicInbox::DS should -- cgit v1.2.3-24-ge0c7 From 6feb58ba2e9dee57b474e82e871a8945b537325e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 10 Jun 2019 03:02:11 +0000 Subject: ds: do not distinguish between POLLHUP and POLLERR In my experience, both are worthless as any normal read/write call path will be wanting to check errors and deal with them appropriately; so we can just call event_read, for now. Eventually, there'll probably be only one callback for dealing with all in/out/err/hup events to simplify logic, especially w.r.t TLS socket negotiation. --- lib/PublicInbox/DS.pm | 44 +++++++++++++++--------------------------- lib/PublicInbox/HTTP.pm | 5 ----- lib/PublicInbox/HTTPD/Async.pm | 2 -- lib/PublicInbox/NNTP.pm | 5 ----- 4 files changed, 16 insertions(+), 40 deletions(-) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 78ea7227..9277981b 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -239,6 +239,18 @@ sub RunTimers { return $timeout; } +# Placeholder callback when we hit POLLERR/POLLHUP or other unrecoverable +# errors. Shouldn't be needed in the future. +sub event_end ($) { + my ($self) = @_; + return if $self->{closed}; + $self->{wbuf} = []; + $self->{wbuf_off} = 0; + + # we're screwed if a read handler can't handle POLLERR/POLLHUP-type errors + $self->event_read; +} + ### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads ### okay. sub EpollEventLoop { @@ -268,9 +280,8 @@ sub EpollEventLoop { # standard non-profiling codepat $pob->event_read if $state & EPOLLIN && ! $pob->{closed}; $pob->event_write if $state & EPOLLOUT && ! $pob->{closed}; - if ($state & (EPOLLERR|EPOLLHUP)) { - $pob->event_err if $state & EPOLLERR && ! $pob->{closed}; - $pob->event_hup if $state & EPOLLHUP && ! $pob->{closed}; + if ($state & (EPOLLERR|EPOLLHUP) && ! $pob->{closed}) { + event_end($pob); } } return unless PostEventLoop(); @@ -320,8 +331,7 @@ sub PollEventLoop { $pob->event_read if $state & POLLIN && ! $pob->{closed}; $pob->event_write if $state & POLLOUT && ! $pob->{closed}; - $pob->event_err if $state & POLLERR && ! $pob->{closed}; - $pob->event_hup if $state & POLLHUP && ! $pob->{closed}; + event_end($pob) if $state & (POLLERR|POLLHUP) && ! $pob->{closed}; } return unless PostEventLoop(); @@ -357,11 +367,7 @@ sub KQueueEventLoop { $pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed}; $pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed}; if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) { - if ($fflags) { - $pob->event_err; - } else { - $pob->event_hup; - } + event_end($pob); } } return unless PostEventLoop(); @@ -672,24 +678,6 @@ called. =cut sub event_read { die "Base class event_read called for $_[0]\n"; } -=head2 (VIRTUAL) C<< $obj->event_err() >> - -Error event handler. Concrete deriviatives of PublicInbox::DS should -provide an implementation of this. The default implementation will die if -called. - -=cut -sub event_err { die "Base class event_err called for $_[0]\n"; } - -=head2 (VIRTUAL) C<< $obj->event_hup() >> - -'Hangup' event handler. Concrete deriviatives of PublicInbox::DS should -provide an implementation of this. The default implementation will die if -called. - -=cut -sub event_hup { die "Base class event_hup called for $_[0]\n"; } - =head2 C<< $obj->event_write() >> Writable event handler. Concrete deriviatives of PublicInbox::DS may wish to diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index fd103251..4fbc34ee 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -466,11 +466,6 @@ sub quit { $self->close; } -# callbacks for PublicInbox::DS - -sub event_hup { $_[0]->close } -sub event_err { $_[0]->close } - sub close { my $self = shift; my $forward = $self->{forward}; diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 60701085..4d0c8d5b 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -76,8 +76,6 @@ sub async_pass { } sub event_read { $_[0]->{cb}->(@_) } -sub event_hup { $_[0]->{cb}->(@_) } -sub event_err { $_[0]->{cb}->(@_) } sub close { my $self = shift; diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 5e66d077..85778c44 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -937,11 +937,6 @@ sub do_more ($$) { do_write($self, $data); } -# callbacks for PublicInbox::DS - -sub event_hup { $_[0]->close } -sub event_err { $_[0]->close } - sub event_write { my ($self) = @_; update_idle_time($self); -- cgit v1.2.3-24-ge0c7 From b1fca235b174d4e3df674fd301e7bfba222a9e97 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 10 Jun 2019 03:21:37 +0000 Subject: ds: stop caring about event flags set by epoll/poll/kqueue If we got something to write, then write it. Otherwise, try reading; and continue dealing with errors which normally occur along the way. Trying to read requests while we need to buffer in luserspace is suicidal from a memory management standpoint. The only adjustment needed for existing callers is EvCleanup; where we need to ensure we're always calling the dummy EvCleanup::event_write callback to accomplish nothing. --- lib/PublicInbox/DS.pm | 52 +++++++++++++------------------------------- lib/PublicInbox/EvCleanup.pm | 8 +++++-- 2 files changed, 21 insertions(+), 39 deletions(-) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 9277981b..2f028a36 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -239,15 +239,18 @@ sub RunTimers { return $timeout; } -# Placeholder callback when we hit POLLERR/POLLHUP or other unrecoverable -# errors. Shouldn't be needed in the future. -sub event_end ($) { +sub event_step ($) { my ($self) = @_; return if $self->{closed}; - $self->{wbuf} = []; - $self->{wbuf_off} = 0; - # we're screwed if a read handler can't handle POLLERR/POLLHUP-type errors + my $wbuf = $self->{wbuf}; + if (@$wbuf) { + $self->event_write; + return if $self->{closed} || scalar(@$wbuf); + } + + # only read more requests if we've drained the write buffer, + # otherwise we can be buffering infinitely w/o backpressure $self->event_read; } @@ -270,19 +273,7 @@ sub EpollEventLoop { # that ones in the front triggered unregister-interest actions. if we # can't find the %sock entry, it's because we're no longer interested # in that event. - my PublicInbox::DS $pob = $DescriptorMap{$ev->[0]}; - my $code; - my $state = $ev->[1]; - - DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n", - $ev->[0], ref($pob), $ev->[1], time); - - # standard non-profiling codepat - $pob->event_read if $state & EPOLLIN && ! $pob->{closed}; - $pob->event_write if $state & EPOLLOUT && ! $pob->{closed}; - if ($state & (EPOLLERR|EPOLLHUP) && ! $pob->{closed}) { - event_end($pob); - } + event_step($DescriptorMap{$ev->[0]}); } return unless PostEventLoop(); } @@ -327,11 +318,7 @@ sub PollEventLoop { my ($fd, $state) = splice(@poll, 0, 2); next unless $state; - $pob = $DescriptorMap{$fd}; - - $pob->event_read if $state & POLLIN && ! $pob->{closed}; - $pob->event_write if $state & POLLOUT && ! $pob->{closed}; - event_end($pob) if $state & (POLLERR|POLLHUP) && ! $pob->{closed}; + event_step($DescriptorMap{$fd}); } return unless PostEventLoop(); @@ -359,16 +346,7 @@ sub KQueueEventLoop { foreach my $kev (@ret) { my ($fd, $filter, $flags, $fflags) = @$kev; - my PublicInbox::DS $pob = $DescriptorMap{$fd}; - - DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n", - $fd, ref($pob), $flags, time); - - $pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed}; - $pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed}; - if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) { - event_end($pob); - } + event_step($DescriptorMap{$fd}); } return unless PostEventLoop(); } @@ -672,11 +650,11 @@ sub on_incomplete_write { =head2 (VIRTUAL) C<< $obj->event_read() >> Readable event handler. Concrete deriviatives of PublicInbox::DS should -provide an implementation of this. The default implementation will die if -called. +provide an implementation of this. The default implementation is a noop +if called. =cut -sub event_read { die "Base class event_read called for $_[0]\n"; } +sub event_read {} # noop =head2 C<< $obj->event_write() >> diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm index afed24ff..f76fb681 100644 --- a/lib/PublicInbox/EvCleanup.pm +++ b/lib/PublicInbox/EvCleanup.pm @@ -6,7 +6,6 @@ package PublicInbox::EvCleanup; use strict; use warnings; use base qw(PublicInbox::DS); -use fields qw(rd); my $ENABLED; sub enabled { $ENABLED } @@ -25,7 +24,12 @@ sub once_init () { pipe($r, $w) or die "pipe: $!"; fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ $self->SUPER::new($w); - $self->{rd} = $r; # never read, since we never write.. + + # always writable, since PublicInbox::EvCleanup::event_write + # never drains wbuf. We can avoid wasting a hash slot by + # stuffing the read-end of the pipe into the never-to-be-touched + # wbuf + push @{$self->{wbuf}}, $r; $self; } -- cgit v1.2.3-24-ge0c7