From d78f50649a5545d66a61b5465ca7f5ce4be398ea Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 19 Sep 2020 09:37:14 +0000 Subject: gcf2: wire up read-only daemons and rm -gcf2 script It seems easiest to have a singleton Gcf2Client client object per daemon worker for all inboxes to use. This reduces overall FD usage from pipes. The `public-inbox-gcf2' command + manpage are gone and a `$^X' one-liner is used, instead. This saves inodes for internal commands and hopefully makes it easier to avoid mismatched PERL5LIB include paths (as noticed during development :x). We'll also make the existing cat-file process management infrastructure more resilient to BOFHs on process killing sprees (or in case our libgit2-based code fails on us). (Rare) PublicInbox::WWW PSGI users NOT using public-inbox-httpd won't automatically benefit from this change, and extra configuration will be required (to be documented later). --- lib/PublicInbox/Daemon.pm | 11 ++++++++ lib/PublicInbox/Gcf2.pm | 36 ++++++++++++++++++++++++-- lib/PublicInbox/Gcf2Client.pm | 59 ++++++++++++++++++++++++++++++++---------- lib/PublicInbox/Git.pm | 31 ++++++++++++++-------- lib/PublicInbox/GitAsyncCat.pm | 55 ++++++++++++++++++++++++++++++++++++--- lib/PublicInbox/IMAP.pm | 2 +- 6 files changed, 163 insertions(+), 31 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index b929ec2a..1520f8f2 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -19,6 +19,7 @@ use PublicInbox::Syscall qw($SFD_NONBLOCK); require PublicInbox::Listener; use PublicInbox::EOFpipe; use PublicInbox::Sigfd; +use PublicInbox::GitAsyncCat; my @CMD; my ($set_user, $oldset); my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize); @@ -652,6 +653,16 @@ sub run ($$$;$) { daemon_prepare($default); my $af_default = $default =~ /:8080\z/ ? 'httpready' : undef; my $for_destroy = daemonize(); + + # this wastes a bit of memory for non-PublicInbox::WWW -httpd users + # oh well... + eval { + require PublicInbox::Gcf2; + require PublicInbox::Gcf2Client; + }; + local $PublicInbox::GitAsyncCat::GCF2C = + PublicInbox::Gcf2Client::new() if !$@; + daemon_loop($refresh, $post_accept, $tlsd, $af_default); PublicInbox::DS->Reset; # ->DESTROY runs when $for_destroy goes out-of-scope diff --git a/lib/PublicInbox/Gcf2.pm b/lib/PublicInbox/Gcf2.pm index fe76b1fd..7983c841 100644 --- a/lib/PublicInbox/Gcf2.pm +++ b/lib/PublicInbox/Gcf2.pm @@ -1,12 +1,13 @@ # Copyright (C) 2020 all contributors # License: AGPL-3.0+ -# backend for public-inbox-gcf2(1) (git-cat-file based on libgit2, -# other libgit2 stuff may go here, too) +# backend for a git-cat-file-workalike based on libgit2, +# other libgit2 stuff may go here, too. package PublicInbox::Gcf2; use strict; use PublicInbox::Spawn qw(which popen_rd); use Fcntl qw(LOCK_EX); +use IO::Handle; # autoflush my (%CFG, $c_src, $lockfh); BEGIN { # PublicInbox::Spawn will set PERL_INLINE_DIRECTORY @@ -54,4 +55,35 @@ use Inline C => $c_src; undef $c_src; undef %CFG; undef $lockfh; + +# Usage: $^X -MPublicInbox::Gcf2 -e 'PublicInbox::Gcf2::loop()' +# (see lib/PublicInbox/Gcf2Client.pm) +sub loop { + my $gcf2 = new(); + STDERR->autoflush(1); + STDOUT->autoflush(1); + + while () { + chomp; + my ($oid, $git_dir) = split(/ /, $_, 2); + $gcf2->add_alternate("$git_dir/objects"); + if (!$gcf2->cat_oid(1, $oid)) { + # retry once if missing. We only get unabbreviated OIDs + # from SQLite or Xapian DBs, here, so malicious clients + # can't trigger excessive retries: + warn "I: $$ $oid missing, retrying in $git_dir\n"; + + $gcf2 = new(); + $gcf2->add_alternate("$git_dir/objects"); + + if ($gcf2->cat_oid(1, $oid)) { + warn "I: $$ $oid found after retry\n"; + } else { + warn "W: $$ $oid missing after retry\n"; + print "$oid missing\n"; # mimic git-cat-file + } + } + } +} + 1; diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm index 30f85c71..42ff1bf3 100644 --- a/lib/PublicInbox/Gcf2Client.pm +++ b/lib/PublicInbox/Gcf2Client.pm @@ -1,29 +1,62 @@ # Copyright (C) 2020 all contributors # License: AGPL-3.0+ -# connects public-inbox processes to public-inbox-gcf2(1) +# connects public-inbox processes to PublicInbox::Gcf2::loop() package PublicInbox::Gcf2Client; use strict; -use parent 'PublicInbox::Git'; +use parent qw(PublicInbox::DS); +use PublicInbox::Git; use PublicInbox::Spawn qw(popen_rd); use IO::Handle (); +use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLOUT); +# fields: +# async_cat => GitAsyncCat ref (read-only pipe) +# sock => writable pipe to Gcf2::loop -sub new { - my ($rdr) = @_; - my $self = bless {}, __PACKAGE__; +sub new { bless($_[0] // {}, __PACKAGE__) } + +sub gcf2c_begin ($) { + my ($self) = @_; + # ensure the child process has the same @INC we do: + my $env = { PERL5LIB => join(':', @INC) }; my ($out_r, $out_w); - pipe($out_r, $out_w) or $self->fail("pipe failed: $!"); - $rdr //= {}; - $rdr->{0} = $out_r; - @$self{qw(in pid)} = popen_rd(['public-inbox-gcf2'], undef, $rdr); - $self->{inflight} = []; - $self->{out} = $out_w; + pipe($out_r, $out_w) or die "pipe failed: $!"; + my $rdr = { 0 => $out_r, 2 => $self->{2} }; + my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop()]]; + @$self{qw(in pid)} = popen_rd($cmd, $env, $rdr); fcntl($out_w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ $out_w->autoflush(1); - $self; + $out_w->blocking(0); + $self->SUPER::new($out_w, 0); # EPOLL_CTL_ADD (a bit wasteful :x) + $self->{inflight} = []; +} + +sub fail { + my $self = shift; + $self->close; # PublicInbox::DS::close + PublicInbox::Git::fail($self, @_); +} + +sub cat_async ($$$;$) { + my ($self, $req, $cb, $arg) = @_; + my $inflight = $self->{inflight} // gcf2c_begin($self); + + # rare, I hope: + cat_async_step($self, $inflight) if $self->{wbuf}; + + $self->write(\"$req\n") or $self->fail("gcf2c write: $!"); + push @$inflight, $req, $cb, $arg; } -# always false, since -gcf2 retries internally +# ensure PublicInbox::Git::cat_async_step never calls cat_async_retry sub alternates_changed {} +no warnings 'once'; + +# this is the write-only end of a pipe, DS->EventLoop will call this +*event_step = \&PublicInbox::DS::flush_write; + +# used by GitAsyncCat +*cat_async_step = \&PublicInbox::Git::cat_async_step; + 1; diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 6bb82b6b..2323cecc 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -185,11 +185,12 @@ sub cat_async_step ($$) { my $rbuf = delete($self->{cat_rbuf}) // \(my $new = ''); my ($bref, $oid, $type, $size); my $head = my_readline($self->{in}, $rbuf); + # ->fail may be called via Gcf2Client.pm if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) { ($oid, $type, $size) = ($1, $2, $3 + 0); $bref = my_read($self->{in}, $rbuf, $size + 1) or - fail($self, defined($bref) ? 'read EOF' : "read: $!"); - chop($$bref) eq "\n" or fail($self, 'LF missing after blob'); + $self->fail(defined($bref) ? 'read EOF' : "read: $!"); + chop($$bref) eq "\n" or $self->fail('LF missing after blob'); } elsif ($head =~ s/ missing\n//s) { $oid = $head; # ref($req) indicates it's already been retried @@ -201,7 +202,7 @@ sub cat_async_step ($$) { $type = 'missing'; $oid = ref($req) ? $$req : $req if $oid eq ''; } else { - fail($self, "Unexpected result from async git cat-file: $head"); + $self->fail("Unexpected result from async git cat-file: $head"); } eval { $cb->($bref, $oid, $type, $size, $arg) }; $self->{cat_rbuf} = $rbuf if $$rbuf ne ''; @@ -304,10 +305,12 @@ sub check { sub _destroy { my ($self, $rbuf, $in, $out, $pid, $err) = @_; - my $p = delete $self->{$pid} or return; delete @$self{($rbuf, $in, $out)}; delete $self->{$err} if $err; # `err_c' + # GitAsyncCat::event_step may delete {pid} + my $p = delete $self->{$pid} or return; + # PublicInbox::DS may not be loaded eval { PublicInbox::DS::dwaitpid($p, undef, undef) }; waitpid($p, 0) if $@; # wait synchronously if not in event loop @@ -315,14 +318,21 @@ sub _destroy { sub cat_async_abort ($) { my ($self) = @_; - my $inflight = delete $self->{inflight} or die 'BUG: not in async'; + if (my $inflight = delete $self->{inflight}) { + while (@$inflight) { + my ($req, $cb, $arg) = splice(@$inflight, 0, 3); + $req =~ s/ .*//; # drop git_dir for Gcf2Client + eval { $cb->(undef, $req, undef, undef, $arg) }; + warn "E: $req: $@ (in abort)\n" if $@; + } + } cleanup($self); } sub fail { my ($self, $msg) = @_; - $self->{inflight} ? cat_async_abort($self) : cleanup($self); - croak("git $self->{git_dir}: $msg"); + cat_async_abort($self); + croak(ref($self) . ' ' . ($self->{git_dir} // '') . ": $msg"); } sub popen { @@ -352,6 +362,7 @@ sub cleanup { !!($self->{pid} || $self->{pid_c}); } + # assuming a well-maintained repo, this should be a somewhat # accurate estimation of its size # TODO: show this in the WWW UI as a hint to potential cloners @@ -397,7 +408,7 @@ sub pub_urls { sub cat_async_begin { my ($self) = @_; cleanup($self) if $self->alternates_changed; - batch_prepare($self); + $self->batch_prepare; die 'BUG: already in async' if $self->{inflight}; $self->{inflight} = []; } @@ -413,11 +424,9 @@ sub cat_async ($$$;$) { push(@$inflight, $oid, $cb, $arg); } -# this is safe to call inside $cb, but not guaranteed to enqueue -# returns true if successful, undef if not. sub async_prefetch { my ($self, $oid, $cb, $arg) = @_; - if (defined($self->{async_cat}) && (my $inflight = $self->{inflight})) { + if (my $inflight = $self->{inflight}) { # we could use MAX_INFLIGHT here w/o the halving, # but lets not allow one client to monopolize a git process if (scalar(@$inflight) < int(MAX_INFLIGHT/2)) { diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm index 8a54c608..b9dbe0cc 100644 --- a/lib/PublicInbox/GitAsyncCat.pm +++ b/lib/PublicInbox/GitAsyncCat.pm @@ -11,23 +11,49 @@ package PublicInbox::GitAsyncCat; use strict; use parent qw(PublicInbox::DS Exporter); +use POSIX qw(WNOHANG); use PublicInbox::Syscall qw(EPOLLIN EPOLLET); -our @EXPORT = qw(git_async_cat); +our @EXPORT = qw(git_async_cat git_async_prefetch); +use PublicInbox::Git (); + +our $GCF2C; # singleton PublicInbox::Gcf2Client + +sub close { + my ($self) = @_; + + if (my $gitish = delete $self->{gitish}) { + PublicInbox::Git::cat_async_abort($gitish); + } + $self->SUPER::close; # PublicInbox::DS::close +} sub event_step { my ($self) = @_; - my $gitish = $self->{gitish}; + my $gitish = $self->{gitish} or return; return $self->close if ($gitish->{in} // 0) != ($self->{sock} // 1); my $inflight = $gitish->{inflight}; if ($inflight && @$inflight) { $gitish->cat_async_step($inflight); - $self->requeue if @$inflight || exists $gitish->{cat_rbuf}; + + # child death? + if (($gitish->{in} // 0) != ($self->{sock} // 1)) { + $self->close; + } elsif (@$inflight || exists $gitish->{cat_rbuf}) { + # ok, more to do, requeue for fairness + $self->requeue; + } + } elsif ((my $pid = waitpid($gitish->{pid}, WNOHANG)) > 0) { + # May happen if the child process is killed by a BOFH + # (or segfaults) + delete $gitish->{pid}; + warn "E: gitish $pid exited with \$?=$?\n"; + $self->close; } } sub git_async_cat ($$$$) { my ($git, $oid, $cb, $arg) = @_; - my $gitish = $git->{gcf2c}; # PublicInbox::Gcf2Client + my $gitish = $GCF2C; if ($gitish) { $oid .= " $git->{git_dir}"; } else { @@ -41,4 +67,25 @@ sub git_async_cat ($$$$) { }; } +# this is safe to call inside $cb, but not guaranteed to enqueue +# returns true if successful, undef if not. +sub git_async_prefetch { + my ($git, $oid, $cb, $arg) = @_; + if ($GCF2C) { + if ($GCF2C->{async_cat} && !$GCF2C->{wbuf}) { + $oid .= " $git->{git_dir}"; + return $GCF2C->cat_async($oid, $cb, $arg); + } + } elsif ($git->{async_cat} && (my $inflight = $git->{inflight})) { + # we could use MAX_INFLIGHT here w/o the halving, + # but lets not allow one client to monopolize a git process + if (@$inflight < int(PublicInbox::Git::MAX_INFLIGHT/2)) { + print { $git->{out} } $oid, "\n" or + $git->fail("write error: $!"); + return push(@$inflight, $oid, $cb, $arg); + } + } + undef; +} + 1; diff --git a/lib/PublicInbox/IMAP.pm b/lib/PublicInbox/IMAP.pm index 47c08aea..a861282f 100644 --- a/lib/PublicInbox/IMAP.pm +++ b/lib/PublicInbox/IMAP.pm @@ -626,7 +626,7 @@ sub fetch_blob_cb { # called by git->cat_async via git_async_cat } my $pre; if (!$self->{wbuf} && (my $nxt = $msgs->[0])) { - $pre = $self->{ibx}->git->async_prefetch($nxt->{blob}, + $pre = git_async_prefetch($self->{ibx}->git, $nxt->{blob}, \&fetch_blob_cb, $fetch_arg); } fetch_run_ops($self, $smsg, $bref, $ops, $partial); -- cgit v1.2.3-24-ge0c7