From dd80dcaa1e46543893de533938a1651639f91f10 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 30 Jul 2021 12:18:55 +0000 Subject: extindex: -xcpdb and -compact support Since extindex uses Xapian shards in a similar way to v2 inboxes, we'll support -xcpdb (reshard+upgrade) and -compact all the same to give admins tuning+upgrade options. --- lib/PublicInbox/Admin.pm | 40 ++++++++++++++++++++++++++--- lib/PublicInbox/Search.pm | 2 +- lib/PublicInbox/V2Writable.pm | 6 ++--- lib/PublicInbox/Xapcmd.pm | 58 +++++++++++++++++++++++++++---------------- 4 files changed, 78 insertions(+), 28 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/Admin.pm b/lib/PublicInbox/Admin.pm index d5f867a2..2534958b 100644 --- a/lib/PublicInbox/Admin.pm +++ b/lib/PublicInbox/Admin.pm @@ -28,6 +28,27 @@ sub setup_signals { }; } +sub resolve_eidxdir { + my ($cd) = @_; + my $try = $cd // '.'; + my $root_dev_ino; + while (1) { # favor v2, first + if (-f "$try/ei.lock") { + return rel2abs_collapsed($try); + } elsif (-d $try) { + my @try = stat _; + $root_dev_ino //= do { + my @root = stat('/') or die "stat /: $!\n"; + "$root[0]\0$root[1]"; + }; + return undef if "$try[0]\0$try[1]" eq $root_dev_ino; + $try .= '/..'; # continue, cd up + } else { + die "`$try' is not a directory\n"; + } + } +} + sub resolve_inboxdir { my ($cd, $ver) = @_; my $try = $cd // '.'; @@ -107,11 +128,24 @@ sub resolve_inboxes ($;$$) { $cfg or die "--all specified, but $cfgfile not readable\n"; @$argv and die "--all specified, but directories specified\n"; } - + my (@old, @ibxs, @eidx); + if ($opt->{-eidx_ok}) { + require PublicInbox::ExtSearchIdx; + my $i = -1; + @$argv = grep { + $i++; + if (defined(my $ei = resolve_eidxdir($_))) { + $ei = PublicInbox::ExtSearchIdx->new($ei, $opt); + push @eidx, $ei; + undef; + } else { + 1; + } + } @$argv; + } my $min_ver = $opt->{-min_inbox_version} || 0; # lookup inboxes by st_dev + st_ino instead of {inboxdir} pathnames, # pathnames are not unique due to symlinks and bind mounts - my (@old, @ibxs); if ($opt->{all}) { $cfg->each_inbox(sub { my ($ibx) = @_; @@ -161,7 +195,7 @@ sub resolve_inboxes ($;$$) { die "-V$min_ver inboxes not supported by $0\n\t", join("\n\t", @old), "\n"; } - @ibxs; + $opt->{-eidx_ok} ? (\@ibxs, \@eidx) : @ibxs; } # TODO: make Devel::Peek optional, only used for daemon diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index 7e19e616..e80a5944 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -187,7 +187,7 @@ sub xdir ($;$) { my ($self, $rdonly) = @_; if ($rdonly || !defined($self->{shard})) { $self->{xpfx}; - } else { # v2 only: + } else { # v2 + extindex only: "$self->{xpfx}/$self->{shard}"; } } diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 025487d2..1288f47b 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -54,14 +54,14 @@ sub nproc_shards ($) { sub count_shards ($) { my ($self) = @_; + # always load existing shards in case core count changes: + # Also, shard count may change while -watch is running if (my $ibx = $self->{ibx}) { - # always load existing shards in case core count changes: - # Also, shard count may change while -watch is running my $srch = $ibx->search or return 0; delete $ibx->{search}; $srch->{nshard} // 0 } else { # ExtSearchIdx - $self->{nshard} ||= scalar($self->xdb_shards_flat); + $self->{nshard} = scalar($self->xdb_shards_flat); } } diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm index 9791f02c..e37eece5 100644 --- a/lib/PublicInbox/Xapcmd.pm +++ b/lib/PublicInbox/Xapcmd.pm @@ -61,14 +61,14 @@ sub commit_changes ($$$$) { } # trigger ->check_inodes in read-only daemons - syswrite($im->{lockfh}, '.') if $over_chg; + syswrite($im->{lockfh}, '.') if $over_chg && $im; remove_tree(@old_shard); $tmp = undef; if (!$opt->{-coarse_lock}) { $opt->{-skip_lock} = 1; - - if ($im->can('count_shards')) { + $im //= $ibx if $ibx->can('eidx_sync'); + if ($im->can('count_shards')) { # v2w or eidx my $pr = $opt->{-progress}; my $n = $im->count_shards; if (defined $reshard && $n != $reshard) { @@ -83,7 +83,11 @@ sub commit_changes ($$$$) { } my $env = $opt->{-idx_env}; local %ENV = (%ENV, %$env) if $env; - PublicInbox::Admin::index_inbox($ibx, $im, $opt); + if ($ibx->can('eidx_sync')) { + $ibx->eidx_sync($opt); + } else { + PublicInbox::Admin::index_inbox($ibx, $im, $opt); + } } } @@ -103,9 +107,10 @@ sub runnable_or_die ($) { which($exe) or die "$exe not found in PATH\n"; } -sub prepare_reindex ($$$) { - my ($ibx, $im, $opt) = @_; - if ($ibx->version == 1) { +sub prepare_reindex ($$) { + my ($ibx, $opt) = @_; + if ($ibx->can('eidx_sync')) { # no prep needed for ExtSearchIdx + } elsif ($ibx->version == 1) { my $dir = $ibx->search->xdir(1); my $xdb = $PublicInbox::Search::X{Database}->new($dir); if (my $lc = $xdb->get_metadata('last_commit')) { @@ -172,9 +177,14 @@ sub prepare_run { my ($ibx, $opt) = @_; my $tmp = {}; # old shard dir => File::Temp->newdir object or undef my @queue; # ([old//src,newdir]) - list of args for cpdb() or compact() - my $old; - if (my $srch = $ibx->search) { + my ($old, $misc_ok); + if ($ibx->can('eidx_sync')) { + $misc_ok = 1; + $old = $ibx->xdir(1); + } elsif (my $srch = $ibx->search) { $old = $srch->xdir(1); + } + if (defined $old) { -d $old or die "$old does not exist\n"; } my $reshard = $opt->{reshard}; @@ -184,7 +194,7 @@ sub prepare_run { # we want temporary directories to be as deep as possible, # so v2 shards can keep "xap$SCHEMA_VERSION" on a separate FS. - if ($old && $ibx->version == 1) { + if (defined($old) && $ibx->can('version') && $ibx->version == 1) { if (defined $reshard) { warn "--reshard=$reshard ignored for v1 $ibx->{inboxdir}\n"; @@ -196,7 +206,7 @@ sub prepare_run { $tmp->{$old} = $wip; nodatacow_dir($wip->dirname); push @queue, [ $old, $wip ]; - } elsif ($old) { + } elsif (defined $old) { opendir my $dh, $old or die "Failed to opendir $old: $!\n"; my @old_shards; while (defined(my $dn = readdir($dh))) { @@ -204,6 +214,7 @@ sub prepare_run { push @old_shards, $dn; } elsif ($dn eq '.' || $dn eq '..') { } elsif ($dn =~ /\Aover\.sqlite3/) { + } elsif ($dn eq 'misc' && $misc_ok) { } else { warn "W: skipping unknown dir: $old/$dn\n" } @@ -239,19 +250,19 @@ sub check_compact () { runnable_or_die($XAPIAN_COMPACT) } sub _run { # with_umask callback my ($ibx, $cb, $opt) = @_; - my $im = $ibx->importer(0); - $im->lock_acquire; + my $im = $ibx->can('importer') ? $ibx->importer(0) : undef; + ($im // $ibx)->lock_acquire; my ($tmp, $queue) = prepare_run($ibx, $opt); # fine-grained locking if we prepare for reindex if (!$opt->{-coarse_lock}) { - prepare_reindex($ibx, $im, $opt); - $im->lock_release; + prepare_reindex($ibx, $opt); + ($im // $ibx)->lock_release; } - $ibx->cleanup; + $ibx->cleanup if $ibx->can('cleanup'); process_queue($queue, $cb, $opt); - $im->lock_acquire if !$opt->{-coarse_lock}; + ($im // $ibx)->lock_acquire if !$opt->{-coarse_lock}; commit_changes($ibx, $im, $tmp, $opt); } @@ -259,11 +270,16 @@ sub run { my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact' my $cb = \&$task; PublicInbox::Admin::progress_prepare($opt ||= {}); - defined(my $dir = $ibx->{inboxdir}) or die "no inboxdir defined\n"; - -d $dir or die "inboxdir=$dir does not exist\n"; + my $dir; + for my $fld (qw(inboxdir topdir)) { + my $d = $ibx->{$fld} // next; + -d $d or die "$fld=$d does not exist\n"; + $dir = $d; + last; + } check_compact() if $opt->{compact} && $ibx->search; - if (!$opt->{-coarse_lock}) { + if (!$ibx->can('eidx_sync') && !$opt->{-coarse_lock}) { # per-epoch ranges for v2 # v1:{ from => $OID }, v2:{ from => [ $OID, $OID, $OID ] } } $opt->{reindex} = { from => $ibx->version == 1 ? '' : [] }; @@ -393,7 +409,7 @@ sub cpdb ($$) { # cb_spawn callback PublicInbox::SearchIdx::load_xapian_writable() or die; my $XapianDatabase = $PublicInbox::Search::X{Database}; if (ref($old) eq 'ARRAY') { - ($cur_shard) = ($new =~ m!xap[0-9]+/([0-9]+)\b!); + ($cur_shard) = ($new =~ m!(?:xap|ei)[0-9]+/([0-9]+)\b!); defined $cur_shard or die "BUG: could not extract shard # from $new"; $reshard = $opt->{reshard}; -- cgit v1.2.3-24-ge0c7