about summary refs log tree commit homepage
path: root/lib/PublicInbox/Xapcmd.pm
diff options
authorEric Wong <e@80x24.org>2019-06-13 00:29:37 +0000
committerEric Wong <e@80x24.org>2019-06-14 01:31:25 +0000
commite9eb3af852778a67533e9579b14695763535d262 (patch)
tree09478c84234a4a7f99def3bbae8f4f7e19c390fe /lib/PublicInbox/Xapcmd.pm
parente665a4fa317bf9ceea812bc9ca3f486ec722dfea (diff)
v2 repos are sometimes created on machines where CPU
parallelization exceeds the capability of the storage devices.

In that case, users may reshard the Xapian DB to any smaller,
positive integer to avoid excessive overhead and contention when
bottlenecked by slow storage.

Resharding can also be used to increase shard count after
hardware upgrades.
Diffstat (limited to 'lib/PublicInbox/Xapcmd.pm')
1 files changed, 167 insertions, 45 deletions
diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm
index 5d8c35f4..e1c6fe3a 100644
--- a/lib/PublicInbox/Xapcmd.pm
+++ b/lib/PublicInbox/Xapcmd.pm
@@ -17,34 +17,66 @@ our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F);
 sub commit_changes ($$$) {
         my ($ibx, $tmp, $opt) = @_;
+        my $new_parts = $opt->{reshard};
         my $reindex = $opt->{reindex};
         my $im = $ibx->importer(0);
         $im->lock_acquire if !$opt->{-coarse_lock};
         $SIG{INT} or die 'BUG: $SIG{INT} not handled';
+        my @old_part;
         while (my ($old, $new) = each %$tmp) {
-                my @st = stat($old) or die "failed to stat($old): $!\n";
+                my @st = stat($old);
+                if (!@st && !defined($opt->{reshard})) {
+                        die "failed to stat($old): $!";
+                }
                 my $over = "$old/over.sqlite3";
                 if (-f $over) { # only for v1, v2 over is untouched
+                        defined $new or die "BUG: $over exists when culling v2";
                         $over = PublicInbox::Over->new($over);
                         my $tmp_over = "$new/over.sqlite3";
                         $over = undef;
-                chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
+                if (!defined($new)) { # culled partition
+                        push @old_part, $old;
+                        next;
+                }
+                if (@st) {
+                        chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
+                        rename($old, "$new/old") or
+                                        die "rename $old => $new/old: $!\n";
+                }
                 # Xtmpdir->DESTROY won't remove $new after this:
-                rename($old, "$new/old") or die "rename $old => $new/old: $!\n";
                 rename($new, $old) or die "rename $new => $old: $!\n";
-                my $prev = "$old/old";
-                remove_tree($prev) or die "failed to remove $prev: $!\n";
+                if (@st) {
+                        my $prev = "$old/old";
+                        remove_tree($prev) or
+                                die "failed to remove $prev: $!\n";
+                }
+        remove_tree(@old_part);
         if (!$opt->{-coarse_lock}) {
                 $opt->{-skip_lock} = 1;
+                if ($im->can('count_partitions')) {
+                        my $pr = $opt->{-progress};
+                        my $n = $im->count_partitions;
+                        if (defined $new_parts && $n != $new_parts) {
+                                die
+"BUG: counted $n partitions after repartioning to $new_parts";
+                        }
+                        my $prev = $im->{partitions};
+                        if ($pr && $prev != $n) {
+                                $pr->("partition count changed: $prev => $n\n");
+                                $im->{partitions} = $n;
+                        }
+                }
                 PublicInbox::Admin::index_inbox($ibx, $opt);
                 # implicit lock_release
         } else {
@@ -139,32 +171,59 @@ sub run {
         my $tmp = PublicInbox::Xtmpdirs->new;
         my $v = $ibx->{version} ||= 1;
         my @q;
+        my $new_parts = $opt->{reshard};
+        if (defined $new_parts && $new_parts <= 0) {
+                die "--reshard must be a positive number\n";
+        }
         # we want temporary directories to be as deep as possible,
         # so v2 partitions can keep "xap$SCHEMA_VERSION" on a separate FS.
         if ($v == 1) {
+                if (defined $new_parts) {
+                        warn
+"--reshard=$new_parts ignored for v1 $ibx->{mainrepo}\n";
+                }
                 my $old_parent = dirname($old);
                 same_fs_or_die($old_parent, $old);
                 my $v = PublicInbox::Search::SCHEMA_VERSION();
-                $tmp->{$old} = tempdir("xapian$v-XXXXXXXX", DIR => $old_parent);
-                push @q, [ $old, $tmp->{$old} ];
+                my $wip = tempdir("xapian$v-XXXXXXXX", DIR => $old_parent);
+                $tmp->{$old} = $wip;
+                push @q, [ $old, $wip ];
         } else {
                 opendir my $dh, $old or die "Failed to opendir $old: $!\n";
+                my @old_parts;
                 while (defined(my $dn = readdir($dh))) {
                         if ($dn =~ /\A[0-9]+\z/) {
-                                my $tmpl = "$dn-XXXXXXXX";
-                                my $dst = tempdir($tmpl, DIR => $old);
-                                same_fs_or_die($old, $dst);
-                                my $cur = "$old/$dn";
-                                push @q, [ $cur, $dst ];
-                                $tmp->{$cur} = $dst;
+                                push @old_parts, $dn;
                         } elsif ($dn eq '.' || $dn eq '..') {
                         } elsif ($dn =~ /\Aover\.sqlite3/) {
                         } else {
                                 warn "W: skipping unknown dir: $old/$dn\n"
-                die "No Xapian parts found in $old\n" unless @q;
+                die "No Xapian parts found in $old\n" unless @old_parts;
+                my ($src, $max_part);
+                if (!defined($new_parts) || $new_parts == scalar(@old_parts)) {
+                        # 1:1 copy
+                        $max_part = scalar(@old_parts) - 1;
+                } else {
+                        # M:N copy
+                        $max_part = $new_parts - 1;
+                        $src = [ map { "$old/$_" } @old_parts ];
+                }
+                foreach my $dn (0..$max_part) {
+                        my $tmpl = "$dn-XXXXXXXX";
+                        my $wip = tempdir($tmpl, DIR => $old);
+                        same_fs_or_die($old, $wip);
+                        my $cur = "$old/$dn";
+                        push @q, [ $src // $cur , $wip ];
+                        $tmp->{$cur} = $wip;
+                }
+                # mark old parts to be unlinked
+                if ($src) {
+                        $tmp->{$_} ||= undef for @$src;
+                }
         my $im = $ibx->importer(0);
         my $max = $opt->{jobs} || scalar(@q);
@@ -245,12 +304,74 @@ sub compact ($$) {
+sub cpdb_loop ($$$;$$) {
+        my ($src, $dst, $pr_data, $cur_part, $new_parts) = @_;
+        my ($pr, $fmt, $nr, $pfx);
+        if ($pr_data) {
+                $pr = $pr_data->{pr};
+                $fmt = $pr_data->{fmt};
+                $nr = \($pr_data->{nr});
+                $pfx = $pr_data->{pfx};
+        }
+        my ($it, $end);
+        do {
+                eval {
+                        $it = $src->postlist_begin('');
+                        $end = $src->postlist_end('');
+                };
+        } while (cpdb_retryable($src, $pfx));
+        do {
+                eval {
+                        for (; $it != $end; $it++) {
+                                my $docid = $it->get_docid;
+                                if (defined $new_parts) {
+                                        my $dst_part = $docid % $new_parts;
+                                        next if $dst_part != $cur_part;
+                                }
+                                my $doc = $src->get_document($docid);
+                                $dst->replace_document($docid, $doc);
+                                if ($pr_data && !(++$$nr  & 1023)) {
+                                        $pr->(sprintf($fmt, $$nr));
+                                }
+                        }
+                        # unlike copydatabase(1), we don't copy spelling
+                        # and synonym data (or other user metadata) since
+                        # the Perl APIs don't expose iterators for them
+                        # (and public-inbox does not use those features)
+                };
+        } while (cpdb_retryable($src, $pfx));
 # Like copydatabase(1), this is horribly slow; and it doesn't seem due
 # to the overhead of Perl.
 sub cpdb ($$) {
         my ($args, $opt) = @_;
         my ($old, $new) = @$args;
-        my $src = Search::Xapian::Database->new($old);
+        my ($src, $cur_part);
+        my $new_parts;
+        if (ref($old) eq 'ARRAY') {
+                ($cur_part) = ($new =~ m!xap[0-9]+/([0-9]+)\b!);
+                defined $cur_part or
+                        die "BUG: could not extract partition # from $new";
+                $new_parts = $opt->{reshard};
+                defined $new_parts or die 'BUG: got array src w/o --partition';
+                # repartitioning, M:N copy means have full read access
+                foreach (@$old) {
+                        if ($src) {
+                                my $sub = Search::Xapian::Database->new($_);
+                                $src->add_database($sub);
+                        } else {
+                                $src = Search::Xapian::Database->new($_);
+                        }
+                }
+        } else {
+                $src = Search::Xapian::Database->new($old);
+        }
         my ($xtmp, $tmp);
         if ($opt->{compact}) {
                 my $newdir = dirname($new);
@@ -266,10 +387,9 @@ sub cpdb ($$) {
         # of other bugs:
         my $creat = Search::Xapian::DB_CREATE();
         my $dst = Search::Xapian::WritableDatabase->new($tmp, $creat);
-        my ($it, $end);
-        my ($nr, $tot, $fmt); # progress output
         my $pr = $opt->{-progress};
         my $pfx = $opt->{-progress_pfx} = progress_pfx($new);
+        my $pr_data = { pr => $pr, pfx => $pfx, nr => 0 } if $pr;
         do {
                 eval {
@@ -284,38 +404,39 @@ sub cpdb ($$) {
                                         $dst->set_metadata('indexlevel', $l);
-                        $it = $src->postlist_begin('');
-                        $end = $src->postlist_end('');
-                        if ($pr) {
-                                $nr = 0;
-                                $tot = $src->get_doccount;
-                                $fmt = "$pfx % ".length($tot)."u/$tot\n";
-                                $pr->("$pfx copying $tot documents\n");
-                        }
-                };
-        } while (cpdb_retryable($src, $pfx));
-        do {
-                eval {
-                        while ($it != $end) {
-                                my $docid = $it->get_docid;
-                                my $doc = $src->get_document($docid);
-                                $dst->replace_document($docid, $doc);
-                                $it->inc;
-                                if ($pr && !(++$nr & 1023)) {
-                                        $pr->(sprintf($fmt, $nr));
+                        if ($pr_data) {
+                                my $tot = $src->get_doccount;
+                                # we can only estimate when repartitioning,
+                                # because removed spam causes slight imbalance
+                                my $est = '';
+                                if (defined $cur_part && $new_parts > 1) {
+                                        $tot = int($tot/$new_parts);
+                                        $est = 'around ';
+                                my $fmt = "$pfx % ".length($tot)."u/$tot\n";
+                                $pr->("$pfx copying $est$tot documents\n");
+                                $pr_data->{fmt} = $fmt;
+                                $pr_data->{total} = $tot;
-                        # unlike copydatabase(1), we don't copy spelling
-                        # and synonym data (or other user metadata) since
-                        # the Perl APIs don't expose iterators for them
-                        # (and public-inbox does not use those features)
         } while (cpdb_retryable($src, $pfx));
-        $pr->(sprintf($fmt, $nr)) if $pr;
+        if (defined $new_parts) {
+                # we rely on document IDs matching NNTP article number,
+                # so we can't have the combined DB support rewriting
+                # document IDs.  Thus we iterate through each shard
+                # individually.
+                $src = undef;
+                foreach (@$old) {
+                        my $old = Search::Xapian::Database->new($_);
+                        cpdb_loop($old, $dst, $pr_data, $cur_part, $new_parts);
+                }
+        } else {
+                cpdb_loop($src, $dst, $pr_data);
+        }
+        $pr->(sprintf($pr_data->{fmt}, $pr_data->{nr})) if $pr;
         return unless $xtmp;
         $src = $dst = undef; # flushes and closes
@@ -360,6 +481,7 @@ sub DESTROY {
         my $owner_pid = delete $owner{"$self"} or return;
         return if $owner_pid != $$;
         foreach my $new (values %$self) {
+                defined $new or next; # may be undef if repartitioning
                 remove_tree($new) unless -d "$new/old";