about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2019-05-23 09:36:52 +0000
committerEric Wong <e@80x24.org>2019-05-23 17:43:50 +0000
commit0c6d38221ec9fbf9d7c7e0329252b10f17ab7a27 (patch)
treeec8826ca0cfa8c63a636cacc4ba35fe7e658975b /lib
parenta1c3fb1bbef319a0af8ab63380495984c3a4ee18 (diff)
downloadpublic-inbox-0c6d38221ec9fbf9d7c7e0329252b10f17ab7a27.tar.gz
Copying an entire Xapian DB takes a long time, so update our
reindexing code to support partial reindexing, snapshot the
pre-copydatabase git revisions, perform the lengthy copy,
and do a partial reindex when the copy + renames are done.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/Admin.pm2
-rw-r--r--lib/PublicInbox/SearchIdx.pm10
-rw-r--r--lib/PublicInbox/V2Writable.pm21
-rw-r--r--lib/PublicInbox/Xapcmd.pm58
4 files changed, 80 insertions, 11 deletions
diff --git a/lib/PublicInbox/Admin.pm b/lib/PublicInbox/Admin.pm
index 94f47abb..34aa3129 100644
--- a/lib/PublicInbox/Admin.pm
+++ b/lib/PublicInbox/Admin.pm
@@ -141,7 +141,7 @@ sub index_inbox {
         if (ref($ibx) && ($ibx->{version} || 1) == 2) {
                 eval { require PublicInbox::V2Writable };
                 die "v2 requirements not met: $@\n" if $@;
-                my $v2w = eval {
+                my $v2w = eval { $ibx->importer(0) } || eval {
                         PublicInbox::V2Writable->new($ibx, {nproc=>$jobs});
                 };
                 if (defined $jobs) {
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 114420e4..0aeeb6bc 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -542,8 +542,10 @@ sub do_cat_mail {
         $@ ? undef : $mime;
 }
 
+# called by public-inbox-index
 sub index_sync {
         my ($self, $opts) = @_;
+        delete $self->{lock_path} if $opts->{-skip_lock};
         $self->{-inbox}->with_umask(sub { $self->_index_sync($opts) })
 }
 
@@ -692,6 +694,12 @@ sub _last_x_commit {
         $lx;
 }
 
+sub reindex_from ($$) {
+        my ($reindex, $last_commit) = @_;
+        return $last_commit unless $reindex;
+        ref($reindex) eq 'HASH' ? $reindex->{from} : '';
+}
+
 # indexes all unindexed messages (v1 only)
 sub _index_sync {
         my ($self, $opts) = @_;
@@ -705,7 +713,7 @@ sub _index_sync {
         do {
                 $xlog = undef;
                 $last_commit = _last_x_commit($self, $mm);
-                $lx = $opts->{reindex} ? '' : $last_commit;
+                $lx = reindex_from($opts->{reindex}, $last_commit);
 
                 $self->{over}->rollback_lazy;
                 $self->{over}->disconnect;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 3dd606ea..1ee19b21 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -238,7 +238,7 @@ sub idx_part {
 
 # idempotent
 sub idx_init {
-        my ($self) = @_;
+        my ($self, $opt) = @_;
         return if $self->{idx_parts};
         my $ibx = $self->{-inbox};
 
@@ -264,7 +264,7 @@ sub idx_init {
         my $over = $self->{over};
         $ibx->umask_prepare;
         $ibx->with_umask(sub {
-                $self->lock_acquire;
+                $self->lock_acquire unless ($opt && $opt->{-skip_lock});
                 $over->create;
 
                 # -compact can change partition count while -watch is idle
@@ -924,6 +924,19 @@ sub unindex {
                 qw(-c gc.reflogExpire=now gc --prune=all)]);
 }
 
+sub index_ranges ($$$) {
+        my ($self, $reindex, $epoch_max) = @_;
+        return last_commits($self, $epoch_max) unless $reindex;
+
+        return [] if ref($reindex) ne 'HASH';
+
+        my $ranges = $reindex->{from}; # arrayref;
+        if (ref($ranges) ne 'ARRAY') {
+                die 'BUG: $reindex->{from} not an ARRAY';
+        }
+        $ranges;
+}
+
 # called for public-inbox-index
 sub index_sync {
         my ($self, $opts) = @_;
@@ -931,10 +944,10 @@ sub index_sync {
         my $epoch_max;
         my $latest = git_dir_latest($self, \$epoch_max);
         return unless defined $latest;
-        $self->idx_init; # acquire lock
+        $self->idx_init($opts); # acquire lock
         my $mm_tmp = $self->{mm}->tmp_clone;
         my $reindex = $opts->{reindex};
-        my $ranges = $reindex ? [] : $self->last_commits($epoch_max);
+        my $ranges = index_ranges($self, $reindex, $epoch_max);
 
         my $high = $self->{mm}->num_highwater();
         my $regen = $self->index_prepare($opts, $epoch_max, $ranges);
diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm
index d2de8743..4555340a 100644
--- a/lib/PublicInbox/Xapcmd.pm
+++ b/lib/PublicInbox/Xapcmd.pm
@@ -5,6 +5,7 @@ use strict;
 use warnings;
 use PublicInbox::Spawn qw(which spawn);
 use PublicInbox::Over;
+use PublicInbox::Search;
 use File::Temp qw(tempdir);
 use File::Path qw(remove_tree);
 
@@ -12,20 +13,33 @@ use File::Path qw(remove_tree);
 # commands with a version number suffix (e.g. "xapian-compact-1.5")
 our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact';
 
-sub commit_changes ($$$) {
-        my ($im, $old, $new) = @_;
+sub commit_changes ($$$$) {
+        my ($ibx, $old, $new, $opt) = @_;
+
+        my $reindex = $opt->{reindex};
+        my $im = $ibx->importer(0);
+        $im->lock_acquire if $reindex;
+
         my @st = stat($old) or die "failed to stat($old): $!\n";
 
         my $over = "$old/over.sqlite3";
         if (-f $over) {
                 $over = PublicInbox::Over->new($over);
                 $over->connect->sqlite_backup_to_file("$new/over.sqlite3");
+                $over = undef;
         }
         rename($old, "$new/old") or die "rename $old => $new/old: $!\n";
         chmod($st[2] & 07777, $new) or die "chmod $old: $!\n";
         rename($new, $old) or die "rename $new => $old: $!\n";
-        $im->lock_release;
         remove_tree("$old/old") or die "failed to remove $old/old: $!\n";
+
+        if ($reindex) {
+                $opt->{-skip_lock} = 1;
+                PublicInbox::Admin::index_inbox($ibx, $opt);
+                # implicit lock_release
+        } else {
+                $im->lock_release;
+        }
 }
 
 sub xspawn {
@@ -47,6 +61,27 @@ sub runnable_or_die ($) {
         which($exe) or die "$exe not found in PATH\n";
 }
 
+sub prepare_reindex ($$) {
+        my ($ibx, $reindex) = @_;
+        if ($ibx->{version} == 1) {
+                my $dir = $ibx->search->xdir(1);
+                my $xdb = Search::Xapian::Database->new($dir);
+                if (my $lc = $xdb->get_metadata('last_commit')) {
+                        $reindex->{from} = $lc;
+                }
+        } else { # v2
+                my $v2w = $ibx->importer(0);
+                my $max;
+                $v2w->git_dir_latest(\$max) or return;
+                my $from = $reindex->{from};
+                my $mm = $ibx->mm;
+                my $v = PublicInbox::Search::SCHEMA_VERSION();
+                foreach my $i (0..$max) {
+                        $from->[$i] = $mm->last_commit_xap($v, $i);
+                }
+        }
+}
+
 sub run {
         my ($ibx, $cmd, $env, $opt) = @_;
         $opt ||= {};
@@ -54,8 +89,14 @@ sub run {
         my $exe = $cmd->[0];
         my $pfx = $exe;
         runnable_or_die($XAPIAN_COMPACT) if $opt->{compact};
+
+        my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } }
+        my $from; # per-epoch ranges
+
         if (ref($exe) eq 'CODE') {
                 $pfx = 'CODE';
+                $reindex = $opt->{reindex} = {};
+                $from = $reindex->{from} = [];
                 require Search::Xapian::WritableDatabase;
         } else {
                 runnable_or_die($exe);
@@ -64,7 +105,7 @@ sub run {
         my $old = $ibx->search->xdir(1);
         -d $old or die "$old does not exist\n";
         my $new = tempdir("$pfx-XXXXXXXX", DIR => $dir);
-        my $v = $ibx->{version} || 1;
+        my $v = $ibx->{version} ||= 1;
         my @cmds;
         if ($v == 1) {
                 push @cmds, [@$cmd, $old, $new];
@@ -85,6 +126,13 @@ sub run {
         my $max = $opt->{jobs} || scalar(@cmds);
         $ibx->with_umask(sub {
                 $im->lock_acquire;
+
+                # fine-grained locking if we prepare for reindex
+                if ($reindex) {
+                        prepare_reindex($ibx, $reindex);
+                        $im->lock_release;
+                }
+                delete($ibx->{$_}) for (qw(mm over search)); # cleanup
                 my %pids;
                 while (@cmds) {
                         while (scalar(keys(%pids)) < $max && scalar(@cmds)) {
@@ -98,7 +146,7 @@ sub run {
                                 die join(' ', @$x)." failed: $?\n" if $?;
                         }
                 }
-                commit_changes($im, $old, $new);
+                commit_changes($ibx, $old, $new, $opt);
         });
 }