about summary refs log tree commit homepage
path: root/lib/PublicInbox/DS.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2019-06-24 02:52:08 +0000
committerEric Wong <e@80x24.org>2019-06-24 05:26:25 +0000
commitfdf67396c179a64154eaa6c10ac255d61ed39c01 (patch)
treef077a630b81365869ffcbe630f2183737d3d8a1f /lib/PublicInbox/DS.pm
parentddba176a763dd7f36e3aa53b87907c6226207efa (diff)
downloadpublic-inbox-fdf67396c179a64154eaa6c10ac255d61ed39c01.tar.gz
ds: split out from ->flush_write and ->write
Get rid of the confusing $need_queue variable and all
the associated documentation for it.  Instead, make it
obvious that we're either skipping the write buffer or
flushing the write buffer by splitting the sub in two.
Diffstat (limited to 'lib/PublicInbox/DS.pm')
-rw-r--r--lib/PublicInbox/DS.pm141
1 files changed, 58 insertions, 83 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 154fd4dd..f1b7bab7 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -480,6 +480,42 @@ sub close {
     return 0;
 }
 
+# returns 1 if done, 0 if incomplete
+sub flush_write ($) {
+    my ($self) = @_;
+    my $sock = $self->{sock} or return 1;
+    my $wbuf = $self->{wbuf};
+
+    while (my $bref = $wbuf->[0]) {
+        my $ref = ref($bref);
+        if ($ref eq 'SCALAR') {
+            my $len = bytes::length($$bref);
+            my $off = $self->{wbuf_off} || 0;
+            my $to_write = $len - $off;
+            my $written = syswrite($sock, $$bref, $to_write, $off);
+            if (defined $written) {
+                if ($written == $to_write) {
+                    shift @$wbuf;
+                } else {
+                    $self->{wbuf_off} = $off + $written;
+                }
+                next; # keep going until EAGAIN
+            } elsif ($! == EAGAIN) {
+                $self->watch_write(1);
+            } else {
+                $self->close;
+            }
+            return 0;
+        } else { #($ref eq 'CODE') {
+            shift @$wbuf;
+            $bref->();
+        }
+    } # while @$wbuf
+
+    $self->watch_write(0);
+    1; # all done
+}
+
 =head2 C<< $obj->write( $data ) >>
 
 Write the specified data to the underlying handle.  I<data> may be scalar,
@@ -489,9 +525,8 @@ it returns 1, caller should stop waiting for 'writable' events)
 
 =cut
 sub write {
-    my PublicInbox::DS $self;
-    my $data;
-    ($self, $data) = @_;
+    my ($self, $data) = @_;
+    return flush_write($self) unless defined $data;
 
     # nobody should be writing to closed sockets, but caller code can
     # do two writes within an event, have the first fail and
@@ -501,91 +536,31 @@ sub write {
     # just lie and say it worked.  it'll be dead soon and won't be
     # hurt by this lie.
     my $sock = $self->{sock} or return 1;
-
-    my $bref;
-
-    # just queue data if there's already a wait
-    my $need_queue;
+    my $ref = ref $data;
+    my $bref = $ref ? $data : \$data;
     my $wbuf = $self->{wbuf};
+    if (@$wbuf) { # already buffering, can't write more...
+        push @$wbuf, $bref;
+        return 0;
+    } elsif ($ref eq 'CODE') {
+        $bref->();
+        return 1;
+    } else {
+        my $to_write = bytes::length($$bref);
+        my $written = syswrite($sock, $$bref, $to_write);
 
-    if (defined $data) {
-        $bref = ref $data ? $data : \$data;
-        if (scalar @$wbuf) {
+        if (defined $written) {
+            return 1 if $written == $to_write;
+            $self->{wbuf_off} = $written;
+            push @$wbuf, $bref;
+            return flush_write($self); # try until EAGAIN
+        } elsif ($! == EAGAIN) {
             push @$wbuf, $bref;
-            return 0;
-        }
-
-        # this flag says we're bypassing the queue system, knowing we're the
-        # only outstanding write, and hoping we don't ever need to use it.
-        # if so later, though, we'll need to queue
-        $need_queue = 1;
-    }
-
-  WRITE:
-    while (1) {
-        return 1 unless $bref ||= $wbuf->[0];
-
-        my $len;
-        eval {
-            $len = length($$bref); # this will die if $bref is a code ref, caught below
-        };
-        if ($@) {
-            if (UNIVERSAL::isa($bref, "CODE")) {
-                unless ($need_queue) {
-                    shift @$wbuf;
-                }
-                $bref->();
-
-                # code refs are just run and never get reenqueued
-                # (they're one-shot), so turn off the flag indicating the
-                # outstanding data needs queueing.
-                $need_queue = 0;
-
-                undef $bref;
-                next WRITE;
-            }
-            die "Write error: $@ <$bref>";
-        }
-
-        my $off = $self->{wbuf_off} // 0;
-        my $to_write = $len - $off;
-        my $written = syswrite($sock, $$bref, $to_write, $off);
-
-        if (! defined $written) {
-            if ($! == EAGAIN) {
-                # since connection has stuff to write, it should now be
-                # interested in pending writes:
-                if ($need_queue) {
-                    push @$wbuf, $bref;
-                }
-                $self->watch_write(1);
-                return 0;
-            }
-
-            return $self->close;
-        } elsif ($written != $to_write) {
-            if ($need_queue) {
-                push @$wbuf, $bref;
-            }
-            # since connection has stuff to write, it should now be
-            # interested in pending writes:
-            $self->{wbuf_off} = $off + $written;
             $self->watch_write(1);
-            return 0;
-        } elsif ($written == $to_write) {
-            delete $self->{wbuf_off};
-            $self->watch_write(0);
-
-            # this was our only write, so we can return immediately
-            # since we avoided incrementing the buffer size or
-            # putting it in the buffer.  we also know there
-            # can't be anything else to write.
-            return 1 if $need_queue;
-
-            shift @$wbuf;
-            undef $bref;
-            next WRITE;
+        } else {
+            $self->close;
         }
+        return 0;
     }
 }