From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id CE7921F885 for ; Sun, 26 Jan 2020 01:17:44 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 3/3] xapcmd: increase scope of lock Date: Sun, 26 Jan 2020 01:17:44 +0000 Message-Id: <20200126011744.6278-4-e@yhbt.net> In-Reply-To: <20200126011744.6278-1-e@yhbt.net> References: <20200126011744.6278-1-e@yhbt.net> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: The old lock scope was only sufficient for protecting against concurrent modifications from the common -mda, -watch, or -learn writers. It was not sufficient for protecting against parallel -compact or -xcpdb invocations from eager admins. Most of the time this only leads to confusing and misleading warning messages, but parallel xcpdb --reshard could lead to errors. --- lib/PublicInbox/Xapcmd.pm | 59 +++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm index 19c6ff07..2864b0d4 100644 --- a/lib/PublicInbox/Xapcmd.pm +++ b/lib/PublicInbox/Xapcmd.pm @@ -122,7 +122,8 @@ sub same_fs_or_die ($$) { } sub process_queue { - my ($queue, $cb, $max, $opt) = @_; + my ($queue, $cb, $opt) = @_; + my $max = $opt->{jobs} || scalar(@$queue); if ($max <= 1) { while (defined(my $args = shift @$queue)) { $cb->($args, $opt); @@ -152,36 +153,18 @@ sub setup_signals () { $SIG{HUP} = $SIG{PIPE} = $SIG{TERM} = sub { exit(1) }; } -sub run { - my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact' - my $cb = \&${\"PublicInbox::Xapcmd::$task"}; - PublicInbox::Admin::progress_prepare($opt ||= {}); - my $dir = $ibx->{inboxdir} or die "no inboxdir in inbox\n"; - runnable_or_die($XAPIAN_COMPACT) if $opt->{compact}; - my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } } - my $from; # per-epoch ranges - - if (!$opt->{-coarse_lock}) { - $reindex = $opt->{reindex} = {}; - $from = $reindex->{from} = []; - require PublicInbox::SearchIdx; - PublicInbox::SearchIdx::load_xapian_writable(); - } +sub prepare_run { + my ($ibx, $opt) = @_; + my $tmp = {}; # old shard dir => File::Temp->newdir object or undef + my @queue; # ([old//src,newdir]) - list of args for cpdb() or compact() - $ibx->umask_prepare; my $old = $ibx->search->xdir(1); -d $old or die "$old does not exist\n"; - - my $tmp = {}; - my @q; my $reshard = $opt->{reshard}; if (defined $reshard && $reshard <= 0) { die "--reshard must be a positive number\n"; } - local %SIG = %SIG; - setup_signals(); - # we want temporary directories to be as deep as possible, # so v2 shards can keep "xap$SCHEMA_VERSION" on a separate FS. if ($ibx->version == 1) { @@ -194,7 +177,7 @@ sub run { my $v = PublicInbox::Search::SCHEMA_VERSION(); my $wip = File::Temp->newdir("xapian$v-XXXXXXXX", DIR => $dir); $tmp->{$old} = $wip; - push @q, [ $old, $wip ]; + push @queue, [ $old, $wip ]; } else { opendir my $dh, $old or die "Failed to opendir $old: $!\n"; my @old_shards; @@ -223,7 +206,7 @@ sub run { my $wip = File::Temp->newdir($tmpl, DIR => $old); same_fs_or_die($old, $wip->dirname); my $cur = "$old/$dn"; - push @q, [ $src // $cur , $wip ]; + push @queue, [ $src // $cur , $wip ]; $tmp->{$cur} = $wip; } # mark old shards to be unlinked @@ -231,10 +214,32 @@ sub run { $tmp->{$_} ||= undef for @$src; } } - my $max = $opt->{jobs} || scalar(@q); + ($tmp, \@queue); +} + +sub run { + my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact' + my $cb = \&${\"PublicInbox::Xapcmd::$task"}; + PublicInbox::Admin::progress_prepare($opt ||= {}); + defined(my $dir = $ibx->{inboxdir}) or die "no inboxdir defined\n"; + -d $dir or die "inboxdir=$dir does not exist\n"; + runnable_or_die($XAPIAN_COMPACT) if $opt->{compact}; + my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } } + + if (!$opt->{-coarse_lock}) { + $reindex = $opt->{reindex} = {}; + $reindex->{from} = []; # per-epoch ranges + require PublicInbox::SearchIdx; + PublicInbox::SearchIdx::load_xapian_writable(); + } + + local %SIG = %SIG; + setup_signals(); + $ibx->umask_prepare; $ibx->with_umask(sub { my $im = $ibx->importer(0); $im->lock_acquire; + my ($tmp, $queue) = prepare_run($ibx, $opt); # fine-grained locking if we prepare for reindex if (!$opt->{-coarse_lock}) { @@ -243,7 +248,7 @@ sub run { } $ibx->cleanup; - process_queue(\@q, $cb, $max, $opt); + process_queue($queue, $cb, $opt); $im->lock_acquire if !$opt->{-coarse_lock}; commit_changes($ibx, $im, $tmp, $opt); });