diff options
Diffstat (limited to 'lib/PublicInbox/DS.pm')
-rw-r--r-- | lib/PublicInbox/DS.pm | 108 |
1 files changed, 13 insertions, 95 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 9142f210..03612ce8 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -5,8 +5,14 @@ # # This is a fork of the (for now) unmaintained Danga::Socket 1.61. # Unused features will be removed, and updates will be made to take -# advantage of newer kernels - +# advantage of newer kernels. +# +# API changes to diverge from Danga::Socket will happen to better +# accomodate new features and improve scalability. Do not expect +# this to be a stable API like Danga::Socket. +# Bugs encountered (and likely fixed) are reported to +# bug-Danga-Socket@rt.cpan.org and visible at: +# https://rt.cpan.org/Public/Dist/Display.html?Name=Danga-Socket package PublicInbox::DS; use strict; use bytes; @@ -24,10 +30,8 @@ use fields ('sock', # underlying socket 'write_buf', # arrayref of scalars, scalarrefs, or coderefs to write 'write_buf_offset', # offset into first array of write_buf to start writing at 'write_buf_size', # total length of data in all write_buf items - 'write_set_watch', # bool: true if we internally set watch_write rather than by a subclass 'closed', # bool: socket is closed 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) - 'writer_func', # subref which does writing. must return bytes written (or undef) and set $! on errors ); use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN ENOTSOCK @@ -54,7 +58,6 @@ our ( @ToClose, # sockets to close when event loop is done $PostLoopCallback, # subref to call at the end of each loop, if defined (global) - %PLCMap, # fd (num) -> PostLoopCallback (per-object) $LoopTimeout, # timeout of event loop in milliseconds $DoneInit, # if we've done the one-time module init yet @@ -81,7 +84,6 @@ sub Reset { @Timers = (); $PostLoopCallback = undef; - %PLCMap = (); $DoneInit = 0; # NOTE kqueue is close-on-fork, and we don't account for it, yet @@ -95,24 +97,6 @@ sub Reset { *EventLoop = *FirstTimeEventLoop; } -=head2 C<< CLASS->HaveEpoll() >> - -Returns a true value if this class will use IO::Epoll for async IO. - -=cut -sub HaveEpoll { - _InitPoller(); - return $HaveEpoll; -} - -=head2 C<< CLASS->ToClose() >> - -Return the list of sockets that are awaiting close() at the end of the -current event loop. - -=cut -sub ToClose { return @ToClose; } - =head2 C<< CLASS->SetLoopTimeout( $timeout ) >> Set the loop timeout for the event loop to some value in milliseconds. @@ -270,7 +254,6 @@ sub EpollEventLoop { # get up to 1000 events my $evcount = epoll_wait($Epoll, 1000, $timeout, \@events); - EVENT: for ($i=0; $i<$evcount; $i++) { my $ev = $events[$i]; @@ -282,16 +265,6 @@ sub EpollEventLoop { my $code; my $state = $ev->[1]; - # if we didn't find a Perlbal::Socket subclass for that fd, try other - # pseudo-registered (above) fds. - if (! $pob) { - my $fd = $ev->[0]; - warn "epoll() returned fd $fd w/ state $state for which we have no mapping. removing.\n"; - epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0); - POSIX::close($fd); - next; - } - DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n", $ev->[0], ref($pob), $ev->[1], time); @@ -348,10 +321,6 @@ sub PollEventLoop { $pob = $DescriptorMap{$fd}; - if (!$pob) { - next; - } - $pob->event_read if $state & POLLIN && ! $pob->{closed}; $pob->event_write if $state & POLLOUT && ! $pob->{closed}; $pob->event_err if $state & POLLERR && ! $pob->{closed}; @@ -384,11 +353,6 @@ sub KQueueEventLoop { foreach my $kev (@ret) { my ($fd, $filter, $flags, $fflags) = @$kev; my PublicInbox::DS $pob = $DescriptorMap{$fd}; - if (!$pob) { - warn "kevent() returned fd $fd for which we have no mapping. removing.\n"; - POSIX::close($fd); # close deletes the kevent entry - next; - } DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n", $fd, ref($pob), $flags, time); @@ -423,18 +387,8 @@ The callback function will be passed two parameters: \%DescriptorMap sub SetPostLoopCallback { my ($class, $ref) = @_; - if (ref $class) { - # per-object callback - my PublicInbox::DS $self = $class; - if (defined $ref && ref $ref eq 'CODE') { - $PLCMap{$self->{fd}} = $ref; - } else { - delete $PLCMap{$self->{fd}}; - } - } else { - # global callback - $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; - } + # global callback + $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; } # Internal function: run the post-event callback, send read events @@ -460,11 +414,6 @@ sub PostEventLoop { # or global) cancels it my $keep_running = 1; - # per-object post-loop-callbacks - for my $plc (values %PLCMap) { - $keep_running &&= $plc->(\%DescriptorMap); - } - # now we're at the very end, call callback if defined if (defined $PostLoopCallback) { $keep_running &&= $PostLoopCallback->(\%DescriptorMap); @@ -614,10 +563,6 @@ sub _cleanup { } } - # now delete from mappings. this fd no longer belongs to us, so we don't want - # to get alerts for it if it becomes writable/readable/etc. - delete $PLCMap{$self->{fd}}; - # we explicitly don't delete from DescriptorMap here until we # actually close the socket, as we might be in the middle of # processing an epoll_wait/etc that returned hundreds of fds, one @@ -641,18 +586,6 @@ sub sock { return $self->{sock}; } -=head2 C<< $obj->set_writer_func( CODEREF ) >> - -Sets a function to use instead of C<syswrite()> when writing data to the socket. - -=cut -sub set_writer_func { - my PublicInbox::DS $self = shift; - my $wtr = shift; - Carp::croak("Not a subref") unless !defined $wtr || UNIVERSAL::isa($wtr, "CODE"); - $self->{writer_func} = $wtr; -} - =head2 C<< $obj->write( $data ) >> Write the specified data to the underlying handle. I<data> may be scalar, @@ -722,12 +655,8 @@ sub write { } my $to_write = $len - $self->{write_buf_offset}; - my $written; - if (my $wtr = $self->{writer_func}) { - $written = $wtr->($bref, $to_write, $self->{write_buf_offset}); - } else { - $written = syswrite($self->{sock}, $$bref, $to_write, $self->{write_buf_offset}); - } + my $written = syswrite($self->{sock}, $$bref, $to_write, + $self->{write_buf_offset}); if (! defined $written) { if ($! == EPIPE) { @@ -739,7 +668,6 @@ sub write { push @{$self->{write_buf}}, $bref; $self->{write_buf_size} += $len; } - $self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT; $self->watch_write(1); return 0; } elsif ($! == ECONNRESET) { @@ -766,11 +694,7 @@ sub write { DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)", $written, $self->{fd}, $need_queue); $self->{write_buf_offset} = 0; - - if ($self->{write_set_watch}) { - $self->watch_write(0); - $self->{write_set_watch} = 0; - } + $self->watch_write(0); # this was our only write, so we can return immediately # since we avoided incrementing the buffer size or @@ -788,7 +712,6 @@ sub write { sub on_incomplete_write { my PublicInbox::DS $self = shift; - $self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT; $self->watch_write(1); } @@ -907,11 +830,6 @@ sub watch_write { $event &= ~POLLOUT if ! $val; $event |= POLLOUT if $val; - if ($val && caller ne __PACKAGE__) { - # A subclass registered interest, it's now responsible for this. - $self->{write_set_watch} = 0; - } - # If it changed, set it if ($event != $self->{event_watch}) { if ($HaveKQueue) { |