diff options
-rw-r--r-- | lib/PublicInbox/DS.pm | 141 |
1 files changed, 58 insertions, 83 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 154fd4dd..f1b7bab7 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -480,6 +480,42 @@ sub close { return 0; } +# returns 1 if done, 0 if incomplete +sub flush_write ($) { + my ($self) = @_; + my $sock = $self->{sock} or return 1; + my $wbuf = $self->{wbuf}; + + while (my $bref = $wbuf->[0]) { + my $ref = ref($bref); + if ($ref eq 'SCALAR') { + my $len = bytes::length($$bref); + my $off = $self->{wbuf_off} || 0; + my $to_write = $len - $off; + my $written = syswrite($sock, $$bref, $to_write, $off); + if (defined $written) { + if ($written == $to_write) { + shift @$wbuf; + } else { + $self->{wbuf_off} = $off + $written; + } + next; # keep going until EAGAIN + } elsif ($! == EAGAIN) { + $self->watch_write(1); + } else { + $self->close; + } + return 0; + } else { #($ref eq 'CODE') { + shift @$wbuf; + $bref->(); + } + } # while @$wbuf + + $self->watch_write(0); + 1; # all done +} + =head2 C<< $obj->write( $data ) >> Write the specified data to the underlying handle. I<data> may be scalar, @@ -489,9 +525,8 @@ it returns 1, caller should stop waiting for 'writable' events) =cut sub write { - my PublicInbox::DS $self; - my $data; - ($self, $data) = @_; + my ($self, $data) = @_; + return flush_write($self) unless defined $data; # nobody should be writing to closed sockets, but caller code can # do two writes within an event, have the first fail and @@ -501,91 +536,31 @@ sub write { # just lie and say it worked. it'll be dead soon and won't be # hurt by this lie. my $sock = $self->{sock} or return 1; - - my $bref; - - # just queue data if there's already a wait - my $need_queue; + my $ref = ref $data; + my $bref = $ref ? $data : \$data; my $wbuf = $self->{wbuf}; + if (@$wbuf) { # already buffering, can't write more... + push @$wbuf, $bref; + return 0; + } elsif ($ref eq 'CODE') { + $bref->(); + return 1; + } else { + my $to_write = bytes::length($$bref); + my $written = syswrite($sock, $$bref, $to_write); - if (defined $data) { - $bref = ref $data ? $data : \$data; - if (scalar @$wbuf) { + if (defined $written) { + return 1 if $written == $to_write; + $self->{wbuf_off} = $written; + push @$wbuf, $bref; + return flush_write($self); # try until EAGAIN + } elsif ($! == EAGAIN) { push @$wbuf, $bref; - return 0; - } - - # this flag says we're bypassing the queue system, knowing we're the - # only outstanding write, and hoping we don't ever need to use it. - # if so later, though, we'll need to queue - $need_queue = 1; - } - - WRITE: - while (1) { - return 1 unless $bref ||= $wbuf->[0]; - - my $len; - eval { - $len = length($$bref); # this will die if $bref is a code ref, caught below - }; - if ($@) { - if (UNIVERSAL::isa($bref, "CODE")) { - unless ($need_queue) { - shift @$wbuf; - } - $bref->(); - - # code refs are just run and never get reenqueued - # (they're one-shot), so turn off the flag indicating the - # outstanding data needs queueing. - $need_queue = 0; - - undef $bref; - next WRITE; - } - die "Write error: $@ <$bref>"; - } - - my $off = $self->{wbuf_off} // 0; - my $to_write = $len - $off; - my $written = syswrite($sock, $$bref, $to_write, $off); - - if (! defined $written) { - if ($! == EAGAIN) { - # since connection has stuff to write, it should now be - # interested in pending writes: - if ($need_queue) { - push @$wbuf, $bref; - } - $self->watch_write(1); - return 0; - } - - return $self->close; - } elsif ($written != $to_write) { - if ($need_queue) { - push @$wbuf, $bref; - } - # since connection has stuff to write, it should now be - # interested in pending writes: - $self->{wbuf_off} = $off + $written; $self->watch_write(1); - return 0; - } elsif ($written == $to_write) { - delete $self->{wbuf_off}; - $self->watch_write(0); - - # this was our only write, so we can return immediately - # since we avoided incrementing the buffer size or - # putting it in the buffer. we also know there - # can't be anything else to write. - return 1 if $need_queue; - - shift @$wbuf; - undef $bref; - next WRITE; + } else { + $self->close; } + return 0; } } |