user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 6/7] cindex: implement dump_roots in C++
  2023-08-24  1:22  7% [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
@ 2023-08-24  1:22  2% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2023-08-24  1:22 UTC (permalink / raw)
  To: meta

It's now just `dump_roots' instead of `dump_shard_roots', since
this doesn't need to be tied to the concept of shards.  I'm
still shaky with C++, but intend to keep using stuff like
hsearch(3) to make life easier for C hackers :P
---
 MANIFEST                              |   1 -
 lib/PublicInbox/CidxDumpShardRoots.pm |  73 ------
 lib/PublicInbox/CidxXapHelperAux.pm   |  10 +-
 lib/PublicInbox/CodeSearchIdx.pm      |  56 ++---
 lib/PublicInbox/XapHelper.pm          |  52 +++-
 lib/PublicInbox/xap_helper.h          | 332 +++++++++++++++++++++++---
 t/xap_helper.t                        |  44 +++-
 7 files changed, 407 insertions(+), 161 deletions(-)
 delete mode 100644 lib/PublicInbox/CidxDumpShardRoots.pm

diff --git a/MANIFEST b/MANIFEST
index 4f61af42..4bccc849 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,7 +162,6 @@ lib/PublicInbox/AltId.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
 lib/PublicInbox/CidxComm.pm
-lib/PublicInbox/CidxDumpShardRoots.pm
 lib/PublicInbox/CidxLogP.pm
 lib/PublicInbox/CidxRecvIbx.pm
 lib/PublicInbox/CidxXapHelperAux.pm
diff --git a/lib/PublicInbox/CidxDumpShardRoots.pm b/lib/PublicInbox/CidxDumpShardRoots.pm
deleted file mode 100644
index 34afa419..00000000
--- a/lib/PublicInbox/CidxDumpShardRoots.pm
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# Intended for PublicInbox::DS::event_loop for -cindex --associate
-# Iterating through mset->items is slow in Perl due to method dispatch
-# and that loop may implemented in C++ using Xapian directly
-package PublicInbox::CidxDumpShardRoots;
-use v5.12;
-use PublicInbox::Lock;
-use PublicInbox::Search qw(xap_terms);
-use Socket qw(MSG_EOR);
-
-sub start {
-	my ($cidx, $root2id, $qry_str) = @_;
-	my $op_p = delete($cidx->{0}) // die 'BUG: no {0} op_p';
-	my $sort_w = delete($cidx->{1}) // die 'BUG: no {1} $w sort pipe';
-	# sort lock is necessary if we have may root ids which cause a
-	# row length to exceed POSIX PIPE_BUF (via `$G' below)
-	my $sort_lk = bless { lock_path => $cidx->tmpdir.'/to_root_id.lock' },
-		'PublicInbox::Lock';
-	$sort_w->autoflush(1);
-	$cidx->begin_txn_lazy; # only using txn to simplify writer subs
-	my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
-	my $self = bless {
-		cidx => $cidx,
-		op_p => $op_p,
-		iter => 0,
-		mset => $cidx->mset($qry_str, $opt),
-		root2id => $root2id,
-		sort_w => $sort_w,
-		sort_lk => $sort_lk,
-	}, __PACKAGE__;
-	event_step($self);
-}
-
-sub event_step {
-	my ($self) = @_;
-	my $cidx = $self->{cidx};
-	return if $cidx->do_quit;
-	my $last = $self->{mset}->size - 1;
-	my $cur = $self->{iter};
-	my $end = $cur + 9999;
-	$end = $last if $end > $last;
-	$self->{iter} = $end + 1;
-	local $0 = "dumping shard [$cidx->{shard}] $cur..$end";
-	$cidx->progress($0);
-
-	my $root2id = $self->{root2id};
-	my $buf = '';
-	for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
-		my $doc = $x->get_document;
-		my $G = join(' ', map {
-			$root2id->{pack('H*', $_)};
-		} xap_terms('G', $doc));
-		for my $p (@{$cidx->{ASSOC_PFX}}) {
-			$buf .= "$_ $G\n" for (xap_terms($p, $doc));
-		}
-	}
-	$self->{sort_lk}->lock_acquire_fast;
-	print { $self->{sort_w} } $buf or die "print: $!";
-	$self->{sort_lk}->lock_release_fast;
-	$end < $last && !$cidx->do_quit and
-		PublicInbox::DS::requeue($self);
-}
-
-sub DESTROY {
-	my ($self) = @_;
-	return if $self->{cidx}->do_quit;
-	send($self->{op_p},
-		"dump_shard_roots_done $self->{cidx}->{shard}", MSG_EOR);
-}
-
-1;
diff --git a/lib/PublicInbox/CidxXapHelperAux.pm b/lib/PublicInbox/CidxXapHelperAux.pm
index c9a5ddad..f402bde0 100644
--- a/lib/PublicInbox/CidxXapHelperAux.pm
+++ b/lib/PublicInbox/CidxXapHelperAux.pm
@@ -10,12 +10,8 @@ use PublicInbox::Syscall qw(EPOLLIN);
 
 # rpipe connects to req->fp[1] in xap_helper.h
 sub new {
-	my ($cls, $rpipe, $cidx, $pfx, $associate) = @_;
-	my $self = bless {
-		cidx => $cidx,
-		pfx => $pfx,
-		associate => $associate
-	}, $cls;
+	my ($cls, $rpipe, $cidx, $pfx) = @_;
+	my $self = bless { cidx => $cidx, pfx => $pfx }, $cls;
 	$rpipe->blocking(0);
 	$self->SUPER::new($rpipe, EPOLLIN);
 }
