From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id AA9D120383 for ; Thu, 23 May 2019 09:37:14 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 25/26] xcpdb|compact: support --jobs/-j flag like gmake(1) Date: Thu, 23 May 2019 09:37:03 +0000 Message-Id: <20190523093704.18367-26-e@80x24.org> In-Reply-To: <20190523093704.18367-1-e@80x24.org> References: <20190523093704.18367-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We don't have to be tied to the number of partitions in case we made a bad choice at initialization. This doesn't affect reindexing, but the copying phase is already intensive. And optimize away the extra process when we only have a single job which won't parallelize. The wording for the (v2) reindexing phase could be improved, later. I also plan to allow repartitioning of existing Xapian DBs. --- lib/PublicInbox/Xapcmd.pm | 44 +++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm index 5b6d06b..a294d53 100644 --- a/lib/PublicInbox/Xapcmd.pm +++ b/lib/PublicInbox/Xapcmd.pm @@ -13,7 +13,7 @@ use File::Basename qw(dirname); # support testing with dev versions of Xapian which installs # commands with a version number suffix (e.g. "xapian-compact-1.5") our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact'; -our @COMPACT_OPT = qw(quiet|q blocksize|b=s no-full|n fuller|F); +our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F); sub commit_changes ($$$) { my ($ibx, $tmp, $opt) = @_; @@ -54,8 +54,7 @@ sub cb_spawn { my ($cb, $args, $opt) = @_; # $cb = cpdb() or compact() defined(my $pid = fork) or die "fork: $!"; return $pid if $pid > 0; - eval { $cb->($args, $opt) }; - die $@ if $@; + $cb->($args, $opt); exit 0; } @@ -103,6 +102,31 @@ sub same_fs_or_die ($$) { die "$x and $y reside on different filesystems\n"; } +sub process_queue { + my ($queue, $cb, $max, $opt) = @_; + if ($max <= 1) { + while (defined(my $args = shift @$queue)) { + $cb->($args, $opt); + } + return; + } + + # run in parallel: + my %pids; + while (@$queue) { + while (scalar(keys(%pids)) < $max && scalar(@$queue)) { + my $args = shift @$queue; + $pids{cb_spawn($cb, $args, $opt)} = $args; + } + + while (scalar keys %pids) { + my $pid = waitpid(-1, 0); + my $args = delete $pids{$pid}; + die join(' ', @$args)." failed: $?\n" if $?; + } + } +} + sub run { my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact' my $cb = \&${\"PublicInbox::Xapcmd::$task"}; @@ -163,19 +187,7 @@ sub run { } delete($ibx->{$_}) for (qw(mm over search)); # cleanup - my %pids; - while (@q) { - while (scalar(keys(%pids)) < $max && scalar(@q)) { - my $args = shift @q; - $pids{cb_spawn($cb, $args, $opt)} = $args; - } - - while (scalar keys %pids) { - my $pid = waitpid(-1, 0); - my $args = delete $pids{$pid}; - die join(' ', @$args)." failed: $?\n" if $?; - } - } + process_queue(\@q, $cb, $max, $opt); commit_changes($ibx, $tmp, $opt); }); } -- EW