diff options
Diffstat (limited to 'lib/PublicInbox/DS.pm')
-rw-r--r-- | lib/PublicInbox/DS.pm | 235 |
1 files changed, 59 insertions, 176 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 03612ce8..2f028a36 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -26,16 +26,13 @@ use warnings; use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket - 'fd', # numeric file descriptor - 'write_buf', # arrayref of scalars, scalarrefs, or coderefs to write - 'write_buf_offset', # offset into first array of write_buf to start writing at - 'write_buf_size', # total length of data in all write_buf items + 'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write + 'wbuf_off', # offset into first element of wbuf to start writing at 'closed', # bool: socket is closed 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) ); -use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN ENOTSOCK - EPIPE EAGAIN EBADF ECONNRESET ENOPROTOOPT); +use Errno qw(EAGAIN EINVAL); use Carp qw(croak confess); use constant DebugLevel => 0; @@ -242,6 +239,21 @@ sub RunTimers { return $timeout; } +sub event_step ($) { + my ($self) = @_; + return if $self->{closed}; + + my $wbuf = $self->{wbuf}; + if (@$wbuf) { + $self->event_write; + return if $self->{closed} || scalar(@$wbuf); + } + + # only read more requests if we've drained the write buffer, + # otherwise we can be buffering infinitely w/o backpressure + $self->event_read; +} + ### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads ### okay. sub EpollEventLoop { @@ -261,20 +273,7 @@ sub EpollEventLoop { # that ones in the front triggered unregister-interest actions. if we # can't find the %sock entry, it's because we're no longer interested # in that event. - my PublicInbox::DS $pob = $DescriptorMap{$ev->[0]}; - my $code; - my $state = $ev->[1]; - - DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n", - $ev->[0], ref($pob), $ev->[1], time); - - # standard non-profiling codepat - $pob->event_read if $state & EPOLLIN && ! $pob->{closed}; - $pob->event_write if $state & EPOLLOUT && ! $pob->{closed}; - if ($state & (EPOLLERR|EPOLLHUP)) { - $pob->event_err if $state & EPOLLERR && ! $pob->{closed}; - $pob->event_hup if $state & EPOLLHUP && ! $pob->{closed}; - } + event_step($DescriptorMap{$ev->[0]}); } return unless PostEventLoop(); } @@ -319,12 +318,7 @@ sub PollEventLoop { my ($fd, $state) = splice(@poll, 0, 2); next unless $state; - $pob = $DescriptorMap{$fd}; - - $pob->event_read if $state & POLLIN && ! $pob->{closed}; - $pob->event_write if $state & POLLOUT && ! $pob->{closed}; - $pob->event_err if $state & POLLERR && ! $pob->{closed}; - $pob->event_hup if $state & POLLHUP && ! $pob->{closed}; + event_step($DescriptorMap{$fd}); } return unless PostEventLoop(); @@ -352,20 +346,7 @@ sub KQueueEventLoop { foreach my $kev (@ret) { my ($fd, $filter, $flags, $fflags) = @$kev; - my PublicInbox::DS $pob = $DescriptorMap{$fd}; - - DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n", - $fd, ref($pob), $flags, time); - - $pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed}; - $pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed}; - if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) { - if ($fflags) { - $pob->event_err; - } else { - $pob->event_hup; - } - } + event_step($DescriptorMap{$fd}); } return unless PostEventLoop(); } @@ -442,16 +423,14 @@ sub new { my ($self, $sock, $exclusive) = @_; $self = fields::new($self) unless ref $self; - $self->{sock} = $sock; + $self->{sock} = $sock; my $fd = fileno($sock); Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || "")) unless $sock && $fd; - $self->{fd} = $fd; - $self->{write_buf} = []; - $self->{write_buf_offset} = 0; - $self->{write_buf_size} = 0; + $self->{wbuf} = []; + $self->{wbuf_off} = 0; $self->{closed} = 0; my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; @@ -464,7 +443,7 @@ sub new { } retry: if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { - if ($!{EINVAL} && ($ev & $EPOLLEXCLUSIVE)) { + if ($! == EINVAL && ($ev & $EPOLLEXCLUSIVE)) { $EPOLLEXCLUSIVE = 0; # old kernel $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP; goto retry; @@ -492,50 +471,22 @@ retry: ### I N S T A N C E M E T H O D S ##################################################################### -=head2 C<< $obj->steal_socket() >> +=head2 C<< $obj->close >> -Basically returns our socket and makes it so that we don't try to close it, -but we do remove it from epoll handlers. THIS CLOSES $self. It is the same -thing as calling close, except it gives you the socket to use. - -=cut -sub steal_socket { - my PublicInbox::DS $self = $_[0]; - return if $self->{closed}; - - # cleanup does most of the work of closing this socket - $self->_cleanup(); - - # now undef our internal sock and fd structures so we don't use them - my $sock = $self->{sock}; - $self->{sock} = undef; - return $sock; -} - -=head2 C<< $obj->close( [$reason] ) >> - -Close the socket. The I<reason> argument will be used in debugging messages. +Close the socket. =cut sub close { my PublicInbox::DS $self = $_[0]; return if $self->{closed}; - # print out debugging info for this close - if (DebugLevel) { - my ($pkg, $filename, $line) = caller; - my $reason = $_[1] || ""; - warn "Closing \#$self->{fd} due to $pkg/$filename/$line ($reason)\n"; - } - # this does most of the work of closing us $self->_cleanup(); # defer closing the actual socket until the event loop is done # processing this round of events. (otherwise we might reuse fds) - if ($self->{sock}) { - push @ToClose, $self->{sock}; - $self->{sock} = undef; + if (my $sock = delete $self->{sock}) { + push @ToClose, $sock; } return 0; @@ -552,15 +503,14 @@ sub _cleanup { # we need to flush our write buffer, as there may # be self-referential closures (sub { $client->close }) # preventing the object from being destroyed - $self->{write_buf} = []; + @{$self->{wbuf}} = (); # if we're using epoll, we have to remove this from our epoll fd so we stop getting # notifications about it - if ($HaveEpoll && $self->{fd}) { - if (epoll_ctl($Epoll, EPOLL_CTL_DEL, $self->{fd}, $self->{event_watch}) != 0) { - # dump_error prints a backtrace so we can try to figure out why this happened - $self->dump_error("epoll_ctl(): failure deleting fd=$self->{fd} during _cleanup(); $! (" . ($!+0) . ")"); - } + if ($HaveEpoll && $self->{sock}) { + my $fd = fileno($self->{sock}); + epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and + confess("EPOLL_CTL_DEL: $!"); } # we explicitly don't delete from DescriptorMap here until we @@ -571,9 +521,6 @@ sub _cleanup { # looked at $pob->{closed} and ignore it. but if it's an # un-accounted for fd, then it (understandably) freak out a bit # and emit warnings, thinking their state got off. - - # and finally get rid of our fd so we can't use it anywhere else - $self->{fd} = undef; } =head2 C<< $obj->sock() >> @@ -612,12 +559,12 @@ sub write { # just queue data if there's already a wait my $need_queue; + my $wbuf = $self->{wbuf}; if (defined $data) { $bref = ref $data ? $data : \$data; - if ($self->{write_buf_size}) { - push @{$self->{write_buf}}, $bref; - $self->{write_buf_size} += ref $bref eq "SCALAR" ? length($$bref) : 1; + if (scalar @$wbuf) { + push @$wbuf, $bref; return 0; } @@ -629,7 +576,7 @@ sub write { WRITE: while (1) { - return 1 unless $bref ||= $self->{write_buf}[0]; + return 1 unless $bref ||= $wbuf->[0]; my $len; eval { @@ -638,8 +585,7 @@ sub write { if ($@) { if (UNIVERSAL::isa($bref, "CODE")) { unless ($need_queue) { - $self->{write_buf_size}--; # code refs are worth 1 - shift @{$self->{write_buf}}; + shift @$wbuf; } $bref->(); @@ -654,46 +600,33 @@ sub write { die "Write error: $@ <$bref>"; } - my $to_write = $len - $self->{write_buf_offset}; + my $to_write = $len - $self->{wbuf_off}; my $written = syswrite($self->{sock}, $$bref, $to_write, - $self->{write_buf_offset}); + $self->{wbuf_off}); if (! defined $written) { - if ($! == EPIPE) { - return $self->close("EPIPE"); - } elsif ($! == EAGAIN) { + if ($! == EAGAIN) { # since connection has stuff to write, it should now be # interested in pending writes: if ($need_queue) { - push @{$self->{write_buf}}, $bref; - $self->{write_buf_size} += $len; + push @$wbuf, $bref; } $self->watch_write(1); return 0; - } elsif ($! == ECONNRESET) { - return $self->close("ECONNRESET"); } - DebugLevel >= 1 && $self->debugmsg("Closing connection ($self) due to write error: $!\n"); - - return $self->close("write_error"); + return $self->close; } elsif ($written != $to_write) { - DebugLevel >= 2 && $self->debugmsg("Wrote PARTIAL %d bytes to %d", - $written, $self->{fd}); if ($need_queue) { - push @{$self->{write_buf}}, $bref; - $self->{write_buf_size} += $len; + push @$wbuf, $bref; } # since connection has stuff to write, it should now be # interested in pending writes: - $self->{write_buf_offset} += $written; - $self->{write_buf_size} -= $written; + $self->{wbuf_off} += $written; $self->on_incomplete_write; return 0; } elsif ($written == $to_write) { - DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)", - $written, $self->{fd}, $need_queue); - $self->{write_buf_offset} = 0; + $self->{wbuf_off} = 0; $self->watch_write(0); # this was our only write, so we can return immediately @@ -702,8 +635,7 @@ sub write { # can't be anything else to write. return 1 if $need_queue; - $self->{write_buf_size} -= $written; - shift @{$self->{write_buf}}; + shift @$wbuf; undef $bref; next WRITE; } @@ -715,63 +647,14 @@ sub on_incomplete_write { $self->watch_write(1); } -=head2 C<< $obj->read( $bytecount ) >> - -Read at most I<bytecount> bytes from the underlying handle; returns scalar -ref on read, or undef on connection closed. - -=cut -sub read { - my PublicInbox::DS $self = shift; - return if $self->{closed}; - my $bytes = shift; - my $buf; - my $sock = $self->{sock}; - - # if this is too high, perl quits(!!). reports on mailing lists - # don't seem to point to a universal answer. 5MB worked for some, - # crashed for others. 1MB works for more people. let's go with 1MB - # for now. :/ - my $req_bytes = $bytes > 1048576 ? 1048576 : $bytes; - - my $res = sysread($sock, $buf, $req_bytes, 0); - DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!); - - if (! $res && $! != EWOULDBLOCK) { - # catches 0=conn closed or undef=error - DebugLevel >= 2 && $self->debugmsg("Fd \#%d read hit the end of the road.", $self->{fd}); - return undef; - } - - return \$buf; -} - =head2 (VIRTUAL) C<< $obj->event_read() >> Readable event handler. Concrete deriviatives of PublicInbox::DS should -provide an implementation of this. The default implementation will die if -called. - -=cut -sub event_read { die "Base class event_read called for $_[0]\n"; } - -=head2 (VIRTUAL) C<< $obj->event_err() >> - -Error event handler. Concrete deriviatives of PublicInbox::DS should -provide an implementation of this. The default implementation will die if -called. - -=cut -sub event_err { die "Base class event_err called for $_[0]\n"; } - -=head2 (VIRTUAL) C<< $obj->event_hup() >> - -'Hangup' event handler. Concrete deriviatives of PublicInbox::DS should -provide an implementation of this. The default implementation will die if -called. +provide an implementation of this. The default implementation is a noop +if called. =cut -sub event_hup { die "Base class event_hup called for $_[0]\n"; } +sub event_read {} # noop =head2 C<< $obj->event_write() >> @@ -800,16 +683,16 @@ sub watch_read { $event &= ~POLLIN if ! $val; $event |= POLLIN if $val; + my $fd = fileno($self->{sock}); # If it changed, set it if ($event != $self->{event_watch}) { if ($HaveKQueue) { - $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_READ(), + $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); } elsif ($HaveEpoll) { - epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event) - and $self->dump_error("couldn't modify epoll settings for $self->{fd} " . - "from $self->{event_watch} -> $event: $! (" . ($!+0) . ")"); + epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and + confess("EPOLL_CTL_MOD: $!"); } $self->{event_watch} = $event; } @@ -829,17 +712,17 @@ sub watch_write { $event &= ~POLLOUT if ! $val; $event |= POLLOUT if $val; + my $fd = fileno($self->{sock}); # If it changed, set it if ($event != $self->{event_watch}) { if ($HaveKQueue) { - $KQueue->EV_SET($self->{fd}, IO::KQueue::EVFILT_WRITE(), + $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(), $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); } elsif ($HaveEpoll) { - epoll_ctl($Epoll, EPOLL_CTL_MOD, $self->{fd}, $event) - and $self->dump_error("couldn't modify epoll settings for $self->{fd} " . - "from $self->{event_watch} -> $event: $! (" . ($!+0) . ")"); + epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and + confess "EPOLL_CTL_MOD: $!"; } $self->{event_watch} = $event; } |