about summary refs log tree commit homepage
path: root/lib/PublicInbox/Xapcmd.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2019-05-23 09:37:00 +0000
committerEric Wong <e@80x24.org>2019-05-23 17:43:50 +0000
commitc7380dad6bb9eab26641c5ec2707431ed573de3c (patch)
tree39f97521df166ace6abb6b7140a1b6b82d1b2793 /lib/PublicInbox/Xapcmd.pm
parented52051030b89985a1ec032ed4acc74912b0dd80 (diff)
downloadpublic-inbox-c7380dad6bb9eab26641c5ec2707431ed573de3c.tar.gz
Since -xcpdb is a superset of -compact, we can reuse much of
that code used for driving compact.

For compact (only), this is slightly less memory efficient since
it requires an extra process per-partition, but we get to prefix
the output with the partition name for more readable output.
Diffstat (limited to 'lib/PublicInbox/Xapcmd.pm')
-rw-r--r--lib/PublicInbox/Xapcmd.pm132
1 files changed, 71 insertions, 61 deletions
diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm
index 06389dd0..488c6165 100644
--- a/lib/PublicInbox/Xapcmd.pm
+++ b/lib/PublicInbox/Xapcmd.pm
@@ -19,7 +19,7 @@ sub commit_changes ($$$) {
 
         my $reindex = $opt->{reindex};
         my $im = $ibx->importer(0);
-        $im->lock_acquire if $reindex;
+        $im->lock_acquire if !$opt->{-coarse_lock};
 
         while (my ($old, $new) = each %$tmp) {
                 my @st = stat($old) or die "failed to stat($old): $!\n";
@@ -40,7 +40,7 @@ sub commit_changes ($$$) {
                 remove_tree($prev) or die "failed to remove $prev: $!\n";
         }
         $tmp->done;
-        if ($reindex) {
+        if (!$opt->{-coarse_lock}) {
                 $opt->{-skip_lock} = 1;
                 PublicInbox::Admin::index_inbox($ibx, $opt);
                 # implicit lock_release
@@ -49,18 +49,13 @@ sub commit_changes ($$$) {
         }
 }
 
-sub xspawn {
-        my ($cmd, $env, $opt) = @_;
-        if (ref($cmd->[0]) eq 'CODE') {
-                my $cb = shift(@$cmd); # $cb = cpdb()
-                defined(my $pid = fork) or die "fork: $!";
-                return $pid if $pid > 0;
-                eval { $cb->($cmd, $env, $opt) };
-                die $@ if $@;
-                exit 0;
-        } else {
-                spawn($cmd, $env, $opt);
-        }
+sub cb_spawn {
+        my ($cb, $args, $opt) = @_; # $cb = cpdb() or compact()
+        defined(my $pid = fork) or die "fork: $!";
+        return $pid if $pid > 0;
+        eval { $cb->($args, $opt) };
+        die $@ if $@;
+        exit 0;
 }
 
 sub runnable_or_die ($) {
@@ -108,29 +103,27 @@ sub same_fs_or_die ($$) {
 }
 
 sub run {
-        my ($ibx, $cmd, $env, $opt) = @_;
+        my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact'
+        my $cb = \&${\"PublicInbox::Xapcmd::$task"};
         progress_prepare($opt ||= {});
         my $dir = $ibx->{mainrepo} or die "no mainrepo in inbox\n";
-        my $exe = $cmd->[0];
         runnable_or_die($XAPIAN_COMPACT) if $opt->{compact};
-
         my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } }
         my $from; # per-epoch ranges
 
-        if (ref($exe) eq 'CODE') {
+        if (!$opt->{-coarse_lock}) {
                 $reindex = $opt->{reindex} = {};
                 $from = $reindex->{from} = [];
                 require Search::Xapian::WritableDatabase;
-        } else {
-                runnable_or_die($exe);
         }
+
         $ibx->umask_prepare;
         my $old = $ibx->search->xdir(1);
         -d $old or die "$old does not exist\n";
 
         my $tmp = PublicInbox::Xtmpdirs->new;
         my $v = $ibx->{version} ||= 1;
-        my @cmds;
+        my @q;
 
         # we want temporary directories to be as deep as possible,
         # so v2 partitions can keep "xap$SCHEMA_VERSION" on a separate FS.
@@ -138,7 +131,7 @@ sub run {
                 my $old_parent = dirname($old);
                 same_fs_or_die($old_parent, $old);
                 $tmp->{$old} = tempdir('xapcmd-XXXXXXXX', DIR => $old_parent);
-                push @cmds, [ @$cmd, $old, $tmp->{$old} ];
+                push @q, [ $old, $tmp->{$old} ];
         } else {
                 opendir my $dh, $old or die "Failed to opendir $old: $!\n";
                 while (defined(my $dn = readdir($dh))) {
@@ -147,7 +140,7 @@ sub run {
                                 my $dst = tempdir($tmpl, DIR => $old);
                                 same_fs_or_die($old, $dst);
                                 my $cur = "$old/$dn";
-                                push @cmds, [@$cmd, $cur, $dst ];
+                                push @q, [ $cur, $dst ];
                                 $tmp->{$cur} = $dst;
                         } elsif ($dn eq '.' || $dn eq '..') {
                         } elsif ($dn =~ /\Aover\.sqlite3/) {
@@ -155,30 +148,31 @@ sub run {
                                 warn "W: skipping unknown dir: $old/$dn\n"
                         }
                 }
-                die "No Xapian parts found in $old\n" unless @cmds;
+                die "No Xapian parts found in $old\n" unless @q;
         }
         my $im = $ibx->importer(0);
-        my $max = $opt->{jobs} || scalar(@cmds);
+        my $max = $opt->{jobs} || scalar(@q);
         $ibx->with_umask(sub {
                 $im->lock_acquire;
 
                 # fine-grained locking if we prepare for reindex
-                if ($reindex) {
+                if (!$opt->{-coarse_lock}) {
                         prepare_reindex($ibx, $reindex);
                         $im->lock_release;
                 }
+
                 delete($ibx->{$_}) for (qw(mm over search)); # cleanup
                 my %pids;
-                while (@cmds) {
-                        while (scalar(keys(%pids)) < $max && scalar(@cmds)) {
-                                my $x = shift @cmds;
-                                $pids{xspawn($x, $env, $opt)} = $x;
+                while (@q) {
+                        while (scalar(keys(%pids)) < $max && scalar(@q)) {
+                                my $args = shift @q;
+                                $pids{cb_spawn($cb, $args, $opt)} = $args;
                         }
 
                         while (scalar keys %pids) {
                                 my $pid = waitpid(-1, 0);
-                                my $x = delete $pids{$pid};
-                                die join(' ', @$x)." failed: $?\n" if $?;
+                                my $args = delete $pids{$pid};
+                                die join(' ', @$args)." failed: $?\n" if $?;
                         }
                 }
                 commit_changes($ibx, $tmp, $opt);
@@ -199,10 +193,51 @@ sub cpdb_retryable ($$) {
         0;
 }
 
+sub progress_pfx ($) {
+        my @p = split('/', $_[0]);
+
+        # return "xap15/0" for v2, or "xapian15" for v1:
+        ($p[-1] =~ /\A\d+\z/) ? "$p[-2]/$p[-1]" : $p[-1];
+}
+
+# xapian-compact wrapper
+sub compact ($$) {
+        my ($args, $opt) = @_;
+        my ($src, $dst) = @$args;
+        my ($r, $w);
+        my $pfx = $opt->{-progress_pfx} ||= progress_pfx($src);
+        my $pr = $opt->{-progress};
+        my $rdr = {};
+
+        foreach my $fd (0..2) {
+                defined(my $dfd = $opt->{$fd}) or next;
+                $rdr->{$fd} = $dfd;
+        }
+        if ($pr) {
+                $pr->("$pfx compacting...\n");
+                $rdr->{1} = fileno($w) if pipe($r, $w);
+        }
+
+        # we rely on --no-renumber to keep docids synched to NNTP
+        my $cmd = [ $XAPIAN_COMPACT, '--no-renumber', $src, $dst ];
+        my $pid = spawn($cmd, undef, $rdr);
+        if ($pr) {
+                close $w or die "close: \$w: $!";
+                foreach (<$r>) {
+                        s/\r/\r$pfx /g;
+                        $pr->("$pfx $_");
+                }
+        }
+        my $rp = waitpid($pid, 0);
+        if ($? || $rp != $pid) {
+                die join(' ', @$cmd)." failed: $? (pid=$pid, reaped=$rp)\n";
+        }
+}
+
 # Like copydatabase(1), this is horribly slow; and it doesn't seem due
 # to the overhead of Perl.
-sub cpdb {
-        my ($args, $env, $opt) = @_;
+sub cpdb ($$) {
+        my ($args, $opt) = @_;
         my ($old, $new) = @$args;
         my $src = Search::Xapian::Database->new($old);
         my $tmp = $opt->{compact} ? "$new.compact" : $new;
@@ -212,9 +247,9 @@ sub cpdb {
         my $creat = Search::Xapian::DB_CREATE();
         my $dst = Search::Xapian::WritableDatabase->new($tmp, $creat);
         my ($it, $end);
-        my $pfx = '';
         my ($nr, $tot, $fmt); # progress output
         my $pr = $opt->{-progress};
+        my $pfx = $opt->{-progress_pfx} = progress_pfx($old);
 
         do {
                 eval {
@@ -227,8 +262,6 @@ sub cpdb {
                         if ($pr) {
                                 $nr = 0;
                                 $tot = $src->get_doccount;
-                                my @p = split('/', $old);
-                                $pfx = "$p[-2]/$p[-1]:";
                                 $fmt = "$pfx % ".length($tot)."u/$tot\n";
                                 $pr->("$pfx copying $tot documents\n");
                         }
@@ -259,32 +292,9 @@ sub cpdb {
 
         $src = $dst = undef; # flushes and closes
 
-        $pr->("$pfx compacting...\n") if $pr;
         # this is probably the best place to do xapian-compact
         # since $dst isn't readable by HTTP or NNTP clients, yet:
-        my $cmd = [ $XAPIAN_COMPACT, '--no-renumber', $tmp, $new ];
-        my $rdr = {};
-        foreach my $fd (0..2) {
-                defined(my $dst = $opt->{$fd}) or next;
-                $rdr->{$fd} = $dst;
-        }
-
-        my ($r, $w);
-        if ($pr && pipe($r, $w)) {
-                $rdr->{1} = fileno($w);
-        }
-        my $pid = spawn($cmd, $env, $rdr);
-        if ($pr) {
-                close $w or die "close: \$w: $!";
-                foreach (<$r>) {
-                        s/\r/\r$pfx /g;
-                        $pr->("$pfx $_");
-                }
-        }
-        my $rp = waitpid($pid, 0);
-        if ($? || $rp != $pid) {
-                die join(' ', @$cmd)." failed: $? (pid=$pid, reaped=$rp)\n";
-        }
+        compact([ $tmp, $new ], $opt);
         remove_tree($tmp) or die "failed to remove $tmp: $!\n";
 }