@@ -36,7 +32,7 @@ sub event_step {
 	my @lines = split(/^/m, $buf);
 	$self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n";
 	for my $l (@lines) {
-		if ($l =~ /\Amset\.size=[0-9]+\n\z/) {
+		if ($l =~ /\Amset\.size=[0-9]+ nr_out=[0-9]+\n\z/) {
 			delete $self->{cidx}->{PENDING}->{$pfx};
 			$self->{cidx}->index_next;
 		}
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 4a41b1da..404d6826 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -87,7 +87,6 @@ our (
 	@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
 	@ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
 	$QRY_STR, # common query string for both code and inbox associations
-	@DUMP_SHARD_ROOTS_OK, # for associate
 	$DUMP_IBX_WPIPE, # goes to sort(1)
 	@ID2ROOT,
 );
@@ -505,14 +504,6 @@ sub shard_commit { # via wq_io_do
 	send($op_p, "shard_done $self->{shard}", MSG_EOR);
 }
 
-sub dump_shard_roots_done { # via PktOp on dump_shard_roots completion
-	my ($self, $associate, $n) = @_;
-	return if $DO_QUIT;
-	progress($self, "dump_shard_roots [$n] done");
-	$DUMP_SHARD_ROOTS_OK[$n] = 1;
-	# may run associate()
-}
-
 sub assoc_max_init ($) {
 	my ($self) = @_;
 	my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX;
@@ -520,38 +511,39 @@ sub assoc_max_init ($) {
 	$max < 0 ? ((2 ** 31) - 1) : $max;
 }
 
-# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2..
-sub dump_shard_roots { # via wq_io_do for associate
-	my ($self, $root2id, $qry_str) = @_;
-	PublicInbox::CidxDumpShardRoots::start($self, $root2id, $qry_str);
-}
-
 sub dump_roots_once {
 	my ($self, $associate) = @_;
 	$associate // die 'BUG: no $associate';
 	$TODO{associating} = 1; # keep shards_active() happy
 	progress($self, 'dumping IDs from coderepos');
 	local $self->{xdb};
-	@ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
-	my $id = 0;
-	my %root2id = map { $_ => $id++ } @ID2ROOT;
-	# dump_shard_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
+	@ID2ROOT = $self->all_terms('G');
+	my $root2id = "$TMPDIR/root2id";
+	open my $fh, '>', $root2id or die "open($root2id): $!";
+	my $nr = -1;
+	for (@ID2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly
+	close $fh or die "close: $!";
+	# dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
 	pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
 	pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
 	my @sort = (@SORT, '-k1,1');
 	my $dst = "$TMPDIR/to_root_id";
-	open my $fh, '>', $dst or die "open($dst): $!";
+	open $fh, '>', $dst or die "open($dst): $!";
 	my $env = { %$CMD_ENV, OFS => ' ' };
 	my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
 	my $fold_pid = spawn(\@UNIQ_FOLD, $env, { 0 => $fold_r, 1 => $fh });
 	awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
 	awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(shards)'], $associate);
-	my ($c, $p) = PublicInbox::PktOp->pair;
-	$c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
-	my @arg = ('dump_shard_roots', [ $p->{op_p}, $sort_w ],
-			\%root2id, $QRY_STR);
-	$_->wq_io_do(@arg) for @IDX_SHARDS;
-	progress($self, 'waiting on dump_shard_roots sort');
+	my @arg = ((map { ('-A', $_) } @ASSOC_PFX), '-c',
+		'-m', assoc_max_init($self), $root2id, $QRY_STR);
+	for my $d ($self->shard_dirs) {
+		pipe(my ($err_r, $err_w)) or die "pipe: $!";
+		$XHC->mkreq([$sort_w, $err_w], qw(dump_roots -d), $d, @arg);
+		my $desc = "dump_roots $d";
+		$self->{PENDING}->{$desc} = $associate;
+		PublicInbox::CidxXapHelperAux->new($err_r, $self, $desc);
+	}
+	progress($self, 'waiting on dump_roots sort');
 }
 
 sub dump_ibx { # sends to xap_helper.h
@@ -563,8 +555,8 @@ sub dump_ibx { # sends to xap_helper.h
 	pipe(my ($r, $w)) or die "pipe: $!";
 	$XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
 	my $ekey = $ibx->eidx_key;
-	$self->{PENDING}->{$ekey} = undef;
-	PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate});
+	$self->{PENDING}->{$ekey} = $TODO{associate};
+	PublicInbox::CidxXapHelperAux->new($r, $self, $ekey);
 }
 
 sub dump_ibx_start {
@@ -885,8 +877,7 @@ sub associate {
 	my ($self) = @_;
 	return if $DO_QUIT;
 	@IDX_SHARDS or return warn("# aborting on no shards\n");
-	grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
-		die "E: shards not dumped properly\n";
+	unlink("$TMPDIR/root2id");
 	my @pending = keys %{$self->{PENDING}};
 	die "E: pending=@pending jobs not done\n" if @pending;
 	progress($self, 'associating...');
@@ -906,7 +897,7 @@ sub associate {
 		my $nr = $score{$k};
 		my ($ibx_id, $root) = split(/ /, $k);
 		my $ekey = $IBX[$ibx_id]->eidx_key;
-		$root = unpack('H*', $ID2ROOT[$root]);
+		$root = $ID2ROOT[$root];
 		progress($self, "$ekey => $root has $nr matches");
 	}
 	delete $TODO{associating}; # break out of shards_active()
@@ -1048,7 +1039,6 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
 sub init_associate_prefork ($) {
 	my ($self) = @_;
 	return unless $self->{-opt}->{associate};
-	require PublicInbox::CidxDumpShardRoots;
 	require PublicInbox::CidxXapHelperAux;
 	require PublicInbox::XapClient;
 	$self->{-pi_cfg} = PublicInbox::Config->new;
@@ -1120,7 +1110,7 @@ sub cidx_run { # main entry point
 	local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
 		$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
 		%TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
-		@ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC, @SORT);
+		@ID2ROOT, $XH_PID, $XHC, @SORT);
 	local $BATCH_BYTES = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
 	local $self->{ASSOC_PFX} = \@ASSOC_PFX;
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index bf2f99a2..c80be810 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -8,7 +8,9 @@ use Getopt::Long (); # good API even if we only use short options
 our $GLP = Getopt::Long::Parser->new;
 $GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev));
 use PublicInbox::Search qw(xap_terms);
+use PublicInbox::CodeSearch;
 use PublicInbox::IPC;
+use Fcntl qw(LOCK_UN LOCK_EX);
 my $X = \%PublicInbox::Search::X;
 our (%SRCH, %PIDS, $parent_pid);
 our $stderr = \*STDERR;
@@ -44,15 +46,63 @@ sub cmd_dump_ibx {
 	my $mset = $req->{srch}->mset($qry_str, $opt);
 	my $out = $req->{0};
 	$out->autoflush(1);
+	my $nr = 0;
 	for my $it ($mset->items) {
 		my $doc = $it->get_document;
 		for my $p (@pfx) {
 			for (xap_terms($p, $doc)) {
 				print $out "$_ $ibx_id\n" or die "print: $!";
+				++$nr;
 			}
 		}
 	}
-	if (my $err = $req->{1}) { say $err 'mset.size=', $mset->size }
+	if (my $err = $req->{1}) {
+		say $err 'mset.size='.$mset->size.' nr_out='.$nr
+	}
+}
+
+sub cmd_dump_roots {
+	my ($req, $root2id_file, $qry_str) = @_;
+	$qry_str // return
+		warn('usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR');
+	my @pfx = @{$req->{A}} or return warn('dump_roots requires -A PREFIX');
+	open my $fh, '<', $root2id_file or die "open($root2id_file): $!";
+	my %root2id; # record format: $OIDHEX "\0" uint32_t
+	my @x = split(/\0/, do { local $/; <$fh> } // die "readline: $!");
+	while (@x) {
+		my $oidhex = shift @x;
+		$root2id{$oidhex} = shift @x;
+	}
+	my $opt = { relevance => -1, limit => $req->{'m'},
+			offset => $req->{o} // 0 };
+	my $mset = $req->{srch}->mset($qry_str, $opt);
+	$req->{0}->autoflush(1);
+	my $buf = '';
+	my $nr = 0;
+	for my $it ($mset->items) {
+		my $doc = $it->get_document;
+		my $G = join(' ', map { $root2id{$_} } xap_terms('G', $doc));
+		for my $p (@pfx) {
+			for (xap_terms($p, $doc)) {
+				$buf .= "$_ $G\n";
+				++$nr;
+			}
+		}
+		if (!($nr & 0x3fff)) {
+			flock($fh, LOCK_EX) or die "flock: $!";
+			print { $req->{0} } $buf or die "print: $!";
+			flock($fh, LOCK_UN) or die "flock: $!";
+			$buf = '';
+		}
+	}
+	if ($buf ne '') {
+		flock($fh, LOCK_EX) or die "flock: $!";
+		print { $req->{0} } $buf or die "print: $!";
+		flock($fh, LOCK_UN) or die "flock: $!";
+	}
+	if (my $err = $req->{1}) {
+		say $err 'mset.size='.$mset->size.' nr_out='.$nr
+	}
 }
 
 sub dispatch {
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index 52db92b7..c9b4e0cc 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -6,12 +6,16 @@
  * this is not linked to Perl in any way.
  * C (not C++) is used as much as possible to lower the contribution
  * barrier for hackers who mainly know C (this includes the maintainer).
+ * Yes, that means we use C stdlib stuff like hsearch and open_memstream
+ * instead their equivalents in the C++ stdlib :P
  * Everything here is an unstable internal API of public-inbox and
  * NOT intended for ordinary users; only public-inbox hackers
  */
 #ifndef _ALL_SOURCE
 #	define _ALL_SOURCE
 #endif
+#include <sys/file.h>
+#include <sys/mman.h>
 #include <sys/resource.h>
 #include <sys/socket.h>
 #include <sys/stat.h>
@@ -80,6 +84,7 @@ struct req { // argv and pfxv point into global rbuf
 	unsigned long long max;
 	unsigned long long off;
 	unsigned long timeout_sec;
+	size_t nr_out;
 	long sort_col; // value column, negative means BoolWeight
 	int argc;
 	int pfxc;
@@ -96,6 +101,28 @@ struct worker {
 	unsigned nr;
 };
 
+#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
+static size_t split2argv(char **dst, char *buf, size_t len, size_t limit)
+{
+	if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
+		warnx("bogus argument given");
+		return 0;
+	}
+	size_t nr = 0;
+	char *c = buf;
+	for (size_t i = 1; i < len; i++) {
+		if (!buf[i]) {
+			dst[nr++] = c;
+			c = buf + i + 1;
+		}
+		if (nr == limit) {
+			warnx("too many args: %zu", nr);
+			return 0;
+		}
+	}
+	return (long)nr;
+}
+
 static bool has_threadid(const struct srch *srch)
 {
 	return srch->db->get_metadata("has_threadid") == "1";
@@ -118,8 +145,12 @@ static Xapian::Enquire prep_enquire(const struct req *req)
 
 static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
 {
-	if (!req->max)
-		req->max = 50;
+	if (!req->max) {
+		switch (sizeof(Xapian::doccount)) {
+		case 4: req->max = UINT_MAX; break;
+		default: req->max = ULLONG_MAX;
+		}
+	}
 	for (int i = 0; i < 9; i++) {
 		try {
 			Xapian::MSet mset = enq->get_mset(req->off, req->max);
@@ -131,13 +162,13 @@ static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
 	return enq->get_mset(req->off, req->max);
 }
 
+// for v1, v2, and extindex
 static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
 {
 	struct srch *srch = req->srch;
 	Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
 	if (req->Oeidx_key) {
 		req->Oeidx_key[0] = 'O'; // modifies static rbuf
-		fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key);
 		qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
 					Xapian::Query(req->Oeidx_key));
 	}
@@ -150,6 +181,21 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
 	return enquire_mset(req, &enq);
 }
 
+// for cindex
+static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
+{
+	struct srch *srch = req->srch;
+	Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
+	// TODO: git_dir + roots_filter
+
+	// we only want commits:
+	qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+				Xapian::Query("T" "c"));
+	Xapian::Enquire enq = prep_enquire(req);
+	enq.set_query(qry);
+	return enquire_mset(req, &enq);
+}
+
 static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
 {
 	return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
@@ -165,9 +211,11 @@ static void dump_ibx_term(struct req *req, const char *pfx,
 	for (cur.skip_to(pfx); cur != end; cur++) {
 		std::string tn = *cur;
 
-		if (starts_with(&tn, pfx, pfx_len))
+		if (starts_with(&tn, pfx, pfx_len)) {
 			fprintf(req->fp[0], "%s %s\n",
 				tn.c_str() + pfx_len, ibx_id);
+			++req->nr_out;
+		}
 	}
 }
 
@@ -194,7 +242,6 @@ static bool cmd_dump_ibx(struct req *req)
 	}
 	req->asc = true;
 	req->sort_col = -1;
-	req->max = (unsigned long long)req->srch->db->get_doccount();
 	Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
 	for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
 		try {
@@ -208,8 +255,244 @@ static bool cmd_dump_ibx(struct req *req)
 		}
 	}
 	if (req->fp[1])
-		fprintf(req->fp[1], "mset.size=%llu\n",
-			(unsigned long long)mset.size());
+		fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+			(unsigned long long)mset.size(), req->nr_out);
+	return true;
+}
+
+struct fbuf {
+	FILE *fp;
+	char *ptr;
+	size_t len;
+};
+
+struct dump_roots_tmp {
+	struct stat sb;
+	void *mm_ptr;
+	char **entries;
+	struct fbuf wbuf;
+	int root2id_fd;
+};
+
+#define CLEANUP_FBUF __attribute__((__cleanup__(fbuf_ensure)))
+static void fbuf_ensure(void *ptr)
+{
+	struct fbuf *fbuf = (struct fbuf *)ptr;
+	if (fbuf->fp && fclose(fbuf->fp))
+		perror("fclose(fbuf->fp)");
+	fbuf->fp = NULL;
+	free(fbuf->ptr);
+}
+
+static bool fbuf_init(struct fbuf *fbuf)
+{
+	assert(!fbuf->ptr);
+	fbuf->fp = open_memstream(&fbuf->ptr, &fbuf->len);
+	if (fbuf->fp) return true;
+	perror("open_memstream(fbuf)");
+	return false;
+}
+
+static void xclose(int fd)
+{
+	if (close(fd) < 0 && errno != EINTR)
+		err(EXIT_FAILURE, "BUG: close");
+}
+
+#define CLEANUP_DUMP_ROOTS __attribute__((__cleanup__(dump_roots_ensure)))
+static void dump_roots_ensure(void *ptr)
+{
+	struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr;
+	if (drt->root2id_fd >= 0)
+		xclose(drt->root2id_fd);
+	hdestroy(); // idempotent
+	if (drt->mm_ptr && munmap(drt->mm_ptr, drt->sb.st_size))
+		err(EXIT_FAILURE, "BUG: munmap");
+	free(drt->entries);
+	fbuf_ensure(&drt->wbuf);
+}
+
+static bool root2ids_str(struct fbuf *root_ids, struct dump_roots_tmp *drt,
+			Xapian::Document *doc)
+{
+	if (!fbuf_init(root_ids)) return false;
+
+	bool ok = true;
+	Xapian::TermIterator cur = doc->termlist_begin();
+	Xapian::TermIterator end = doc->termlist_end();
+	ENTRY e, *ep;
+	for (cur.skip_to("G"); cur != end; cur++) {
+		std::string tn = *cur;
+		if (!starts_with(&tn, "G", 1))
+			continue;
+		union { const char *in; char *out; } u;
+		u.in = tn.c_str() + 1;
+		e.key = u.out;
+		ep = hsearch(e, FIND);
+		if (!ep) {
+			warnx("hsearch miss `%s'", e.key);
+			return false;
+		}
+		// ep->data is a NUL-terminated string matching /[0-9]+/
+		fputc(' ', root_ids->fp);
+		fputs((const char *)ep->data, root_ids->fp);
+	}
+	fputc('\n', root_ids->fp);
+	if (ferror(root_ids->fp) | fclose(root_ids->fp)) {
+		perror("ferror|fclose(root_ids)");
+		ok = false;
+	}
+	root_ids->fp = NULL;
+	return ok;
+}
+
+// writes term values matching @pfx for a given @doc, ending the line
+// with the contents of @root_ids
+static void dump_roots_term(struct req *req, const char *pfx,
+				struct dump_roots_tmp *drt,
+				struct fbuf *root_ids,
+				Xapian::Document *doc)
+{
+	Xapian::TermIterator cur = doc->termlist_begin();
+	Xapian::TermIterator end = doc->termlist_end();
+	size_t pfx_len = strlen(pfx);
+
+	for (cur.skip_to(pfx); cur != end; cur++) {
+		std::string tn = *cur;
+		if (!starts_with(&tn, pfx, pfx_len))
+			continue;
+		fputs(tn.c_str() + pfx_len, drt->wbuf.fp);
+		fwrite(root_ids->ptr, root_ids->len, 1, drt->wbuf.fp);
+		++req->nr_out;
+	}
+}
+
+// we may have lines which exceed PIPE_BUF, so we do our own
+// buffering and rely on flock(2), here
+static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
+{
+	char *p;
+	int fd = fileno(req->fp[0]);
+	bool ok = true;
+
+	if (!drt->wbuf.fp) return true;
+	if (fd < 0) err(EXIT_FAILURE, "BUG: fileno");
+	if (fclose(drt->wbuf.fp)) {
+		warn("fclose(drt->wbuf.fp)"); // malloc failure?
+		return false;
+	}
+	drt->wbuf.fp = NULL;
+	if (!drt->wbuf.len) goto done_free;
+	if (flock(drt->root2id_fd, LOCK_EX)) {
+		perror("LOCK_EX");
+		return false;
+	}
+	p = drt->wbuf.ptr;
+	do {
+		ssize_t n = write(fd, p, drt->wbuf.len);
+		if (n > 0) {
+			drt->wbuf.len -= n;
+			p += n;
+		} else {
+			perror(n ? "write" : "write (zero bytes)");
+			return false;
+		}
+	} while (drt->wbuf.len);
+	if (flock(drt->root2id_fd, LOCK_UN)) {
+		perror("LOCK_UN");
+		return false;
+	}
+done_free:
+	free(drt->wbuf.ptr);
+	drt->wbuf.ptr = NULL;
+	return ok;
+}
+
+static bool cmd_dump_roots(struct req *req)
+{
+	CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 };
+	if ((optind + 1) >= req->argc) {
+		warnx("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR");
+		return false; // need file + qry_str
+	}
+	if (!req->pfxc) {
+		warnx("dump_roots requires -A PREFIX");
+		return false;
+	}
+	const char *root2id_file = req->argv[optind];
+	drt.root2id_fd = open(root2id_file, O_RDONLY);
+	if (drt.root2id_fd < 0) {
+		warn("open(%s)", root2id_file);
+		return false;
+	}
+	if (fstat(drt.root2id_fd, &drt.sb)) {
+		warn("fstat(%s)", root2id_file);
+		return false;
+	}
+	// each entry is at least 43 bytes ({OIDHEX}\0{INT}\0),
+	// so /32 overestimates the number of expected entries by
+	// ~%25 (as recommended by Linux hcreate(3) manpage)
+	size_t est = (drt.sb.st_size / 32) + 1;
+	if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) {
+		warnx("%s size too big (%lld bytes > %zu)", root2id_file,
+			(long long)drt.sb.st_size, SIZE_MAX);
+		return false;
+	}
+	drt.mm_ptr = mmap(NULL, drt.sb.st_size, PROT_READ,
+				MAP_PRIVATE, drt.root2id_fd, 0);
+	if (drt.mm_ptr == MAP_FAILED) {
+		warn("mmap(%s)", root2id_file);
+		return false;
+	}
+	drt.entries = (char **)calloc(est * 2, sizeof(char *));
+	if (!drt.entries) {
+		warn("calloc(%zu * 2, %zu)", est, sizeof(char *));
+		return false;
+	}
+	size_t tot = split2argv(drt.entries, (char *)drt.mm_ptr,
+				drt.sb.st_size, est * 2);
+	if (tot <= 0) return false; // split2argv already warned on error
+	if (!hcreate(est)) {
+		warn("hcreate(%zu)", est);
+		return false;
+	}
+	for (size_t i = 0; i < tot; ) {
+		ENTRY e;
+		e.key = drt.entries[i++];
+		e.data = drt.entries[i++];
+		if (!hsearch(e, ENTER)) {
+			warn("hsearch(%s => %s, ENTER)", e.key,
+				(const char *)e.data);
+			return false;
+		}
+	}
+	req->asc = true;
+	req->sort_col = -1;
+	Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
+	for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+		CLEANUP_FBUF struct fbuf root_ids = { 0 };
+		if (!drt.wbuf.fp && !fbuf_init(&drt.wbuf))
+			return false;
+		try {
+			Xapian::Document doc = i.get_document();
+			if (!root2ids_str(&root_ids, &drt, &doc))
+				return false;
+			for (int p = 0; p < req->pfxc; p++)
+				dump_roots_term(req, req->pfxv[p], &drt,
+						&root_ids, &doc);
+		} catch (const Xapian::Error & e) {
+			fprintf(orig_err, "W: %s (#%ld)\n",
+				e.get_description().c_str(), (long)(*i));
+			continue;
+		}
+		if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt))
+			return false;
+	}
+	if (!dump_roots_flush(req, &drt))
+		return false;
+	if (req->fp[1])
+		fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+			(unsigned long long)mset.size(), req->nr_out);
 	return true;
 }
 
