user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 15/28] cindex: handle graceful shutdown by default
  @ 2023-03-21 23:07  6%   ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

While individual Xapian shards are consistent due to the use of
Xapian transactions, the data across shards still needs to be
in a consistent state for our search to work.
---
 lib/PublicInbox/CodeSearchIdx.pm | 71 +++++++++++++++++++++-----------
 1 file changed, 48 insertions(+), 23 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 1a472b64..82f90368 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -38,6 +38,8 @@ our (
 	$LIVE_JOBS, # integer
 	$MY_SIG, # like %SIG
 	$SIGSET,
+	$TXN_BYTES, # number of bytes in current shard transaction
+	$DO_QUIT, # signal number
 	@RDONLY_SHARDS, # Xapian::Database
 	@IDX_SHARDS # clones of self
 );
@@ -153,18 +155,14 @@ sub store_repo { # wq_do - returns docid
 sub shard_index { # via wq_io_do
 	my ($self, $git, $n, $roots) = @_;
 	local $self->{current_info} = "$git->{git_dir} [$n]";
-	my ($quit, $cmt);
+	my $cmt;
 	local $self->{roots} = $roots;
 	my $in = delete($self->{0}) // die 'BUG: no {0} input';
 	my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
 	my $batch_bytes = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
-	my $max = $batch_bytes;
-	my $set_quit = sub { $quit = shift };
-	local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
-	local $SIG{QUIT} = $set_quit;
-	local $SIG{TERM} = $set_quit;
-	local $SIG{INT} = $set_quit;
+	# local-ized in parent before fork
+	$TXN_BYTES = $batch_bytes;
 	local $self->{git} = $git; # for patchid
 	my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
 	close $in or die "close: $!";
@@ -179,22 +177,23 @@ sub shard_index { # via wq_io_do
 	$self->begin_txn_lazy;
 	while (defined($buf = <$rd>)) {
 		chomp($buf);
-		$max -= length($buf);
+		$TXN_BYTES -= length($buf);
 		@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
 		$/ = "\n";
 		add_commit($self, $cmt);
-		last if $quit; # likely SIGPIPE
+		last if $DO_QUIT;
 		++$nr;
-		if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
+		if ($TXN_BYTES <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
 			progress($self, "[$n] $nr");
 			$self->{xdb}->commit_transaction;
-			$max = $batch_bytes;
+			$TXN_BYTES = $batch_bytes;
 			$self->{xdb}->begin_transaction;
 		}
 		$/ = $FS;
 	}
 	close($rd);
-	if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
+	if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+				($? & 127) == POSIX::SIGPIPE))) {
 		send($op_p, "shard_done $n", MSG_EOR);
 	} else {
 		warn "E: git @LOG_STDIN: \$?=$?\n";
@@ -254,7 +253,7 @@ sub need_reap { # post_loop_do
 sub cidx_reap ($$) {
 	my ($self, $jobs) = @_;
 	while (run_todo($self)) {}
-	local @PublicInbox::DS::post_loop_do = \(&need_reap, $jobs);
+	local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs);
 	while (need_reap(undef, $jobs)) {
 		PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
 	}
@@ -263,7 +262,7 @@ sub cidx_reap ($$) {
 
 sub cidx_await_cb { # awaitpid cb
 	my ($pid, $cb, $self, $git, @args) = @_;
-	return if !$LIVE; # premature shutdown
+	return if !$LIVE || $DO_QUIT;
 	my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
 	PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
 	if ($?) {
@@ -283,7 +282,7 @@ sub cidx_await ($$$$$@) {
 # only care about --heads (branches) and --tags, and not even their names
 sub fp_start ($$$) {
 	my ($self, $git, $prep_repo) = @_;
-	return if !$LIVE; # premature exit
+	return if !$LIVE || $DO_QUIT;
 	cidx_reap($self, $LIVE_JOBS);
 	open my $refs, '+>', undef or die "open: $!";
 	my $cmd = ['git', "--git-dir=$git->{git_dir}",
@@ -305,7 +304,7 @@ sub fp_fini { # cidx_await cb
 
 sub ct_start ($$$) {
 	my ($self, $git, $prep_repo) = @_;
-	return if !$LIVE; # premature exit
+	return if !$LIVE || $DO_QUIT;
 	cidx_reap($self, $LIVE_JOBS);
 	my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
 		qw[for-each-ref --sort=-committerdate
@@ -325,7 +324,7 @@ sub ct_fini { # cidx_await cb
 # TODO: also index gitweb.owner and the full fingerprint for grokmirror?
 sub prep_repo ($$) {
 	my ($self, $git) = @_;
-	return if !$LIVE || $git->{-cidx_err}; # premature exit
+	return if !$LIVE || $DO_QUIT || $git->{-cidx_err};
 	my $repo = $git->{-repo} // die 'BUG: no {-repo}';
 	if (!defined($repo->{ct})) {
 		warn "W: $git->{git_dir} has no commits, skipping\n";
@@ -449,6 +448,11 @@ sub index_repo { # cidx_await cb
 	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
 	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
 	die "E: $git->{git_dir} $n shards failed" if $n;
+	if ($DO_QUIT) {
+		commit_used_shards($self, $git, \%CONSUMERS);
+		progress($self, "$git->{git_dir}: done");
+		return;
+	}
 	$repo->{git_dir} = $git->{git_dir};
 	my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
 	if ($id > 0) {
@@ -462,7 +466,7 @@ sub index_repo { # cidx_await cb
 
 sub get_roots ($$) {
 	my ($self, $git) = @_;
-	return if !$LIVE; # premature exit
+	return if !$LIVE || $DO_QUIT;
 	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
 	sysseek($refs, 0, SEEK_SET) or die "seek: $!";
 	open my $roots, '+>', undef or die "open: $!";
@@ -489,6 +493,10 @@ sub load_existing ($) { # for -u/--update
 	@$dirs = grep { !$uniq{$_}++ } @$dirs;
 }
 
+# SIG handlers:
+sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() }
+sub shard_usr1 { $TXN_BYTES = -1 }
+
 sub cidx_init ($) {
 	my ($self) = @_;
 	my $dir = $self->{cidx_dir};
@@ -498,12 +506,13 @@ sub cidx_init ($) {
 	}
 	$self->lock_acquire;
 	my @shards;
+	local $TXN_BYTES;
 	for my $n (0..($self->{nshard} - 1)) {
 		my $shard = bless { %$self, shard => $n }, ref($self);
 		delete @$shard{qw(lockfh lock_path)};
 		$shard->idx_acquire;
 		$shard->idx_release;
-		$shard->wq_workers_start("shard[$n]", 1, undef, {
+		$shard->wq_workers_start("shard[$n]", 1, $SIGSET, {
 			siblings => \@shards, # for ipc_atfork_child
 		}, \&shard_done_wait, $self);
 		push @shards, $shard;
@@ -533,6 +542,15 @@ sub shards_active { # post_loop_do
 	scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
 }
 
+# signal handlers
+sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS }
+
+sub parent_quit {
+	$DO_QUIT = $_[0];
+	kill_shards(@_);
+	warn "# SIG$_[0] received, quitting...\n";
+}
+
 sub cidx_run { # main entry point
 	my ($self) = @_;
 	local $self->{todo} = [];
@@ -541,13 +559,15 @@ sub cidx_run { # main entry point
 	my $restore = PublicInbox::OnDestroy->new($$,
 		\&PublicInbox::DS::sig_setmask, $SIGSET);
 	local $LIVE = {};
+	local $DO_QUIT;
 	local @IDX_SHARDS = cidx_init($self);
 	local $self->{current_info} = '';
-	my $cb = $SIG{__WARN__} || \&CORE::warn;
 	local $MY_SIG = {
 		CHLD => \&PublicInbox::DS::enqueue_reap,
-		INT => sub { exit },
+		USR1 => \&kill_shards,
 	};
+	$MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT);
+	my $cb = $SIG{__WARN__} || \&CORE::warn;
 	local $SIG{__WARN__} = sub {
 		my $m = shift @_;
 		$self->{current_info} eq '' or
@@ -594,14 +614,19 @@ sub cidx_run { # main entry point
 sub ipc_atfork_child {
 	my ($self) = @_;
 	$self->SUPER::ipc_atfork_child;
+	$SIG{USR1} = \&shard_usr1;
+	$SIG{$_} = \&shard_quit for qw(INT TERM QUIT);
 	my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
 	$_->wq_close for @$x;
+	undef;
 }
 
 sub shard_done_wait { # awaitpid cb via ipc_worker_reap
 	my ($pid, $shard, $self) = @_;
-	delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
-	return unless $?;
+	if ($? == 0) { # success
+		delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+		return;
+	}
 	warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
 	++$self->{shard_err} if defined($self->{shard_err});
 }

^ permalink raw reply related	[relevance 6%]

* [PATCH 00/28] cindex coderepo commit indexer
@ 2023-03-21 23:07  7% Eric Wong
    0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

Not wired up to WWW nor lei, yet; but indexing + pruning of
commits works.

I'm not sure if indexing (root) tree OIDs or committer
names+emails is worth it, since I don't think those are very
important terms to search for.

I first wanted to shoehorn this into extindex, but I think it
works better as a separate Xapian schema.

It allows both internal indexes ($GIT_DIR/public-inbox-cindex)
for unforked repos, as well as extindex-style external index
to encompass several projects.

The indexer is structured a bit more nicely than existing
indexers since I'm relying on OnDestroy and `local', more.
I would like to trickle some of these improvements back to
the mail indexers at some point.

--prune and --reindex currently block incremental updates, which
isn't great since both take a while for giant Xapian DBs.

Pruning is pretty important since it's much common for coderepos
(e.g. `seen' branch of git.git)

`lei cq' will probably be a new command which behaves
similarly to `lei q -f text', but takes `git log' options
for output...

Eric Wong (28):
  ipc: move nproc_shards from v2writable
  search: relocate all_terms from lei_search
  admin: hoist out resolve_git_dir
  admin: ensure resolved GIT_DIR is absolute
  test_common: create_inbox: use `$!' properly on mkdir failure
  codesearch: initial cut w/ -cindex tool
  cindex: parallelize prep phases
  cindex: use read-only shards during prep phases
  searchidxshard: improve comment wording
  cindex: use DS and workqueues for parallelism
  ds: @post_loop_do replaces SetPostLoopCallback
  cindex: implement --exclude= like -clone
  cindex: show shard number in progress message
  cindex: drop `unchanged' progress message
  cindex: handle graceful shutdown by default
  sigfd: pass signal name rather than number to callback
  cindex: implement --max-size=SIZE
  cindex: check for checkpoint before giant messages
  cindex: truncate or drop body for over-sized commits
  cindex: attempt to give oldest commits lowest docids
  cindex: improve granularity of quit checks
  spawn: show failing directory for chdir failures
  cindex: filter out non-existent git directories
  cindex: add support for --prune
  cindex: implement reindex
  cindex: squelch incompatible options
  cindex: respect existing permissions
  cindex: ignore SIGPIPE

 MANIFEST                          |   4 +
 lib/PublicInbox/Admin.pm          |  18 +-
 lib/PublicInbox/CodeSearch.pm     | 121 +++++
 lib/PublicInbox/CodeSearchIdx.pm  | 835 ++++++++++++++++++++++++++++++
 lib/PublicInbox/Config.pm         |   2 +-
 lib/PublicInbox/DS.pm             |  30 +-
 lib/PublicInbox/Daemon.pm         |   4 +-
 lib/PublicInbox/ExtSearchIdx.pm   |   2 +-
 lib/PublicInbox/IPC.pm            |  33 +-
 lib/PublicInbox/LEI.pm            |   4 +-
 lib/PublicInbox/LeiSearch.pm      |  14 -
 lib/PublicInbox/MiscIdx.pm        |   2 +-
 lib/PublicInbox/Search.pm         |  77 ++-
 lib/PublicInbox/SearchIdx.pm      |  88 ++--
 lib/PublicInbox/SearchIdxShard.pm |   7 +-
 lib/PublicInbox/Sigfd.pm          |  10 +-
 lib/PublicInbox/Spawn.pm          |   6 +-
 lib/PublicInbox/SpawnPP.pm        |   2 +-
 lib/PublicInbox/TestCommon.pm     |  47 +-
 lib/PublicInbox/V2Writable.pm     |  26 +-
 lib/PublicInbox/Watch.pm          |   2 +-
 script/public-inbox-cindex        |  86 +++
 script/public-inbox-convert       |   2 +-
 t/cindex.t                        | 134 +++++
 t/dir_idle.t                      |   6 +-
 t/ds-leak.t                       |   8 +-
 t/imapd.t                         |   6 +-
 t/nntpd.t                         |   2 +-
 t/sigfd.t                         |   7 +-
 t/watch_maildir.t                 |   8 +-
 xt/mem-imapd-tls.t                |   7 +-
 xt/mem-nntpd-tls.t                |   8 +-
 xt/net_writer-imap.t              |   4 +-
 33 files changed, 1424 insertions(+), 188 deletions(-)
 create mode 100644 lib/PublicInbox/CodeSearch.pm
 create mode 100644 lib/PublicInbox/CodeSearchIdx.pm
 create mode 100755 script/public-inbox-cindex
 create mode 100644 t/cindex.t

^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2023-03-21 23:07  7% [PATCH 00/28] cindex coderepo commit indexer Eric Wong
2023-03-21 23:07     ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
2023-03-21 23:07  6%   ` [PATCH 15/28] cindex: handle graceful shutdown by default Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).