From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id D61111F4E5 for ; Tue, 21 Mar 2023 23:07:44 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1679440064; bh=RgHF8dRJfK9GDmHJTk71rrDEOmIWhivRj4tJINkqI1M=; h=From:To:Subject:Date:In-Reply-To:References:From; b=O3Hovl/MgzYBpPuu+H/FSZ57FbIkVec9MVS8eQfwx60iTXr0AVADx8Rp+/XpVwtLj awHBQTuoQdJUefTghtBm3jqv1vKr/iVxkFD7CecdeYqZg7WfPlluFvgbyS488lsmKi 9NFUU4fh1tP0yLjkIIxAlsV9JT8iAs4T4GkKdfHA= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 07/28] cindex: parallelize prep phases Date: Tue, 21 Mar 2023 23:07:22 +0000 Message-Id: <20230321230743.3020032-7-e@80x24.org> In-Reply-To: <20230321230743.3020032-1-e@80x24.org> References: <20230321230701.3019936-1-e@80x24.org> <20230321230743.3020032-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Listing refs, fingerprinting and root scanning can all be parallelized to reduce runtime on SMP systems. We'll use DESTROY-based dependency management with parallelizagion as in LeiMirror to handle ref listing and fingerprinting before serializing Xapian DB access to check against the existing fingerprint. We'll also delay root listing until we get a fingerprint mismatch to speed up no-op indexing. --- lib/PublicInbox/CodeSearchIdx.pm | 197 +++++++++++++++++++++---------- 1 file changed, 132 insertions(+), 65 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 218338da..a926886e 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -26,7 +26,10 @@ use PublicInbox::SHA qw(sha256_hex); use PublicInbox::Search qw(xap_terms); use PublicInbox::SearchIdx qw(add_val); use PublicInbox::Config; -use PublicInbox::Spawn qw(run_die); +use PublicInbox::Spawn qw(spawn); +use PublicInbox::OnDestroy; +our $LIVE; # pid => callback +our $LIVE_JOBS; # stop walking history if we see >$SEEN_MAX existing commits, this assumes # branches don't diverge by more than this number of commits... @@ -106,26 +109,27 @@ sub progress { $pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n"); } -sub store_repo ($$) { - my ($self, $repo) = @_; +sub store_repo ($$$) { + my ($self, $git, $repo) = @_; my $xdb = delete($repo->{shard})->idx_acquire; $xdb->begin_transaction; + for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed? if (defined $repo->{id}) { my $doc = $xdb->get_document($repo->{id}) // - die "$self->{git}->{git_dir} doc #$repo->{id} gone"; + die "$git->{git_dir} doc #$repo->{id} gone"; add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct}); - my %new = map { $_ => undef } @{$self->{roots}}; + my %new = map { $_ => undef } @{$repo->{roots}}; my $old = xap_terms('G', $doc); delete @new{keys %$old}; $doc->add_boolean_term('G'.$_) for keys %new; - delete @$old{@{$self->{roots}}}; + delete @$old{@{$repo->{roots}}}; $doc->remove_term('G'.$_) for keys %$old; $doc->set_data($repo->{fp}); $xdb->replace_document($repo->{id}, $doc); } else { my $new = $PublicInbox::Search::X{Document}->new; add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct}); - $new->add_boolean_term("P$self->{git}->{git_dir}"); + $new->add_boolean_term("P$git->{git_dir}"); $new->add_boolean_term('T'.'r'); $new->add_boolean_term('G'.$_) for @{$repo->{roots}}; $new->set_data($repo->{fp}); # \n delimited @@ -201,75 +205,98 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search @ids; } -sub get_roots ($$) { - my ($self, $refs) = @_; - my @roots = $self->{git}->qx([qw(rev-list --stdin --max-parents=0)], - undef, { 0 => $refs }); - die "git rev-list \$?=$?" if $?; - sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin - chomp(@roots); - scalar(@roots) ? \@roots : undef; +sub cidx_reap ($$) { + my ($self, $jobs) = @_; + while (keys(%$LIVE) >= $jobs) { + my $pid = waitpid(-1, 0) // die "waitpid(-1): $!"; + last if $pid < 0; + if (my $x = delete $LIVE->{$pid}) { + my $cb = shift @$x; + $cb->(@$x) if $cb; + } else { + warn "reaped unknown PID=$pid ($?)\n"; + } + } } # this is different from the grokmirror-compatible fingerprint since we # only care about --heads (branches) and --tags, and not even their names -sub cidx_fp ($) { - my ($self) = @_; +sub fp_start ($$$) { + my ($self, $git, $prep_repo) = @_; + return if !$LIVE; # premature exit + cidx_reap($self, $LIVE_JOBS); open my $refs, '+>', undef or die "open: $!"; - run_die(['git', "--git-dir=$self->{git}->{git_dir}", + my $pid = spawn(['git', "--git-dir=$git->{git_dir}", qw(show-ref --heads --tags --hash)], undef, { 1 => $refs }); + $git->{-repo}->{refs} = $refs; + $LIVE->{$pid} = [ \&fp_fini, $self, $git, $prep_repo ]; +} + +sub fp_fini { + my ($self, $git, $prep_repo) = @_; + my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; seek($refs, 0, SEEK_SET) or die "seek: $!"; my $buf; my $dig = PublicInbox::SHA->new(256); while (read($refs, $buf, 65536)) { $dig->add($buf) } - sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin - ($dig->hexdigest, $refs); + $git->{-repo}->{fp} = $dig->hexdigest; } -# TODO: should we also index gitweb.owner and the full fingerprint for grokmirror? -sub prep_git_dir ($) { - my ($self) = @_; - my $git_dir = $self->{git}->{git_dir}; - my $ct = $self->{git}->qx([qw[for-each-ref - --sort=-committerdate --format=%(committerdate:raw) --count=1 +sub ct_start ($$$) { + my ($self, $git, $prep_repo) = @_; + return if !$LIVE; # premature exit + cidx_reap($self, $LIVE_JOBS); + my ($rd, $pid) = $git->popen([qw[for-each-ref --sort=-committerdate + --format=%(committerdate:raw) --count=1 refs/heads/ refs/tags/]]); - my $repo = {}; - @$repo{qw(fp refs)} = cidx_fp($self); - $repo->{roots} = get_roots($self, $repo->{refs}); - if (!$repo->{roots} || !defined($ct)) { - warn "W: $git_dir has no root commits, skipping\n"; + $LIVE->{$pid} = [ \&ct_fini, $self, $git, $rd, $prep_repo ]; +} + +sub ct_fini { + my ($self, $git, $rd, $prep_repo) = @_; + defined(my $ct = <$rd>) or return; + $ct =~ s/\s+.*\z//s; # drop TZ + LF + $git->{-repo}->{ct} = $ct + 0; +} + +# TODO: also index gitweb.owner and the full fingerprint for grokmirror? +sub prep_repo ($$) { + my ($self, $git) = @_; + return if !$LIVE; # premature exit + my $repo = $git->{-repo} // die 'BUG: no {-repo}'; + my $git_dir = $git->{git_dir}; + if (!defined($repo->{ct})) { + warn "W: $git_dir has no commits, skipping\n"; + delete $git->{-repo}; return; } - $ct =~ s/ .*\z//s; # drop TZ - $repo->{ct} = $ct + 0; my $n = git_dir_hash($git_dir) % $self->{nshard}; my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self); delete @$shard{qw(lockfh lock_path)}; local $shard->{xdb}; my $xdb = $shard->idx_acquire; my @docids = docids_by_postlist($shard, 'P'.$git_dir); - my $docid = shift(@docids) // return $repo; + my $docid = shift(@docids) // return get_roots($self, $git); if (@docids) { warn "BUG: $git_dir indexed multiple times, culling\n"; - $xdb->begin_transaction; - for (@docids) { $xdb->delete_document($_) } - $xdb->commit_transaction; + $repo->{to_delete} = \@docids; # XXX needed? } my $doc = $xdb->get_document($docid) // die "BUG: no #$docid ($git_dir)"; my $old_fp = $doc->get_data; if ($old_fp eq $repo->{fp}) { # no change - progress($self, 'unchanged'); + progress($self, "$git_dir unchanged"); + delete $git->{-repo}; return; } $repo->{id} = $docid; - $repo; + get_roots($self, $git); } -sub partition_refs ($$) { - my ($self, $refs) = @_; # show-ref --heads --tags --hash output - my $fh = $self->{git}->popen(qw(rev-list --stdin), undef, - { 0 => $refs }); +sub partition_refs ($$$) { + my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output + sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin + my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs }); close $refs or die "close: $!"; local $self->{xdb}; my $xdb = $self->{-opt}->{reindex} ? undef : $self->xdb; @@ -292,22 +319,27 @@ sub partition_refs ($$) { close($fh); if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) { $self->{nchange} += $nchange; - progress($self, "$nchange commits"); + progress($self, "$git->{git_dir}: $nchange commits"); for my $fh (@shard_in) { $fh->flush or die "flush: $!"; sysseek($fh, 0, SEEK_SET) or die "seek: $!"; } return @shard_in; } - die "git-rev-list: \$?=$?\n"; + die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n"; } -sub index_git_dir ($$) { - my ($self, $git_dir) = @_; - local $self->{git} = PublicInbox::Git->new($git_dir); # for ->patch_id - my $repo = prep_git_dir($self) or return; - local $self->{current_info} = $git_dir; - my @shard_in = partition_refs($self, delete($repo->{refs})); +sub index_repo { + my ($self, $git, $roots) = @_; + return if !$LIVE; # premature exit + my $repo = delete $git->{-repo} or return; + seek($roots, 0, SEEK_SET) or die "seek: $!"; + chomp(my @roots = <$roots>); + close($roots) or die "close: $!"; + @roots or return warn("E: $git->{git_dir} has no root commits\n"); + $repo->{roots} = \@roots; + local $self->{current_info} = $git->{git_dir}; + my @shard_in = partition_refs($self, $git, delete($repo->{refs})); my %pids; my $fwd_kill = sub { my ($sig) = @_; @@ -323,12 +355,13 @@ sub index_git_dir ($$) { my $pid = fork // die "fork: $!"; if ($pid == 0) { # no RNG use, here $0 = "code index [$n]"; + $self->{git} = $git; $self->{shard} = $n; $self->{current_info} = "$self->{current_info} [$n]"; delete @$self{qw(lockfh lock_path)}; my $in = $shard_in[$n]; @shard_in = (); - $self->{roots} = delete $repo->{roots}; + $self->{roots} = \@roots; undef $repo; eval { shard_worker($self, $in, $sigset) }; warn "E: $@" if $@; @@ -339,18 +372,41 @@ sub index_git_dir ($$) { } PublicInbox::DS::sig_setmask($sigset); @shard_in = (); - my $err; + my ($err, @todo); while (keys %pids) { - my $pid = waitpid(-1, 0) or last; - my $j = delete $pids{$pid} // "unknown PID:$pid"; - next if $? == 0; - warn "PID:$pid $j exited with \$?=$?\n"; - $err = 1; + my $pid = waitpid(-1, 0) // die "waitpid: $!"; + if (my $j = delete $pids{$pid}) { + next if $? == 0; + warn "PID:$pid $j exited with \$?=$?\n"; + $err = 1; + } elsif (my $todo = delete $LIVE->{$pid}) { + warn "PID:$pid exited with \$?=$?\n" if $?; + push @todo, $todo; + } else { + warn "reaped unknown PID=$pid ($?)\n"; + } } die "subprocess(es) failed\n" if $err; - store_repo($self, $repo); - progress($self, 'done'); + store_repo($self, $git, $repo); + progress($self, "$git->{git_dir}: done"); # TODO: check fp afterwards? + while (my $x = shift @todo) { + my $cb = shift @$x; + $cb->(@$x) if $cb; + } +} + +sub get_roots ($$) { + my ($self, $git) = @_; + return if !$LIVE; # premature exit + cidx_reap($self, $LIVE_JOBS); + my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; + sysseek($refs, 0, SEEK_SET) or die "seek: $!"; + open my $roots, '+>', undef or die "open: $!"; + my $pid = spawn(['git', "--git-dir=$git->{git_dir}", + qw(rev-list --stdin --max-parents=0)], + undef, { 0 => $refs, 1 => $roots }); + $LIVE->{$pid} = [ \&index_repo, $self, $git, $roots ]; } # for PublicInbox::SearchIdx::patch_id and with_umask @@ -389,6 +445,21 @@ W: memory usage may be high for large indexing runs EOM } +sub scan_git_dirs ($) { + my ($self) = @_; + local $LIVE_JOBS = $self->{-opt}->{jobs} // + PublicInbox::IPC::detect_nproc() // 2; + local $LIVE = {}; + for (@{$self->{git_dirs}}) { + my $git = PublicInbox::Git->new($_); + my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo, + $self, $git); + fp_start($self, $git, $prep_repo); + ct_start($self, $git, $prep_repo); + } + cidx_reap($self, 0); +} + sub cidx_run { my ($self) = @_; cidx_init($self); @@ -414,11 +485,7 @@ sub cidx_run { } local $self->{nchange} = 0; # do_prune($self) if $self->{-opt}->{prune}; TODO - if ($self->{-opt}->{scan} // 1) { - for my $gd (@{$self->{git_dirs}}) { - index_git_dir($self, $gd); - } - } + scan_git_dirs($self) if $self->{-opt}->{scan} // 1; $self->lock_release(!!$self->{nchange}); }