From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 21/28] cindex: improve granularity of quit checks
Date: Tue, 21 Mar 2023 23:07:36 +0000 [thread overview]
Message-ID: <20230321230743.3020032-21-e@80x24.org> (raw)
In-Reply-To: <20230321230743.3020032-1-e@80x24.org>
This fixes shutdown handling when shard_index() isn't running
and ensures we can shut down the process more quickly.
---
lib/PublicInbox/CodeSearchIdx.pm | 55 ++++++++++++++++++++------------
1 file changed, 34 insertions(+), 21 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index f0b506da..4f91e0b6 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -206,6 +206,7 @@ sub shard_index { # via wq_io_do
# local-ized in parent before fork
$TXN_BYTES = $batch_bytes;
local $self->{git} = $git; # for patchid
+ return if $DO_QUIT;
my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
close $in or die "close: $!";
my $nr = 0;
@@ -216,10 +217,10 @@ sub shard_index { # via wq_io_do
my $len;
my $cmt = {};
local $/ = $FS;
- my $buf = <$rd> // return; # leading $FS
+ my $buf = <$rd> // return close($rd); # leading $FS
$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
$self->begin_txn_lazy;
- while (defined($buf = <$rd>)) {
+ while (!$DO_QUIT && defined($buf = <$rd>)) {
chomp($buf);
$/ = "\n";
$len = length($buf);
@@ -234,7 +235,6 @@ sub shard_index { # via wq_io_do
$TXN_BYTES = $batch_bytes - $len;
}
add_commit($self, $cmt);
- last if $DO_QUIT;
++$nr;
if ($TXN_BYTES <= 0) {
cidx_ckpoint($self, "[$n] $nr");
@@ -298,6 +298,7 @@ sub run_todo ($) {
sub need_reap { # post_loop_do
my (undef, $jobs) = @_;
+ return if !$LIVE || $DO_QUIT;
scalar(keys(%$LIVE)) > $jobs;
}
@@ -412,7 +413,7 @@ sub check_existing { # retry_reopen callback
sub partition_refs ($$$) {
my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
- my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
+ my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
close $refs or die "close: $!";
my ($seen, $nchange) = (0, 0);
my @shard_in = map {
@@ -421,7 +422,7 @@ sub partition_refs ($$$) {
$fh;
} @RDONLY_SHARDS;
- while (defined(my $cmt = <$fh>)) {
+ while (defined(my $cmt = <$rfh>)) {
chomp $cmt;
my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
@@ -431,8 +432,13 @@ sub partition_refs ($$$) {
++$nchange;
$seen = 0;
}
+ if ($DO_QUIT) {
+ close($rfh);
+ return ();
+ }
}
- close($fh);
+ close($rfh);
+ return () if $DO_QUIT;
if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
$self->{nchange} += $nchange;
progress($self, "$git->{git_dir}: $nchange commits");
@@ -454,9 +460,18 @@ sub shard_commit { # via wq_io_do
sub consumers_open { # post_loop_do
my (undef, $consumers) = @_;
+ return if $DO_QUIT;
scalar(grep { $_->{sock} } values %$consumers);
}
+sub wait_consumers ($$$) {
+ my ($self, $git, $consumers) = @_;
+ local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+ PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+ my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+ die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
+}
+
sub commit_used_shards ($$$) {
my ($self, $git, $consumers) = @_;
local $self->{-shard_ok} = {};
@@ -466,15 +481,12 @@ sub commit_used_shards ($$$) {
$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
$consumers->{$n} = $c;
}
- local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
- PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
- my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
- die "E: $git->{git_dir} $n shards failed" if $n;
+ wait_consumers($self, $git, $consumers);
}
sub index_repo { # cidx_await cb
my ($self, $git, $roots) = @_;
- return if $git->{-cidx_err};
+ return if $git->{-cidx_err} || $DO_QUIT;
my $repo = delete $git->{-repo} or return;
seek($roots, 0, SEEK_SET) or die "seek: $!";
chomp(my @roots = <$roots>);
@@ -484,31 +496,29 @@ sub index_repo { # cidx_await cb
local $self->{current_info} = $git->{git_dir};
my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
- my %CONSUMERS;
+ my $consumers = {};
for my $n (0..$#shard_in) {
-s $shard_in[$n] or next;
+ last if $DO_QUIT;
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{shard_done} = [ $self ];
$IDX_SHARDS[$n]->wq_io_do('shard_index',
[ $shard_in[$n], $p->{op_p} ],
$git, $n, \@roots);
- $CONSUMERS{$n} = $c;
+ $consumers->{$n} = $c;
}
@shard_in = ();
- local @PublicInbox::DS::post_loop_do = (\&consumers_open, \%CONSUMERS);
- PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
- my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
- die "E: $git->{git_dir} $n shards failed" if $n;
+ wait_consumers($self, $git, $consumers);
if ($DO_QUIT) {
- commit_used_shards($self, $git, \%CONSUMERS);
+ commit_used_shards($self, $git, $consumers);
progress($self, "$git->{git_dir}: done");
return;
}
$repo->{git_dir} = $git->{git_dir};
my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
if ($id > 0) {
- $CONSUMERS{$repo->{shard_n}} = undef;
- commit_used_shards($self, $git, \%CONSUMERS);
+ $consumers->{$repo->{shard_n}} = undef;
+ commit_used_shards($self, $git, $consumers);
progress($self, "$git->{git_dir}: done");
return run_todo($self);
}
@@ -585,6 +595,7 @@ sub scan_git_dirs ($) {
$self, $git);
fp_start($self, $git, $prep_repo);
ct_start($self, $git, $prep_repo);
+ last if $DO_QUIT;
}
cidx_reap($self, 0);
}
@@ -674,8 +685,10 @@ sub ipc_atfork_child {
sub shard_done_wait { # awaitpid cb via ipc_worker_reap
my ($pid, $shard, $self) = @_;
+ my $quit_req = delete($shard->{-cidx_quit});
+ return if $DO_QUIT || !$LIVE;
if ($? == 0) { # success
- delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+ $quit_req // warn 'BUG: {-cidx_quit} unset';
return;
}
warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
next prev parent reply other threads:[~2023-03-21 23:07 UTC|newest]
Thread overview: 30+ messages / expand[flat|nested] mbox.gz Atom feed top
2023-03-21 23:07 [PATCH 00/28] cindex coderepo commit indexer Eric Wong
2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
2023-03-21 23:07 ` [PATCH 02/28] search: relocate all_terms from lei_search Eric Wong
2023-03-21 23:07 ` [PATCH 03/28] admin: hoist out resolve_git_dir Eric Wong
2023-03-21 23:07 ` [PATCH 04/28] admin: ensure resolved GIT_DIR is absolute Eric Wong
2023-03-21 23:07 ` [PATCH 05/28] test_common: create_inbox: use `$!' properly on mkdir failure Eric Wong
2023-03-21 23:07 ` [PATCH 06/28] codesearch: initial cut w/ -cindex tool Eric Wong
2023-03-21 23:07 ` [PATCH 07/28] cindex: parallelize prep phases Eric Wong
2023-03-21 23:07 ` [PATCH 08/28] cindex: use read-only shards during " Eric Wong
2023-03-21 23:07 ` [PATCH 09/28] searchidxshard: improve comment wording Eric Wong
2023-03-21 23:07 ` [PATCH 10/28] cindex: use DS and workqueues for parallelism Eric Wong
2023-03-21 23:07 ` [PATCH 11/28] ds: @post_loop_do replaces SetPostLoopCallback Eric Wong
2023-03-21 23:07 ` [PATCH 12/28] cindex: implement --exclude= like -clone Eric Wong
2023-03-21 23:07 ` [PATCH 13/28] cindex: show shard number in progress message Eric Wong
2023-03-21 23:07 ` [PATCH 14/28] cindex: drop `unchanged' " Eric Wong
2023-03-21 23:07 ` [PATCH 15/28] cindex: handle graceful shutdown by default Eric Wong
2023-03-21 23:07 ` [PATCH 16/28] sigfd: pass signal name rather than number to callback Eric Wong
2023-03-21 23:07 ` [PATCH 17/28] cindex: implement --max-size=SIZE Eric Wong
2023-03-21 23:07 ` [PATCH 18/28] cindex: check for checkpoint before giant messages Eric Wong
2023-03-21 23:07 ` [PATCH 19/28] cindex: truncate or drop body for over-sized commits Eric Wong
2023-03-21 23:07 ` [PATCH 20/28] cindex: attempt to give oldest commits lowest docids Eric Wong
2023-03-21 23:07 ` Eric Wong [this message]
2023-03-21 23:07 ` [PATCH 22/28] spawn: show failing directory for chdir failures Eric Wong
2023-03-21 23:07 ` [PATCH 23/28] cindex: filter out non-existent git directories Eric Wong
2023-03-21 23:07 ` [PATCH 24/28] cindex: add support for --prune Eric Wong
2023-03-21 23:07 ` [PATCH 25/28] cindex: implement reindex Eric Wong
2023-03-21 23:07 ` [PATCH 26/28] cindex: squelch incompatible options Eric Wong
2023-03-21 23:07 ` [PATCH 27/28] cindex: respect existing permissions Eric Wong
2023-03-21 23:07 ` [PATCH 28/28] cindex: ignore SIGPIPE Eric Wong
2023-03-24 10:40 ` [PATCH 29/28] cindex: --prune checkpoints to avoid OOM Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20230321230743.3020032-21-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).