From edcf14a75af994821e624c42e3de31079b2ae70a Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 24 Jul 2020 05:56:04 +0000 Subject: searchidx: support async git check This allows v1 indexing to run while the `cat-file --batch-check' process is waiting on high-latency storage. --- lib/PublicInbox/Git.pm | 72 ++++++++++++++++++++++++++++++++++++-------- lib/PublicInbox/SearchIdx.pm | 23 +++++++++++--- 2 files changed, 78 insertions(+), 17 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 265524ff..ffc464eb 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -231,26 +231,71 @@ sub cat_file { $result->[0]; } -sub check { - my ($self, $obj) = @_; - _bidi_pipe($self, qw(--batch-check in_c out_c pid_c err_c)); - print { $self->{out_c} } $obj, "\n" or fail($self, "write error: $!"); - my $rbuf = ''; # TODO: async + {chk_rbuf} - chomp(my $line = my_readline($self->{in_c}, \$rbuf)); - my ($hex, $type, $size) = split(' ', $line); - - # Future versions of git.git may show 'ambiguous', but for now, +sub check_async_step ($$) { + my ($self, $inflight_c) = @_; + die 'BUG: inflight empty or odd' if scalar(@$inflight_c) < 3; + my ($req, $cb, $arg) = splice(@$inflight_c, 0, 3); + my $rbuf = delete($self->{rbuf_c}) // \(my $new = ''); + chomp(my $line = my_readline($self->{in_c}, $rbuf)); + my ($hex, $type, $size) = split(/ /, $line); + + # Future versions of git.git may have type=ambiguous, but for now, # we must handle 'dangling' below (and maybe some other oddball # stuff): # https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/ - return if $type eq 'missing' || $type eq 'ambiguous'; - if ($hex eq 'dangling' || $hex eq 'notdir' || $hex eq 'loop') { - my $ret = my_read($self->{in_c}, \$rbuf, $type + 1); + my $ret = my_read($self->{in_c}, $rbuf, $type + 1); fail($self, defined($ret) ? 'read EOF' : "read: $!") if !$ret; - return; } + eval { $cb->($hex, $type, $size, $arg, $self) }; + warn "E: check($req) $@\n" if $@; + $self->{rbuf_c} = $rbuf if $$rbuf ne ''; +} + +sub check_async_wait ($) { + my ($self) = @_; + my $inflight_c = delete $self->{inflight_c} or return; + while (scalar(@$inflight_c)) { + check_async_step($self, $inflight_c); + } +} +sub check_async_begin ($) { + my ($self) = @_; + cleanup($self) if alternates_changed($self); + _bidi_pipe($self, qw(--batch-check in_c out_c pid_c err_c)); + die 'BUG: already in async check' if $self->{inflight_c}; + $self->{inflight_c} = []; +} + +sub check_async ($$$$) { + my ($self, $oid, $cb, $arg) = @_; + my $inflight_c = $self->{inflight_c} // check_async_begin($self); + if (scalar(@$inflight_c) >= MAX_INFLIGHT) { + check_async_step($self, $inflight_c); + } + print { $self->{out_c} } $oid, "\n" or fail($self, "write error: $!"); + push(@$inflight_c, $oid, $cb, $arg); +} + +sub _check_cb { # check_async callback + my ($hex, $type, $size, $result) = @_; + @$result = ($hex, $type, $size); +} + +sub check { + my ($self, $oid) = @_; + my $result = []; + check_async($self, $oid, \&_check_cb, $result); + check_async_wait($self); + my ($hex, $type, $size) = @$result; + + # Future versions of git.git may show 'ambiguous', but for now, + # we must handle 'dangling' below (and maybe some other oddball + # stuff): + # https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/ + return if $type eq 'missing' || $type eq 'ambiguous'; + return if $hex eq 'dangling' || $hex eq 'notdir' || $hex eq 'loop'; ($hex, $type, $size); } @@ -297,6 +342,7 @@ sub cleanup { my ($self) = @_; local $in_cleanup = 1; delete $self->{async_cat}; + check_async_wait($self); cat_async_wait($self); _destroy($self, qw(cat_rbuf in out pid)); _destroy($self, qw(chk_rbuf in_c out_c pid_c err_c)); diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 4d2e0da9..39dc1f87 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -563,6 +563,16 @@ sub too_big ($$) { 1; } +sub ck_size { # check_async cb for -index --max-size=... + my ($oid, $type, $size, $arg, $git) = @_; + (($type // '') eq 'blob') or die "E: bad $oid in $git->{git_dir}"; + if ($size <= $arg->{index_max_size}) { + $git->cat_async($oid, \&index_both, $arg); + } else { + warn "W: skipping $oid ($size > $arg->{index_max_size})\n"; + } +} + # only for v1 sub process_stack { my ($self, $stk, $sync, $batch_cb) = @_; @@ -580,13 +590,17 @@ sub process_stack { $git->cat_async($oid, \&unindex_both, $self); } } + $sync->{index_max_size} = $self->{ibx}->{index_max_size}; while (my ($f, $at, $ct, $oid) = $stk->pop_rec) { if ($f eq 'm') { - $sync->{autime} = $at; - $sync->{cotime} = $ct; - next if too_big($self, $oid); - $git->cat_async($oid, \&index_both, { %$sync }); + my $arg = { %$sync, autime => $at, cotime => $ct }; + if ($sync->{index_max_size}) { + $git->check_async($oid, \&ck_size, $arg); + } else { + $git->cat_async($oid, \&index_both, $arg); + } if ($max <= 0) { + $git->check_async_wait; $git->cat_async_wait; $max = $BATCH_BYTES; $batch_cb->($nr); @@ -595,6 +609,7 @@ sub process_stack { $git->cat_async($oid, \&unindex_both, $self); } } + $git->check_async_wait; $git->cat_async_wait; $batch_cb->($nr, $stk); } -- cgit v1.2.3-24-ge0c7