@@ -228,7 +511,8 @@ static const struct cmd_entry {
 	cmd fn;
 } cmds[] = { // should be small enough to not need bsearch || gperf
 	// most common commands first
-	CMD(dump_ibx),
+	CMD(dump_ibx), // many inboxes
+	CMD(dump_roots), // per-cidx shard
 	CMD(test_inspect), // least common commands last
 };
 
@@ -240,12 +524,6 @@ union my_cmsg {
 	char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
 };
 
-static void xclose(int fd)
-{
-	if (close(fd) < 0 && errno != EINTR)
-		err(EXIT_FAILURE, "BUG: close");
-}
-
 static bool recv_req(struct req *req, char *rbuf, size_t *len)
 {
 	union my_cmsg cmsg = { 0 };
@@ -306,28 +584,6 @@ static bool recv_req(struct req *req, char *rbuf, size_t *len)
 	return false;
 }
 
-#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
-static int split2argv(char **dst, char *buf, size_t len, size_t limit)
-{
-	if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
-		warnx("bogus argument given");
-		return 0;
-	}
-	size_t nr = 0;
-	char *c = buf;
-	for (size_t i = 1; i < len; i++) {
-		if (!buf[i]) {
-			dst[nr++] = c;
-			c = buf + i + 1;
-		}
-		if (nr == limit) {
-			warnx("too many args: %zu", nr);
-			return 0;
-		}
-	}
-	return (int)nr;
-}
-
 static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
 {
 	const struct srch *a = (const struct srch *)pa;
@@ -355,7 +611,7 @@ static bool srch_init(struct req *req)
 	char *dirv[MY_ARG_MAX];
 	int i;
 	struct srch *srch = req->srch;
-	int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
+	int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
 	const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
 	srch->qp_flags = FLAG_PHRASE |
 			Xapian::QueryParser::FLAG_BOOLEAN |
@@ -538,7 +794,7 @@ static void recv_loop(void) // worker process loop
 				perror("W: setlinebuf(req.fp[1])");
 			stderr = req.fp[1];
 		}
