From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id D460C1F452 for ; Wed, 25 Oct 2023 15:33:49 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1698248029; bh=3SMBvT5YiEp4pY+O+e0hr1WQKXj1l0ogthkjgINTunE=; h=From:To:Subject:Date:In-Reply-To:References:From; b=H2LYBtQPwonHxHTdIVbqrfAiIF5uYn3TUCni2hRw4Fu7kuQqQycvO/vEJtZy0kTaS dWNuOa+390tF0vs0oqB6INs0ua4SdSmKHCQM/TQzHZxzlZt1PUF1T4OEequWNUzigR wjNxV07AZMvO5ZKxGK4xn/HxHpHBVL08DuTndgZU= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 1/3] cindex: store coderepo data asynchronously Date: Wed, 25 Oct 2023 15:33:47 +0000 Message-ID: <20231025153349.3247178-2-e@80x24.org> In-Reply-To: <20231025153349.3247178-1-e@80x24.org> References: <20231025153349.3247178-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: While it's typically fast to store coderepo data, pathological latency on HDDs can let us use that delay to get other work done. --- lib/PublicInbox/CodeSearchIdx.pm | 69 +++++++++++++++++--------------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index aeee37c0..f2fd28e3 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -193,8 +193,9 @@ sub progress { $pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n"); } -sub store_repo { # wq_do - returns docid +sub store_repo { # wq_io_do, sends docid back my ($self, $repo) = @_; + my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; $self->begin_txn_lazy; $self->{xdb}->delete_document($_) for @{$repo->{to_delete}}; my $doc = $PublicInbox::Search::X{Document}->new; @@ -203,12 +204,10 @@ sub store_repo { # wq_do - returns docid $doc->add_boolean_term('T'.'r'); $doc->add_boolean_term('G'.$_) for @{$repo->{roots}}; $doc->set_data($repo->{fp}); # \n delimited - if ($repo->{docid}) { - $self->{xdb}->replace_document($repo->{docid}, $doc); - $repo->{docid}; - } else { - $self->{xdb}->add_document($doc); - } + my $did = $repo->{docid}; + $did ? $self->{xdb}->replace_document($did, $doc) + : ($did = $self->{xdb}->add_document($doc)); + send($op_p, "repo_stored $did", 0); } sub cidx_ckpoint ($;$) { @@ -322,6 +321,17 @@ sub shard_done { # called via PktOp on shard_index completion $repo_ctx->{shard_ok}->{$n} = 1; } +sub repo_stored { + my ($self, $repo_ctx, $did) = @_; + $did > 0 or die "BUG: $repo_ctx->{repo}->{git_dir}: docid=$did"; + my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx); + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{shard_done} = [ $self, $repo_ctx, $next ]; + # shard_done fires when all shards are committed + my @active = keys %{$repo_ctx->{active}}; + $IDX_SHARDS[$_]->wq_io_do('shard_commit', [ $p->{op_p} ]) for @active; +} + sub prune_done { # called via prune_do completion my ($self, $n) = @_; return if $DO_QUIT || !$PRUNE_DONE; @@ -584,37 +594,30 @@ sub index_next ($) { sub next_repos { # OnDestroy cb my ($repo_ctx) = @_; - progress($repo_ctx->{self}, "$repo_ctx->{repo}->{git_dir}: done"); - return if $DO_QUIT; - if ($REPO_CTX) { - $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx"; - $REPO_CTX = undef; - index_next($repo_ctx->{self}); - } + my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)}; + progress($self, "$repo->{git_dir}: done"); + return if $DO_QUIT || !$REPO_CTX; + my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active; + die "E: $repo->{git_dir} $n shards failed" if $n; + $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx"; + $REPO_CTX = undef; + index_next($self); } -sub commit_shard { # OnDestroy cb +sub index_done { # OnDestroy cb called when done indexing each code repo my ($repo_ctx) = @_; my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)}; + return if $DO_QUIT; my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active; - die "E: $repo->{git_dir} $n shards failed" if $n && !$DO_QUIT; - - $repo_ctx->{shard_ok} = {}; - if (!$DO_QUIT) { - my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', - $repo); - (!defined($id) || $id <= 0) and - die "E: store_repo $repo->{git_dir}: id=$id"; - $active->{$repo->{shard_n}} = undef; - } - my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx); + die "E: $repo->{git_dir} $n shards failed" if $n; + $repo_ctx->{shard_ok} = {}; # reset for future shard_done + $n = $repo->{shard_n}; + $active->{$n} = undef; my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $repo_ctx->{self}, $repo_ctx, $next ]; - for my $n (keys %$active) { - $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]); - } - # shard_done fires when all shards are committed + $c->{ops}->{repo_stored} = [ $self, $repo_ctx ]; + $IDX_SHARDS[$n]->wq_io_do('store_repo', [ $p->{op_p} ], $repo); + # repo_stored will fire once store_repo is done } sub index_repo { # run_git cb @@ -637,10 +640,10 @@ sub index_repo { # run_git cb $repo->{git_dir} = $git->{git_dir}; my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo }; delete $git->{-cidx_gits_fini}; # may fire gits_fini - my $commit_shard = PublicInbox::OnDestroy->new($$, \&commit_shard, + my $index_done = PublicInbox::OnDestroy->new($$, \&index_done, $repo_ctx); my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $self, $repo_ctx, $commit_shard ]; + $c->{ops}->{shard_done} = [ $self, $repo_ctx, $index_done ]; for my $n (0..$#shard_in) { $shard_in[$n]->flush or die "flush shard[$n]: $!"; -s $shard_in[$n] or next;