diff options
-rw-r--r-- | lib/PublicInbox/Gcf2.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/Gcf2Client.pm | 72 | ||||
-rw-r--r-- | lib/PublicInbox/GitAsyncCat.pm | 58 | ||||
-rw-r--r-- | t/gcf2_client.t | 10 |
4 files changed, 70 insertions, 74 deletions
diff --git a/lib/PublicInbox/Gcf2.pm b/lib/PublicInbox/Gcf2.pm index 0d5c8c57..01b83c96 100644 --- a/lib/PublicInbox/Gcf2.pm +++ b/lib/PublicInbox/Gcf2.pm @@ -75,9 +75,9 @@ sub add_alt ($$) { $gcf2->add_alternate($objdir); } -# Usage: $^X -MPublicInbox::Gcf2 -e 'PublicInbox::Gcf2::loop()' +# Usage: $^X -MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop # (see lib/PublicInbox/Gcf2Client.pm) -sub loop { +sub loop () { my $gcf2 = new(); my %seen; STDERR->autoflush(1); diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm index 2022293d..397774f9 100644 --- a/lib/PublicInbox/Gcf2Client.pm +++ b/lib/PublicInbox/Gcf2Client.pm @@ -6,33 +6,35 @@ package PublicInbox::Gcf2Client; use strict; use parent qw(PublicInbox::DS); use PublicInbox::Git; -use PublicInbox::Spawn qw(popen_rd); -use IO::Handle (); -use PublicInbox::Syscall qw(EPOLLONESHOT); -use PublicInbox::DS qw(dwaitpid); +use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available +use PublicInbox::Spawn qw(spawn); +use Socket qw(AF_UNIX SOCK_STREAM); +use PublicInbox::Syscall qw(EPOLLIN EPOLLET); # fields: -# async_cat => GitAsyncCat ref (read-only pipe) -# sock => writable pipe to Gcf2::loop -# in => pipe we read from +# sock => socket to Gcf2::loop +# The rest of these fields are compatible with what PublicInbox::Git +# uses code-sharing # pid => PID of Gcf2::loop process -# owner_pid => process which spawned {pid} +# pid.owner => process which spawned {pid} +# in => same as {sock}, for compatibility with PublicInbox::Git +# inflight => array (see PublicInbox::Git) +# cat_rbuf => scalarref, may be non-existent or empty sub new { my ($rdr) = @_; my $self = bless {}, __PACKAGE__; # ensure the child process has the same @INC we do: my $env = { PERL5LIB => join(':', @INC) }; - my ($out_r, $out_w); - pipe($out_r, $out_w) or die "pipe failed: $!"; + my ($s1, $s2); + socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair $!"; $rdr //= {}; - $rdr->{0} = $out_r; - my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop()]]; - $self->{owner_pid} = $$; - @$self{qw(in pid)} = popen_rd($cmd, $env, $rdr); - fcntl($out_w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ - $out_w->autoflush(1); - $out_w->blocking(0); + $rdr->{0} = $rdr->{1} = $s2; + my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]]; + $self->{'pid.owner'} = $$; + $self->{pid} = spawn($cmd, $env, $rdr); + $s1->blocking(0); $self->{inflight} = []; - $self->SUPER::new($out_w, EPOLLONESHOT); # detect errors once + $self->{in} = $s1; + $self->SUPER::new($s1, EPOLLIN|EPOLLET); } sub fail { @@ -41,43 +43,43 @@ sub fail { PublicInbox::Git::fail($self, @_); } -sub cat_async ($$$;$) { +sub gcf2_async ($$$;$) { my ($self, $req, $cb, $arg) = @_; - my $inflight = $self->{inflight}; + my $inflight = $self->{inflight} or return $self->close; # {wbuf} is rare, I hope: cat_async_step($self, $inflight) if $self->{wbuf}; - if (!$self->write(\"$req\n")) { - $self->fail("gcf2c write: $!") if !$self->{sock}; - } + $self->fail("gcf2c write: $!") if !$self->write($req) && !$self->{sock}; push @$inflight, $req, $cb, $arg; } # ensure PublicInbox::Git::cat_async_step never calls cat_async_retry sub alternates_changed {} -# this is the write-only end of a pipe, DS->EventLoop will call this +# DS->EventLoop will call this sub event_step { my ($self) = @_; $self->flush_write; - $self->close if !$self->{in}; # process died -} + $self->close if !$self->{in} || !$self->{sock}; # process died + my $inflight = $self->{inflight}; + if ($inflight && @$inflight) { + cat_async_step($self, $inflight); + return $self->close unless $self->{in}; # process died -no warnings 'once'; + # ok, more to do, requeue for fairness + $self->requeue if @$inflight || exists($self->{cat_rbuf}); + } +} sub DESTROY { my ($self) = @_; - delete $self->{in}; - # GitAsyncCat::event_step may reap us with WNOHANG, too - my $pid = delete $self->{pid} or return; - if ($$ == $self->{owner_pid}) { - PublicInbox::DS->in_loop ? $self->close : delete($self->{sock}); - dwaitpid $pid; - } + delete $self->{sock}; # if outside EventLoop + PublicInbox::Git::DESTROY($self); } -# used by GitAsyncCat +no warnings 'once'; + *cat_async_step = \&PublicInbox::Git::cat_async_step; 1; diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm index b4d1cd69..7d1a13db 100644 --- a/lib/PublicInbox/GitAsyncCat.pm +++ b/lib/PublicInbox/GitAsyncCat.pm @@ -15,59 +15,53 @@ our $GCF2C; # singleton PublicInbox::Gcf2Client sub close { my ($self) = @_; - - if (my $gitish = delete $self->{gitish}) { - PublicInbox::Git::cat_async_abort($gitish); + if (my $git = delete $self->{git}) { + $git->cat_async_abort; } $self->SUPER::close; # PublicInbox::DS::close } sub event_step { my ($self) = @_; - my $gitish = $self->{gitish} or return; - return $self->close if ($gitish->{in} // 0) != ($self->{sock} // 1); - my $inflight = $gitish->{inflight}; + my $git = $self->{git} or return; + return $self->close if ($git->{in} // 0) != ($self->{sock} // 1); + my $inflight = $git->{inflight}; if ($inflight && @$inflight) { - $gitish->cat_async_step($inflight); + $git->cat_async_step($inflight); # child death? - if (($gitish->{in} // 0) != ($self->{sock} // 1)) { + if (($git->{in} // 0) != ($self->{sock} // 1)) { $self->close; - } elsif (@$inflight || exists $gitish->{cat_rbuf}) { + } elsif (@$inflight || exists $git->{cat_rbuf}) { # ok, more to do, requeue for fairness $self->requeue; } - } elsif ((my $pid = waitpid($gitish->{pid}, WNOHANG)) > 0) { + } elsif ((my $pid = waitpid($git->{pid}, WNOHANG)) > 0) { # May happen if the child process is killed by a BOFH # (or segfaults) - delete $gitish->{pid}; - warn "E: gitish $pid exited with \$?=$?\n"; + delete $git->{pid}; + warn "E: git $pid exited with \$?=$?\n"; $self->close; } } sub git_async_cat ($$$$) { my ($git, $oid, $cb, $arg) = @_; - my $gitish = $GCF2C //= eval { - require PublicInbox::Gcf2; + if ($GCF2C //= eval { require PublicInbox::Gcf2Client; PublicInbox::Gcf2Client::new(); - } // 0; # 0: do not retry if libgit2 or Inline::C are missing - if ($gitish) { # Gcf2 active, {inflight} may be unset due to errors - $GCF2C->{inflight} or - $gitish = $GCF2C = PublicInbox::Gcf2Client::new(); - $oid .= " $git->{git_dir}"; - } else { - $gitish = $git; + } // 0) { # 0: do not retry if libgit2 or Inline::C are missing + $GCF2C->gcf2_async(\"$oid $git->{git_dir}\n", $cb, $arg); + \undef; + } else { # read-only end of git-cat-file pipe + $git->cat_async($oid, $cb, $arg); + $git->{async_cat} //= do { + my $self = bless { git => $git }, __PACKAGE__; + $git->{in}->blocking(0); + $self->SUPER::new($git->{in}, EPOLLIN|EPOLLET); + \undef; # this is a true ref() + }; } - $gitish->cat_async($oid, $cb, $arg); - $gitish->{async_cat} //= do { - # read-only end of pipe (Gcf2Client is write-only end) - my $self = bless { gitish => $gitish }, __PACKAGE__; - $gitish->{in}->blocking(0); - $self->SUPER::new($gitish->{in}, EPOLLIN|EPOLLET); - \undef; # this is a true ref() - }; } # this is safe to call inside $cb, but not guaranteed to enqueue @@ -75,9 +69,9 @@ sub git_async_cat ($$$$) { sub git_async_prefetch { my ($git, $oid, $cb, $arg) = @_; if ($GCF2C) { - if ($GCF2C->{async_cat} && !$GCF2C->{wbuf}) { - $oid .= " $git->{git_dir}"; - return $GCF2C->cat_async($oid, $cb, $arg); + if (!$GCF2C->{wbuf}) { + $oid .= " $git->{git_dir}\n"; + return $GCF2C->gcf2_async(\$oid, $cb, $arg); # true } } elsif ($git->{async_cat} && (my $inflight = $git->{inflight})) { # we could use MAX_INFLIGHT here w/o the halving, diff --git a/t/gcf2_client.t b/t/gcf2_client.t index bae94c77..6d059cad 100644 --- a/t/gcf2_client.t +++ b/t/gcf2_client.t @@ -28,7 +28,7 @@ my $err_f = "$tmpdir/err"; PublicInbox::DS->Reset; open my $err, '>>', $err_f or BAIL_OUT $!; my $gcf2c = PublicInbox::Gcf2Client::new({ 2 => $err }); - $gcf2c->cat_async("$tree $git_a", sub { + $gcf2c->gcf2_async(\"$tree $git_a\n", sub { my ($bref, $oid, $type, $size, $arg) = @_; is($oid, $tree, 'got expected OID'); is($size, 30, 'got expected length'); @@ -44,7 +44,7 @@ my $err_f = "$tmpdir/err"; is($estr, '', 'nothing in stderr'); my $trunc = substr($tree, 0, 39); - $gcf2c->cat_async("$trunc $git_a", sub { + $gcf2c->gcf2_async(\"$trunc $git_a\n", sub { my ($bref, $oid, $type, $size, $arg) = @_; is(undef, $bref, 'missing bref is undef'); is($oid, $trunc, 'truncated OID printed'); @@ -63,7 +63,7 @@ my $err_f = "$tmpdir/err"; PublicInbox::DS->Reset; open $err, '>', $err_f or BAIL_OUT $!; $gcf2c = PublicInbox::Gcf2Client::new({ 2 => $err }); - $gcf2c->cat_async("$tree $git_b", sub { + $gcf2c->gcf2_async(\"$tree $git_b\n", sub { my ($bref, $oid, $type, $size, $arg) = @_; is(undef, $bref, 'missing bref from alt is undef'); $called++; @@ -78,7 +78,7 @@ my $err_f = "$tmpdir/err"; print $alt "$git_a/objects\n" or BAIL_OUT $!; close $alt or BAIL_OUT; my $expect = xqx(['git', "--git-dir=$git_a", qw(cat-file tree), $tree]); - $gcf2c->cat_async("$tree $git_a", sub { + $gcf2c->gcf2_async(\"$tree $git_a\n", sub { my ($bref, $oid, $type, $size, $arg) = @_; is($oid, $tree, 'oid match on alternates retry'); is($$bref, $expect, 'tree content matched'); @@ -86,5 +86,5 @@ my $err_f = "$tmpdir/err"; }); $gcf2c->cat_async_step($gcf2c->{inflight}); } -is($called, 4, 'cat_async callbacks hit'); +is($called, 4, 'gcf2_async callbacks hit'); done_testing; |