user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 07/28] cindex: parallelize prep phases
Date: Tue, 21 Mar 2023 23:07:22 +0000	[thread overview]
Message-ID: <20230321230743.3020032-7-e@80x24.org> (raw)
In-Reply-To: <20230321230743.3020032-1-e@80x24.org>

Listing refs, fingerprinting and root scanning can all be
parallelized to reduce runtime on SMP systems.

We'll use DESTROY-based dependency management with
parallelizagion as in LeiMirror to handle ref listing and
fingerprinting before serializing Xapian DB access to check
against the existing fingerprint.

We'll also delay root listing until we get a fingerprint
mismatch to speed up no-op indexing.
---
 lib/PublicInbox/CodeSearchIdx.pm | 197 +++++++++++++++++++++----------
 1 file changed, 132 insertions(+), 65 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 218338da..a926886e 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -26,7 +26,10 @@ use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config;
-use PublicInbox::Spawn qw(run_die);
+use PublicInbox::Spawn qw(spawn);
+use PublicInbox::OnDestroy;
+our $LIVE; # pid => callback
+our $LIVE_JOBS;
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
 # branches don't diverge by more than this number of commits...
@@ -106,26 +109,27 @@ sub progress {
 	$pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
 }
 
-sub store_repo ($$) {
-	my ($self, $repo) = @_;
+sub store_repo ($$$) {
+	my ($self, $git, $repo) = @_;
 	my $xdb = delete($repo->{shard})->idx_acquire;
 	$xdb->begin_transaction;
+	for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
 	if (defined $repo->{id}) {
 		my $doc = $xdb->get_document($repo->{id}) //
-			die "$self->{git}->{git_dir} doc #$repo->{id} gone";
+			die "$git->{git_dir} doc #$repo->{id} gone";
 		add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
-		my %new = map { $_ => undef } @{$self->{roots}};
+		my %new = map { $_ => undef } @{$repo->{roots}};
 		my $old = xap_terms('G', $doc);
 		delete @new{keys %$old};
 		$doc->add_boolean_term('G'.$_) for keys %new;
-		delete @$old{@{$self->{roots}}};
+		delete @$old{@{$repo->{roots}}};
 		$doc->remove_term('G'.$_) for keys %$old;
 		$doc->set_data($repo->{fp});
 		$xdb->replace_document($repo->{id}, $doc);
 	} else {
 		my $new = $PublicInbox::Search::X{Document}->new;
 		add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct});
-		$new->add_boolean_term("P$self->{git}->{git_dir}");
+		$new->add_boolean_term("P$git->{git_dir}");
 		$new->add_boolean_term('T'.'r');
 		$new->add_boolean_term('G'.$_) for @{$repo->{roots}};
 		$new->set_data($repo->{fp}); # \n delimited
@@ -201,75 +205,98 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
 	@ids;
 }
 