-		req.argc = SPLIT2ARGV(req.argv, rbuf, len);
+		req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
 		if (req.argc > 0)
 			dispatch(&req);
 		if (ferror(req.fp[0]) | fclose(req.fp[0]))
diff --git a/t/xap_helper.t b/t/xap_helper.t
index f00a845a..92da2e6d 100644
--- a/t/xap_helper.t
+++ b/t/xap_helper.t
@@ -91,7 +91,7 @@ my $test = sub {
 	my $res = do { local $/; <$r> };
 	is(join('', @res), $res, 'got identical response w/ error pipe');
 	my $stats = do { local $/; <$err_rd> };
-	is($stats, "mset.size=6\n", 'mset.size reported');
+	is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported');
 
 	if ($arg[-1] !~ /\('-j0'\)/) {
 		kill('KILL', $cinfo{pid});
@@ -105,12 +105,14 @@ my $test = sub {
 };
 my $ar;
 
-$ar = $test->(qw[-MPublicInbox::XapHelper -e
-		PublicInbox::XapHelper::start('-j0')]);
-$ar = $test->(qw[-MPublicInbox::XapHelper -e
-		PublicInbox::XapHelper::start('-j1')]);
-
-my @NO_CXX = (0);
+my @NO_CXX;
+if (!$ENV{TEST_XH_CXX_ONLY}) {
+	$ar = $test->(qw[-MPublicInbox::XapHelper -e
+			PublicInbox::XapHelper::start('-j0')]);
+	$ar = $test->(qw[-MPublicInbox::XapHelper -e
+			PublicInbox::XapHelper::start('-j1')]);
+	push @NO_CXX, 0;
+}
 SKIP: {
 	eval {
 		require PublicInbox::XapHelperCxx;
@@ -125,6 +127,20 @@ SKIP: {
 			PublicInbox::XapHelperCxx::start('-j1')]);
 };
 
+require PublicInbox::CodeSearch;
+my $cs_int = PublicInbox::CodeSearch->new("$crepo/public-inbox-cindex");
+my $root2id_file = "$tmp/root2id";
+my @id2root;
+{
+	open my $fh, '>', $root2id_file;
+	my $i = -1;
+	for ($cs_int->all_terms('G')) {
+		print $fh $_, "\0", ++$i, "\0";
+		$id2root[$i] = $_;
+	}
+	close $fh;
+}
+
 for my $n (@NO_CXX) {
 	local $ENV{PI_NO_CXX} = $n;
 	my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');
@@ -141,7 +157,19 @@ for my $n (@NO_CXX) {
 	my $res = do { local $/; <$r> };
 	is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})");
 	my $err = do { local $/; <$err_r> };
-	is($err, "mset.size=1\n", "got expected status ($xhc->{impl})");
+	is($err, "mset.size=1 nr_out=2\n", "got expected status ($xhc->{impl})");
+
+	pipe($err_r, $err_w);
+	$r = $xhc->mkreq([ undef, $err_w ], qw(dump_roots -c -A XDFID),
+			(map { ('-d', $_) } @int),
+			$root2id_file, 'dt:19700101'.'000000..');
+	close $err_w;
+	my @res = <$r>;
+	is(scalar(@res), 5, 'got expected rows');
+	is(scalar(@res), scalar(grep(/\A[0-9a-f]{40,} [0-9]+\n\z/, @res)),
+		'entries match format');
+	$err = do { local $/; <$err_r> };
+	is($err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})");
 }
 
 done_testing;

