user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 0/7] cindex: optional C++ Xapian helper
@ 2023-08-24  1:22  7% Eric Wong
  2023-08-24  1:22  2% ` [PATCH 2/7] cindex: read-only association dump Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2023-08-24  1:22 UTC (permalink / raw)
  To: meta

Associating inboxes with coderepos is an extremely expensive
operation, especially for Perl (even with XS or SWIG) as Perl's
method dispatch overhead to dump data out of Xapian becomes
noticeable.

The actual association is fast with POSIX sort(1) and join(1);
but getting the necessary data out of Xapian to join on is
expensive as neither quest(1) nor xapian-delve(1) are suitable
for this task.

The actual association data isn't stored or usable anywhere,
yet, and some of them are too loose to be useful.  More
work is required on that point....

The association could probably be faster with rculfhash (from
Userspace-RCU), but I don't think it's worth the maintenance and
installation overhead for this (though I intend to use rculfhash
for the FUSE shim).

These performance problems weren't as noticeable in the past
since our other Xapian uses spent significant amounts of time
when retrieving document data from SQLite and blobs from git.

Using the C++ implementation of xap_helper.h allows a full join
(without limits or date ranges) of lore + git.kernel.org repos
within one hour on my ancient system while the Perl+(XS|SWIG)
implementation took roughly 8 hours.

Eric Wong (7):
  search: hoist out shards_dir for future use
  cindex: read-only association dump
  cindex: add --show-roots switch
  introduce optional C++ xap_helper
  cindex: fix sorting and uniqueness
  cindex: implement dump_roots in C++
  xap_helper: reopen+retry in MSetIterator loops

 MANIFEST                            |   7 +
 lib/PublicInbox/CidxRecvIbx.pm      |  46 ++
 lib/PublicInbox/CidxXapHelperAux.pm |  44 ++
 lib/PublicInbox/CodeSearch.pm       |  54 +-
 lib/PublicInbox/CodeSearchIdx.pm    | 349 ++++++++--
 lib/PublicInbox/Config.pm           |   2 +-
 lib/PublicInbox/Isearch.pm          |   5 +
 lib/PublicInbox/Search.pm           |  92 ++-
 lib/PublicInbox/XapClient.pm        |  50 ++
 lib/PublicInbox/XapHelper.pm        | 226 +++++++
 lib/PublicInbox/XapHelperCxx.pm     |  93 +++
 lib/PublicInbox/xap_helper.h        | 947 ++++++++++++++++++++++++++++
 script/public-inbox-cindex          |   4 +-
 t/xap_helper.t                      | 175 +++++
 14 files changed, 2021 insertions(+), 73 deletions(-)
 create mode 100644 lib/PublicInbox/CidxRecvIbx.pm
 create mode 100644 lib/PublicInbox/CidxXapHelperAux.pm
 create mode 100644 lib/PublicInbox/XapClient.pm
 create mode 100644 lib/PublicInbox/XapHelper.pm
 create mode 100644 lib/PublicInbox/XapHelperCxx.pm
 create mode 100644 lib/PublicInbox/xap_helper.h
 create mode 100644 t/xap_helper.t

^ permalink raw reply	[relevance 7%]

* [PATCH 2/7] cindex: read-only association dump
  2023-08-24  1:22  7% [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
@ 2023-08-24  1:22  2% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2023-08-24  1:22 UTC (permalink / raw)
  To: meta

This will eventually allow associating coderepos with inboxes
and vice-versa; avoiding the need for manual configuration via
tedious publicinbox.*.coderepo directives.

I'm not sure how this should be stored for WWW, yet, but it's
required since it takes about 8 hours to do this fully across
lore and git.kernel.org.
---
 MANIFEST                              |   3 +
 lib/PublicInbox/CidxDumpIbx.pm        |  59 +++++
 lib/PublicInbox/CidxDumpShardRoots.pm |  73 ++++++
 lib/PublicInbox/CidxRecvIbx.pm        |  46 ++++
 lib/PublicInbox/CodeSearchIdx.pm      | 306 ++++++++++++++++++++++----
 lib/PublicInbox/Config.pm             |   2 +-
 lib/PublicInbox/Search.pm             |   2 +-
 script/public-inbox-cindex            |   4 +-
 8 files changed, 452 insertions(+), 43 deletions(-)
 create mode 100644 lib/PublicInbox/CidxDumpIbx.pm
 create mode 100644 lib/PublicInbox/CidxDumpShardRoots.pm
 create mode 100644 lib/PublicInbox/CidxRecvIbx.pm

diff --git a/MANIFEST b/MANIFEST
index 1001ca08..162e3038 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,7 +162,10 @@ lib/PublicInbox/AltId.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
 lib/PublicInbox/CidxComm.pm
+lib/PublicInbox/CidxDumpIbx.pm
+lib/PublicInbox/CidxDumpShardRoots.pm
 lib/PublicInbox/CidxLogP.pm
+lib/PublicInbox/CidxRecvIbx.pm
 lib/PublicInbox/CmdIPC4.pm
 lib/PublicInbox/CodeSearch.pm
 lib/PublicInbox/CodeSearchIdx.pm
diff --git a/lib/PublicInbox/CidxDumpIbx.pm b/lib/PublicInbox/CidxDumpIbx.pm
new file mode 100644
index 00000000..e1bc273d
--- /dev/null
+++ b/lib/PublicInbox/CidxDumpIbx.pm
@@ -0,0 +1,59 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Intended for PublicInbox::DS::event_loop for -cindex --associate
+# Iterating through mset->items is slow in Perl due to method dispatch
+# and that loop may implemented in C++ using Xapian directly
+package PublicInbox::CidxDumpIbx;
+use v5.12;
+use PublicInbox::Search qw(xap_terms);
+use PublicInbox::DS;
+use Socket qw(MSG_EOR);
+
+sub start {
+	my ($rcvibx, $ibx_id) = @_;
+	my $cidx = $rcvibx->{cidx};
+	my $ibx = $cidx->{IBX}->[$ibx_id] // die "BUG: no IBX[$ibx_id]";
+	my $self = bless { rcvibx => $rcvibx, ekey => $ibx->eidx_key,
+		ibx_id => $ibx_id }, __PACKAGE__;
+	$self->{srch} = $ibx->isrch // do {
+		warn("W: $self->{ekey} has no search index (ignoring)\n");
+		return undef;
+	};
+	my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
+	$self->{mset} = $self->{srch}->mset($rcvibx->{qry_str}, $opt);
+	$self->{iter} = 0;
+	event_step($self);
+}
+
+sub event_step {
+	my ($self) = @_;
+	my $rcvibx = $self->{rcvibx} // die 'BUG: no rcvibx';
+	return if $rcvibx->{cidx}->do_quit;
+	my $last = $self->{mset}->size - 1;
+	my $cur = $self->{iter};
+	my $end = $cur + 9999;
+	if ($end >= $last) {
+		send($rcvibx->{op_p}, 'index_next', MSG_EOR);
+		$end = $last;
+	}
+	$self->{iter} = $end + 1;
+	local $0 = "dumping $self->{ekey} $cur..$end";
+
+	my $sort_w = $rcvibx->{sort_w};
+	my $ibx_id = $self->{ibx_id};
+	local $0 = "dumping $self->{ekey} $cur..$end";
+	$rcvibx->{cidx}->progress($0);
+	for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
+		my $doc = $x->get_document;
+		for my $p (@{$rcvibx->{cidx}->{ASSOC_PFX}}) {
+			for (xap_terms($p, $doc)) {
+				print $sort_w "$_ $ibx_id\n" or die "print: $!";
+			}
+		}
+	}
+	$end < $last && !$rcvibx->{cidx}->do_quit and
+		PublicInbox::DS::requeue($self);
+}
+
+1;
diff --git a/lib/PublicInbox/CidxDumpShardRoots.pm b/lib/PublicInbox/CidxDumpShardRoots.pm
new file mode 100644
index 00000000..34afa419
--- /dev/null
+++ b/lib/PublicInbox/CidxDumpShardRoots.pm
@@ -0,0 +1,73 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Intended for PublicInbox::DS::event_loop for -cindex --associate
+# Iterating through mset->items is slow in Perl due to method dispatch
+# and that loop may implemented in C++ using Xapian directly
+package PublicInbox::CidxDumpShardRoots;
+use v5.12;
+use PublicInbox::Lock;
+use PublicInbox::Search qw(xap_terms);
+use Socket qw(MSG_EOR);
+
+sub start {
+	my ($cidx, $root2id, $qry_str) = @_;
+	my $op_p = delete($cidx->{0}) // die 'BUG: no {0} op_p';
+	my $sort_w = delete($cidx->{1}) // die 'BUG: no {1} $w sort pipe';
+	# sort lock is necessary if we have may root ids which cause a
+	# row length to exceed POSIX PIPE_BUF (via `$G' below)
+	my $sort_lk = bless { lock_path => $cidx->tmpdir.'/to_root_id.lock' },
+		'PublicInbox::Lock';
+	$sort_w->autoflush(1);
+	$cidx->begin_txn_lazy; # only using txn to simplify writer subs
+	my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
+	my $self = bless {
+		cidx => $cidx,
+		op_p => $op_p,
+		iter => 0,
+		mset => $cidx->mset($qry_str, $opt),
+		root2id => $root2id,
+		sort_w => $sort_w,
+		sort_lk => $sort_lk,
+	}, __PACKAGE__;
+	event_step($self);
+}
+
+sub event_step {
+	my ($self) = @_;
+	my $cidx = $self->{cidx};
+	return if $cidx->do_quit;
+	my $last = $self->{mset}->size - 1;
+	my $cur = $self->{iter};
+	my $end = $cur + 9999;
+	$end = $last if $end > $last;
+	$self->{iter} = $end + 1;
+	local $0 = "dumping shard [$cidx->{shard}] $cur..$end";
+	$cidx->progress($0);
+
+	my $root2id = $self->{root2id};
+	my $buf = '';
+	for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
+		my $doc = $x->get_document;
+		my $G = join(' ', map {
+			$root2id->{pack('H*', $_)};
+		} xap_terms('G', $doc));
+		for my $p (@{$cidx->{ASSOC_PFX}}) {
+			$buf .= "$_ $G\n" for (xap_terms($p, $doc));
+		}
+	}
+	$self->{sort_lk}->lock_acquire_fast;
+	print { $self->{sort_w} } $buf or die "print: $!";
+	$self->{sort_lk}->lock_release_fast;
+	$end < $last && !$cidx->do_quit and
+		PublicInbox::DS::requeue($self);
+}
+
+sub DESTROY {
+	my ($self) = @_;
+	return if $self->{cidx}->do_quit;
+	send($self->{op_p},
+		"dump_shard_roots_done $self->{cidx}->{shard}", MSG_EOR);
+}
+
+1;
diff --git a/lib/PublicInbox/CidxRecvIbx.pm b/lib/PublicInbox/CidxRecvIbx.pm
new file mode 100644
index 00000000..6add8e54
--- /dev/null
+++ b/lib/PublicInbox/CidxRecvIbx.pm
@@ -0,0 +1,46 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# dumps all per-inbox info for -cindex --associate
+# integrated into the event loop for signalfd SIGINT handling
+package PublicInbox::CidxRecvIbx;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE);
+use Socket qw(MSG_EOR);
+use PublicInbox::CidxDumpIbx;
+
+sub new {
+	my ($cls, $cidx, $qry_str) = @_;
+	my ($op_p, $r_ibx, $sort_w) = delete @$cidx{0..2};
+	$op_p // die 'BUG: no $op_p';
+	$r_ibx // die 'BUG: no $r_ibx';
+	$sort_w // die 'BUG: no $sort_w';
+	my $self = bless {}, $cls;
+	$self->SUPER::new($r_ibx, EPOLLIN|EPOLLEXCLUSIVE);
+	$self->{cidx} = $cidx;
+	$self->{sort_w} = $sort_w;
+	$self->{op_p} = $op_p; # PublicInbox::CidxDumpIbx uses this
+	$self->{qry_str} = $qry_str;
+	# writes to this pipe are never longer than POSIX PIPE_BUF,
+	# so rely on POSIX atomicity guarantees
+	$sort_w->autoflush(1);
+	$self;
+}
+
+sub event_step {
+	my ($self) = @_;
+	recv($self->{sock}, my $ibx_id, 25, 0) // die "recv: $!";
+	return $self->close if $ibx_id eq '' || $self->{cidx}->do_quit;
+	PublicInbox::CidxDumpIbx::start($self, $ibx_id);
+}
+
+sub close {
+	my ($self) = @_;
+	$self->{cidx}->do_quit or
+		send($self->{op_p},
+			"recv_ibx_done $self->{cidx}->{shard}", MSG_EOR);
+	$self->SUPER::close; # PublicInbox::DS::close
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index ba14e52a..2480dbd2 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -11,6 +11,26 @@
 #
 # We shard repos using the first 32-bits of sha256($ABS_GIT_DIR)
 #
+# --associate joins root commits of coderepos to inboxes based on prefixes.
+#
+# Internally, each inbox is assigned a non-negative integer index ($IBX_ID),
+# and each root commit object ID (SHA-1/SHA-256 hex) is also assigned
+# a non-negative integer index ($ROOT_COMMIT_OID_ID).
+#
+# associate dumps to 2 intermediate files in $TMPDIR:
+#
+# * to_root_id - each line is of the format:
+#
+#	$PFX $ROOT_COMMIT_OID_ID
+#
+# * to_ibx_id - each line is of the format:
+#
+#	$PFX $IBX_ID
+#
+# In both cases, $PFX is typically the value of the patchid (XDFID) but it
+# can be configured to use any combination of patchid, dfpre, dfpost or
+# dfblob.
+#
 # See PublicInbox::CodeSearch (read-only API) for more
 package PublicInbox::CodeSearchIdx;
 use v5.12;
@@ -25,14 +45,14 @@ use File::Spec ();
 use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
-use PublicInbox::Config qw(glob2re);
+use PublicInbox::Config qw(glob2re rel2abs_collapsed);
 use PublicInbox::Spawn qw(which spawn popen_rd);
 use PublicInbox::OnDestroy;
 use PublicInbox::CidxLogP;
 use PublicInbox::CidxComm;
 use PublicInbox::Git qw(%OFMT2HEXLEN);
 use PublicInbox::Compat qw(uniqstr);
-use Socket qw(MSG_EOR);
+use Socket qw(MSG_EOR AF_UNIX SOCK_SEQPACKET);
 use Carp ();
 our (
 	$LIVE, # pid => cmd
@@ -55,8 +75,15 @@ our (
 	%ALT_FH, # hexlen => tmp IO for TMPDIR git alternates
 	$TMPDIR, # File::Temp->newdir object for prune
 	@PRUNE_QUEUE, # GIT_DIRs to prepare for pruning
-	$PRUNE_ENV, # env for awk(1), comm(1), sort(1) commands during prune
+	%TODO, @IBXQ, @IBX,
+	@JOIN, # join(1) command for associate
+	$CMD_ENV, # env for awk(1), comm(1), sort(1) commands during prune
 	@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
+	@ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
+	$QRY_STR, # common query string for both code and inbox associations
+	$IBXDIR_FEED, # SOCK_SEQPACKET
+	@DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate
+	@ID2ROOT,
 );
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -64,6 +91,9 @@ our (
 # git walks commits quickly if it doesn't have to read trees
 our $SEEN_MAX = 100000;
 
+# window for commits/emails to determine a inbox <-> coderepo association
+my $ASSOC_MAX = 50000;
+
 our @PRUNE_BATCH = qw(git _ cat-file --batch-all-objects --batch-check);
 
 # TODO: do we care about committer name + email? or tree OID?
@@ -455,6 +485,91 @@ sub shard_commit { # via wq_io_do
 	send($op_p, "shard_done $self->{shard}", MSG_EOR);
 }
 
+sub dump_shard_roots_done { # via PktOp on dump_shard_roots completion
+	my ($self, $associate, $n) = @_;
+	return if $DO_QUIT;
+	progress($self, "dump_shard_roots [$n] done");
+	$DUMP_SHARD_ROOTS_OK[$n] = 1;
+	# may run associate()
+}
+
+sub assoc_max_init ($) {
+	my ($self) = @_;
+	my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX;
+	$max = $ASSOC_MAX if !$max;
+	$max < 0 ? ((2 ** 31) - 1) : $max;
+}
+
+# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2..
+sub dump_shard_roots { # via wq_io_do for associate
+	my ($self, $root2id, $qry_str) = @_;
+	PublicInbox::CidxDumpShardRoots::start($self, $root2id, $qry_str);
+}
+
+sub dump_roots_once {
+	my ($self, $associate) = @_;
+	$associate // die 'BUG: no $associate';
+	$TODO{associating} = 1; # keep shards_active() happy
+	progress($self, 'dumping IDs from coderepos');
+	local $self->{xdb};
+	@ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
+	my $id = 0;
+	my %root2id = map { $_ => $id++ } @ID2ROOT;
+	pipe(my ($r, $w)) or die "pipe: $!";
+	my @sort = (@SORT, '-k1,1');
+	my $dst = "$TMPDIR/to_root_id";
+	open my $fh, '>', $dst or die "open($dst): $!";
+	my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $r, 1 => $fh });
+	close $r or die "close: $!";
+	awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+	my ($c, $p) = PublicInbox::PktOp->pair;
+	$c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
+	my @arg = ('dump_shard_roots', [ $p->{op_p}, $w ], \%root2id, $QRY_STR);
+	$_->wq_io_do(@arg) for @IDX_SHARDS;
+	progress($self, 'waiting on dump_shard_roots sort');
+}
+
+sub recv_ibx_done { # via PktOp on recv_ibx completion
+	my ($self, $pid, $n) = @_;
+	return if $DO_QUIT;
+	progress($self, "recv_ibx [$n] done");
+	$RECV_IBX_OK[$n] = 1;
+}
+
+# causes a worker to become a dumper for inbox/extindex
+sub recv_ibx { # wq_io_do
+	my ($self, $qry_str) = @_;
+	PublicInbox::CidxRecvIbx->new($self, $qry_str);
+}
+
+sub dump_ibx { # sends to PublicInbox::CidxRecvIbx::event_step
+	my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR"
+	my $n = length($id_dir);
+	my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!";
+	$n == $w or die "send($id_dir) $w != $n";
+}
+
+# repurpose shard workers to dump inbox patchids with perfect balance
+sub dump_ibx_start {
+	my ($self, $associate) = @_;
+	pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+	my @sort = (@SORT, '-k1,1');
+	my $dst = "$TMPDIR/to_ibx_id";
+	open my $fh, '>', $dst or die "open($dst): $!";
+	my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
+	close $sort_r or die "close: $!";
+	awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+
+	my ($r, $w);
+	socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
+	my ($c, $p) = PublicInbox::PktOp->pair;
+	$c->{ops}->{recv_ibx_done} = [ $self, $associate ];
+	$c->{ops}->{index_next} = [ $self ];
+	my $io = [ $p->{op_p}, $r, $sort_w ];
+	$_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS;
+	$IBXDIR_FEED = $w;
+}
+
 sub index_next ($) {
 	my ($self) = @_;
 	return if $DO_QUIT;
@@ -466,6 +581,12 @@ sub index_next ($) {
 							$self, $git);
 		fp_start($self, $git, $prep_repo);
 		ct_start($self, $git, $prep_repo);
+	} elsif ($TMPDIR) {
+		delete $TODO{dump_ibx_start}; # runs OnDestroy once
+		return dump_ibx($self, shift @IBXQ) if @IBXQ;
+		progress($self, 'done dumping inboxes') if $IBXDIR_FEED;
+		undef $IBXDIR_FEED; # done dumping inboxes, dump roots
+		dump_roots_once($self, delete($TODO{associate}) // return);
 	}
 	# else: wait for shards_active (post_loop_do) callback
 }
@@ -502,7 +623,7 @@ sub commit_shard { # OnDestroy cb
 	for my $n (keys %$active) {
 		$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
 	}
-	undef $p; # shard_done fires when all shards are committed
+	# shard_done fires when all shards are committed
 }
 
 sub index_repo { # cidx_await cb
@@ -628,8 +749,8 @@ EOM
 
 sub scan_git_dirs ($) {
 	my ($self) = @_;
-	@$GIT_TODO = @{$self->{git_dirs}};
-	index_next($self) for (1..$LIVE_JOBS);
+	my $n = @$GIT_TODO = @{$self->{git_dirs}};
+	progress($self, "scanning $n code repositories...");
 }
 
 sub prune_do { # via wq_io_do in IDX_SHARDS
@@ -661,7 +782,7 @@ sub shards_active { # post_loop_do
 	return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4;
 	return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
 	return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX;
-	return 1 if keys(%$LIVE);
+	return 1 if keys(%$LIVE) || @IBXQ || keys(%TODO);
 	for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) {
 		$s->{-cidx_quit} = 1 if defined($s->{-wq_s1});
 		$s->wq_close; # may recurse via awaitpid outside of event_loop
@@ -674,6 +795,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
 
 sub parent_quit {
 	$DO_QUIT = POSIX->can("SIG$_[0]")->();
+	$IBXDIR_FEED = undef;
 	kill_shards(@_);
 	warn "# SIG$_[0] received, quitting...\n";
 }
@@ -717,6 +839,7 @@ sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat
 E: ignoring objdir=$objdir, unknown extensions.objectFormat=$fmt
 EOM
 	unless ($ALT_FH{$hexlen}) {
+		require PublicInbox::Import;
 		my $git_dir = "$TMPDIR/hexlen$hexlen.git";
 		PublicInbox::Import::init_bare($git_dir, 'cidx-all', $fmt);
 		my $f = "$git_dir/objects/info/alternates";
@@ -739,52 +862,102 @@ sub prep_alternate_start {
 	awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune);
 }
 
-sub prune_cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
-	my ($pid, $cmd, $run_prune) = @_;
+sub cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
+	my ($pid, $cmd, $run_on_destroy) = @_;
 	$? and die "@$cmd failed: \$?=$?";
+	# $run_on_destroy calls associate() or run_prune()
+}
+
+# runs once all inboxes and shards are dumped via OnDestroy
+sub associate {
+	my ($self) = @_;
+	return if $DO_QUIT;
+	@IDX_SHARDS or return warn("# aborting on no shards\n");
+	grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
+		die "E: shards not dumped properly\n";
+	grep(defined, @RECV_IBX_OK) == @IDX_SHARDS or
+		die "E: inboxes not dumped properly\n";
+	progress($self, 'associating...');
+	my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
+	my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
+	my %score;
+	while (<$rd>) { # PFX ibx_id root_id
+		my (undef, $ibx_id, @root_id) = split(/ /, $_);
+		++$score{"$ibx_id $_"} for @root_id;
+	}
+	close $rd or die "@join failed: $?=$?";
+	my $min = $self->{-opt}->{'assoc-min'} // 10;
+	progress($self, scalar(keys %score).' potential pairings...');
+	for my $k (keys %score) {
+		my $nr = $score{$k};
+		my ($ibx_id, $root) = split(/ /, $k);
+		my $ekey = $IBX[$ibx_id]->eidx_key;
+		$root = unpack('H*', $ID2ROOT[$root]);
+		progress($self, "$ekey => $root has $nr matches");
+	}
+	delete $TODO{associating}; # break out of shards_active()
+	# TODO
+	warn "# Waiting for $TMPDIR/cont @JOIN";
+	system "ls -Rl $TMPDIR >&2";
+	system "wc -l $TMPDIR/to_*_id >&2";
+	#sleep(1) until -f "$TMPDIR/cont";
+	# warn "# Waiting for $TMPDIR/cont";
+	# sleep(1) until -f "$TMPDIR/cont";
+}
+
+sub require_progs {
+	my $op = shift;
+	while (my ($x, $argv) = splice(@_, 0, 2)) {
+		my $e = $x;
+		$e =~ tr/a-z-/A-Z_/;
+		my $c = $ENV{$e} // $x;
+		$argv->[0] //= which($c) // die "E: `$x' required for --$op\n";
+	}
+}
+
+sub init_associate_postfork ($) {
+	my ($self) = @_;
+	return unless $self->{-opt}->{associate};
+	require_progs('associate', join => \@JOIN);
+	$QRY_STR = $self->{-opt}->{'associate-date-range'} // '1.year.ago..';
+	substr($QRY_STR, 0, 0) = 'dt:';
+	scalar(@{$self->{git_dirs} //  []}) or die <<EOM;
+E: no coderepos to associate
+EOM
+	my $approx_git = PublicInbox::Git->new($self->{git_dirs}->[0]); # ugh
+	$self->query_approxidate($approx_git, $QRY_STR); # in-place
+	$TODO{associate} = PublicInbox::OnDestroy->new($$, \&associate, $self);
+	$TODO{dump_ibx_start} = PublicInbox::OnDestroy->new($$,
+				\&dump_ibx_start, $self, $TODO{associate});
+	my $id = -1;
+	@IBXQ = map { ++$id } @IBX;
 }
 
 sub init_prune ($) {
 	my ($self) = @_;
 	return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune};
 
-	require File::Temp;
-	require PublicInbox::Import;
-	$TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
-
 	# Dealing with millions of commits here at once, so use faster tools.
 	# xapian-delve is nearly an order-of-magnitude faster than Xapian Perl
 	# bindings.  sed/awk are faster than Perl for simple stream ops, and
 	# sort+comm are more memory-efficient with gigantic lists
 	my @delve = (undef, qw(-A Q -1));
 	my @sed = (undef, '-ne', 's/^Q//p');
-	@SORT = (undef, '-u');
 	@COMM = (undef, qw(-2 -3 indexed_commits -));
 	@AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output
-	my @x = ('xapian-delve' => \@delve, sed => \@sed,
-		sort => \@SORT, comm => \@COMM, awk => \@AWK);
-	while (my ($x, $argv) = splice(@x, 0, 2)) {
-		my $e = $x;
-		$e =~ tr/a-z-/A-Z_/;
-		my $c = $ENV{$e} // $x;
-		$argv->[0] = which($c) // die "E: `$x' required for --prune\n";
-	}
+	require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed,
+			comm => \@COMM, awk => \@AWK);
 	for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" }
-	for (qw(parallel compress-program buffer-size)) { # GNU sort options
-		my $v = $self->{-opt}->{"sort-$_"};
-		push @SORT, "--$_=$v" if defined $v;
-	}
 	my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
 	pipe(my ($sed_in, $delve_out)) or die "pipe: $!";
 	pipe(my ($sort_in, $sed_out)) or die "pipe: $!";
 	open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!";
-	$PRUNE_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
-	my $pid = spawn(\@SORT, $PRUNE_ENV, { 0 => $sort_in, 1 => $sort_out });
-	awaitpid($pid, \&prune_cmd_done, \@SORT, $run_prune);
-	$pid = spawn(\@sed, $PRUNE_ENV, { 0 => $sed_in, 1 => $sed_out });
-	awaitpid($pid, \&prune_cmd_done, \@sed, $run_prune);
+	my $pid = spawn(\@SORT, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
+	awaitpid($pid, \&cmd_done, \@SORT, $run_prune);
+	$pid = spawn(\@sed, $CMD_ENV, { 0 => $sed_in, 1 => $sed_out });
+	awaitpid($pid, \&cmd_done, \@sed, $run_prune);
 	$pid = spawn(\@delve, undef, { 1 => $delve_out });
-	awaitpid($pid, \&prune_cmd_done, \@delve, $run_prune);
+	awaitpid($pid, \&cmd_done, \@delve, $run_prune);
 	@PRUNE_QUEUE = @{$self->{git_dirs}};
 	for (1..$LIVE_JOBS) {
 		prep_alternate_start(shift(@PRUNE_QUEUE) // last, $run_prune);
@@ -809,14 +982,14 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
 	pipe(my ($awk_in, $batch_out)) or die "pipe: $!";
 	pipe(my ($sort_in, $awk_out)) or die "pipe: $!";
 	pipe(my ($comm_in, $sort_out)) or die "pipe: $!";
-	my $awk_pid = spawn(\@AWK, $PRUNE_ENV, { 0 => $awk_in, 1 => $awk_out });
-	my $sort_pid = spawn(\@SORT, $PRUNE_ENV,
+	my $awk_pid = spawn(\@AWK, $CMD_ENV, { 0 => $awk_in, 1 => $awk_out });
+	my $sort_pid = spawn(\@SORT, $CMD_ENV,
 				{ 0 => $sort_in, 1 => $sort_out });
-	my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $PRUNE_ENV,
+	my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $CMD_ENV,
 				{ 0 => $comm_in, -C => "$TMPDIR" });
-	awaitpid($awk_pid, \&prune_cmd_done, \@AWK);
-	awaitpid($sort_pid, \&prune_cmd_done, \@SORT);
-	awaitpid($comm_pid, \&prune_cmd_done, \@COMM);
+	awaitpid($awk_pid, \&cmd_done, \@AWK);
+	awaitpid($sort_pid, \&cmd_done, \@SORT);
+	awaitpid($comm_pid, \&cmd_done, \@COMM);
 	PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
 	my $git_ver = PublicInbox::Git::git_version();
 	push @PRUNE_BATCH, '--buffer' if $git_ver ge v2.6;
@@ -856,6 +1029,35 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
 	for (@gone) { close $_ or die "close: $!" };
 }
 
+sub init_associate_prefork ($) {
+	my ($self) = @_;
+	return unless $self->{-opt}->{associate};
+	require PublicInbox::CidxRecvIbx;
+	require PublicInbox::CidxDumpShardRoots;
+	$self->{-pi_cfg} = PublicInbox::Config->new;
+	my @unknown;
+	my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]};
+	for (map { split(/\s*,\s*/) } @pfx) {
+		my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$_} //
+			push(@unknown, $_);
+		push(@ASSOC_PFX, split(/ /, $v));
+	}
+	die <<EOM if @unknown;
+--associate-prefixes contains unsupported prefixes: @unknown
+EOM
+	@ASSOC_PFX = uniqstr @ASSOC_PFX;
+	my %incl = map {
+		rel2abs_collapsed($_) => undef;
+	} (@{$self->{-opt}->{include} // []});
+	$self->{-pi_cfg}->each_inbox(\&_prep_ibx, $self, \%incl);
+}
+
+sub _prep_ibx { # each_inbox callback
+	my ($ibx, $self, $incl) = @_;
+	($self->{-opt}->{all} || exists($incl->{$ibx->{inboxdir}})) and
+		push @{$self->{IBX}}, $ibx;
+}
+
 sub cidx_run { # main entry point
 	my ($self) = @_;
 	my $restore_umask = prep_umask($self);
@@ -868,10 +1070,27 @@ sub cidx_run { # main entry point
 	local $IDX_TODO = [];
 	local $GIT_TODO = [];
 	local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
-		$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, @SORT, $PRUNE_ENV);
+		$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
+		%TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT,
+		@DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK);
 	local $BATCH_BYTES = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
-	local @IDX_SHARDS = cidx_init($self);
+	local @SORT = (undef, '-u');
+	local $self->{IBX} = \@IBX;
+	local $self->{ASSOC_PFX} = \@ASSOC_PFX;
+	local $self->{-pi_cfg};
+	if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) {
+		require File::Temp;
+		$TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
+		$CMD_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
+		require_progs('(prune|associate)', sort => \@SORT);
+		for (qw(parallel compress-program buffer-size)) { # GNU sort
+			my $v = $self->{-opt}->{"sort-$_"};
+			push @SORT, "--$_=$v" if defined $v;
+		}
+		init_associate_prefork($self)
+	}
+	local @IDX_SHARDS = cidx_init($self); # forks workers
 	local $self->{current_info} = '';
 	local $MY_SIG = {
 		CHLD => \&PublicInbox::DS::enqueue_reap,
@@ -919,7 +1138,9 @@ sub cidx_run { # main entry point
 			PublicInbox::IPC::detect_nproc() || 2;
 	local @RDONLY_XDB = $self->xdb_shards_flat;
 	init_prune($self);
+	init_associate_postfork($self);
 	scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+	index_next($self) for (1..$LIVE_JOBS);
 
 	# FreeBSD ignores/discards SIGCHLD while signals are blocked and
 	# EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
@@ -954,4 +1175,9 @@ sub shard_done_wait { # awaitpid cb via ipc_worker_reap
 	PublicInbox::DS::enqueue_reap() if !shards_active(); # once more for PLC
 }
 
+sub do_quit { $DO_QUIT }
+
+sub tmpdir { $TMPDIR }
+
+
 1;
diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm
index 2f1b4122..0a6b210f 100644
--- a/lib/PublicInbox/Config.pm
+++ b/lib/PublicInbox/Config.pm
@@ -11,7 +11,7 @@ package PublicInbox::Config;
 use strict;
 use v5.10.1;
 use parent qw(Exporter);
-our @EXPORT_OK = qw(glob2re);
+our @EXPORT_OK = qw(glob2re rel2abs_collapsed);
 use PublicInbox::Inbox;
 use PublicInbox::Spawn qw(popen_rd);
 our $LD_PRELOAD = $ENV{LD_PRELOAD}; # only valid at startup
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 1559d9b3..d5b0bceb 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -197,7 +197,7 @@ sub xdir ($;$) {
 	my ($self, $rdonly) = @_;
 	if ($rdonly || !defined($self->{shard})) {
 		$self->{xpfx};
-	} else { # v2 + extindex only:
+	} else { # v2, extindex, cindex only:
 		"$self->{xpfx}/$self->{shard}";
 	}
 }
diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex
index 2f7796e7..888c8b10 100755
--- a/script/public-inbox-cindex
+++ b/script/public-inbox-cindex
@@ -26,8 +26,10 @@ See public-inbox-cindex(1) man page for full documentation.
 EOF
 my $opt = { fsync => 1, scan => 1 }; # --no-scan is hidden
 GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous
-		indexlevel|index-level|L=s
+		indexlevel|index-level|L=s associate associate-max=i
+		associate-date-range=s associate-prefixes=s@
 		batch_size|batch-size=s max_size|max-size=s
+		include|I=s@ only=s@ all
 		project-list=s exclude=s@
 		sort-parallel=s sort-compress-program=s sort-buffer-size=s
 		d=s update|u scan! prune dry-run|n C=s@ help|h))

^ permalink raw reply related	[relevance 2%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2023-08-24  1:22  7% [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
2023-08-24  1:22  2% ` [PATCH 2/7] cindex: read-only association dump Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).