From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id F1BDE1F4B6 for ; Mon, 24 Jun 2019 02:52:58 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 01/57] ds: get rid of {closed} field Date: Mon, 24 Jun 2019 02:52:02 +0000 Message-Id: <20190624025258.25592-2-e@80x24.org> In-Reply-To: <20190624025258.25592-1-e@80x24.org> References: <20190624025258.25592-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Merely checking the presence of the {sock} field is enough, and having multiple sources of truth increases confusion and the likelyhood of bugs. --- lib/PublicInbox/DS.pm | 52 ++++++++++++---------------------- lib/PublicInbox/HTTP.pm | 8 +++--- lib/PublicInbox/HTTPD/Async.pm | 2 +- lib/PublicInbox/NNTP.pm | 30 +++++++++----------- 4 files changed, 37 insertions(+), 55 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 2b04886a..f4fe8793 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -28,7 +28,6 @@ use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket 'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write 'wbuf_off', # offset into first element of wbuf to start writing at - 'closed', # bool: socket is closed 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) ); @@ -366,7 +365,7 @@ sub PostEventLoop { $sock->close; # and now we can finally remove the fd from the map. see - # comment above in _cleanup. + # comment above in ->close. delete $DescriptorMap{$fd}; } @@ -411,7 +410,6 @@ sub new { $self->{wbuf} = []; $self->{wbuf_off} = 0; - $self->{closed} = 0; my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; @@ -457,28 +455,8 @@ Close the socket. =cut sub close { - my PublicInbox::DS $self = $_[0]; - return if $self->{closed}; - - # this does most of the work of closing us - $self->_cleanup(); - - # defer closing the actual socket until the event loop is done - # processing this round of events. (otherwise we might reuse fds) - if (my $sock = delete $self->{sock}) { - push @ToClose, $sock; - } - - return 0; -} - -### METHOD: _cleanup() -### Called by our closers so we can clean internal data structures. -sub _cleanup { - my PublicInbox::DS $self = $_[0]; - - # we're effectively closed; we have no fd and sock when we leave here - $self->{closed} = 1; + my ($self) = @_; + my $sock = delete $self->{sock} or return; # we need to flush our write buffer, as there may # be self-referential closures (sub { $client->close }) @@ -487,8 +465,8 @@ sub _cleanup { # if we're using epoll, we have to remove this from our epoll fd so we stop getting # notifications about it - if ($HaveEpoll && $self->{sock}) { - my $fd = fileno($self->{sock}); + if ($HaveEpoll) { + my $fd = fileno($sock); epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and confess("EPOLL_CTL_DEL: $!"); } @@ -498,9 +476,15 @@ sub _cleanup { # processing an epoll_wait/etc that returned hundreds of fds, one # of which is not yet processed and is what we're closing. if we # keep it in DescriptorMap, then the event harnesses can just - # looked at $pob->{closed} and ignore it. but if it's an + # looked at $pob->{sock} == undef and ignore it. but if it's an # un-accounted for fd, then it (understandably) freak out a bit # and emit warnings, thinking their state got off. + + # defer closing the actual socket until the event loop is done + # processing this round of events. (otherwise we might reuse fds) + push @ToClose, $sock; + + return 0; } =head2 C<< $obj->sock() >> @@ -533,7 +517,7 @@ sub write { # now-dead object does its second write. that is this case. we # just lie and say it worked. it'll be dead soon and won't be # hurt by this lie. - return 1 if $self->{closed}; + return 1 unless $self->{sock}; my $bref; @@ -634,7 +618,7 @@ Turn 'readable' event notification on or off. =cut sub watch_read { my PublicInbox::DS $self = shift; - return if $self->{closed} || !$self->{sock}; + my $sock = $self->{sock} or return; my $val = shift; my $event = $self->{event_watch}; @@ -642,7 +626,7 @@ sub watch_read { $event &= ~POLLIN if ! $val; $event |= POLLIN if $val; - my $fd = fileno($self->{sock}); + my $fd = fileno($sock); # If it changed, set it if ($event != $self->{event_watch}) { if ($HaveKQueue) { @@ -664,14 +648,14 @@ Turn 'writable' event notification on or off. =cut sub watch_write { my PublicInbox::DS $self = shift; - return if $self->{closed} || !$self->{sock}; + my $sock = $self->{sock} or return; my $val = shift; my $event = $self->{event_watch}; $event &= ~POLLOUT if ! $val; $event |= POLLOUT if $val; - my $fd = fileno($self->{sock}); + my $fd = fileno($sock); # If it changed, set it if ($event != $self->{event_watch}) { @@ -728,7 +712,7 @@ sub as_string { my PublicInbox::DS $self = shift; my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') . ($self->{event_watch} & POLLOUT ? 'W' : '') . ")"; - my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open"); + my $ret = ref($self) . "$rw: " . ($self->{sock} ? 'open' : 'closed'); return $ret; } diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 45bf23ec..dff59286 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -33,7 +33,7 @@ sub process_pipelineq () { $pipet = undef; $pipelineq = []; foreach (@$q) { - next if $_->{closed}; + next unless $_->{sock}; rbuf_process($_); } } @@ -70,7 +70,7 @@ sub event_step { # called by PublicInbox::DS my $wbuf = $self->{wbuf}; if (@$wbuf) { $self->write(undef); - return if $self->{closed} || scalar(@$wbuf); + return if !$self->{sock} || scalar(@$wbuf); } # only read more requests if we've drained the write buffer, # otherwise we can be buffering infinitely w/o backpressure @@ -266,7 +266,7 @@ sub getline_cb ($$$) { my $buf = eval { $forward->getline }; if (defined $buf) { $write->($buf); # may close in PublicInbox::DS::write - unless ($self->{closed}) { + if ($self->{sock}) { my $next = $self->{pull}; if (scalar @{$self->{wbuf}}) { $self->write($next); @@ -322,7 +322,7 @@ sub response_write { use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; sub more ($$) { my $self = $_[0]; - return if $self->{closed}; + return unless $self->{sock}; if (MSG_MORE && !scalar(@{$self->{wbuf}})) { my $n = send($self->{sock}, $_[1], MSG_MORE); if (defined $n) { diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 604627ab..261a01e0 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -45,7 +45,7 @@ sub main_cb ($$$) { my $r = sysread($self->{sock}, $$bref, 8192); if ($r) { $fh->write($$bref); - unless ($http->{closed}) { # PublicInbox::DS sets this + if ($http->{sock}) { # !closed if (scalar @{$http->{wbuf}}) { $self->watch_read(0); $http->write(restart_read_cb($self)); diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 796ac74d..107cbe31 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -57,7 +57,7 @@ sub next_tick () { # maybe there's more pipelined data, or we'll have # to register it for socket-readiness notifications - if (!$nntp->{long_res} && !$nntp->{closed}) { + if (!$nntp->{long_res} && $nntp->{sock}) { check_read($nntp); } } @@ -66,9 +66,8 @@ sub next_tick () { sub update_idle_time ($) { my ($self) = @_; - my $sock = $self->{sock} or return; - my $fd = fileno($sock); - defined $fd and $EXPMAP->{$fd} = [ now(), $self ]; + my $sock = $self->{sock} or return; + $EXPMAP->{fileno($sock)} = [ now(), $self ]; } sub expire_old () { @@ -134,7 +133,7 @@ sub process_line ($$) { my $res = eval { $req->($self, @args) }; my $err = $@; - if ($err && !$self->{closed}) { + if ($err && $self->{sock}) { local $/ = "\n"; chomp($l); err($self, 'error from: %s (%s)', $l, $err); @@ -632,7 +631,7 @@ sub long_response ($$) { my $t0 = now(); $self->{long_res} = sub { my $more = eval { $cb->() }; - if ($@ || $self->{closed}) { + if ($@ || !$self->{sock}) { $self->{long_res} = undef; if ($@) { @@ -640,12 +639,12 @@ sub long_response ($$) { "%s during long response[$fd] - %0.6f", $@, now() - $t0); } - if ($self->{closed}) { - out($self, " deferred[$fd] aborted - %0.6f", - now() - $t0); - } else { + if ($self->{sock}) { update_idle_time($self); check_read($self); + } else { + out($self, " deferred[$fd] aborted - %0.6f", + now() - $t0); } } elsif ($more) { # scalar @{$self->{wbuf}}: # no recursion, schedule another call ASAP @@ -930,7 +929,7 @@ sub more ($$) { sub do_write ($$) { my ($self, $data) = @_; my $done = $self->write($data); - return 0 if $self->{closed}; + return 0 unless $self->{sock}; # Do not watch for readability if we have data in the queue, # instead re-enable watching for readability when we can @@ -966,13 +965,13 @@ sub do_more ($$) { sub event_step { my ($self) = @_; - return if $self->{closed}; + return unless $self->{sock}; my $wbuf = $self->{wbuf}; if (@$wbuf) { update_idle_time($self); $self->write(undef); - return if $self->{closed} || scalar(@$wbuf); + return if !$self->{sock} || scalar(@$wbuf); } return if $self->{long_res}; # only read more requests if we've drained the write buffer, @@ -1028,9 +1027,8 @@ sub check_read { sub not_idle_long ($$) { my ($self, $now) = @_; - my $sock = $self->{sock} or return; - defined(my $fd = fileno($sock)) or return; - my $ary = $EXPMAP->{$fd} or return; + my $sock = $self->{sock} or return; + my $ary = $EXPMAP->{fileno($sock)} or return; my $exp_at = $ary->[0] + $EXPTIME; $exp_at > $now; } -- EW