-sub get_roots ($$) {
-	my ($self, $refs) = @_;
-	my @roots = $self->{git}->qx([qw(rev-list --stdin --max-parents=0)],
-		undef, { 0 => $refs });
-	die "git rev-list \$?=$?" if $?;
-	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
-	chomp(@roots);
-	scalar(@roots) ? \@roots : undef;
+sub cidx_reap ($$) {
+	my ($self, $jobs) = @_;
+	while (keys(%$LIVE) >= $jobs) {
+		my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
+		last if $pid < 0;
+		if (my $x = delete $LIVE->{$pid}) {
+			my $cb = shift @$x;
+			$cb->(@$x) if $cb;
+		} else {
+			warn "reaped unknown PID=$pid ($?)\n";
+		}
+	}
 }
 
 # this is different from the grokmirror-compatible fingerprint since we
 # only care about --heads (branches) and --tags, and not even their names
-sub cidx_fp ($) {
-	my ($self) = @_;
+sub fp_start ($$$) {
+	my ($self, $git, $prep_repo) = @_;
+	return if !$LIVE; # premature exit
+	cidx_reap($self, $LIVE_JOBS);
 	open my $refs, '+>', undef or die "open: $!";
-	run_die(['git', "--git-dir=$self->{git}->{git_dir}",
+	my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
 		qw(show-ref --heads --tags --hash)], undef, { 1 => $refs });
+	$git->{-repo}->{refs} = $refs;
+	$LIVE->{$pid} = [ \&fp_fini, $self, $git, $prep_repo ];
+}
+
+sub fp_fini {
+	my ($self, $git, $prep_repo) = @_;
+	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
 	seek($refs, 0, SEEK_SET) or die "seek: $!";
 	my $buf;
 	my $dig = PublicInbox::SHA->new(256);
 	while (read($refs, $buf, 65536)) { $dig->add($buf) }
-	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
-	($dig->hexdigest, $refs);
+	$git->{-repo}->{fp} = $dig->hexdigest;
 }
 
-# TODO: should we also index gitweb.owner and the full fingerprint for grokmirror?
-sub prep_git_dir ($) {
-	my ($self) = @_;
-	my $git_dir = $self->{git}->{git_dir};
-	my $ct = $self->{git}->qx([qw[for-each-ref
-		--sort=-committerdate --format=%(committerdate:raw) --count=1
+sub ct_start ($$$) {
+	my ($self, $git, $prep_repo) = @_;
+	return if !$LIVE; # premature exit
+	cidx_reap($self, $LIVE_JOBS);
+	my ($rd, $pid) = $git->popen([qw[for-each-ref --sort=-committerdate
+		--format=%(committerdate:raw) --count=1
 		refs/heads/ refs/tags/]]);
-	my $repo = {};
-	@$repo{qw(fp refs)} = cidx_fp($self);
-	$repo->{roots} = get_roots($self, $repo->{refs});
-	if (!$repo->{roots} || !defined($ct)) {
-		warn "W: $git_dir has no root commits, skipping\n";
+	$LIVE->{$pid} = [ \&ct_fini, $self, $git, $rd, $prep_repo ];
+}
+
+sub ct_fini {
+	my ($self, $git, $rd, $prep_repo) = @_;
+	defined(my $ct = <$rd>) or return;
+	$ct =~ s/\s+.*\z//s; # drop TZ + LF
+	$git->{-repo}->{ct} = $ct + 0;
+}
+
+# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
+sub prep_repo ($$) {
+	my ($self, $git) = @_;
+	return if !$LIVE; # premature exit
+	my $repo = $git->{-repo} // die 'BUG: no {-repo}';
+	my $git_dir = $git->{git_dir};
+	if (!defined($repo->{ct})) {
+		warn "W: $git_dir has no commits, skipping\n";
+		delete $git->{-repo};
 		return;
 	}
-	$ct =~ s/ .*\z//s; # drop TZ
-	$repo->{ct} = $ct + 0;
 	my $n = git_dir_hash($git_dir) % $self->{nshard};
 	my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self);
 	delete @$shard{qw(lockfh lock_path)};
 	local $shard->{xdb};
 	my $xdb = $shard->idx_acquire;
 	my @docids = docids_by_postlist($shard, 'P'.$git_dir);
-	my $docid = shift(@docids) // return $repo;
+	my $docid = shift(@docids) // return get_roots($self, $git);
 	if (@docids) {
 		warn "BUG: $git_dir indexed multiple times, culling\n";
-		$xdb->begin_transaction;
-		for (@docids) { $xdb->delete_document($_) }
-		$xdb->commit_transaction;
+		$repo->{to_delete} = \@docids; # XXX needed?
 	}
 	my $doc = $xdb->get_document($docid) //
 		die "BUG: no #$docid ($git_dir)";
 	my $old_fp = $doc->get_data;
 	if ($old_fp eq $repo->{fp}) { # no change
-		progress($self, 'unchanged');
+		progress($self, "$git_dir unchanged");
+		delete $git->{-repo};
 		return;
 	}
 	$repo->{id} = $docid;
-	$repo;
+	get_roots($self, $git);
 }
 
-sub partition_refs ($$) {
-	my ($self, $refs) = @_; # show-ref --heads --tags --hash output
-	my $fh = $self->{git}->popen(qw(rev-list --stdin), undef,
-					{ 0 => $refs });
+sub partition_refs ($$$) {
+	my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
+	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
+	my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
 	close $refs or die "close: $!";
 	local $self->{xdb};
 	my $xdb = $self->{-opt}->{reindex} ? undef : $self->xdb;
@@ -292,22 +319,27 @@ sub partition_refs ($$) {
 	close($fh);
 	if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
 		$self->{nchange} += $nchange;
-		progress($self, "$nchange commits");
+		progress($self, "$git->{git_dir}: $nchange commits");
 		for my $fh (@shard_in) {
 			$fh->flush or die "flush: $!";
 			sysseek($fh, 0, SEEK_SET) or die "seek: $!";
 		}
 		return @shard_in;
 	}
-	die "git-rev-list: \$?=$?\n";
+	die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n";
 }
 
-sub index_git_dir ($$) {
-	my ($self, $git_dir) = @_;
-	local $self->{git} = PublicInbox::Git->new($git_dir); # for ->patch_id
-	my $repo = prep_git_dir($self) or return;
-	local $self->{current_info} = $git_dir;
-	my @shard_in = partition_refs($self, delete($repo->{refs}));
+sub index_repo {
+	my ($self, $git, $roots) = @_;
+	return if !$LIVE; # premature exit
+	my $repo = delete $git->{-repo} or return;
+	seek($roots, 0, SEEK_SET) or die "seek: $!";
+	chomp(my @roots = <$roots>);
+	close($roots) or die "close: $!";
+	@roots or return warn("E: $git->{git_dir} has no root commits\n");
+	$repo->{roots} = \@roots;
+	local $self->{current_info} = $git->{git_dir};
+	my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
 	my %pids;
 	my $fwd_kill = sub {
 		my ($sig) = @_;
@@ -323,12 +355,13 @@ sub index_git_dir ($$) {
 		my $pid = fork // die "fork: $!";
 		if ($pid == 0) { # no RNG use, here
 			$0 = "code index [$n]";
+			$self->{git} = $git;
 			$self->{shard} = $n;
 			$self->{current_info} = "$self->{current_info} [$n]";
 			delete @$self{qw(lockfh lock_path)};
 			my $in = $shard_in[$n];
 			@shard_in = ();
-			$self->{roots} = delete $repo->{roots};
+			$self->{roots} = \@roots;
 			undef $repo;
 			eval { shard_worker($self, $in, $sigset) };
 			warn "E: $@" if $@;
@@ -339,18 +372,41 @@ sub index_git_dir ($$) {
 	}
 	PublicInbox::DS::sig_setmask($sigset);
 	@shard_in = ();
-	my $err;
+	my ($err, @todo);
 	while (keys %pids) {
-		my $pid = waitpid(-1, 0) or last;
-		my $j = delete $pids{$pid} // "unknown PID:$pid";
-		next if $? == 0;
-		warn "PID:$pid $j exited with \$?=$?\n";
-		$err = 1;
+		my $pid = waitpid(-1, 0) // die "waitpid: $!";
+		if (my $j = delete $pids{$pid}) {
+			next if $? == 0;
+			warn "PID:$pid $j exited with \$?=$?\n";
+			$err = 1;
+		} elsif (my $todo = delete $LIVE->{$pid}) {
+			warn "PID:$pid exited with \$?=$?\n" if $?;
+			push @todo, $todo;
+		} else {
+			warn "reaped unknown PID=$pid ($?)\n";
+		}
 	}
 	die "subprocess(es) failed\n" if $err;
-	store_repo($self, $repo);
-	progress($self, 'done');
+	store_repo($self, $git, $repo);
+	progress($self, "$git->{git_dir}: done");
 	# TODO: check fp afterwards?
+	while (my $x = shift @todo) {
+		my $cb = shift @$x;
+		$cb->(@$x) if $cb;
+	}
+}
+
+sub get_roots ($$) {
+	my ($self, $git) = @_;
+	return if !$LIVE; # premature exit
+	cidx_reap($self, $LIVE_JOBS);
+	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
+	sysseek($refs, 0, SEEK_SET) or die "seek: $!";
+	open my $roots, '+>', undef or die "open: $!";
+	my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
+			qw(rev-list --stdin --max-parents=0)],
+			undef, { 0 => $refs, 1 => $roots });
+	$LIVE->{$pid} = [ \&index_repo, $self, $git, $roots ];
 }
 
 # for PublicInbox::SearchIdx::patch_id and with_umask
@@ -389,6 +445,21 @@ W: memory usage may be high for large indexing runs
 EOM
 }
 
+sub scan_git_dirs ($) {
+	my ($self) = @_;
+	local $LIVE_JOBS = $self->{-opt}->{jobs} //
+			PublicInbox::IPC::detect_nproc() // 2;
+	local $LIVE = {};
+	for (@{$self->{git_dirs}}) {
+		my $git = PublicInbox::Git->new($_);
+		my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
+							$self, $git);
+		fp_start($self, $git, $prep_repo);
+		ct_start($self, $git, $prep_repo);
+	}
+	cidx_reap($self, 0);
+}
+
 sub cidx_run {
 	my ($self) = @_;
 	cidx_init($self);
@@ -414,11 +485,7 @@ sub cidx_run {
 	}
 	local $self->{nchange} = 0;
 	# do_prune($self) if $self->{-opt}->{prune}; TODO
-	if ($self->{-opt}->{scan} // 1) {
-		for my $gd (@{$self->{git_dirs}}) {
-			index_git_dir($self, $gd);
-		}
-	}
+	scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
 	$self->lock_release(!!$self->{nchange});
 }
 

  parent reply	other threads:[~2023-03-21 23:07 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-03-21 23:07 [PATCH 00/28] cindex coderepo commit indexer Eric Wong
2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
2023-03-21 23:07   ` [PATCH 02/28] search: relocate all_terms from lei_search Eric Wong
2023-03-21 23:07   ` [PATCH 03/28] admin: hoist out resolve_git_dir Eric Wong
2023-03-21 23:07   ` [PATCH 04/28] admin: ensure resolved GIT_DIR is absolute Eric Wong
2023-03-21 23:07   ` [PATCH 05/28] test_common: create_inbox: use `$!' properly on mkdir failure Eric Wong
2023-03-21 23:07   ` [PATCH 06/28] codesearch: initial cut w/ -cindex tool Eric Wong
2023-03-21 23:07   ` Eric Wong [this message]
2023-03-21 23:07   ` [PATCH 08/28] cindex: use read-only shards during prep phases Eric Wong
2023-03-21 23:07   ` [PATCH 09/28] searchidxshard: improve comment wording Eric Wong
2023-03-21 23:07   ` [PATCH 10/28] cindex: use DS and workqueues for parallelism Eric Wong
2023-03-21 23:07   ` [PATCH 11/28] ds: @post_loop_do replaces SetPostLoopCallback Eric Wong
2023-03-21 23:07   ` [PATCH 12/28] cindex: implement --exclude= like -clone Eric Wong
2023-03-21 23:07   ` [PATCH 13/28] cindex: show shard number in progress message Eric Wong
2023-03-21 23:07   ` [PATCH 14/28] cindex: drop `unchanged' " Eric Wong
2023-03-21 23:07   ` [PATCH 15/28] cindex: handle graceful shutdown by default Eric Wong
2023-03-21 23:07   ` [PATCH 16/28] sigfd: pass signal name rather than number to callback Eric Wong
2023-03-21 23:07   ` [PATCH 17/28] cindex: implement --max-size=SIZE Eric Wong
2023-03-21 23:07   ` [PATCH 18/28] cindex: check for checkpoint before giant messages Eric Wong
2023-03-21 23:07   ` [PATCH 19/28] cindex: truncate or drop body for over-sized commits Eric Wong
2023-03-21 23:07   ` [PATCH 20/28] cindex: attempt to give oldest commits lowest docids Eric Wong
2023-03-21 23:07   ` [PATCH 21/28] cindex: improve granularity of quit checks Eric Wong
2023-03-21 23:07   ` [PATCH 22/28] spawn: show failing directory for chdir failures Eric Wong
2023-03-21 23:07   ` [PATCH 23/28] cindex: filter out non-existent git directories Eric Wong
2023-03-21 23:07   ` [PATCH 24/28] cindex: add support for --prune Eric Wong
2023-03-21 23:07   ` [PATCH 25/28] cindex: implement reindex Eric Wong
2023-03-21 23:07   ` [PATCH 26/28] cindex: squelch incompatible options Eric Wong
2023-03-21 23:07   ` [PATCH 27/28] cindex: respect existing permissions Eric Wong
2023-03-21 23:07   ` [PATCH 28/28] cindex: ignore SIGPIPE Eric Wong
2023-03-24 10:40     ` [PATCH 29/28] cindex: --prune checkpoints to avoid OOM Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230321230743.3020032-7-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).