From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 2DBC81F4BD for ; Mon, 24 Jun 2019 02:53:00 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 07/57] ds: split out from ->flush_write and ->write Date: Mon, 24 Jun 2019 02:52:08 +0000 Message-Id: <20190624025258.25592-8-e@80x24.org> In-Reply-To: <20190624025258.25592-1-e@80x24.org> References: <20190624025258.25592-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Get rid of the confusing $need_queue variable and all the associated documentation for it. Instead, make it obvious that we're either skipping the write buffer or flushing the write buffer by splitting the sub in two. --- lib/PublicInbox/DS.pm | 141 +++++++++++++++++------------------------- 1 file changed, 58 insertions(+), 83 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 154fd4dd..f1b7bab7 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -480,6 +480,42 @@ sub close { return 0; } +# returns 1 if done, 0 if incomplete +sub flush_write ($) { + my ($self) = @_; + my $sock = $self->{sock} or return 1; + my $wbuf = $self->{wbuf}; + + while (my $bref = $wbuf->[0]) { + my $ref = ref($bref); + if ($ref eq 'SCALAR') { + my $len = bytes::length($$bref); + my $off = $self->{wbuf_off} || 0; + my $to_write = $len - $off; + my $written = syswrite($sock, $$bref, $to_write, $off); + if (defined $written) { + if ($written == $to_write) { + shift @$wbuf; + } else { + $self->{wbuf_off} = $off + $written; + } + next; # keep going until EAGAIN + } elsif ($! == EAGAIN) { + $self->watch_write(1); + } else { + $self->close; + } + return 0; + } else { #($ref eq 'CODE') { + shift @$wbuf; + $bref->(); + } + } # while @$wbuf + + $self->watch_write(0); + 1; # all done +} + =head2 C<< $obj->write( $data ) >> Write the specified data to the underlying handle. I may be scalar, @@ -489,9 +525,8 @@ it returns 1, caller should stop waiting for 'writable' events) =cut sub write { - my PublicInbox::DS $self; - my $data; - ($self, $data) = @_; + my ($self, $data) = @_; + return flush_write($self) unless defined $data; # nobody should be writing to closed sockets, but caller code can # do two writes within an event, have the first fail and @@ -501,91 +536,31 @@ sub write { # just lie and say it worked. it'll be dead soon and won't be # hurt by this lie. my $sock = $self->{sock} or return 1; - - my $bref; - - # just queue data if there's already a wait - my $need_queue; + my $ref = ref $data; + my $bref = $ref ? $data : \$data; my $wbuf = $self->{wbuf}; + if (@$wbuf) { # already buffering, can't write more... + push @$wbuf, $bref; + return 0; + } elsif ($ref eq 'CODE') { + $bref->(); + return 1; + } else { + my $to_write = bytes::length($$bref); + my $written = syswrite($sock, $$bref, $to_write); - if (defined $data) { - $bref = ref $data ? $data : \$data; - if (scalar @$wbuf) { + if (defined $written) { + return 1 if $written == $to_write; + $self->{wbuf_off} = $written; + push @$wbuf, $bref; + return flush_write($self); # try until EAGAIN + } elsif ($! == EAGAIN) { push @$wbuf, $bref; - return 0; - } - - # this flag says we're bypassing the queue system, knowing we're the - # only outstanding write, and hoping we don't ever need to use it. - # if so later, though, we'll need to queue - $need_queue = 1; - } - - WRITE: - while (1) { - return 1 unless $bref ||= $wbuf->[0]; - - my $len; - eval { - $len = length($$bref); # this will die if $bref is a code ref, caught below - }; - if ($@) { - if (UNIVERSAL::isa($bref, "CODE")) { - unless ($need_queue) { - shift @$wbuf; - } - $bref->(); - - # code refs are just run and never get reenqueued - # (they're one-shot), so turn off the flag indicating the - # outstanding data needs queueing. - $need_queue = 0; - - undef $bref; - next WRITE; - } - die "Write error: $@ <$bref>"; - } - - my $off = $self->{wbuf_off} // 0; - my $to_write = $len - $off; - my $written = syswrite($sock, $$bref, $to_write, $off); - - if (! defined $written) { - if ($! == EAGAIN) { - # since connection has stuff to write, it should now be - # interested in pending writes: - if ($need_queue) { - push @$wbuf, $bref; - } - $self->watch_write(1); - return 0; - } - - return $self->close; - } elsif ($written != $to_write) { - if ($need_queue) { - push @$wbuf, $bref; - } - # since connection has stuff to write, it should now be - # interested in pending writes: - $self->{wbuf_off} = $off + $written; $self->watch_write(1); - return 0; - } elsif ($written == $to_write) { - delete $self->{wbuf_off}; - $self->watch_write(0); - - # this was our only write, so we can return immediately - # since we avoided incrementing the buffer size or - # putting it in the buffer. we also know there - # can't be anything else to write. - return 1 if $need_queue; - - shift @$wbuf; - undef $bref; - next WRITE; + } else { + $self->close; } + return 0; } } -- EW