about summary refs log tree commit homepage
path: root/lib/PublicInbox/Git.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/Git.pm')
-rw-r--r--lib/PublicInbox/Git.pm30
1 files changed, 22 insertions, 8 deletions
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index a1af776b..86b80a4e 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -145,6 +145,7 @@ sub _bidi_pipe {
                 fcntl($out_w, 1031, 4096);
                 fcntl($in_r, 1031, 4096) if $batch eq '--batch-check';
         }
+        $out_w->blocking(0);
         $self->{$out} = $out_w;
         $self->{$in} = $in_r;
 }
@@ -203,7 +204,9 @@ sub cat_async_retry ($$) {
         for (my $i = 0; $i < @$inflight; $i += 3) {
                 $buf .= "$inflight->[$i]\n";
         }
+        $self->{out}->blocking(1); # brand new pipe, should never block
         print { $self->{out} } $buf or $self->fail("write error: $!");
+        $self->{out}->blocking(0);
         my $req = shift @$inflight;
         unshift(@$inflight, \$req); # \$ref to indicate retried
 
@@ -305,13 +308,27 @@ sub check_async_begin ($) {
         $self->{inflight_c} = [];
 }
 
+sub write_all {
+        my ($self, $out, $buf, $read_step, $inflight) = @_;
+        $read_step->($self, $inflight) while @$inflight >= MAX_INFLIGHT;
+        do {
+                my $w = syswrite($out, $buf);
+                if (defined $w) {
+                        return if $w == length($buf);
+                        warn "chop: $w";
+                        substr($buf, 0, $w, ''); # sv_chop
+                } elsif ($! != EAGAIN) {
+                        $self->fail("write: $!");
+                } else { warn "E: $!" }
+                $read_step->($self, $inflight);
+        } while (1);
+}
+
 sub check_async ($$$$) {
         my ($self, $oid, $cb, $arg) = @_;
         my $inflight_c = $self->{inflight_c} // check_async_begin($self);
-        while (scalar(@$inflight_c) >= MAX_INFLIGHT) {
-                check_async_step($self, $inflight_c);
-        }
-        print { $self->{out_c} } $oid, "\n" or $self->fail("write error: $!");
+        write_all($self, $self->{out_c}, $oid."\n",
+                \&check_async_step, $inflight_c);
         push(@$inflight_c, $oid, $cb, $arg);
 }
 
@@ -496,10 +513,7 @@ sub cat_async_begin {
 sub cat_async ($$$;$) {
         my ($self, $oid, $cb, $arg) = @_;
         my $inflight = $self->{inflight} // cat_async_begin($self);
-        while (scalar(@$inflight) >= MAX_INFLIGHT) {
-                cat_async_step($self, $inflight);
-        }
-        print { $self->{out} } $oid, "\n" or $self->fail("write error: $!");
+        write_all($self, $self->{out}, $oid."\n", \&cat_async_step, $inflight);
         push(@$inflight, $oid, $cb, $arg);
 }