^ permalink raw reply related	[relevance 2%]

* [PATCH 0/7] cindex: optional C++ Xapian helper
@ 2023-08-24  1:22  7% Eric Wong
  2023-08-24  1:22  2% ` [PATCH 6/7] cindex: implement dump_roots in C++ Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2023-08-24  1:22 UTC (permalink / raw)
  To: meta

Associating inboxes with coderepos is an extremely expensive
operation, especially for Perl (even with XS or SWIG) as Perl's
method dispatch overhead to dump data out of Xapian becomes
noticeable.

The actual association is fast with POSIX sort(1) and join(1);
but getting the necessary data out of Xapian to join on is
expensive as neither quest(1) nor xapian-delve(1) are suitable
for this task.

The actual association data isn't stored or usable anywhere,
yet, and some of them are too loose to be useful.  More
work is required on that point....

The association could probably be faster with rculfhash (from
Userspace-RCU), but I don't think it's worth the maintenance and
installation overhead for this (though I intend to use rculfhash
for the FUSE shim).

These performance problems weren't as noticeable in the past
since our other Xapian uses spent significant amounts of time
when retrieving document data from SQLite and blobs from git.

Using the C++ implementation of xap_helper.h allows a full join
(without limits or date ranges) of lore + git.kernel.org repos
within one hour on my ancient system while the Perl+(XS|SWIG)
implementation took roughly 8 hours.

Eric Wong (7):
  search: hoist out shards_dir for future use
  cindex: read-only association dump
  cindex: add --show-roots switch
  introduce optional C++ xap_helper
  cindex: fix sorting and uniqueness
  cindex: implement dump_roots in C++
  xap_helper: reopen+retry in MSetIterator loops

 MANIFEST                            |   7 +
 lib/PublicInbox/CidxRecvIbx.pm      |  46 ++
 lib/PublicInbox/CidxXapHelperAux.pm |  44 ++
 lib/PublicInbox/CodeSearch.pm       |  54 +-
 lib/PublicInbox/CodeSearchIdx.pm    | 349 ++++++++--
 lib/PublicInbox/Config.pm           |   2 +-
 lib/PublicInbox/Isearch.pm          |   5 +
 lib/PublicInbox/Search.pm           |  92 ++-
 lib/PublicInbox/XapClient.pm        |  50 ++
 lib/PublicInbox/XapHelper.pm        | 226 +++++++
 lib/PublicInbox/XapHelperCxx.pm     |  93 +++
 lib/PublicInbox/xap_helper.h        | 947 ++++++++++++++++++++++++++++
 script/public-inbox-cindex          |   4 +-
 t/xap_helper.t                      | 175 +++++
 14 files changed, 2021 insertions(+), 73 deletions(-)
 create mode 100644 lib/PublicInbox/CidxRecvIbx.pm
 create mode 100644 lib/PublicInbox/CidxXapHelperAux.pm
 create mode 100644 lib/PublicInbox/XapClient.pm
 create mode 100644 lib/PublicInbox/XapHelper.pm
 create mode 100644 lib/PublicInbox/XapHelperCxx.pm
 create mode 100644 lib/PublicInbox/xap_helper.h
 create mode 100644 t/xap_helper.t

^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2023-08-24  1:22  7% [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
2023-08-24  1:22  2% ` [PATCH 6/7] cindex: implement dump_roots in C++ Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).