diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/PublicInbox/Admin.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/AdminEdit.pm | 12 | ||||
-rw-r--r-- | lib/PublicInbox/Inbox.pm | 18 | ||||
-rw-r--r-- | lib/PublicInbox/InboxWritable.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/Search.pm | 14 | ||||
-rw-r--r-- | lib/PublicInbox/SearchIdx.pm | 19 | ||||
-rw-r--r-- | lib/PublicInbox/SearchIdxShard.pm (renamed from lib/PublicInbox/SearchIdxPart.pm) | 30 | ||||
-rw-r--r-- | lib/PublicInbox/V2Writable.pm | 109 | ||||
-rw-r--r-- | lib/PublicInbox/WWW.pm | 12 | ||||
-rw-r--r-- | lib/PublicInbox/WwwListing.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/WwwStream.pm | 12 | ||||
-rw-r--r-- | lib/PublicInbox/Xapcmd.pm | 96 |
12 files changed, 165 insertions, 167 deletions
diff --git a/lib/PublicInbox/Admin.pm b/lib/PublicInbox/Admin.pm index 8a2f2043..29388ad6 100644 --- a/lib/PublicInbox/Admin.pm +++ b/lib/PublicInbox/Admin.pm @@ -204,10 +204,10 @@ sub index_inbox { if ($jobs == 0) { $v2w->{parallel} = 0; } else { - my $n = $v2w->{partitions}; + my $n = $v2w->{shards}; if ($jobs != ($n + 1)) { warn -"Unable to respect --jobs=$jobs, inbox was created with $n partitions\n"; +"Unable to respect --jobs=$jobs, inbox was created with $n shards\n"; } } } diff --git a/lib/PublicInbox/AdminEdit.pm b/lib/PublicInbox/AdminEdit.pm index 169feba0..2e2a8629 100644 --- a/lib/PublicInbox/AdminEdit.pm +++ b/lib/PublicInbox/AdminEdit.pm @@ -29,15 +29,15 @@ sub check_editable ($) { # $ibx->{search} is populated by $ibx->over call my $xdir_ro = $ibx->{search}->xdir(1); - my $npart = 0; - foreach my $part (<$xdir_ro/*>) { - if (-d $part && $part =~ m!/[0-9]+\z!) { + my $nshard = 0; + foreach my $shard (<$xdir_ro/*>) { + if (-d $shard && $shard =~ m!/[0-9]+\z!) { my $bytes = 0; - $bytes += -s $_ foreach glob("$part/*"); - $npart++ if $bytes; + $bytes += -s $_ foreach glob("$shard/*"); + $nshard++ if $bytes; } } - if ($npart) { + if ($nshard) { PublicInbox::Admin::require_or_die('-search'); } else { # somebody could "rm -r" all the Xapian directories; diff --git a/lib/PublicInbox/Inbox.pm b/lib/PublicInbox/Inbox.pm index 10f716ca..c0eb640f 100644 --- a/lib/PublicInbox/Inbox.pm +++ b/lib/PublicInbox/Inbox.pm @@ -125,11 +125,11 @@ sub new { bless $opts, $class; } -sub git_part { - my ($self, $part) = @_; +sub git_epoch { + my ($self, $epoch) = @_; ($self->{version} || 1) == 2 or return; - $self->{"$part.git"} ||= eval { - my $git_dir = "$self->{mainrepo}/git/$part.git"; + $self->{"$epoch.git"} ||= eval { + my $git_dir = "$self->{mainrepo}/git/$epoch.git"; my $g = PublicInbox::Git->new($git_dir); $g->{-httpbackend_limiter} = $self->{-httpbackend_limiter}; # no cleanup needed, we never cat-file off this, only clone @@ -149,13 +149,13 @@ sub git { }; } -sub max_git_part { +sub max_git_epoch { my ($self) = @_; my $v = $self->{version}; return unless defined($v) && $v == 2; - my $part = $self->{-max_git_part}; + my $cur = $self->{-max_git_epoch}; my $changed = git($self)->alternates_changed; - if (!defined($part) || $changed) { + if (!defined($cur) || $changed) { $self->git->cleanup if $changed; my $gits = "$self->{mainrepo}/git"; if (opendir my $dh, $gits) { @@ -164,12 +164,12 @@ sub max_git_part { $git_dir =~ m!\A([0-9]+)\.git\z! or next; $max = $1 if $1 > $max; } - $part = $self->{-max_git_part} = $max if $max >= 0; + $cur = $self->{-max_git_epoch} = $max if $max >= 0; } else { warn "opendir $gits failed: $!\n"; } } - $part; + $cur; } sub mm { diff --git a/lib/PublicInbox/InboxWritable.pm b/lib/PublicInbox/InboxWritable.pm index 116f423b..f00141d2 100644 --- a/lib/PublicInbox/InboxWritable.pm +++ b/lib/PublicInbox/InboxWritable.pm @@ -31,7 +31,7 @@ sub new { } sub init_inbox { - my ($self, $partitions, $skip_epoch, $skip_artnum) = @_; + my ($self, $shards, $skip_epoch, $skip_artnum) = @_; # TODO: honor skip_artnum my $v = $self->{version} || 1; if ($v == 1) { @@ -39,7 +39,7 @@ sub init_inbox { PublicInbox::Import::init_bare($dir); } else { my $v2w = importer($self); - $v2w->init_inbox($partitions, $skip_epoch, $skip_artnum); + $v2w->init_inbox($shards, $skip_epoch, $skip_artnum); } } diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index 098c97cd..60fc861a 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -131,9 +131,9 @@ sub xdir ($;$) { my $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION; return $dir if $rdonly; - my $part = $self->{partition}; - defined $part or die "partition not given"; - $dir .= "/$part"; + my $shard = $self->{shard}; + defined $shard or die "shard not given"; + $dir .= "/$shard"; } } @@ -143,15 +143,15 @@ sub _xdb ($) { my ($xdb, $slow_phrase); my $qpf = \($self->{qp_flags} ||= $QP_FLAGS); if ($self->{version} >= 2) { - foreach my $part (<$dir/*>) { - -d $part && $part =~ m!/[0-9]+\z! or next; - my $sub = Search::Xapian::Database->new($part); + foreach my $shard (<$dir/*>) { + -d $shard && $shard =~ m!/[0-9]+\z! or next; + my $sub = Search::Xapian::Database->new($shard); if ($xdb) { $xdb->add_database($sub); } else { $xdb = $sub; } - $slow_phrase ||= -f "$part/iamchert"; + $slow_phrase ||= -f "$shard/iamchert"; } } else { $slow_phrase = -f "$dir/iamchert"; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index a088ce75..665f673a 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -29,7 +29,7 @@ use constant { my $xapianlevels = qr/\A(?:full|medium)\z/; sub new { - my ($class, $ibx, $creat, $part) = @_; + my ($class, $ibx, $creat, $shard) = @_; ref $ibx or die "BUG: expected PublicInbox::Inbox object: $ibx"; my $levels = qr/\A(?:full|medium|basic)\z/; my $mainrepo = $ibx->{mainrepo}; @@ -62,9 +62,9 @@ sub new { my $dir = $self->xdir; $self->{over} = PublicInbox::OverIdx->new("$dir/over.sqlite3"); } elsif ($version == 2) { - defined $part or die "partition is required for v2\n"; - # partition is a number - $self->{partition} = $part; + defined $shard or die "shard is required for v2\n"; + # shard is a number + $self->{shard} = $shard; $self->{lock_path} = undef; } else { die "unsupported inbox version=$version\n"; @@ -102,8 +102,8 @@ sub _xdb_acquire { $self->lock_acquire; # don't create empty Xapian directories if we don't need Xapian - my $is_part = defined($self->{partition}); - if (!$is_part || ($is_part && need_xapian($self))) { + my $is_shard = defined($self->{shard}); + if (!$is_shard || ($is_shard && need_xapian($self))) { File::Path::mkpath($dir); } } @@ -797,7 +797,7 @@ sub remote_close { sub remote_remove { my ($self, $oid, $mid) = @_; if (my $w = $self->{w}) { - # triggers remove_by_oid in a partition + # triggers remove_by_oid in a shard print $w "D $oid $mid\n" or die "failed to write remove $!"; } else { $self->begin_txn_lazy; @@ -824,9 +824,10 @@ sub commit_txn_lazy { $self->{-inbox}->with_umask(sub { if (my $xdb = $self->{xdb}) { - # store 'indexlevel=medium' in v2 part=0 and v1 (only part) + # store 'indexlevel=medium' in v2 shard=0 and + # v1 (only one shard) # This metadata is read by Admin::detect_indexlevel: - if (!$self->{partition} # undef or 0, not >0 + if (!$self->{shard} # undef or 0, not >0 && $self->{indexlevel} eq 'medium') { $xdb->set_metadata('indexlevel', 'medium'); } diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxShard.pm index 51d81a0a..15ec6578 100644 --- a/lib/PublicInbox/SearchIdxPart.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -1,25 +1,25 @@ # Copyright (C) 2018 all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> -# used to interface with a single Xapian partition in V2 repos. -# See L<public-inbox-v2-format(5)> for more info on how we partition Xapian -package PublicInbox::SearchIdxPart; +# used to interface with a single Xapian shard in V2 repos. +# See L<public-inbox-v2-format(5)> for more info on how we shard Xapian +package PublicInbox::SearchIdxShard; use strict; use warnings; use base qw(PublicInbox::SearchIdx); sub new { - my ($class, $v2writable, $part) = @_; - my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part); + my ($class, $v2writable, $shard) = @_; + my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $shard); # create the DB before forking: $self->_xdb_acquire; $self->_xdb_release; - $self->spawn_worker($v2writable, $part) if $v2writable->{parallel}; + $self->spawn_worker($v2writable, $shard) if $v2writable->{parallel}; $self; } sub spawn_worker { - my ($self, $v2writable, $part) = @_; + my ($self, $v2writable, $shard) = @_; my ($r, $w); pipe($r, $w) or die "pipe failed: $!\n"; binmode $r, ':raw'; @@ -35,8 +35,8 @@ sub spawn_worker { # speeds V2Writable batch imports across 8 cores by nearly 20% fcntl($r, 1031, 1048576) if $^O eq 'linux'; - eval { partition_worker_loop($self, $r, $part, $bnote) }; - die "worker $part died: $@\n" if $@; + eval { shard_worker_loop($self, $r, $shard, $bnote) }; + die "worker $shard died: $@\n" if $@; die "unexpected MM $self->{mm}" if $self->{mm}; exit; } @@ -45,14 +45,14 @@ sub spawn_worker { close $r or die "failed to close: $!"; } -sub partition_worker_loop ($$$$) { - my ($self, $r, $part, $bnote) = @_; - $0 = "pi-v2-partition[$part]"; +sub shard_worker_loop ($$$$) { + my ($self, $r, $shard, $bnote) = @_; + $0 = "pi-v2-shard[$shard]"; my $current_info = ''; my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ }; local $SIG{__WARN__} = sub { chomp $current_info; - $warn_cb->("[$part] $current_info: ", @_); + $warn_cb->("[$shard] $current_info: ", @_); }; $self->begin_txn_lazy; while (my $line = $r->getline) { @@ -64,7 +64,7 @@ sub partition_worker_loop ($$$$) { } elsif ($line eq "barrier\n") { $self->commit_txn_lazy; # no need to lock < 512 bytes is atomic under POSIX - print $bnote "barrier $part\n" or + print $bnote "barrier $shard\n" or die "write failed for barrier $!\n"; } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) { my ($oid, $mid) = ($1, $2); @@ -89,7 +89,7 @@ sub index_raw { my ($self, $bytes, $msgref, $artnum, $oid, $mid0, $mime) = @_; if (my $w = $self->{w}) { print $w "$bytes $artnum $oid $mid0\n", $$msgref or die - "failed to write partition $!\n"; + "failed to write shard $!\n"; $w->flush or die "failed to flush: $!\n"; } else { $$msgref = undef; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 76e61e86..2b3ffa63 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -7,7 +7,7 @@ package PublicInbox::V2Writable; use strict; use warnings; use base qw(PublicInbox::Lock); -use PublicInbox::SearchIdxPart; +use PublicInbox::SearchIdxShard; use PublicInbox::MIME; use PublicInbox::Git; use PublicInbox::Import; @@ -24,14 +24,14 @@ use IO::Handle; my $PACKING_FACTOR = 0.4; # SATA storage lags behind what CPUs are capable of, so relying on -# nproc(1) can be misleading and having extra Xapian partions is a +# nproc(1) can be misleading and having extra Xapian shards is a # waste of FDs and space. It can also lead to excessive IO latency # and slow things down. Users on NVME or other fast storage can # use the NPROC env or switches in our script/public-inbox-* programs -# to increase Xapian partitions. +# to increase Xapian shards our $NPROC_MAX_DEFAULT = 4; -sub nproc_parts ($) { +sub nproc_shards ($) { my ($creat_opt) = @_; if (ref($creat_opt) eq 'HASH') { if (defined(my $n = $creat_opt->{nproc})) { @@ -52,24 +52,24 @@ sub nproc_parts ($) { $n < 1 ? 1 : $n; } -sub count_partitions ($) { +sub count_shards ($) { my ($self) = @_; - my $nparts = 0; + my $n = 0; my $xpfx = $self->{xpfx}; - # always load existing partitions in case core count changes: - # Also, partition count may change while -watch is running - # due to -compact + # always load existing shards in case core count changes: + # Also, shard count may change while -watch is running + # due to "xcpdb --reshard" if (-d $xpfx) { - foreach my $part (<$xpfx/*>) { - -d $part && $part =~ m!/[0-9]+\z! or next; + foreach my $shard (<$xpfx/*>) { + -d $shard && $shard =~ m!/[0-9]+\z! or next; eval { - Search::Xapian::Database->new($part)->close; - $nparts++; + Search::Xapian::Database->new($shard)->close; + $n++; }; } } - $nparts; + $n; } sub new { @@ -103,7 +103,7 @@ sub new { rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), last_commit => [], # git repo -> commit }; - $self->{partitions} = count_partitions($self) || nproc_parts($creat); + $self->{shards} = count_shards($self) || nproc_shards($creat); bless $self, $class; } @@ -134,12 +134,10 @@ sub add { sub do_idx ($$$$$$$) { my ($self, $msgref, $mime, $len, $num, $oid, $mid0) = @_; $self->{over}->add_overview($mime, $len, $num, $oid, $mid0); - my $npart = $self->{partitions}; - my $part = $num % $npart; - my $idx = idx_part($self, $part); + my $idx = idx_shard($self, $num % $self->{shards}); $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime); my $n = $self->{transact_bytes} += $len; - $n >= (PublicInbox::SearchIdx::BATCH_BYTES * $npart); + $n >= (PublicInbox::SearchIdx::BATCH_BYTES * $self->{shards}); } sub _add { @@ -252,15 +250,15 @@ sub num_for_harder { $num; } -sub idx_part { - my ($self, $part) = @_; - $self->{idx_parts}->[$part]; +sub idx_shard { + my ($self, $shard_i) = @_; + $self->{idx_shards}->[$shard_i]; } # idempotent sub idx_init { my ($self, $opt) = @_; - return if $self->{idx_parts}; + return if $self->{idx_shards}; my $ibx = $self->{-inbox}; # do not leak read-only FDs to child processes, we only have these @@ -288,19 +286,19 @@ sub idx_init { $self->lock_acquire unless ($opt && $opt->{-skip_lock}); $over->create; - # -compact can change partition count while -watch is idle - my $nparts = count_partitions($self); - if ($nparts && $nparts != $self->{partitions}) { - $self->{partitions} = $nparts; + # xcpdb can change shard count while -watch is idle + my $nshards = count_shards($self); + if ($nshards && $nshards != $self->{shards}) { + $self->{shards} = $nshards; } - # need to create all parts before initializing msgmap FD - my $max = $self->{partitions} - 1; + # need to create all shards before initializing msgmap FD + my $max = $self->{shards} - 1; - # idx_parts must be visible to all forked processes - my $idx = $self->{idx_parts} = []; + # idx_shards must be visible to all forked processes + my $idx = $self->{idx_shards} = []; for my $i (0..$max) { - push @$idx, PublicInbox::SearchIdxPart->new($self, $i); + push @$idx, PublicInbox::SearchIdxShard->new($self, $i); } # Now that all subprocesses are up, we can open the FDs @@ -370,7 +368,6 @@ sub rewrite_internal ($$;$$$) { } my $over = $self->{over}; my $cids = content_ids($old_mime); - my $parts = $self->{idx_parts}; my $removed; my $mids = mids($old_mime->header_obj); @@ -559,7 +556,7 @@ W: $list $rewritten->{rewrites}; } -sub last_commit_part ($$;$) { +sub last_epoch_commit ($$;$) { my ($self, $i, $cmt) = @_; my $v = PublicInbox::Search::SCHEMA_VERSION(); $self->{mm}->last_commit_xap($v, $i, $cmt); @@ -572,7 +569,7 @@ sub set_last_commits ($) { foreach my $i (0..$epoch_max) { defined(my $cmt = $last_commit->[$i]) or next; $last_commit->[$i] = undef; - last_commit_part($self, $i, $cmt); + last_epoch_commit($self, $i, $cmt); } } @@ -590,7 +587,7 @@ sub barrier_wait { while (scalar keys %$barrier) { defined(my $l = $r->getline) or die "EOF on barrier_wait: $!"; $l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l"; - delete $barrier->{$1} or die "bad part[$1] on barrier wait"; + delete $barrier->{$1} or die "bad shard[$1] on barrier wait"; } } @@ -605,8 +602,8 @@ sub checkpoint ($;$) { $im->checkpoint; } } - my $parts = $self->{idx_parts}; - if ($parts) { + my $shards = $self->{idx_shards}; + if ($shards) { my $dbh = $self->{mm}->{dbh}; # SQLite msgmap data is second in importance @@ -617,19 +614,19 @@ sub checkpoint ($;$) { # Now deal with Xapian if ($wait) { - my $barrier = $self->barrier_init(scalar @$parts); + my $barrier = $self->barrier_init(scalar @$shards); - # each partition needs to issue a barrier command - $_->remote_barrier for @$parts; + # each shard needs to issue a barrier command + $_->remote_barrier for @$shards; - # wait for each Xapian partition + # wait for each Xapian shard $self->barrier_wait($barrier); } else { - $_->remote_commit for @$parts; + $_->remote_commit for @$shards; } # last_commit is special, don't commit these until - # remote partitions are done: + # remote shards are done: $dbh->begin_work; set_last_commits($self); $dbh->commit; @@ -652,14 +649,14 @@ sub done { checkpoint($self); my $mm = delete $self->{mm}; $mm->{dbh}->commit if $mm; - my $parts = delete $self->{idx_parts}; - if ($parts) { - $_->remote_close for @$parts; + my $shards = delete $self->{idx_shards}; + if ($shards) { + $_->remote_close for @$shards; } $self->{over}->disconnect; delete $self->{bnote}; $self->{transact_bytes} = 0; - $self->lock_release if $parts; + $self->lock_release if $shards; $self->{-inbox}->git->cleanup; } @@ -827,8 +824,8 @@ sub atfork_child { my ($self) = @_; my $fh = delete $self->{reindex_pipe}; close $fh if $fh; - if (my $parts = $self->{idx_parts}) { - $_->atfork_child foreach @$parts; + if (my $shards = $self->{idx_shards}) { + $_->atfork_child foreach @$shards; } if (my $im = $self->{im}) { $im->atfork_child; @@ -930,13 +927,13 @@ sub reindex_oid ($$$$) { # only update last_commit for $i on reindex iff newer than current sub update_last_commit ($$$$) { my ($self, $git, $i, $cmt) = @_; - my $last = last_commit_part($self, $i); + my $last = last_epoch_commit($self, $i); if (defined $last && is_ancestor($git, $last, $cmt)) { my @cmd = (qw(rev-list --count), "$last..$cmt"); chomp(my $n = $git->qx(@cmd)); return if $n ne '' && $n == 0; } - last_commit_part($self, $i, $cmt); + last_epoch_commit($self, $i, $cmt); } sub git_dir_n ($$) { "$_[0]->{-inbox}->{mainrepo}/git/$_[1].git" } @@ -945,7 +942,7 @@ sub last_commits ($$) { my ($self, $epoch_max) = @_; my $heads = []; for (my $i = $epoch_max; $i >= 0; $i--) { - $heads->[$i] = last_commit_part($self, $i); + $heads->[$i] = last_epoch_commit($self, $i); } $heads; } @@ -1016,7 +1013,7 @@ sub sync_prepare ($$$) { for (my $i = $epoch_max; $i >= 0; $i--) { die 'BUG: already indexing!' if $self->{reindex_pipe}; my $git_dir = git_dir_n($self, $i); - -d $git_dir or next; # missing parts are fine + -d $git_dir or next; # missing epochs are fine my $git = PublicInbox::Git->new($git_dir); if ($reindex_heads) { $head = $reindex_heads->[$i] or next; @@ -1051,7 +1048,7 @@ sub sync_prepare ($$$) { sub unindex_oid_remote ($$$) { my ($self, $oid, $mid) = @_; - $_->remote_remove($oid, $mid) foreach @{$self->{idx_parts}}; + $_->remote_remove($oid, $mid) foreach @{$self->{idx_shards}}; $self->{over}->remove_oid($oid, $mid); } @@ -1126,7 +1123,7 @@ sub index_epoch ($$$) { my $git_dir = git_dir_n($self, $i); die 'BUG: already reindexing!' if $self->{reindex_pipe}; - -d $git_dir or return; # missing parts are fine + -d $git_dir or return; # missing epochs are fine fill_alternates($self, $i); my $git = PublicInbox::Git->new($git_dir); if (my $unindex_range = delete $sync->{unindex_range}->{$i}) { diff --git a/lib/PublicInbox/WWW.pm b/lib/PublicInbox/WWW.pm index e4682636..9021cb52 100644 --- a/lib/PublicInbox/WWW.pm +++ b/lib/PublicInbox/WWW.pm @@ -76,9 +76,9 @@ sub call { if ($method eq 'POST') { if ($path_info =~ m!$INBOX_RE/(?:(?:git/)?([0-9]+)(?:\.git)?/)? (git-upload-pack)\z!x) { - my ($part, $path) = ($2, $3); + my ($epoch, $path) = ($2, $3); return invalid_inbox($ctx, $1) || - serve_git($ctx, $part, $path); + serve_git($ctx, $epoch, $path); } elsif ($path_info =~ m!$INBOX_RE/!o) { return invalid_inbox($ctx, $1) || mbox_results($ctx); } @@ -100,8 +100,8 @@ sub call { invalid_inbox($ctx, $1) || get_new($ctx); } elsif ($path_info =~ m!$INBOX_RE/(?:(?:git/)?([0-9]+)(?:\.git)?/)? ($PublicInbox::GitHTTPBackend::ANY)\z!ox) { - my ($part, $path) = ($2, $3); - invalid_inbox($ctx, $1) || serve_git($ctx, $part, $path); + my ($epoch, $path) = ($2, $3); + invalid_inbox($ctx, $1) || serve_git($ctx, $epoch, $path); } elsif ($path_info =~ m!$INBOX_RE/([a-zA-Z0-9_\-]+).mbox\.gz\z!o) { serve_mbox_range($ctx, $1, $2); } elsif ($path_info =~ m!$INBOX_RE/$MID_RE/$END_RE\z!o) { @@ -437,10 +437,10 @@ sub msg_page { } sub serve_git { - my ($ctx, $part, $path) = @_; + my ($ctx, $epoch, $path) = @_; my $env = $ctx->{env}; my $ibx = $ctx->{-inbox}; - my $git = defined $part ? $ibx->git_part($part) : $ibx->git; + my $git = defined $epoch ? $ibx->git_epoch($epoch) : $ibx->git; $git ? PublicInbox::GitHTTPBackend::serve($env, $git, $path) : r404(); } diff --git a/lib/PublicInbox/WwwListing.pm b/lib/PublicInbox/WwwListing.pm index e2724cc4..e052bbff 100644 --- a/lib/PublicInbox/WwwListing.pm +++ b/lib/PublicInbox/WwwListing.pm @@ -190,7 +190,7 @@ sub js ($$) { my $manifest = { -abs2urlpath => {}, -mtime => 0 }; for my $ibx (@$list) { - if (defined(my $max = $ibx->max_git_part)) { + if (defined(my $max = $ibx->max_git_epoch)) { for my $epoch (0..$max) { manifest_add($manifest, $ibx, $epoch); } diff --git a/lib/PublicInbox/WwwStream.pm b/lib/PublicInbox/WwwStream.pm index f6c50496..082e5ec9 100644 --- a/lib/PublicInbox/WwwStream.pm +++ b/lib/PublicInbox/WwwStream.pm @@ -85,11 +85,11 @@ sub _html_end { my (%seen, @urls); my $http = $ibx->base_url($ctx->{env}); chop $http; # no trailing slash for clone - my $part = $ibx->max_git_part; + my $max = $ibx->max_git_epoch; my $dir = (split(m!/!, $http))[-1]; - if (defined($part)) { # v2 + if (defined($max)) { # v2 $seen{$http} = 1; - for my $i (0..$part) { + for my $i (0..$max) { # old parts my be deleted: -d "$ibx->{mainrepo}/git/$i.git" or next; my $url = "$http/$i"; @@ -101,7 +101,7 @@ sub _html_end { push @urls, $http; } - # FIXME: partitioning in can be different in other repositories, + # FIXME: epoch splits can be different in other repositories, # use the "cloneurl" file as-is for now: foreach my $u (@{$ibx->cloneurl}) { next if $seen{$u}; @@ -109,13 +109,13 @@ sub _html_end { push @urls, $u =~ /\Ahttps?:/ ? qq(<a\nhref="$u">$u</a>) : $u; } - if (defined($part) || scalar(@urls) > 1) { + if (defined($max) || scalar(@urls) > 1) { $urls .= "\n" . join("\n", map { "\tgit clone --mirror $_" } @urls); } else { $urls .= " git clone --mirror $urls[0]"; } - if (defined $part) { + if (defined $max) { my $addrs = $ibx->{address}; $addrs = join(' ', @$addrs) if ref($addrs) eq 'ARRAY'; $urls .= <<EOF diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm index e1c6fe3a..819d7829 100644 --- a/lib/PublicInbox/Xapcmd.pm +++ b/lib/PublicInbox/Xapcmd.pm @@ -17,13 +17,13 @@ our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F); sub commit_changes ($$$) { my ($ibx, $tmp, $opt) = @_; - my $new_parts = $opt->{reshard}; + my $reshard = $opt->{reshard}; my $reindex = $opt->{reindex}; my $im = $ibx->importer(0); $im->lock_acquire if !$opt->{-coarse_lock}; $SIG{INT} or die 'BUG: $SIG{INT} not handled'; - my @old_part; + my @old_shard; while (my ($old, $new) = each %$tmp) { my @st = stat($old); @@ -40,8 +40,8 @@ sub commit_changes ($$$) { $over = undef; } - if (!defined($new)) { # culled partition - push @old_part, $old; + if (!defined($new)) { # culled shard + push @old_shard, $old; next; } @@ -58,22 +58,22 @@ sub commit_changes ($$$) { die "failed to remove $prev: $!\n"; } } - remove_tree(@old_part); + remove_tree(@old_shard); $tmp->done; if (!$opt->{-coarse_lock}) { $opt->{-skip_lock} = 1; - if ($im->can('count_partitions')) { + if ($im->can('count_shards')) { my $pr = $opt->{-progress}; - my $n = $im->count_partitions; - if (defined $new_parts && $n != $new_parts) { + my $n = $im->count_shards; + if (defined $reshard && $n != $reshard) { die -"BUG: counted $n partitions after repartioning to $new_parts"; +"BUG: counted $n shards after resharding to $reshard"; } - my $prev = $im->{partitions}; + my $prev = $im->{shards}; if ($pr && $prev != $n) { - $pr->("partition count changed: $prev => $n\n"); - $im->{partitions} = $n; + $pr->("shard count changed: $prev => $n\n"); + $im->{shards} = $n; } } @@ -171,17 +171,17 @@ sub run { my $tmp = PublicInbox::Xtmpdirs->new; my $v = $ibx->{version} ||= 1; my @q; - my $new_parts = $opt->{reshard}; - if (defined $new_parts && $new_parts <= 0) { + my $reshard = $opt->{reshard}; + if (defined $reshard && $reshard <= 0) { die "--reshard must be a positive number\n"; } # we want temporary directories to be as deep as possible, - # so v2 partitions can keep "xap$SCHEMA_VERSION" on a separate FS. + # so v2 shards can keep "xap$SCHEMA_VERSION" on a separate FS. if ($v == 1) { - if (defined $new_parts) { + if (defined $reshard) { warn -"--reshard=$new_parts ignored for v1 $ibx->{mainrepo}\n"; +"--reshard=$reshard ignored for v1 $ibx->{mainrepo}\n"; } my $old_parent = dirname($old); same_fs_or_die($old_parent, $old); @@ -191,28 +191,28 @@ sub run { push @q, [ $old, $wip ]; } else { opendir my $dh, $old or die "Failed to opendir $old: $!\n"; - my @old_parts; + my @old_shards; while (defined(my $dn = readdir($dh))) { if ($dn =~ /\A[0-9]+\z/) { - push @old_parts, $dn; + push @old_shards, $dn; } elsif ($dn eq '.' || $dn eq '..') { } elsif ($dn =~ /\Aover\.sqlite3/) { } else { warn "W: skipping unknown dir: $old/$dn\n" } } - die "No Xapian parts found in $old\n" unless @old_parts; + die "No Xapian shards found in $old\n" unless @old_shards; - my ($src, $max_part); - if (!defined($new_parts) || $new_parts == scalar(@old_parts)) { + my ($src, $max_shard); + if (!defined($reshard) || $reshard == scalar(@old_shards)) { # 1:1 copy - $max_part = scalar(@old_parts) - 1; + $max_shard = scalar(@old_shards) - 1; } else { # M:N copy - $max_part = $new_parts - 1; - $src = [ map { "$old/$_" } @old_parts ]; + $max_shard = $reshard - 1; + $src = [ map { "$old/$_" } @old_shards ]; } - foreach my $dn (0..$max_part) { + foreach my $dn (0..$max_shard) { my $tmpl = "$dn-XXXXXXXX"; my $wip = tempdir($tmpl, DIR => $old); same_fs_or_die($old, $wip); @@ -220,7 +220,7 @@ sub run { push @q, [ $src // $cur , $wip ]; $tmp->{$cur} = $wip; } - # mark old parts to be unlinked + # mark old shards to be unlinked if ($src) { $tmp->{$_} ||= undef for @$src; } @@ -305,7 +305,7 @@ sub compact ($$) { } sub cpdb_loop ($$$;$$) { - my ($src, $dst, $pr_data, $cur_part, $new_parts) = @_; + my ($src, $dst, $pr_data, $cur_shard, $reshard) = @_; my ($pr, $fmt, $nr, $pfx); if ($pr_data) { $pr = $pr_data->{pr}; @@ -326,9 +326,9 @@ sub cpdb_loop ($$$;$$) { eval { for (; $it != $end; $it++) { my $docid = $it->get_docid; - if (defined $new_parts) { - my $dst_part = $docid % $new_parts; - next if $dst_part != $cur_part; + if (defined $reshard) { + my $dst_shard = $docid % $reshard; + next if $dst_shard != $cur_shard; } my $doc = $src->get_document($docid); $dst->replace_document($docid, $doc); @@ -350,16 +350,16 @@ sub cpdb_loop ($$$;$$) { sub cpdb ($$) { my ($args, $opt) = @_; my ($old, $new) = @$args; - my ($src, $cur_part); - my $new_parts; + my ($src, $cur_shard); + my $reshard; if (ref($old) eq 'ARRAY') { - ($cur_part) = ($new =~ m!xap[0-9]+/([0-9]+)\b!); - defined $cur_part or - die "BUG: could not extract partition # from $new"; - $new_parts = $opt->{reshard}; - defined $new_parts or die 'BUG: got array src w/o --partition'; + ($cur_shard) = ($new =~ m!xap[0-9]+/([0-9]+)\b!); + defined $cur_shard or + die "BUG: could not extract shard # from $new"; + $reshard = $opt->{reshard}; + defined $reshard or die 'BUG: got array src w/o --reshard'; - # repartitioning, M:N copy means have full read access + # resharding, M:N copy means have full read access foreach (@$old) { if ($src) { my $sub = Search::Xapian::Database->new($_); @@ -397,7 +397,7 @@ sub cpdb ($$) { my $lc = $src->get_metadata('last_commit'); $dst->set_metadata('last_commit', $lc) if $lc; - # only the first xapian partition (0) gets 'indexlevel' + # only the first xapian shard (0) gets 'indexlevel' if ($new =~ m!(?:xapian[0-9]+|xap[0-9]+/0)\b!) { my $l = $src->get_metadata('indexlevel'); if ($l eq 'medium') { @@ -407,11 +407,11 @@ sub cpdb ($$) { if ($pr_data) { my $tot = $src->get_doccount; - # we can only estimate when repartitioning, + # we can only estimate when resharding, # because removed spam causes slight imbalance my $est = ''; - if (defined $cur_part && $new_parts > 1) { - $tot = int($tot/$new_parts); + if (defined $cur_shard && $reshard > 1) { + $tot = int($tot/$reshard); $est = 'around '; } my $fmt = "$pfx % ".length($tot)."u/$tot\n"; @@ -422,15 +422,15 @@ sub cpdb ($$) { }; } while (cpdb_retryable($src, $pfx)); - if (defined $new_parts) { + if (defined $reshard) { # we rely on document IDs matching NNTP article number, - # so we can't have the combined DB support rewriting + # so we can't have the Xapian sharding DB support rewriting # document IDs. Thus we iterate through each shard # individually. $src = undef; foreach (@$old) { my $old = Search::Xapian::Database->new($_); - cpdb_loop($old, $dst, $pr_data, $cur_part, $new_parts); + cpdb_loop($old, $dst, $pr_data, $cur_shard, $reshard); } } else { cpdb_loop($src, $dst, $pr_data); @@ -459,7 +459,7 @@ sub new { # http://www.tldp.org/LDP/abs/html/exitcodes.html $SIG{INT} = sub { exit(130) }; $SIG{HUP} = $SIG{PIPE} = $SIG{TERM} = sub { exit(1) }; - my $self = bless {}, $_[0]; # old partition => new (tmp) partition + my $self = bless {}, $_[0]; # old shard => new (WIP) shard $owner{"$self"} = $$; $self; } @@ -481,7 +481,7 @@ sub DESTROY { my $owner_pid = delete $owner{"$self"} or return; return if $owner_pid != $$; foreach my $new (values %$self) { - defined $new or next; # may be undef if repartitioning + defined $new or next; # may be undef if resharding remove_tree($new) unless -d "$new/old"; } done($self); |