From fdf67396c179a64154eaa6c10ac255d61ed39c01 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:08 +0000 Subject: ds: split out from ->flush_write and ->write Get rid of the confusing $need_queue variable and all the associated documentation for it. Instead, make it obvious that we're either skipping the write buffer or flushing the write buffer by splitting the sub in two. --- lib/PublicInbox/DS.pm | 141 +++++++++++++++++++++----------------------------- 1 file changed, 58 insertions(+), 83 deletions(-) (limited to 'lib/PublicInbox/DS.pm') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 154fd4dd..f1b7bab7 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -480,6 +480,42 @@ sub close { return 0; } +# returns 1 if done, 0 if incomplete +sub flush_write ($) { + my ($self) = @_; + my $sock = $self->{sock} or return 1; + my $wbuf = $self->{wbuf}; + + while (my $bref = $wbuf->[0]) { + my $ref = ref($bref); + if ($ref eq 'SCALAR') { + my $len = bytes::length($$bref); + my $off = $self->{wbuf_off} || 0; + my $to_write = $len - $off; + my $written = syswrite($sock, $$bref, $to_write, $off); + if (defined $written) { + if ($written == $to_write) { + shift @$wbuf; + } else { + $self->{wbuf_off} = $off + $written; + } + next; # keep going until EAGAIN + } elsif ($! == EAGAIN) { + $self->watch_write(1); + } else { + $self->close; + } + return 0; + } else { #($ref eq 'CODE') { + shift @$wbuf; + $bref->(); + } + } # while @$wbuf + + $self->watch_write(0); + 1; # all done +} + =head2 C<< $obj->write( $data ) >> Write the specified data to the underlying handle. I may be scalar, @@ -489,9 +525,8 @@ it returns 1, caller should stop waiting for 'writable' events) =cut sub write { - my PublicInbox::DS $self; - my $data; - ($self, $data) = @_; + my ($self, $data) = @_; + return flush_write($self) unless defined $data; # nobody should be writing to closed sockets, but caller code can # do two writes within an event, have the first fail and @@ -501,91 +536,31 @@ sub write { # just lie and say it worked. it'll be dead soon and won't be # hurt by this lie. my $sock = $self->{sock} or return 1; - - my $bref; - - # just queue data if there's already a wait - my $need_queue; + my $ref = ref $data; + my $bref = $ref ? $data : \$data; my $wbuf = $self->{wbuf}; + if (@$wbuf) { # already buffering, can't write more... + push @$wbuf, $bref; + return 0; + } elsif ($ref eq 'CODE') { + $bref->(); + return 1; + } else { + my $to_write = bytes::length($$bref); + my $written = syswrite($sock, $$bref, $to_write); - if (defined $data) { - $bref = ref $data ? $data : \$data; - if (scalar @$wbuf) { + if (defined $written) { + return 1 if $written == $to_write; + $self->{wbuf_off} = $written; + push @$wbuf, $bref; + return flush_write($self); # try until EAGAIN + } elsif ($! == EAGAIN) { push @$wbuf, $bref; - return 0; - } - - # this flag says we're bypassing the queue system, knowing we're the - # only outstanding write, and hoping we don't ever need to use it. - # if so later, though, we'll need to queue - $need_queue = 1; - } - - WRITE: - while (1) { - return 1 unless $bref ||= $wbuf->[0]; - - my $len; - eval { - $len = length($$bref); # this will die if $bref is a code ref, caught below - }; - if ($@) { - if (UNIVERSAL::isa($bref, "CODE")) { - unless ($need_queue) { - shift @$wbuf; - } - $bref->(); - - # code refs are just run and never get reenqueued - # (they're one-shot), so turn off the flag indicating the - # outstanding data needs queueing. - $need_queue = 0; - - undef $bref; - next WRITE; - } - die "Write error: $@ <$bref>"; - } - - my $off = $self->{wbuf_off} // 0; - my $to_write = $len - $off; - my $written = syswrite($sock, $$bref, $to_write, $off); - - if (! defined $written) { - if ($! == EAGAIN) { - # since connection has stuff to write, it should now be - # interested in pending writes: - if ($need_queue) { - push @$wbuf, $bref; - } - $self->watch_write(1); - return 0; - } - - return $self->close; - } elsif ($written != $to_write) { - if ($need_queue) { - push @$wbuf, $bref; - } - # since connection has stuff to write, it should now be - # interested in pending writes: - $self->{wbuf_off} = $off + $written; $self->watch_write(1); - return 0; - } elsif ($written == $to_write) { - delete $self->{wbuf_off}; - $self->watch_write(0); - - # this was our only write, so we can return immediately - # since we avoided incrementing the buffer size or - # putting it in the buffer. we also know there - # can't be anything else to write. - return 1 if $need_queue; - - shift @$wbuf; - undef $bref; - next WRITE; + } else { + $self->close; } + return 0; } } -- cgit v1.2.3-24-ge0c7