diff options
author | Eric Wong <e@80x24.org> | 2023-03-26 09:35:43 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2023-03-26 09:35:43 +0000 |
commit | 511b7ca599f1c151f6c50f5ba848be60d5857e22 (patch) | |
tree | a7aa219b08af891c3d86d7be250c92ed597de261 /lib/PublicInbox | |
parent | 2dd3cec8783700f061a0c9b69e329918a4f5cccd (diff) | |
parent | 2e28cc7edb58b04404a836dffc07d47b1a38ee17 (diff) | |
download | public-inbox-511b7ca599f1c151f6c50f5ba848be60d5857e22.tar.gz |
* cindex: (29 commits) cindex: --prune checkpoints to avoid OOM cindex: ignore SIGPIPE cindex: respect existing permissions cindex: squelch incompatible options cindex: implement reindex cindex: add support for --prune cindex: filter out non-existent git directories spawn: show failing directory for chdir failures cindex: improve granularity of quit checks cindex: attempt to give oldest commits lowest docids cindex: truncate or drop body for over-sized commits cindex: check for checkpoint before giant messages cindex: implement --max-size=SIZE sigfd: pass signal name rather than number to callback cindex: handle graceful shutdown by default cindex: drop `unchanged' progress message cindex: show shard number in progress message cindex: implement --exclude= like -clone ds: @post_loop_do replaces SetPostLoopCallback cindex: use DS and workqueues for parallelism ...
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/Admin.pm | 18 | ||||
-rw-r--r-- | lib/PublicInbox/CodeSearch.pm | 121 | ||||
-rw-r--r-- | lib/PublicInbox/CodeSearchIdx.pm | 846 | ||||
-rw-r--r-- | lib/PublicInbox/Config.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/DS.pm | 30 | ||||
-rw-r--r-- | lib/PublicInbox/Daemon.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/IPC.pm | 33 | ||||
-rw-r--r-- | lib/PublicInbox/LEI.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/LeiSearch.pm | 14 | ||||
-rw-r--r-- | lib/PublicInbox/MiscIdx.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/Search.pm | 77 | ||||
-rw-r--r-- | lib/PublicInbox/SearchIdx.pm | 88 | ||||
-rw-r--r-- | lib/PublicInbox/SearchIdxShard.pm | 7 | ||||
-rw-r--r-- | lib/PublicInbox/Sigfd.pm | 10 | ||||
-rw-r--r-- | lib/PublicInbox/Spawn.pm | 6 | ||||
-rw-r--r-- | lib/PublicInbox/SpawnPP.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/TestCommon.pm | 47 | ||||
-rw-r--r-- | lib/PublicInbox/V2Writable.pm | 26 | ||||
-rw-r--r-- | lib/PublicInbox/Watch.pm | 2 |
20 files changed, 1182 insertions, 159 deletions
diff --git a/lib/PublicInbox/Admin.pm b/lib/PublicInbox/Admin.pm index 11ea8f83..abfcbb9c 100644 --- a/lib/PublicInbox/Admin.pm +++ b/lib/PublicInbox/Admin.pm @@ -1,10 +1,10 @@ -# Copyright (C) 2019-2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # common stuff for administrative command-line tools # Unstable internal API package PublicInbox::Admin; -use strict; +use v5.12; use parent qw(Exporter); our @EXPORT_OK = qw(setup_signals); use PublicInbox::Config; @@ -69,14 +69,22 @@ sub resolve_inboxdir { die "`$try' is not a directory\n"; } } + my $dir = resolve_git_dir($cd); + $$ver = 1 if $ver; + $dir; +} + +sub resolve_git_dir { + my ($cd) = @_; # try v1 bare git dirs my $cmd = [ qw(git rev-parse --git-dir) ]; my $fh = popen_rd($cmd, undef, {-C => $cd}); my $dir = do { local $/; <$fh> }; - close $fh or die "error in @$cmd (cwd:${\($cd // '.')}): $!\n"; + close $fh or die "error in @$cmd (cwd:${\($cd // '.')}): $?\n"; chomp $dir; - $$ver = 1 if $ver; - rel2abs_collapsed($dir eq '.' ? ($cd // $dir) : $dir); + # --absolute-git-dir requires git v2.13.0+ + $dir = rel2abs_collapsed($dir, $cd) if $dir !~ m!\A/!; + $dir; } # for unconfigured inboxes diff --git a/lib/PublicInbox/CodeSearch.pm b/lib/PublicInbox/CodeSearch.pm new file mode 100644 index 00000000..1dfc124f --- /dev/null +++ b/lib/PublicInbox/CodeSearch.pm @@ -0,0 +1,121 @@ +# Copyright (C) all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# read-only external index for coderepos +# currently, it only indexes commits and repository metadata +# (pathname, root commits); not blob contents +package PublicInbox::CodeSearch; +use v5.12; +use parent qw(PublicInbox::Search); +use PublicInbox::Search qw(retry_reopen int_val xap_terms); +use constant { + AT => 0, # author time YYYYMMDDHHMMSS, dt: for mail) + CT => 1, # commit time (Unix time stamp, like TS/rt: in mail) + CIDX_SCHEMA_VER => 1, # brand new schema for code search + # for repos (`Tr'), CT(col=1) is used for the latest tip commit time + # in refs/{heads,tags}. AT(col=0) may be used to store disk usage + # in the future, but disk usage calculation is espensive w/ alternates +}; + +# note: the non-X term prefix allocations are shared with Xapian omega, +# see xapian-applications/omega/docs/termprefixes.rst +# bool_pfx_internal: +# type => 'T', # 'c' - commit, 'r' - repo GIT_DIR +# tags are not indexed, only normal branches (refs/heads/*), not hidden +# 'P' # (pathname) GIT_DIR # uniq +# 'G' # (group) root commit (may have multiple roots) +my %bool_pfx_external = ( + oid => 'Q', # type:commit - git OID hex (40|64)-byte SHA-(1|256) + # type:repo - rel2abs_collapsed(GIT_DIR) + parent => 'XP', + %PublicInbox::Search::PATCH_BOOL_COMMON, +); + +my %prob_prefix = ( # copied from PublicInbox::Search + # do we care about committer? or partial commit OID via Xapian? + # o => 'XQ', # 'oid:' (bool) is exact, 'o:' (prob) can do partial + %PublicInbox::Search::PATCH_PROB_COMMON, + + # default: + '' => 'S A XQUOT XFN ' . $PublicInbox::Search::NON_QUOTED_BODY +); + +sub new { + my ($cls, $dir) = @_; + bless { xpfx => "$dir/cidx".CIDX_SCHEMA_VER }, $cls; +} + +sub cqparse_new ($) { + my ($self) = @_; + my $qp = $self->qp_init_common; + my $cb = $qp->can('add_valuerangeprocessor') // + $qp->can('add_rangeprocessor'); # Xapian 1.5.0+ + $cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'd:')); # mairix compat + $cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'dt:')); # mail compat + $cb->($qp, $PublicInbox::Search::NVRP->new(CT, 'ct:')); + + while (my ($name, $pfx) = each %bool_pfx_external) { + $qp->add_boolean_prefix($name, $_) for split(/ /, $pfx); + } + while (my ($name, $pfx) = each %prob_prefix) { + $qp->add_prefix($name, $_) for split(/ /, $pfx); + } + $qp; +} + +# returns a Xapian::Query to filter by roots +sub roots_filter { # retry_reopen callback + my ($self, $git_dir) = @_; + my $xdb = $self->xdb; + my $P = 'P'.$git_dir; + my ($cur, $end) = ($xdb->postlist_begin($P), $xdb->postlist_end($P)); + if ($cur == $end) { + warn "W: $git_dir not indexed?\n"; + return; + } + my @roots = xap_terms('G', $xdb, $cur->get_docid); + if (!@roots) { + warn "W: $git_dir has no root commits?\n"; + return; + } + my $q = $PublicInbox::Search::X{Query}->new('G'.shift(@roots)); + for my $r (@roots) { + $q = $PublicInbox::Search::X{Query}->new( + PublicInbox::Search::OP_OR(), + $q, 'G'.$r); + } + $q; +} + +sub mset { + my ($self, $qry_str, $opt) = @_; + my $qp = $self->{qp} //= cqparse_new($self); + my $qry = $qp->parse_query($qry_str, $self->{qp_flags}); + + # limit to commits with shared roots + if (defined(my $git_dir = $opt->{git_dir})) { + my $rf = retry_reopen($self, \&roots_filter, $git_dir) + or return; + + $qry = $PublicInbox::Search::X{Query}->new( + PublicInbox::Search::OP_FILTER(), + $qry, $rf); + } + + # we only want commits: + $qry = $PublicInbox::Search::X{Query}->new( + PublicInbox::Search::OP_FILTER(), + $qry, 'T'.'c'); + + my $enq = $PublicInbox::Search::X{Enquire}->new($self->xdb); + $enq->set_query($qry); + if ($opt->{relevance}) { + $enq->set_sort_by_relevance_then_value(CT, !$opt->{asc}); + } else { + $enq->set_sort_by_value_then_relevance(CT, !$opt->{asc}); + } + $self->retry_reopen($self->can('enquire_once'), $enq, + $opt->{offset} || 0, $opt->{limit} || 50); +} + +1; diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm new file mode 100644 index 00000000..e353f452 --- /dev/null +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -0,0 +1,846 @@ +# Copyright (C) all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +# +# indexer for git coderepos, just commits and repo paths for now +# this stores normalized absolute paths of indexed GIT_DIR inside +# the DB itself and is designed to handle forks by designating roots +# +# Unlike mail search, docid isn't tied to NNTP artnum or IMAP UID, +# there's no serial number dependency at all. The first 32-bits of +# the commit SHA-(1|256) is used to select a shard. +# +# We shard repos using the first 32-bits of sha256($ABS_GIT_DIR) +# +# See PublicInbox::CodeSearch (read-only API) for more +package PublicInbox::CodeSearchIdx; +use v5.12; +# parent order matters, we want ->DESTROY from IPC, not SearchIdx +use parent qw(PublicInbox::CodeSearch PublicInbox::IPC PublicInbox::SearchIdx); +use PublicInbox::Eml; +use PublicInbox::DS qw(awaitpid); +use PublicInbox::PktOp; +use PublicInbox::IPC qw(nproc_shards); +use PublicInbox::Admin; +use POSIX qw(WNOHANG SEEK_SET); +use File::Path (); +use File::Spec (); +use PublicInbox::SHA qw(sha256_hex); +use PublicInbox::Search qw(xap_terms); +use PublicInbox::SearchIdx qw(add_val); +use PublicInbox::Config qw(glob2re); +use PublicInbox::Spawn qw(spawn popen_rd); +use PublicInbox::OnDestroy; +use Socket qw(MSG_EOR); +use Carp (); +our ( + $LIVE, # pid => cmd + $DEFER, # [ [ cb, @args ], ... ] + $LIVE_JOBS, # integer + $MY_SIG, # like %SIG + $SIGSET, + $TXN_BYTES, # number of bytes in current shard transaction + $DO_QUIT, # signal number + @RDONLY_SHARDS, # Xapian::Database + @IDX_SHARDS, # clones of self + $MAX_SIZE, + $TMP_GIT, # PublicInbox::Git object for --prune + $REINDEX, # PublicInbox::SharedKV +); + +# stop walking history if we see >$SEEN_MAX existing commits, this assumes +# branches don't diverge by more than this number of commits... +# git walks commits quickly if it doesn't have to read trees +our $SEEN_MAX = 100000; + +# TODO: do we care about committer name + email? or tree OID? +my @FMT = qw(H P ct an ae at s b); # (b)ody must be last + +# git log --stdin buffers all commits before emitting, thus --reverse +# doesn't incur extra overhead. We use --reverse to keep Xapian docids +# increasing so we may be able to avoid sorting results in some cases +my @LOG_STDIN = (qw(log --no-decorate --no-color --no-notes -p --stat -M + --reverse --stdin --no-walk=unsorted), '--pretty=format:%n%x00'. + join('%n', map { "%$_" } @FMT)); + +sub new { + my (undef, $dir, $opt) = @_; + my $l = $opt->{indexlevel} // 'full'; + $l !~ $PublicInbox::SearchIdx::INDEXLEVELS and + die "invalid indexlevel=$l\n"; + $l eq 'basic' and die "E: indexlevel=basic not supported\n"; + my $self = bless { + xpfx => "$dir/cidx". PublicInbox::CodeSearch::CIDX_SCHEMA_VER, + cidx_dir => $dir, + creat => 1, # TODO: get rid of this, should be implicit + indexlevel => $l, + transact_bytes => 0, # for checkpoint + total_bytes => 0, # for lock_release + current_info => '', + parallel => 1, + -opt => $opt, + lock_path => "$dir/cidx.lock", + }, __PACKAGE__; + $self->{nshard} = count_shards($self) || + nproc_shards({nproc => $opt->{jobs}}); + $self->{-no_fsync} = 1 if !$opt->{fsync}; + $self->{-dangerous} = 1 if $opt->{dangerous}; + $self; +} + +# TODO: may be used for reshard/compact +sub count_shards { scalar($_[0]->xdb_shards_flat) } + +sub update_commit ($$) { + my ($self, $cmt) = @_; # fields from @FMT + my $x = 'Q'.$cmt->{H}; + my ($docid, @extra) = sort { $a <=> $b } docids_by_postlist($self, $x); + @extra and warn "W: $cmt->{H} indexed multiple times, pruning ", + join(', ', map { "#$_" } @extra), "\n"; + $self->{xdb}->delete_document($_) for @extra; + my $doc = $PublicInbox::Search::X{Document}->new; + $doc->add_boolean_term($x); + $doc->add_boolean_term('G'.$_) for @{$self->{roots}}; + $doc->add_boolean_term('XP'.$_) for split(/ /, $cmt->{P}); + $doc->add_boolean_term('T'.'c'); + + # Author-Time is compatible with dt: for mail search schema_version=15 + add_val($doc, PublicInbox::CodeSearch::AT, + POSIX::strftime('%Y%m%d%H%M%S', gmtime($cmt->{at}))); + + # Commit-Time is the fallback used by rt: (TS) for mail search: + add_val($doc, PublicInbox::CodeSearch::CT, $cmt->{ct}); + + $self->term_generator->set_document($doc); + + # email address is always indexed with positional data for usability + $self->index_phrase("$cmt->{an} <$cmt->{ae}>", 1, 'A'); + + $x = $cmt->{'s'}; + $self->index_text($x, 1, 'S') if $x =~ /\S/s; + $doc->set_data($x); # subject is the first (and currently only) line + + $x = delete $cmt->{b}; + $self->index_body_text($doc, \$x) if $x =~ /\S/s; + defined($docid) ? $self->{xdb}->replace_document($docid, $doc) : + $self->{xdb}->add_document($doc); +} + +sub progress { + my ($self, @msg) = @_; + my $pr = $self->{-opt}->{-progress} or return; + $pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n"); +} + +sub store_repo { # wq_do - returns docid + my ($self, $repo) = @_; + $self->begin_txn_lazy; + my $xdb = $self->{xdb}; + for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed? + if (defined $repo->{docid}) { + my $doc = $xdb->get_document($repo->{docid}) // + die "$repo->{git_dir} doc #$repo->{docid} gone"; + add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct}); + my %new = map { $_ => undef } @{$repo->{roots}}; + my $old = xap_terms('G', $doc); + delete @new{keys %$old}; + $doc->add_boolean_term('G'.$_) for keys %new; + delete @$old{@{$repo->{roots}}}; + $doc->remove_term('G'.$_) for keys %$old; + $doc->set_data($repo->{fp}); + $xdb->replace_document($repo->{docid}, $doc); + $repo->{docid} + } else { + my $new = $PublicInbox::Search::X{Document}->new; + add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct}); + $new->add_boolean_term("P$repo->{git_dir}"); + $new->add_boolean_term('T'.'r'); + $new->add_boolean_term('G'.$_) for @{$repo->{roots}}; + $new->set_data($repo->{fp}); # \n delimited + $xdb->add_document($new); + } +} + +sub cidx_ckpoint ($$) { + my ($self, $msg) = @_; + progress($self, $msg); + return if $PublicInbox::Search::X{CLOEXEC_UNSET}; + $self->{xdb}->commit_transaction; + $self->{xdb}->begin_transaction; +} + +sub truncate_cmt ($$) { + my ($cmt) = @_; # _[1] is $buf (giant) + my ($orig_len, $len); + $len = $orig_len = length($_[1]); + @$cmt{@FMT} = split(/\n/, $_[1], scalar(@FMT)); + undef $_[1]; + $len -= length($cmt->{b}); + + # try to keep the commit message body. + # n.b. this diffstat split may be unreliable but it's not worth + # perfection for giant commits: + my ($bdy) = split(/^---\n/sm, delete($cmt->{b}), 2); + if (($len + length($bdy)) <= $MAX_SIZE) { + $len += length($bdy); + $cmt->{b} = $bdy; + warn <<EOM; +W: $cmt->{H}: truncated body ($orig_len => $len bytes) +W: to be under --max-size=$MAX_SIZE +EOM + } else { + $cmt->{b} = ''; + warn <<EOM; +W: $cmt->{H}: deleted body ($orig_len => $len bytes) +W: to be under --max-size=$MAX_SIZE +EOM + } + $len; +} + +# sharded reader for `git log --pretty=format: --stdin' +sub shard_index { # via wq_io_do + my ($self, $git, $n, $roots) = @_; + local $self->{current_info} = "$git->{git_dir} [$n]"; + local $self->{roots} = $roots; + my $in = delete($self->{0}) // die 'BUG: no {0} input'; + my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p'; + my $batch_bytes = $self->{-opt}->{batch_size} // + $PublicInbox::SearchIdx::BATCH_BYTES; + local $MAX_SIZE = $self->{-opt}->{max_size}; + # local-ized in parent before fork + $TXN_BYTES = $batch_bytes; + local $self->{git} = $git; # for patchid + return if $DO_QUIT; + my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in }); + close $in or die "close: $!"; + my $nr = 0; + + # a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4 + # in linux.git, so we use $/ = "\n\0" to check end-of-patch + my $FS = "\n\0"; + my $len; + my $cmt = {}; + local $/ = $FS; + my $buf = <$rd> // return close($rd); # leading $FS + $buf eq $FS or die "BUG: not LF-NUL: $buf\n"; + $self->begin_txn_lazy; + while (!$DO_QUIT && defined($buf = <$rd>)) { + chomp($buf); + $/ = "\n"; + $len = length($buf); + if (defined($MAX_SIZE) && $len > $MAX_SIZE) { + $len = truncate_cmt($cmt, $buf); + } else { + @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT)); + } + $TXN_BYTES -= $len; + if ($TXN_BYTES <= 0) { + cidx_ckpoint($self, "[$n] $nr"); + $TXN_BYTES = $batch_bytes - $len; + } + update_commit($self, $cmt); + ++$nr; + if ($TXN_BYTES <= 0) { + cidx_ckpoint($self, "[$n] $nr"); + $TXN_BYTES = $batch_bytes; + } + $/ = $FS; + } + close($rd); + if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT || + ($? & 127) == POSIX::SIGPIPE))) { + send($op_p, "shard_done $n", MSG_EOR); + } else { + warn "E: git @LOG_STDIN: \$?=$?\n"; + $self->{xdb}->cancel_transaction; + } +} + +sub shard_done { # called via PktOp on shard_index completion + my ($self, $n) = @_; + $self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok}); +} + +sub seen ($$) { + my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH" + for (1..100) { + my $ret = eval { + $xdb->postlist_begin($q) != $xdb->postlist_end($q); + }; + return $ret unless $@; + if (ref($@) =~ /\bDatabaseModifiedError\b/) { + $xdb->reopen; + } else { + Carp::croak($@); + } + } + Carp::croak('too many Xapian DB modifications in progress'); +} + +# used to select the shard for a GIT_DIR +sub git_dir_hash ($) { hex(substr(sha256_hex($_[0]), 0, 8)) } + +sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search + my ($self, $q) = @_; + my $cur = $self->{xdb}->postlist_begin($q); + my $end = $self->{xdb}->postlist_end($q); + my @ids; + for (; $cur != $end; $cur++) { push(@ids, $cur->get_docid) }; + @ids; +} + +sub run_todo ($) { + my ($self) = @_; + my $n; + while (defined(my $x = shift(@{$self->{todo} // []}))) { + my $cb = shift @$x; + $cb->(@$x); + ++$n; + } + $n; +} + +sub need_reap { # post_loop_do + my (undef, $jobs) = @_; + return if !$LIVE || $DO_QUIT; + scalar(keys(%$LIVE)) > $jobs; +} + +sub cidx_reap ($$) { + my ($self, $jobs) = @_; + while (run_todo($self)) {} + local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs); + while (need_reap(undef, $jobs)) { + PublicInbox::DS::event_loop($MY_SIG, $SIGSET); + } + while (!$jobs && run_todo($self)) {} +} + +sub cidx_await_cb { # awaitpid cb + my ($pid, $cb, $self, $git, @args) = @_; + return if !$LIVE || $DO_QUIT; + my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd'; + PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC + if ($?) { + $git->{-cidx_err} = 1; + return warn("@$cmd error: \$?=$?\n"); + } + push(@$DEFER, [ $cb, $self, $git, @args ]) if $DEFER; +} + +sub cidx_await ($$$$$@) { + my ($pid, $cmd, $cb, $self, $git, @args) = @_; + $LIVE->{$pid} = $cmd; + awaitpid($pid, \&cidx_await_cb, $cb, $self, $git, @args); +} + +# this is different from the grokmirror-compatible fingerprint since we +# only care about --heads (branches) and --tags, and not even their names +sub fp_start ($$$) { + my ($self, $git, $prep_repo) = @_; + return if !$LIVE || $DO_QUIT; + cidx_reap($self, $LIVE_JOBS); + open my $refs, '+>', undef or die "open: $!"; + my $cmd = ['git', "--git-dir=$git->{git_dir}", + qw(show-ref --heads --tags --hash)]; + my $pid = spawn($cmd, undef, { 1 => $refs }); + $git->{-repo}->{refs} = $refs; + cidx_await($pid, $cmd, \&fp_fini, $self, $git, $prep_repo); +} + +sub fp_fini { # cidx_await cb + my ($self, $git, $prep_repo) = @_; + my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; + seek($refs, 0, SEEK_SET) or die "seek: $!"; + my $buf; + my $dig = PublicInbox::SHA->new(256); + while (read($refs, $buf, 65536)) { $dig->add($buf) } + $git->{-repo}->{fp} = $dig->hexdigest; +} + +sub ct_start ($$$) { + my ($self, $git, $prep_repo) = @_; + return if !$LIVE || $DO_QUIT; + cidx_reap($self, $LIVE_JOBS); + my $cmd = [ 'git', "--git-dir=$git->{git_dir}", + qw[for-each-ref --sort=-committerdate + --format=%(committerdate:raw) --count=1 + refs/heads/ refs/tags/] ]; + my ($rd, $pid) = popen_rd($cmd); + cidx_await($pid, $cmd, \&ct_fini, $self, $git, $rd, $prep_repo); +} + +sub ct_fini { # cidx_await cb + my ($self, $git, $rd, $prep_repo) = @_; + defined(my $ct = <$rd>) or return; + $ct =~ s/\s+.*\z//s; # drop TZ + LF + $git->{-repo}->{ct} = $ct + 0; +} + +# TODO: also index gitweb.owner and the full fingerprint for grokmirror? +sub prep_repo ($$) { + my ($self, $git) = @_; + return if !$LIVE || $DO_QUIT || $git->{-cidx_err}; + my $repo = $git->{-repo} // die 'BUG: no {-repo}'; + if (!defined($repo->{ct})) { + warn "W: $git->{git_dir} has no commits, skipping\n"; + delete $git->{-repo}; + return; + } + my $n = git_dir_hash($git->{git_dir}) % $self->{nshard}; + my $shard = bless { %$self, shard => $n }, ref($self); + $repo->{shard_n} = $n; + delete @$shard{qw(lockfh lock_path)}; + local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef"; + $shard->retry_reopen(\&check_existing, $self, $git); +} + +sub check_existing { # retry_reopen callback + my ($shard, $self, $git) = @_; + my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir}); + my $docid = shift(@docids) // return get_roots($self, $git); + my $doc = $shard->{xdb}->get_document($docid) // + die "BUG: no #$docid ($git->{git_dir})"; + my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data; + if ($old_fp eq $git->{-repo}->{fp}) { # no change + delete $git->{-repo}; + return; + } + $git->{-repo}->{docid} = $docid; + if (@docids) { + warn "BUG: $git->{git_dir} indexed multiple times, culling\n"; + $git->{-repo}->{to_delete} = \@docids; # XXX needed? + } + get_roots($self, $git); +} + +sub partition_refs ($$$) { + my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output + sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin + my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs }); + close $refs or die "close: $!"; + my ($seen, $nchange) = (0, 0); + my @shard_in = map { + $_->reopen; + open my $fh, '+>', undef or die "open: $!"; + $fh; + } @RDONLY_SHARDS; + + while (defined(my $cmt = <$rfh>)) { + chomp $cmt; + my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS); + if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) { + say { $shard_in[$n] } $cmt or die "say: $!"; + ++$nchange; + } elsif (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) { + last if ++$seen > $SEEN_MAX; + } else { + say { $shard_in[$n] } $cmt or die "say: $!"; + ++$nchange; + $seen = 0; + } + if ($DO_QUIT) { + close($rfh); + return (); + } + } + close($rfh); + return () if $DO_QUIT; + if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) { + $self->{nchange} += $nchange; + progress($self, "$git->{git_dir}: $nchange commits"); + for my $fh (@shard_in) { + $fh->flush or die "flush: $!"; + sysseek($fh, 0, SEEK_SET) or die "seek: $!"; + } + return @shard_in; + } + die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n"; +} + +sub shard_commit { # via wq_io_do + my ($self, $n) = @_; + my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; + $self->commit_txn_lazy; + send($op_p, "shard_done $n", MSG_EOR); +} + +sub consumers_open { # post_loop_do + my (undef, $consumers) = @_; + return if $DO_QUIT; + scalar(grep { $_->{sock} } values %$consumers); +} + +sub wait_consumers ($$$) { + my ($self, $git, $consumers) = @_; + local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers); + PublicInbox::DS::event_loop($MY_SIG, $SIGSET); + my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers; + die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT; +} + +sub commit_used_shards ($$$) { + my ($self, $git, $consumers) = @_; + local $self->{-shard_ok} = {}; + for my $n (keys %$consumers) { + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{shard_done} = [ $self ]; + $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n); + $consumers->{$n} = $c; + } + wait_consumers($self, $git, $consumers); +} + +sub index_repo { # cidx_await cb + my ($self, $git, $roots) = @_; + return if $git->{-cidx_err} || $DO_QUIT; + my $repo = delete $git->{-repo} or return; + seek($roots, 0, SEEK_SET) or die "seek: $!"; + chomp(my @roots = <$roots>); + close($roots) or die "close: $!"; + @roots or return warn("E: $git->{git_dir} has no root commits\n"); + $repo->{roots} = \@roots; + local $self->{current_info} = $git->{git_dir}; + my @shard_in = partition_refs($self, $git, delete($repo->{refs})); + local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1 + my $consumers = {}; + for my $n (0..$#shard_in) { + -s $shard_in[$n] or next; + last if $DO_QUIT; + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{shard_done} = [ $self ]; + $IDX_SHARDS[$n]->wq_io_do('shard_index', + [ $shard_in[$n], $p->{op_p} ], + $git, $n, \@roots); + $consumers->{$n} = $c; + } + @shard_in = (); + wait_consumers($self, $git, $consumers); + if ($DO_QUIT) { + commit_used_shards($self, $git, $consumers); + progress($self, "$git->{git_dir}: done"); + return; + } + $repo->{git_dir} = $git->{git_dir}; + my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo); + if ($id > 0) { + $consumers->{$repo->{shard_n}} = undef; + commit_used_shards($self, $git, $consumers); + progress($self, "$git->{git_dir}: done"); + return run_todo($self); + } + die "E: store_repo $git->{git_dir}: id=$id"; +} + +sub get_roots ($$) { + my ($self, $git) = @_; + return if !$LIVE || $DO_QUIT; + my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; + sysseek($refs, 0, SEEK_SET) or die "seek: $!"; + open my $roots, '+>', undef or die "open: $!"; + my $cmd = [ 'git', "--git-dir=$git->{git_dir}", + qw(rev-list --stdin --max-parents=0) ]; + my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots }); + cidx_await($pid, $cmd, \&index_repo, $self, $git, $roots); +} + +# for PublicInbox::SearchIdx::patch_id and with_umask +sub git { $_[0]->{git} } + +sub load_existing ($) { # for -u/--update + my ($self) = @_; + my $dirs = $self->{git_dirs} // []; + if ($self->{-opt}->{update} || $self->{-opt}->{prune}) { + local $self->{xdb}; + $self->xdb or + die "E: $self->{cidx_dir} non-existent for --update\n"; + my @missing; + my @cur = grep { + if (-e $_) { + 1; + } else { + push @missing, $_; + undef; + } + } $self->all_terms('P'); + @missing = () if $self->{-opt}->{prune}; + @missing and warn "W: the following repos no longer exist:\n", + (map { "W:\t$_\n" } @missing), + "W: use --prune to remove them from ", + $self->{cidx_dir}, "\n"; + push @$dirs, @cur; + } + my %uniq; # List::Util::uniq requires Perl 5.26+ + @$dirs = grep { !$uniq{$_}++ } @$dirs; +} + +# SIG handlers: +sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() } +sub shard_usr1 { $TXN_BYTES = -1 } + +sub cidx_init ($) { + my ($self) = @_; + my $dir = $self->{cidx_dir}; + unless (-d $dir) { + warn "# creating $dir\n" if !$self->{-opt}->{quiet}; + File::Path::mkpath($dir); + } + $self->lock_acquire; + my @shards; + local $TXN_BYTES; + for my $n (0..($self->{nshard} - 1)) { + my $shard = bless { %$self, shard => $n }, ref($self); + delete @$shard{qw(lockfh lock_path)}; + $shard->idx_acquire; + $shard->idx_release; + $shard->wq_workers_start("shard[$n]", 1, $SIGSET, { + siblings => \@shards, # for ipc_atfork_child + }, \&shard_done_wait, $self); + push @shards, $shard; + } + # this warning needs to happen after idx_acquire + state $once; + warn <<EOM if $PublicInbox::Search::X{CLOEXEC_UNSET} && !$once++; +W: Xapian v1.2.21..v1.2.24 were missing close-on-exec on OFD locks, +W: memory usage may be high for large indexing runs +EOM + @shards; +} + +sub scan_git_dirs ($) { + my ($self) = @_; + for (@{$self->{git_dirs}}) { + my $git = PublicInbox::Git->new($_); + my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo, + $self, $git); + fp_start($self, $git, $prep_repo); + ct_start($self, $git, $prep_repo); + last if $DO_QUIT; + } + cidx_reap($self, 0); +} + +sub prune_cb { # git->check_async callback + my ($hex, $type, undef, $self_id) = @_; + return if $type eq 'commit'; + my ($self, $id) = @$self_id; + my $len = $self->{xdb}->get_doclength($id); + progress($self, "$hex $type (doclength=$len)"); + ++$self->{pruned}; + $self->{xdb}->delete_document($id); + + # all math around batch_bytes calculation is pretty fuzzy, + # but need a way to regularly flush output to avoid OOM, + # so assume the average term + position overhead is the + # answer to everything: 42 + return if ($self->{batch_bytes} -= ($len * 42)) > 0; + cidx_ckpoint($self, "[$self->{shard}] $self->{pruned}"); + $self->{batch_bytes} = $self->{-opt}->{batch_size} // + $PublicInbox::SearchIdx::BATCH_BYTES; +} + +sub shard_prune { # via wq_io_do + my ($self, $n, $git_dir) = @_; + my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; + my $git = PublicInbox::Git->new($git_dir); # TMP_GIT copy + $self->begin_txn_lazy; + my $xdb = $self->{xdb}; + my $cur = $xdb->postlist_begin('Tc'); + my $end = $xdb->postlist_end('Tc'); + my ($id, @cmt, $oid); + local $self->{batch_bytes} = $self->{-opt}->{batch_size} // + $PublicInbox::SearchIdx::BATCH_BYTES; + local $self->{pruned} = 0; + for (; $cur != $end && !$DO_QUIT; $cur++) { + @cmt = xap_terms('Q', $xdb, $id = $cur->get_docid); + scalar(@cmt) == 1 or + warn "BUG? shard[$n] #$id has multiple commits: @cmt"; + for $oid (@cmt) { + $git->check_async($oid, \&prune_cb, [ $self, $id ]); + } + } + $git->async_wait_all; + for my $d ($self->all_terms('P')) { # GIT_DIR paths + last if $DO_QUIT; + next if -d $d; + for $id (docids_by_postlist($self, 'P'.$d)) { + progress($self, "$d gone #$id"); + $xdb->delete_document($id); + } + } + $self->commit_txn_lazy; + $self->{pruned} and + progress($self, "[$n] pruned $self->{pruned} commits"); + send($op_p, "shard_done $n", MSG_EOR); +} + +sub do_prune ($) { + my ($self) = @_; + my $consumers = {}; + my $git_dir = $TMP_GIT->{git_dir}; + my $n = 0; + local $self->{-shard_ok} = {}; + for my $s (@IDX_SHARDS) { + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{shard_done} = [ $self ]; + $s->wq_io_do('shard_prune', [ $p->{op_p} ], $n, $git_dir); + $consumers->{$n++} = $c; + } + wait_consumers($self, $TMP_GIT, $consumers); +} + +sub shards_active { # post_loop_do + scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); +} + +# signal handlers +sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS } + +sub parent_quit { + $DO_QUIT = POSIX->can("SIG$_[0]")->(); + kill_shards(@_); + warn "# SIG$_[0] received, quitting...\n"; +} + +sub init_tmp_git_dir ($) { + my ($self) = @_; + return unless $self->{-opt}->{prune}; + require File::Temp; + require PublicInbox::Import; + my $tmp = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); + PublicInbox::Import::init_bare("$tmp", 'cidx-all'); + my $f = "$tmp/objects/info/alternates"; + open my $fh, '>', $f or die "open($f): $!"; + my $o; + for (@{$self->{git_dirs}}) { # TODO: sha256 check? + $o = $_.'/objects'; + say $fh $o if -d $o; + } + close $fh or die "close($f): $!"; + $TMP_GIT = PublicInbox::Git->new("$tmp"); + $TMP_GIT->{-tmp} = $tmp; +} + +sub prep_umask ($) { + my ($self) = @_; + my $um; + my $cur = umask; + if ($self->{-internal}) { # respect core.sharedRepository + @{$self->{git_dirs}} == 1 or die 'BUG: only for GIT_DIR'; + # yuck, FIXME move umask handling out of inbox-specific stuff + require PublicInbox::InboxWritable; + my $git = PublicInbox::Git->new($self->{git_dirs}->[0]); + chomp($um = $git->qx('config', 'core.sharedRepository') // ''); + $um = PublicInbox::InboxWritable::_git_config_perm(undef, $um); + $um = PublicInbox::InboxWritable::_umask_for($um); + umask == $um or progress($self, 'umask from git: ', + sprintf('0%03o', $um)); + } elsif (-d $self->{cidx_dir}) { # respect existing perms + my @st = stat(_); + $um = (~$st[2] & 0777); + umask == $um or progress($self, 'using umask from ', + $self->{cidx_dir}, ': ', + sprintf('0%03o', $um)); + } + defined($um) ? + PublicInbox::OnDestroy->new(\&CORE::umask, umask($um)) : + undef; +} + +sub cidx_run { # main entry point + my ($self) = @_; + my $restore_umask = prep_umask($self); + local $self->{todo} = []; + local $DEFER = $self->{todo}; + local $SIGSET = PublicInbox::DS::block_signals(); + my $restore = PublicInbox::OnDestroy->new($$, + \&PublicInbox::DS::sig_setmask, $SIGSET); + local $LIVE = {}; + local $DO_QUIT; + local $TMP_GIT; + local @IDX_SHARDS = cidx_init($self); + local $self->{current_info} = ''; + local $MY_SIG = { + CHLD => \&PublicInbox::DS::enqueue_reap, + USR1 => \&kill_shards, + }; + $MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT); + my $cb = $SIG{__WARN__} || \&CORE::warn; + local $SIG{__WARN__} = sub { + my $m = shift @_; + $self->{current_info} eq '' or + $m =~ s/\A(#?\s*)/$1$self->{current_info}: /; + $cb->($m, @_); + }; + load_existing($self) unless $self->{-internal}; + local $REINDEX; + if ($self->{-opt}->{reindex}) { + require PublicInbox::SharedKV; + $REINDEX = PublicInbox::SharedKV->new; + delete $REINDEX->{lock_path}; + $REINDEX->dbh; + } + my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}}; + if (@nc) { + warn "E: BUG? paths in $self->{cidx_dir} not canonicalized:\n"; + for my $d (@{$self->{git_dirs}}) { + my $c = File::Spec->canonpath($_); + warn "E: $d => $c\n"; + $d = $c; + } + warn "E: canonicalized and attempting to continue\n"; + } + if (defined(my $excl = $self->{-opt}->{exclude})) { + my $re = '(?:'.join('\\z|', map { + glob2re($_) // qr/\A\Q$_\E/ + } @$excl).'\\z)'; + @{$self->{git_dirs}} = grep { + $_ =~ /$re/ ? (warn("# excluding $_\n"), 0) : 1; + } @{$self->{git_dirs}}; + } + local $self->{nchange} = 0; + local $LIVE_JOBS = $self->{-opt}->{jobs} || + PublicInbox::IPC::detect_nproc() || 2; + local @RDONLY_SHARDS = $self->xdb_shards_flat; + init_tmp_git_dir($self); + do_prune($self) if $self->{-opt}->{prune}; + scan_git_dirs($self) if $self->{-opt}->{scan} // 1; + + for my $s (@IDX_SHARDS) { + $s->{-cidx_quit} = 1; + $s->wq_close; + } + + local @PublicInbox::DS::post_loop_do = (\&shards_active); + PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active(); + $self->lock_release(!!$self->{nchange}); +} + +sub ipc_atfork_child { + my ($self) = @_; + $self->SUPER::ipc_atfork_child; + $SIG{USR1} = \&shard_usr1; + $SIG{$_} = \&shard_quit for qw(INT TERM QUIT); + my $x = delete $self->{siblings} // die 'BUG: no {siblings}'; + $_->wq_close for @$x; + undef; +} + +sub shard_done_wait { # awaitpid cb via ipc_worker_reap + my ($pid, $shard, $self) = @_; + my $quit_req = delete($shard->{-cidx_quit}); + return if $DO_QUIT || !$LIVE; + if ($? == 0) { # success + $quit_req // warn 'BUG: {-cidx_quit} unset'; + return; + } + warn "PID:$pid $shard->{shard} exited with \$?=$?\n"; + ++$self->{shard_err} if defined($self->{shard_err}); +} + +sub with_umask { # TODO get rid of this treewide and rely on OnDestroy + my ($self, $cb, @arg) = @_; + $cb->(@arg); +} + +1; diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm index 4065b256..e095ecd1 100644 --- a/lib/PublicInbox/Config.pm +++ b/lib/PublicInbox/Config.pm @@ -371,7 +371,7 @@ sub git_bool { # is sufficient and doesn't leave "/.." or "/../" sub rel2abs_collapsed { require File::Spec; - my $p = File::Spec->rel2abs($_[-1]); + my $p = File::Spec->rel2abs(@_); return $p if substr($p, -3, 3) ne '/..' && index($p, '/../') < 0; require Cwd; Cwd::abs_path($p); diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index b6eaf2d7..340086fc 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -44,7 +44,7 @@ our ( $Epoll, # Global epoll fd (or DSKQXS ref) $ep_io, # IO::Handle for Epoll - $PostLoopCallback, # subref to call at the end of each loop, if defined (global) + @post_loop_do, # subref + args to call at the end of each loop $LoopTimeout, # timeout of event loop in milliseconds @Timers, # timers @@ -69,7 +69,7 @@ sub Reset { %DescriptorMap = (); @Timers = (); %UniqTimer = (); - $PostLoopCallback = undef; + @post_loop_do = (); # we may be iterating inside one of these on our stack my @q = delete @Stack{keys %Stack}; @@ -79,7 +79,7 @@ sub Reset { $Epoll = undef; # may call DSKQXS::DESTROY } while (@Timers || keys(%Stack) || $nextq || $AWAIT_PIDS || $ToClose || keys(%DescriptorMap) || - $PostLoopCallback || keys(%UniqTimer)); + @post_loop_do || keys(%UniqTimer)); $reap_armed = undef; $LoopTimeout = -1; # no timeout by default @@ -247,11 +247,13 @@ sub PostEventLoop () { } # by default we keep running, unless a postloop callback cancels it - $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1; + @post_loop_do ? $post_loop_do[0]->(\%DescriptorMap, + @post_loop_do[1..$#post_loop_do]) + : 1 } # Start processing IO events. In most daemon programs this never exits. See -# C<PostLoopCallback> for how to exit the loop. +# C<post_loop_do> for how to exit the loop. sub event_loop (;$$) { my ($sig, $oldset) = @_; $Epoll //= _InitPoller(); @@ -287,24 +289,6 @@ sub event_loop (;$$) { } while (PostEventLoop()); } -=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >> - -Sets post loop callback function. Pass a subref and it will be -called every time the event loop finishes. - -Return 1 (or any true value) from the sub to make the loop continue, 0 or false -and it will exit. - -The callback function will be passed two parameters: \%DescriptorMap - -=cut -sub SetPostLoopCallback { - my ($class, $ref) = @_; - - # global callback - $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; -} - ##################################################################### ### PublicInbox::DS-the-object code ##################################################################### diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index 17e799ca..57435421 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -350,7 +350,7 @@ sub worker_quit { # $_[0] = signal name or number (unused) my $proc_name; my $warn = 0; # drop idle connections and try to quit gracefully - PublicInbox::DS->SetPostLoopCallback(sub { + @PublicInbox::DS::post_loop_do = (sub { my ($dmap, undef) = @_; my $n = 0; my $now = now(); @@ -510,7 +510,7 @@ sub upgrade_aborted ($) { warn $@, "\n" if $@; } -sub reap_children { # $_[0] = 'CHLD' or POSIX::SIGCHLD() +sub reap_children { # $_[0] = 'CHLD' while (1) { my $p = waitpid(-1, WNOHANG) or return; if (defined $reexec_pid && $p == $reexec_pid) { diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 401b18d0..5445b156 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -1388,7 +1388,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop my $quit = PublicInbox::SearchIdx::quit_cb($sync); $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit; local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock - PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} }); + local @PublicInbox::DS::post_loop_do = (sub { !$sync->{quit} }); $pr->("initial scan complete, entering event loop\n") if $pr; # calls InboxIdle->event_step: PublicInbox::DS::event_loop($sig, $oldset); diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 7fa656d0..1f0e87ee 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -19,7 +19,7 @@ use PublicInbox::WQWorker; use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? -our @EXPORT_OK = qw(ipc_freeze ipc_thaw); +our @EXPORT_OK = qw(ipc_freeze ipc_thaw nproc_shards); my ($enc, $dec); # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+ # and eliminate method call overhead @@ -263,11 +263,16 @@ sub recv_and_run { $n; } +sub sock_defined { + my (undef, $wqw) = @_; + defined($wqw->{sock}); +} + sub wq_worker_loop ($$) { my ($self, $bcast2) = @_; my $wqw = PublicInbox::WQWorker->new($self, $self->{-wq_s2}); PublicInbox::WQWorker->new($self, $bcast2) if $bcast2; - PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} }); + local @PublicInbox::DS::post_loop_do = (\&sock_defined, $wqw); PublicInbox::DS::event_loop(); PublicInbox::DS->Reset; } @@ -475,4 +480,28 @@ sub detect_nproc () { undef } +# SATA storage lags behind what CPUs are capable of, so relying on +# nproc(1) can be misleading and having extra Xapian shards is a +# waste of FDs and space. It can also lead to excessive IO latency +# and slow things down. Users on NVME or other fast storage can +# use the NPROC env or switches in our script/public-inbox-* programs +# to increase Xapian shards +our $NPROC_MAX_DEFAULT = 4; + +sub nproc_shards ($) { + my ($creat_opt) = @_; + my $n = $creat_opt->{nproc} if ref($creat_opt) eq 'HASH'; + $n //= $ENV{NPROC}; + if (!$n) { + # assume 2 cores if not detectable or zero + state $NPROC_DETECTED = PublicInbox::IPC::detect_nproc() || 2; + $n = $NPROC_DETECTED; + $n = $NPROC_MAX_DEFAULT if $n > $NPROC_MAX_DEFAULT; + } + + # subtract for the main process and git-fast-import + $n -= 1; + $n < 1 ? 1 : $n; +} + 1; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index ff2db1ff..578686e2 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1320,11 +1320,11 @@ sub lazy_start { }; require PublicInbox::DirIdle; local $dir_idle = PublicInbox::DirIdle->new(sub { - # just rely on wakeup to hit PostLoopCallback set below + # just rely on wakeup to hit post_loop_do dir_idle_handler($_[0]) if $_[0]->fullname ne $path; }); $dir_idle->add_watches([$sock_dir]); - PublicInbox::DS->SetPostLoopCallback(sub { + local @PublicInbox::DS::post_loop_do = (sub { my ($dmap, undef) = @_; if (@st = defined($path) ? stat($path) : ()) { if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) { diff --git a/lib/PublicInbox/LeiSearch.pm b/lib/PublicInbox/LeiSearch.pm index 936c2751..ba4c4309 100644 --- a/lib/PublicInbox/LeiSearch.pm +++ b/lib/PublicInbox/LeiSearch.pm @@ -158,20 +158,6 @@ sub kw_changed { join("\0", @$new_kw_sorted) eq $cur_kw ? 0 : 1; } -sub all_terms { - my ($self, $pfx) = @_; - my $xdb = $self->xdb; - my $cur = $xdb->allterms_begin($pfx); - my $end = $xdb->allterms_end($pfx); - my %ret; - for (; $cur != $end; $cur++) { - my $tn = $cur->get_termname; - index($tn, $pfx) == 0 and - $ret{substr($tn, length($pfx))} = undef; - } - wantarray ? (sort keys %ret) : \%ret; -} - sub qparse_new { my ($self) = @_; my $qp = $self->SUPER::qparse_new; # PublicInbox::Search diff --git a/lib/PublicInbox/MiscIdx.pm b/lib/PublicInbox/MiscIdx.pm index 19200b92..6708527d 100644 --- a/lib/PublicInbox/MiscIdx.pm +++ b/lib/PublicInbox/MiscIdx.pm @@ -5,7 +5,7 @@ # Things indexed include: # * inboxes themselves # * epoch information -# * (maybe) git code repository information +# * (maybe) git code repository information (not commits) # Expect ~100K-1M documents with no parallelism opportunities, # so no sharding, here. # diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index e858729a..5133a3b7 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -110,43 +110,50 @@ sub load_xapian () { # a prefix common in patch emails our $LANG = 'english'; +our %PATCH_BOOL_COMMON = ( + dfpre => 'XDFPRE', + dfpost => 'XDFPOST', + dfblob => 'XDFPRE XDFPOST', + patchid => 'XDFID', +); + # note: the non-X term prefix allocations are shared with # Xapian omega, see xapian-applications/omega/docs/termprefixes.rst my %bool_pfx_external = ( mid => 'Q', # Message-ID (full/exact), this is mostly uniQue lid => 'G', # newsGroup (or similar entity), just inside <> - dfpre => 'XDFPRE', - dfpost => 'XDFPOST', - dfblob => 'XDFPRE XDFPOST', - patchid => 'XDFID', + %PATCH_BOOL_COMMON ); -my $non_quoted_body = 'XNQ XDFN XDFA XDFB XDFHH XDFCTX XDFPRE XDFPOST XDFID'; -my %prob_prefix = ( - # for mairix compatibility +# for mairix compatibility +our $NON_QUOTED_BODY = 'XNQ XDFN XDFA XDFB XDFHH XDFCTX XDFPRE XDFPOST XDFID'; +our %PATCH_PROB_COMMON = ( s => 'S', - m => 'XM', # 'mid:' (bool) is exact, 'm:' (prob) can do partial - l => 'XL', # 'lid:' (bool) is exact, 'l:' (prob) can do partial f => 'A', - t => 'XTO', - tc => 'XTO XCC', - c => 'XCC', - tcf => 'XTO XCC A', - a => 'XTO XCC A', - b => $non_quoted_body . ' XQUOT', - bs => $non_quoted_body . ' XQUOT S', + b => $NON_QUOTED_BODY . ' XQUOT', + bs => $NON_QUOTED_BODY . ' XQUOT S', n => 'XFN', q => 'XQUOT', - nq => $non_quoted_body, + nq => $NON_QUOTED_BODY, dfn => 'XDFN', dfa => 'XDFA', dfb => 'XDFB', dfhh => 'XDFHH', dfctx => 'XDFCTX', +); +my %prob_prefix = ( + m => 'XM', # 'mid:' (bool) is exact, 'm:' (prob) can do partial + l => 'XL', # 'lid:' (bool) is exact, 'l:' (prob) can do partial + t => 'XTO', + tc => 'XTO XCC', + c => 'XCC', + tcf => 'XTO XCC A', + a => 'XTO XCC A', + %PATCH_PROB_COMMON, # default: - '' => 'XM S A XQUOT XFN ' . $non_quoted_body, + '' => 'XM S A XQUOT XFN ' . $NON_QUOTED_BODY, ); # not documenting m: and mid: for now, the using the URLs works w/o Xapian @@ -305,7 +312,7 @@ sub date_parse_prepare { $x = "\0%Y%m%d%H%M%S$#$to_parse\0"; } } - } else { # "rt", let git interpret "YYYY", deal with Y10K later :P + } else { # (rt|ct), let git interpret "YYYY", deal with Y10K later :P for my $x (@r) { next if $x eq '' || $x =~ /\A[0-9]{5,}\z/; push @$to_parse, $x; @@ -454,20 +461,24 @@ sub mset_to_smsg { # read-write sub stemmer { $X{Stem}->new($LANG) } -# read-only -sub qparse_new { +sub qp_init_common { my ($self) = @_; - - my $xdb = xdb($self); my $qp = $X{QueryParser}->new; $qp->set_default_op(OP_AND()); - $qp->set_database($xdb); + $qp->set_database(xdb($self)); $qp->set_stemmer(stemmer($self)); $qp->set_stemming_strategy(STEM_SOME()); my $cb = $qp->can('set_max_wildcard_expansion') // $qp->can('set_max_expansion'); # Xapian 1.5.0+ $cb->($qp, 100); - $cb = $qp->can('add_valuerangeprocessor') // + $qp; +} + +# read-only +sub qparse_new { + my ($self) = @_; + my $qp = qp_init_common($self); + my $cb = $qp->can('add_valuerangeprocessor') // $qp->can('add_rangeprocessor'); # Xapian 1.5.0+ $cb->($qp, $NVRP->new(YYYYMMDD, 'd:')); $cb->($qp, $NVRP->new(DT, 'dt:')); @@ -546,11 +557,25 @@ sub xap_terms ($$;@) { } # get combined docid from over.num: -# (not generic Xapian, only works with our sharding scheme) +# (not generic Xapian, only works with our sharding scheme for mail) sub num2docid ($$) { my ($self, $num) = @_; my $nshard = $self->{nshard}; ($num - 1) * $nshard + $num % $nshard + 1; } +sub all_terms { + my ($self, $pfx) = @_; + my $cur = xdb($self)->allterms_begin($pfx); + my $end = $self->{xdb}->allterms_end($pfx); + my %ret; + for (; $cur != $end; $cur++) { + my $tn = $cur->get_termname; + index($tn, $pfx) == 0 and + $ret{substr($tn, length($pfx))} = undef; + } + wantarray ? (sort keys %ret) : \%ret; +} + + 1; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index fc464383..3baeaa9c 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -134,6 +134,7 @@ sub idx_acquire { load_xapian_writable(); $flag = $self->{creat} ? $DB_CREATE_OR_OPEN : $DB_OPEN; } + my $owner = $self->{ibx} // $self->{eidx} // $self; if ($self->{creat}) { require File::Path; $self->lock_acquire; @@ -145,14 +146,13 @@ sub idx_acquire { File::Path::mkpath($dir); require PublicInbox::Syscall; PublicInbox::Syscall::nodatacow_dir($dir); - $self->{-set_has_threadid_once} = 1; - if (($self->{ibx} // $self->{eidx})->{-dangerous}) { - $flag |= $DB_DANGEROUS; - } + # owner == self for CodeSearchIdx + $self->{-set_has_threadid_once} = 1 if $owner != $self; + $flag |= $DB_DANGEROUS if $owner->{-dangerous}; } } return unless defined $flag; - $flag |= $DB_NO_SYNC if ($self->{ibx} // $self->{eidx})->{-no_fsync}; + $flag |= $DB_NO_SYNC if $owner->{-no_fsync}; my $xdb = eval { ($X->{WritableDatabase})->new($dir, $flag) }; croak "Failed opening $dir: $@" if $@; $self->{xdb} = $xdb; @@ -350,43 +350,30 @@ sub index_diff ($$$) { index_text($self, join("\n", @$xnq), 1, 'XNQ'); } -sub index_xapian { # msg_iter callback - my $part = $_[0]->[0]; # ignore $depth and $idx - my ($self, $doc) = @{$_[1]}; - my $ct = $part->content_type || 'text/plain'; - my $fn = $part->filename; - if (defined $fn && $fn ne '') { - index_phrase($self, $fn, 1, 'XFN'); - } - if ($part->{is_submsg}) { - my $mids = mids_for_index($part); - index_ids($self, $doc, $part, $mids); - my $smsg = bless {}, 'PublicInbox::Smsg'; - $smsg->populate($part); - index_headers($self, $smsg); - } - - my ($s, undef) = msg_part_text($part, $ct); - defined $s or return; - $_[0]->[0] = $part = undef; # free memory +sub patch_id { + my ($self) = @_; # $_[1] is the diff (may be huge) + open(my $fh, '+>:utf8', undef) or die "open: $!"; + open(my $eh, '+>', undef) or die "open: $!"; + $fh->autoflush(1); + print $fh $_[1] or die "print: $!"; + sysseek($fh, 0, SEEK_SET) or die "sysseek: $!"; + my $id = ($self->{ibx} // $self->{eidx} // $self)->git->qx( + [qw(patch-id --stable)], {}, { 0 => $fh, 2 => $eh }); + seek($eh, 0, SEEK_SET) or die "seek: $!"; + while (<$eh>) { warn $_ } + $id =~ /\A([a-f0-9]{40,})/ ? $1 : undef; +} - if ($s =~ /^(?:diff|---|\+\+\+) /ms) { - open(my $fh, '+>:utf8', undef) or die "open: $!"; - open(my $eh, '+>', undef) or die "open: $!"; - $fh->autoflush(1); - print $fh $s or die "print: $!"; - sysseek($fh, 0, SEEK_SET) or die "sysseek: $!"; - my $id = ($self->{ibx} // $self->{eidx})->git->qx( - [qw(patch-id --stable)], - {}, { 0 => $fh, 2 => $eh }); - $id =~ /\A([a-f0-9]{40,})/ and $doc->add_term('XDFID'.$1); - seek($eh, 0, SEEK_SET) or die "seek: $!"; - while (<$eh>) { warn $_ } +sub index_body_text { + my ($self, $doc, $sref) = @_; + if ($$sref =~ /^(?:diff|---|\+\+\+) /ms) { + my $id = patch_id($self, $$sref); + $doc->add_term('XDFID'.$id) if defined($id); } # split off quoted and unquoted blocks: - my @sections = PublicInbox::MsgIter::split_quotes($s); - undef $s; # free memory + my @sections = PublicInbox::MsgIter::split_quotes($$sref); + undef $$sref; # free memory for my $txt (@sections) { if ($txt =~ /\A>/) { if ($txt =~ /^[>\t ]+GIT binary patch\r?/sm) { @@ -396,8 +383,7 @@ sub index_xapian { # msg_iter callback (?:[>\h]+$BASE85\h*\r?\n)+/$1/gsmx; } index_text($self, $txt, 0, 'XQUOT'); - } else { - # does it look like a diff? + } else { # does it look like a diff? if ($txt =~ /^(?:diff|---|\+\+\+) /ms) { index_diff($self, \$txt, $doc); } else { @@ -408,6 +394,28 @@ sub index_xapian { # msg_iter callback } } +sub index_xapian { # msg_iter callback + my $part = $_[0]->[0]; # ignore $depth and $idx + my ($self, $doc) = @{$_[1]}; + my $ct = $part->content_type || 'text/plain'; + my $fn = $part->filename; + if (defined $fn && $fn ne '') { + index_phrase($self, $fn, 1, 'XFN'); + } + if ($part->{is_submsg}) { + my $mids = mids_for_index($part); + index_ids($self, $doc, $part, $mids); + my $smsg = bless {}, 'PublicInbox::Smsg'; + $smsg->populate($part); + index_headers($self, $smsg); + } + + my ($s, undef) = msg_part_text($part, $ct); + defined $s or return; + $_[0]->[0] = $part = undef; # free memory + index_body_text($self, $doc, \$s); +} + sub index_list_id ($$$) { my ($self, $doc, $hdr) = @_; for my $l ($hdr->header_raw('List-Id')) { diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index 000abd94..831be51b 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -1,11 +1,10 @@ -# Copyright (C) 2018-2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # Internal interface for a single Xapian shard in V2 inboxes. # See L<public-inbox-v2-format(5)> for more info on how we shard Xapian package PublicInbox::SearchIdxShard; -use strict; -use v5.10.1; +use v5.12; use parent qw(PublicInbox::SearchIdx PublicInbox::IPC); use PublicInbox::OnDestroy; @@ -47,7 +46,7 @@ sub ipc_atfork_child { # called automatically before ipc_worker_loop $v2w->atfork_child; # calls ipc_sibling_atfork_child on our siblings $v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__} $self->begin_txn_lazy; - # caller must capture this: + # caller (ipc_worker_spawn) must capture this: PublicInbox::OnDestroy->new($$, \&_worker_done, $self); } diff --git a/lib/PublicInbox/Sigfd.pm b/lib/PublicInbox/Sigfd.pm index 3d964be3..3c1d3811 100644 --- a/lib/PublicInbox/Sigfd.pm +++ b/lib/PublicInbox/Sigfd.pm @@ -4,7 +4,7 @@ # Wraps a signalfd (or similar) for PublicInbox::DS # fields: (sig: hashref similar to %SIG, but signal numbers as keys) package PublicInbox::Sigfd; -use strict; +use v5.12; use parent qw(PublicInbox::DS); use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET %SIGNUM); use POSIX (); @@ -14,8 +14,8 @@ use POSIX (); sub new { my ($class, $sig, $nonblock) = @_; my %signo = map {; - # $num => $cb; - ($SIGNUM{$_} // POSIX->can("SIG$_")->()) => $sig->{$_} + # $num => [ $cb, $signame ]; + ($SIGNUM{$_} // POSIX->can("SIG$_")->()) => [ $sig->{$_}, $_ ] } keys %$sig; my $self = bless { sig => \%signo }, $class; my $io; @@ -45,8 +45,8 @@ sub wait_once ($) { for my $off (0..$nr) { # the first uint32_t of signalfd_siginfo: ssi_signo my $signo = unpack('L', substr($buf, 128 * $off, 4)); - my $cb = $self->{sig}->{$signo}; - $cb->($signo) if $cb ne 'IGNORE'; + my ($cb, $signame) = @{$self->{sig}->{$signo}}; + $cb->($signame) if $cb ne 'IGNORE'; } } $r; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index dc11543a..878843a6 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -122,8 +122,10 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref, exit_err("setpgid", &cerrnum); for (sig = 1; sig < NSIG; sig++) signal(sig, SIG_DFL); /* ignore errors on signals */ - if (*cd && chdir(cd) < 0) - exit_err("chdir", &cerrnum); + if (*cd && chdir(cd) < 0) { + write(2, "cd ", 3); + exit_err(cd, &cerrnum); + } max_rlim = av_len(rlim); for (i = 0; i < max_rlim; i += 3) { diff --git a/lib/PublicInbox/SpawnPP.pm b/lib/PublicInbox/SpawnPP.pm index 5609f74a..d6c863f8 100644 --- a/lib/PublicInbox/SpawnPP.pm +++ b/lib/PublicInbox/SpawnPP.pm @@ -37,7 +37,7 @@ sub pi_fork_exec ($$$$$$$) { } $SIG{$_} = 'DEFAULT' for grep(!/\A__/, keys %SIG); if ($cd ne '') { - chdir $cd or die "chdir $cd: $!"; + chdir $cd or die "cd $cd: $!"; } while (@$rlim) { my ($r, $soft, $hard) = splice(@$rlim, 0, 3); diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index 5807105a..494323c0 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -21,6 +21,7 @@ BEGIN { @EXPORT = qw(tmpdir tcp_server tcp_connect require_git require_mods run_script start_script key2sub xsys xsys_e xqx eml_load tick have_xapian_compact json_utf8 setup_public_inboxes create_inbox + create_coderepo tcp_host_port test_lei lei lei_ok $lei_out $lei_err $lei_opt test_httpd xbail require_cmd is_xdeeply tail_f ignore_inline_c_missing); @@ -325,7 +326,7 @@ sub run_script ($;$$) { } } my $tail = @tail_paths ? tail_f(@tail_paths) : undef; - if ($key =~ /-(index|convert|extindex|convert|xcpdb)\z/) { + if ($key =~ /-(index|cindex|extindex|convert|xcpdb)\z/) { unshift @argv, '--no-fsync'; } if ($run_mode == 0) { @@ -698,6 +699,44 @@ sub setup_public_inboxes () { @ret; } +our %COMMIT_ENV = ( + GIT_AUTHOR_NAME => 'A U Thor', + GIT_COMMITTER_NAME => 'C O Mitter', + GIT_AUTHOR_EMAIL => 'a@example.com', + GIT_COMMITTER_EMAIL => 'c@example.com', +); + +sub create_coderepo ($$;@) { + my $ident = shift; + my $cb = pop; + my %opt = @_; + require PublicInbox::Lock; + require PublicInbox::Import; + my ($base) = ($0 =~ m!\b([^/]+)\.[^\.]+\z!); + my ($db) = (PublicInbox::Import::default_branch() =~ m!([^/]+)\z!); + my $dir = "t/data-gen/$base.$ident-$db"; + my $new = !-d $dir; + if ($new && !mkdir($dir)) { + my $err = $!; + -d $dir or xbail "mkdir($dir): $err"; + } + my $lk = bless { lock_path => "$dir/creat.lock" }, 'PublicInbox::Lock'; + my $scope = $lk->lock_for_scope; + my $tmpdir = delete $opt{tmpdir}; + if (!-f "$dir/creat.stamp") { + opendir(my $dfh, '.') or xbail "opendir .: $!"; + chdir($dir) or xbail "chdir($dir): $!"; + local %ENV = (%ENV, %COMMIT_ENV); + $cb->($dir); + chdir($dfh) or xbail "cd -: $!"; + open my $s, '>', "$dir/creat.stamp" or + BAIL_OUT "error creating $dir/creat.stamp: $!"; + } + return $dir if !defined($tmpdir); + xsys_e([qw(/bin/cp -Rp), $dir, $tmpdir]); + $tmpdir; +} + sub create_inbox ($$;@) { my $ident = shift; my $cb = pop; @@ -709,9 +748,9 @@ sub create_inbox ($$;@) { my ($db) = (PublicInbox::Import::default_branch() =~ m!([^/]+)\z!); my $dir = "t/data-gen/$base.$ident-$db"; my $new = !-d $dir; - if ($new) { - mkdir $dir; # may race - -d $dir or BAIL_OUT "$dir could not be created: $!"; + if ($new && !mkdir($dir)) { + my $err = $!; + -d $dir or xbail "mkdir($dir): $err"; } my $lk = bless { lock_path => "$dir/creat.lock" }, 'PublicInbox::Lock'; $opt{inboxdir} = File::Spec->rel2abs($dir); diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index ed5182ae..d3d13941 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -8,7 +8,7 @@ use strict; use v5.10.1; use parent qw(PublicInbox::Lock PublicInbox::IPC); use PublicInbox::SearchIdxShard; -use PublicInbox::IPC; +use PublicInbox::IPC qw(nproc_shards); use PublicInbox::Eml; use PublicInbox::Git; use PublicInbox::Import; @@ -29,30 +29,6 @@ my $OID = qr/[a-f0-9]{40,}/; # an estimate of the post-packed size to the raw uncompressed size our $PACKING_FACTOR = 0.4; -# SATA storage lags behind what CPUs are capable of, so relying on -# nproc(1) can be misleading and having extra Xapian shards is a -# waste of FDs and space. It can also lead to excessive IO latency -# and slow things down. Users on NVME or other fast storage can -# use the NPROC env or switches in our script/public-inbox-* programs -# to increase Xapian shards -our $NPROC_MAX_DEFAULT = 4; - -sub nproc_shards ($) { - my ($creat_opt) = @_; - my $n = $creat_opt->{nproc} if ref($creat_opt) eq 'HASH'; - $n //= $ENV{NPROC}; - if (!$n) { - # assume 2 cores if not detectable or zero - state $NPROC_DETECTED = PublicInbox::IPC::detect_nproc() || 2; - $n = $NPROC_DETECTED; - $n = $NPROC_MAX_DEFAULT if $n > $NPROC_MAX_DEFAULT; - } - - # subtract for the main process and git-fast-import - $n -= 1; - $n < 1 ? 1 : $n; -} - sub count_shards ($) { my ($self) = @_; # always load existing shards in case core count changes: diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm index d9aadf82..8482100c 100644 --- a/lib/PublicInbox/Watch.pm +++ b/lib/PublicInbox/Watch.pm @@ -536,7 +536,7 @@ sub watch { # main entry point add_timer(0, \&poll_fetch_fork, $self, $intvl, $uris); } watch_fs_init($self) if $self->{mdre}; - PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done }); + local @PublicInbox::DS::post_loop_do = (sub { !$self->quit_done }); PublicInbox::DS::event_loop($sig, $oldset); # calls ->event_step _done_for_now($self); } |