* [PATCH 15/28] cindex: handle graceful shutdown by default
@ 2023-03-21 23:07 6% ` Eric Wong
0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
To: meta
While individual Xapian shards are consistent due to the use of
Xapian transactions, the data across shards still needs to be
in a consistent state for our search to work.
---
lib/PublicInbox/CodeSearchIdx.pm | 71 +++++++++++++++++++++-----------
1 file changed, 48 insertions(+), 23 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 1a472b64..82f90368 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -38,6 +38,8 @@ our (
$LIVE_JOBS, # integer
$MY_SIG, # like %SIG
$SIGSET,
+ $TXN_BYTES, # number of bytes in current shard transaction
+ $DO_QUIT, # signal number
@RDONLY_SHARDS, # Xapian::Database
@IDX_SHARDS # clones of self
);
@@ -153,18 +155,14 @@ sub store_repo { # wq_do - returns docid
sub shard_index { # via wq_io_do
my ($self, $git, $n, $roots) = @_;
local $self->{current_info} = "$git->{git_dir} [$n]";
- my ($quit, $cmt);
+ my $cmt;
local $self->{roots} = $roots;
my $in = delete($self->{0}) // die 'BUG: no {0} input';
my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
my $batch_bytes = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
- my $max = $batch_bytes;
- my $set_quit = sub { $quit = shift };
- local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
- local $SIG{QUIT} = $set_quit;
- local $SIG{TERM} = $set_quit;
- local $SIG{INT} = $set_quit;
+ # local-ized in parent before fork
+ $TXN_BYTES = $batch_bytes;
local $self->{git} = $git; # for patchid
my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
close $in or die "close: $!";
@@ -179,22 +177,23 @@ sub shard_index { # via wq_io_do
$self->begin_txn_lazy;
while (defined($buf = <$rd>)) {
chomp($buf);
- $max -= length($buf);
+ $TXN_BYTES -= length($buf);
@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
$/ = "\n";
add_commit($self, $cmt);
- last if $quit; # likely SIGPIPE
+ last if $DO_QUIT;
++$nr;
- if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
+ if ($TXN_BYTES <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
progress($self, "[$n] $nr");
$self->{xdb}->commit_transaction;
- $max = $batch_bytes;
+ $TXN_BYTES = $batch_bytes;
$self->{xdb}->begin_transaction;
}
$/ = $FS;
}
close($rd);
- if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
+ if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+ ($? & 127) == POSIX::SIGPIPE))) {
send($op_p, "shard_done $n", MSG_EOR);
} else {
warn "E: git @LOG_STDIN: \$?=$?\n";
@@ -254,7 +253,7 @@ sub need_reap { # post_loop_do
sub cidx_reap ($$) {
my ($self, $jobs) = @_;
while (run_todo($self)) {}
- local @PublicInbox::DS::post_loop_do = \(&need_reap, $jobs);
+ local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs);
while (need_reap(undef, $jobs)) {
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
}
@@ -263,7 +262,7 @@ sub cidx_reap ($$) {
sub cidx_await_cb { # awaitpid cb
my ($pid, $cb, $self, $git, @args) = @_;
- return if !$LIVE; # premature shutdown
+ return if !$LIVE || $DO_QUIT;
my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
if ($?) {
@@ -283,7 +282,7 @@ sub cidx_await ($$$$$@) {
# only care about --heads (branches) and --tags, and not even their names
sub fp_start ($$$) {
my ($self, $git, $prep_repo) = @_;
- return if !$LIVE; # premature exit
+ return if !$LIVE || $DO_QUIT;
cidx_reap($self, $LIVE_JOBS);
open my $refs, '+>', undef or die "open: $!";
my $cmd = ['git', "--git-dir=$git->{git_dir}",
@@ -305,7 +304,7 @@ sub fp_fini { # cidx_await cb
sub ct_start ($$$) {
my ($self, $git, $prep_repo) = @_;
- return if !$LIVE; # premature exit
+ return if !$LIVE || $DO_QUIT;
cidx_reap($self, $LIVE_JOBS);
my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
qw[for-each-ref --sort=-committerdate
@@ -325,7 +324,7 @@ sub ct_fini { # cidx_await cb
# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
sub prep_repo ($$) {
my ($self, $git) = @_;
- return if !$LIVE || $git->{-cidx_err}; # premature exit
+ return if !$LIVE || $DO_QUIT || $git->{-cidx_err};
my $repo = $git->{-repo} // die 'BUG: no {-repo}';
if (!defined($repo->{ct})) {
warn "W: $git->{git_dir} has no commits, skipping\n";
@@ -449,6 +448,11 @@ sub index_repo { # cidx_await cb
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
die "E: $git->{git_dir} $n shards failed" if $n;
+ if ($DO_QUIT) {
+ commit_used_shards($self, $git, \%CONSUMERS);
+ progress($self, "$git->{git_dir}: done");
+ return;
+ }
$repo->{git_dir} = $git->{git_dir};
my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
if ($id > 0) {
@@ -462,7 +466,7 @@ sub index_repo { # cidx_await cb
sub get_roots ($$) {
my ($self, $git) = @_;
- return if !$LIVE; # premature exit
+ return if !$LIVE || $DO_QUIT;
my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
sysseek($refs, 0, SEEK_SET) or die "seek: $!";
open my $roots, '+>', undef or die "open: $!";
@@ -489,6 +493,10 @@ sub load_existing ($) { # for -u/--update
@$dirs = grep { !$uniq{$_}++ } @$dirs;
}
+# SIG handlers:
+sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() }
+sub shard_usr1 { $TXN_BYTES = -1 }
+
sub cidx_init ($) {
my ($self) = @_;
my $dir = $self->{cidx_dir};
@@ -498,12 +506,13 @@ sub cidx_init ($) {
}
$self->lock_acquire;
my @shards;
+ local $TXN_BYTES;
for my $n (0..($self->{nshard} - 1)) {
my $shard = bless { %$self, shard => $n }, ref($self);
delete @$shard{qw(lockfh lock_path)};
$shard->idx_acquire;
$shard->idx_release;
- $shard->wq_workers_start("shard[$n]", 1, undef, {
+ $shard->wq_workers_start("shard[$n]", 1, $SIGSET, {
siblings => \@shards, # for ipc_atfork_child
}, \&shard_done_wait, $self);
push @shards, $shard;
@@ -533,6 +542,15 @@ sub shards_active { # post_loop_do
scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
}
+# signal handlers
+sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS }
+
+sub parent_quit {
+ $DO_QUIT = $_[0];
+ kill_shards(@_);
+ warn "# SIG$_[0] received, quitting...\n";
+}
+
sub cidx_run { # main entry point
my ($self) = @_;
local $self->{todo} = [];
@@ -541,13 +559,15 @@ sub cidx_run { # main entry point
my $restore = PublicInbox::OnDestroy->new($$,
\&PublicInbox::DS::sig_setmask, $SIGSET);
local $LIVE = {};
+ local $DO_QUIT;
local @IDX_SHARDS = cidx_init($self);
local $self->{current_info} = '';
- my $cb = $SIG{__WARN__} || \&CORE::warn;
local $MY_SIG = {
CHLD => \&PublicInbox::DS::enqueue_reap,
- INT => sub { exit },
+ USR1 => \&kill_shards,
};
+ $MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT);
+ my $cb = $SIG{__WARN__} || \&CORE::warn;
local $SIG{__WARN__} = sub {
my $m = shift @_;
$self->{current_info} eq '' or
@@ -594,14 +614,19 @@ sub cidx_run { # main entry point
sub ipc_atfork_child {
my ($self) = @_;
$self->SUPER::ipc_atfork_child;
+ $SIG{USR1} = \&shard_usr1;
+ $SIG{$_} = \&shard_quit for qw(INT TERM QUIT);
my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
$_->wq_close for @$x;
+ undef;
}
sub shard_done_wait { # awaitpid cb via ipc_worker_reap
my ($pid, $shard, $self) = @_;
- delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
- return unless $?;
+ if ($? == 0) { # success
+ delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+ return;
+ }
warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
++$self->{shard_err} if defined($self->{shard_err});
}
^ permalink raw reply related [relevance 6%]
* [PATCH 00/28] cindex coderepo commit indexer
@ 2023-03-21 23:07 7% Eric Wong
0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
To: meta
Not wired up to WWW nor lei, yet; but indexing + pruning of
commits works.
I'm not sure if indexing (root) tree OIDs or committer
names+emails is worth it, since I don't think those are very
important terms to search for.
I first wanted to shoehorn this into extindex, but I think it
works better as a separate Xapian schema.
It allows both internal indexes ($GIT_DIR/public-inbox-cindex)
for unforked repos, as well as extindex-style external index
to encompass several projects.
The indexer is structured a bit more nicely than existing
indexers since I'm relying on OnDestroy and `local', more.
I would like to trickle some of these improvements back to
the mail indexers at some point.
--prune and --reindex currently block incremental updates, which
isn't great since both take a while for giant Xapian DBs.
Pruning is pretty important since it's much common for coderepos
(e.g. `seen' branch of git.git)
`lei cq' will probably be a new command which behaves
similarly to `lei q -f text', but takes `git log' options
for output...
Eric Wong (28):
ipc: move nproc_shards from v2writable
search: relocate all_terms from lei_search
admin: hoist out resolve_git_dir
admin: ensure resolved GIT_DIR is absolute
test_common: create_inbox: use `$!' properly on mkdir failure
codesearch: initial cut w/ -cindex tool
cindex: parallelize prep phases
cindex: use read-only shards during prep phases
searchidxshard: improve comment wording
cindex: use DS and workqueues for parallelism
ds: @post_loop_do replaces SetPostLoopCallback
cindex: implement --exclude= like -clone
cindex: show shard number in progress message
cindex: drop `unchanged' progress message
cindex: handle graceful shutdown by default
sigfd: pass signal name rather than number to callback
cindex: implement --max-size=SIZE
cindex: check for checkpoint before giant messages
cindex: truncate or drop body for over-sized commits
cindex: attempt to give oldest commits lowest docids
cindex: improve granularity of quit checks
spawn: show failing directory for chdir failures
cindex: filter out non-existent git directories
cindex: add support for --prune
cindex: implement reindex
cindex: squelch incompatible options
cindex: respect existing permissions
cindex: ignore SIGPIPE
MANIFEST | 4 +
lib/PublicInbox/Admin.pm | 18 +-
lib/PublicInbox/CodeSearch.pm | 121 +++++
lib/PublicInbox/CodeSearchIdx.pm | 835 ++++++++++++++++++++++++++++++
lib/PublicInbox/Config.pm | 2 +-
lib/PublicInbox/DS.pm | 30 +-
lib/PublicInbox/Daemon.pm | 4 +-
lib/PublicInbox/ExtSearchIdx.pm | 2 +-
lib/PublicInbox/IPC.pm | 33 +-
lib/PublicInbox/LEI.pm | 4 +-
lib/PublicInbox/LeiSearch.pm | 14 -
lib/PublicInbox/MiscIdx.pm | 2 +-
lib/PublicInbox/Search.pm | 77 ++-
lib/PublicInbox/SearchIdx.pm | 88 ++--
lib/PublicInbox/SearchIdxShard.pm | 7 +-
lib/PublicInbox/Sigfd.pm | 10 +-
lib/PublicInbox/Spawn.pm | 6 +-
lib/PublicInbox/SpawnPP.pm | 2 +-
lib/PublicInbox/TestCommon.pm | 47 +-
lib/PublicInbox/V2Writable.pm | 26 +-
lib/PublicInbox/Watch.pm | 2 +-
script/public-inbox-cindex | 86 +++
script/public-inbox-convert | 2 +-
t/cindex.t | 134 +++++
t/dir_idle.t | 6 +-
t/ds-leak.t | 8 +-
t/imapd.t | 6 +-
t/nntpd.t | 2 +-
t/sigfd.t | 7 +-
t/watch_maildir.t | 8 +-
xt/mem-imapd-tls.t | 7 +-
xt/mem-nntpd-tls.t | 8 +-
xt/net_writer-imap.t | 4 +-
33 files changed, 1424 insertions(+), 188 deletions(-)
create mode 100644 lib/PublicInbox/CodeSearch.pm
create mode 100644 lib/PublicInbox/CodeSearchIdx.pm
create mode 100755 script/public-inbox-cindex
create mode 100644 t/cindex.t
^ permalink raw reply [relevance 7%]
Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2023-03-21 23:07 7% [PATCH 00/28] cindex coderepo commit indexer Eric Wong
2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
2023-03-21 23:07 6% ` [PATCH 15/28] cindex: handle graceful shutdown by default Eric Wong
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).