From c7380dad6bb9eab26641c5ec2707431ed573de3c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 23 May 2019 09:37:00 +0000 Subject: compact: reuse infrastructure from xcpdb Since -xcpdb is a superset of -compact, we can reuse much of that code used for driving compact. For compact (only), this is slightly less memory efficient since it requires an extra process per-partition, but we get to prefix the output with the partition name for more readable output. --- lib/PublicInbox/Xapcmd.pm | 132 ++++++++++++++++++++++++-------------------- script/public-inbox-compact | 6 +- script/public-inbox-xcpdb | 3 +- 3 files changed, 76 insertions(+), 65 deletions(-) diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm index 06389dd0..488c6165 100644 --- a/lib/PublicInbox/Xapcmd.pm +++ b/lib/PublicInbox/Xapcmd.pm @@ -19,7 +19,7 @@ sub commit_changes ($$$) { my $reindex = $opt->{reindex}; my $im = $ibx->importer(0); - $im->lock_acquire if $reindex; + $im->lock_acquire if !$opt->{-coarse_lock}; while (my ($old, $new) = each %$tmp) { my @st = stat($old) or die "failed to stat($old): $!\n"; @@ -40,7 +40,7 @@ sub commit_changes ($$$) { remove_tree($prev) or die "failed to remove $prev: $!\n"; } $tmp->done; - if ($reindex) { + if (!$opt->{-coarse_lock}) { $opt->{-skip_lock} = 1; PublicInbox::Admin::index_inbox($ibx, $opt); # implicit lock_release @@ -49,18 +49,13 @@ sub commit_changes ($$$) { } } -sub xspawn { - my ($cmd, $env, $opt) = @_; - if (ref($cmd->[0]) eq 'CODE') { - my $cb = shift(@$cmd); # $cb = cpdb() - defined(my $pid = fork) or die "fork: $!"; - return $pid if $pid > 0; - eval { $cb->($cmd, $env, $opt) }; - die $@ if $@; - exit 0; - } else { - spawn($cmd, $env, $opt); - } +sub cb_spawn { + my ($cb, $args, $opt) = @_; # $cb = cpdb() or compact() + defined(my $pid = fork) or die "fork: $!"; + return $pid if $pid > 0; + eval { $cb->($args, $opt) }; + die $@ if $@; + exit 0; } sub runnable_or_die ($) { @@ -108,29 +103,27 @@ sub same_fs_or_die ($$) { } sub run { - my ($ibx, $cmd, $env, $opt) = @_; + my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact' + my $cb = \&${\"PublicInbox::Xapcmd::$task"}; progress_prepare($opt ||= {}); my $dir = $ibx->{mainrepo} or die "no mainrepo in inbox\n"; - my $exe = $cmd->[0]; runnable_or_die($XAPIAN_COMPACT) if $opt->{compact}; - my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } } my $from; # per-epoch ranges - if (ref($exe) eq 'CODE') { + if (!$opt->{-coarse_lock}) { $reindex = $opt->{reindex} = {}; $from = $reindex->{from} = []; require Search::Xapian::WritableDatabase; - } else { - runnable_or_die($exe); } + $ibx->umask_prepare; my $old = $ibx->search->xdir(1); -d $old or die "$old does not exist\n"; my $tmp = PublicInbox::Xtmpdirs->new; my $v = $ibx->{version} ||= 1; - my @cmds; + my @q; # we want temporary directories to be as deep as possible, # so v2 partitions can keep "xap$SCHEMA_VERSION" on a separate FS. @@ -138,7 +131,7 @@ sub run { my $old_parent = dirname($old); same_fs_or_die($old_parent, $old); $tmp->{$old} = tempdir('xapcmd-XXXXXXXX', DIR => $old_parent); - push @cmds, [ @$cmd, $old, $tmp->{$old} ]; + push @q, [ $old, $tmp->{$old} ]; } else { opendir my $dh, $old or die "Failed to opendir $old: $!\n"; while (defined(my $dn = readdir($dh))) { @@ -147,7 +140,7 @@ sub run { my $dst = tempdir($tmpl, DIR => $old); same_fs_or_die($old, $dst); my $cur = "$old/$dn"; - push @cmds, [@$cmd, $cur, $dst ]; + push @q, [ $cur, $dst ]; $tmp->{$cur} = $dst; } elsif ($dn eq '.' || $dn eq '..') { } elsif ($dn =~ /\Aover\.sqlite3/) { @@ -155,30 +148,31 @@ sub run { warn "W: skipping unknown dir: $old/$dn\n" } } - die "No Xapian parts found in $old\n" unless @cmds; + die "No Xapian parts found in $old\n" unless @q; } my $im = $ibx->importer(0); - my $max = $opt->{jobs} || scalar(@cmds); + my $max = $opt->{jobs} || scalar(@q); $ibx->with_umask(sub { $im->lock_acquire; # fine-grained locking if we prepare for reindex - if ($reindex) { + if (!$opt->{-coarse_lock}) { prepare_reindex($ibx, $reindex); $im->lock_release; } + delete($ibx->{$_}) for (qw(mm over search)); # cleanup my %pids; - while (@cmds) { - while (scalar(keys(%pids)) < $max && scalar(@cmds)) { - my $x = shift @cmds; - $pids{xspawn($x, $env, $opt)} = $x; + while (@q) { + while (scalar(keys(%pids)) < $max && scalar(@q)) { + my $args = shift @q; + $pids{cb_spawn($cb, $args, $opt)} = $args; } while (scalar keys %pids) { my $pid = waitpid(-1, 0); - my $x = delete $pids{$pid}; - die join(' ', @$x)." failed: $?\n" if $?; + my $args = delete $pids{$pid}; + die join(' ', @$args)." failed: $?\n" if $?; } } commit_changes($ibx, $tmp, $opt); @@ -199,10 +193,51 @@ sub cpdb_retryable ($$) { 0; } +sub progress_pfx ($) { + my @p = split('/', $_[0]); + + # return "xap15/0" for v2, or "xapian15" for v1: + ($p[-1] =~ /\A\d+\z/) ? "$p[-2]/$p[-1]" : $p[-1]; +} + +# xapian-compact wrapper +sub compact ($$) { + my ($args, $opt) = @_; + my ($src, $dst) = @$args; + my ($r, $w); + my $pfx = $opt->{-progress_pfx} ||= progress_pfx($src); + my $pr = $opt->{-progress}; + my $rdr = {}; + + foreach my $fd (0..2) { + defined(my $dfd = $opt->{$fd}) or next; + $rdr->{$fd} = $dfd; + } + if ($pr) { + $pr->("$pfx compacting...\n"); + $rdr->{1} = fileno($w) if pipe($r, $w); + } + + # we rely on --no-renumber to keep docids synched to NNTP + my $cmd = [ $XAPIAN_COMPACT, '--no-renumber', $src, $dst ]; + my $pid = spawn($cmd, undef, $rdr); + if ($pr) { + close $w or die "close: \$w: $!"; + foreach (<$r>) { + s/\r/\r$pfx /g; + $pr->("$pfx $_"); + } + } + my $rp = waitpid($pid, 0); + if ($? || $rp != $pid) { + die join(' ', @$cmd)." failed: $? (pid=$pid, reaped=$rp)\n"; + } +} + # Like copydatabase(1), this is horribly slow; and it doesn't seem due # to the overhead of Perl. -sub cpdb { - my ($args, $env, $opt) = @_; +sub cpdb ($$) { + my ($args, $opt) = @_; my ($old, $new) = @$args; my $src = Search::Xapian::Database->new($old); my $tmp = $opt->{compact} ? "$new.compact" : $new; @@ -212,9 +247,9 @@ sub cpdb { my $creat = Search::Xapian::DB_CREATE(); my $dst = Search::Xapian::WritableDatabase->new($tmp, $creat); my ($it, $end); - my $pfx = ''; my ($nr, $tot, $fmt); # progress output my $pr = $opt->{-progress}; + my $pfx = $opt->{-progress_pfx} = progress_pfx($old); do { eval { @@ -227,8 +262,6 @@ sub cpdb { if ($pr) { $nr = 0; $tot = $src->get_doccount; - my @p = split('/', $old); - $pfx = "$p[-2]/$p[-1]:"; $fmt = "$pfx % ".length($tot)."u/$tot\n"; $pr->("$pfx copying $tot documents\n"); } @@ -259,32 +292,9 @@ sub cpdb { $src = $dst = undef; # flushes and closes - $pr->("$pfx compacting...\n") if $pr; # this is probably the best place to do xapian-compact # since $dst isn't readable by HTTP or NNTP clients, yet: - my $cmd = [ $XAPIAN_COMPACT, '--no-renumber', $tmp, $new ]; - my $rdr = {}; - foreach my $fd (0..2) { - defined(my $dst = $opt->{$fd}) or next; - $rdr->{$fd} = $dst; - } - - my ($r, $w); - if ($pr && pipe($r, $w)) { - $rdr->{1} = fileno($w); - } - my $pid = spawn($cmd, $env, $rdr); - if ($pr) { - close $w or die "close: \$w: $!"; - foreach (<$r>) { - s/\r/\r$pfx /g; - $pr->("$pfx $_"); - } - } - my $rp = waitpid($pid, 0); - if ($? || $rp != $pid) { - die join(' ', @$cmd)." failed: $? (pid=$pid, reaped=$rp)\n"; - } + compact([ $tmp, $new ], $opt); remove_tree($tmp) or die "failed to remove $tmp: $!\n"; } diff --git a/script/public-inbox-compact b/script/public-inbox-compact index 709fb92a..4f58d5af 100755 --- a/script/public-inbox-compact +++ b/script/public-inbox-compact @@ -3,14 +3,16 @@ # License: AGPL-3.0+ use strict; use warnings; +use Getopt::Long qw(:config gnu_getopt no_ignore_case auto_abbrev); use PublicInbox::InboxWritable; use PublicInbox::Xapcmd; use PublicInbox::Admin; PublicInbox::Admin::require_or_die('-index'); my $usage = "Usage: public-inbox-compact REPO_DIR\n"; +my $opt = { compact => 1, -coarse_lock => 1 }; +GetOptions($opt, qw(quiet|q)) or die "bad command-line args\n$usage"; my @ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV) or die $usage; foreach (@ibxs) { my $ibx = PublicInbox::InboxWritable->new($_); - # we rely on --no-renumber to keep docids synched to NNTP - PublicInbox::Xapcmd::run($ibx, [qw(xapian-compact --no-renumber)]); + PublicInbox::Xapcmd::run($ibx, 'compact', $opt); } diff --git a/script/public-inbox-xcpdb b/script/public-inbox-xcpdb index 5b66337b..bda7be0c 100755 --- a/script/public-inbox-xcpdb +++ b/script/public-inbox-xcpdb @@ -11,9 +11,8 @@ my $usage = "Usage: public-inbox-xcpdb INBOX_DIR\n"; my $opt = {}; GetOptions($opt, qw(compact quiet|q)) or die "bad command-line args\n$usage"; my @ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV) or die $usage; -my $cmd = [ \&PublicInbox::Xapcmd::cpdb ]; foreach (@ibxs) { my $ibx = PublicInbox::InboxWritable->new($_); # we rely on --no-renumber to keep docids synched to NNTP - PublicInbox::Xapcmd::run($ibx, $cmd, undef, $opt); + PublicInbox::Xapcmd::run($ibx, 'cpdb', $opt); } -- cgit v1.2.3-24-ge0c7