# Copyright (C) all contributors # License: AGPL-3.0+ # # indexer for git coderepos, just commits and repo paths for now # this stores normalized absolute paths of indexed GIT_DIR inside # the DB itself and is designed to handle forks by designating roots # At minimum, it needs to have the pathnames of all git repos in # memory at runtime. --join also requires all inbox pathnames to # be in memory (as it happens when loaded from ~/.public-inbox/config). # # Unlike mail search, docid isn't tied to NNTP artnum or IMAP UID, # there's no serial number dependency at all. The first 32-bits of # the commit SHA-(1|256) is used to select a shard. # # We shard repos using the first 32-bits of sha256($ABS_GIT_DIR) # # --join associates root commits of coderepos to inboxes based on prefixes. # # Internally, each inbox is assigned a non-negative integer index ($IBX_OFF), # and each root commit object ID (SHA-1/SHA-256 hex) is also assigned # a non-negative integer index ($ROOT_COMMIT_OID_ID). # # join dumps to 2 intermediate files in $TMPDIR: # # * to_root_off - each line is of the format: # # $PFX @ROOT_COMMIT_OID_OFFS # # * to_ibx_off - each line is of the format: # # $PFX @IBX_OFFS # # $IBX_OFFS is a comma-delimited list of integers ($IBX_ID) # The $IBX_OFF here is ephemeral (per-join_data) and NOT related to # the `ibx_off' column of `over.sqlite3' for extindex. # @ROOT_COMMIT_OID_OFFS is space-delimited # In both cases, $PFX is typically the value of the 7-(hex)char dfpost # XDFPOST but it can be configured to use any combination of patchid, # dfpre, dfpost or dfblob. # # WARNING: this is vulnerable to arbitrary memory usage attacks if we # attempt to index or join against malicious coderepos with # thousands/millions of root commits. Most coderepos have only one # root commit, some have several: git.git currently has 7, # torvalds/linux.git has 4. # --max-size= is required to keep memory usage reasonable for gigantic # commits. # # See PublicInbox::CodeSearch (read-only API) for more package PublicInbox::CodeSearchIdx; use v5.12; # parent order matters, we want ->DESTROY from IPC, not SearchIdx use parent qw(PublicInbox::CodeSearch PublicInbox::IPC PublicInbox::SearchIdx); use PublicInbox::DS qw(awaitpid); use PublicInbox::PktOp; use PublicInbox::IPC qw(nproc_shards); use POSIX qw(WNOHANG SEEK_SET strftime); use File::Path (); use File::Spec (); use List::Util qw(max); use PublicInbox::SHA qw(sha256_hex sha_all); use PublicInbox::Search qw(xap_terms); use PublicInbox::SearchIdx qw(add_val); use PublicInbox::Config qw(glob2re rel2abs_collapsed); use PublicInbox::Spawn qw(which spawn popen_rd); use PublicInbox::OnDestroy; use PublicInbox::CidxLogP; use PublicInbox::CidxComm; use PublicInbox::Git qw(%OFMT2HEXLEN); use PublicInbox::Compat qw(uniqstr); use PublicInbox::Aspawn qw(run_await); use Compress::Zlib qw(compress); use Carp qw(croak); use Time::Local qw(timegm); use autodie qw(close pipe open sysread seek sysseek send); our $DO_QUIT = 15; # signal number our ( $LIVE_JOBS, # integer $GITS_NR, # number of coderepos $MY_SIG, # like %SIG $SIGSET, $TXN_BYTES, # number of bytes in current shard transaction $BATCH_BYTES, @RDONLY_XDB, # Xapian::Database @IDX_SHARDS, # clones of self $MAX_SIZE, $REINDEX, # PublicInbox::SharedKV @GIT_DIR_GONE, # [ git_dir1, git_dir2 ] $PRUNE_DONE, # marks off prune completions $NCHANGE, # current number of changes $NPROC, $XHC, # XapClient $REPO_CTX, # current repo being indexed in shards $IDXQ, # PublicInbox::Git object arrayref $SCANQ, # PublicInbox::Git object arrayref %ALT_FH, # hexlen => tmp IO for TMPDIR git alternates $TMPDIR, # File::Temp->newdir object for prune @PRUNEQ, # GIT_DIRs to prepare for pruning %TODO, @IBXQ, @IBX, @JOIN, # join(1) command for --join $CMD_ENV, # env for awk(1), comm(1), sort(1) commands during prune @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands %JOIN, # CLI --join= suboptions @JOIN_PFX, # any combination of XDFID, XDFPRE, XDFPOST @JOIN_DT, # YYYYmmddHHMMSS for dt: $QRY_STR, # common query string for both code and inbox associations $DUMP_IBX_WPIPE, # goes to sort(1) $ANY_SHARD, # shard round-robin for scan fingerprinting @OFF2ROOT, $GIT_VER, @NO_ABBREV, ); # stop walking history if we see >$SEEN_MAX existing commits, this assumes # branches don't diverge by more than this number of commits... # git walks commits quickly if it doesn't have to read trees our $SEEN_MAX = 100000; # window for commits/emails to determine a inbox <-> coderepo association my $JOIN_WINDOW = 50000; our @PRUNE_BATCH = qw(cat-file --batch-all-objects --batch-check); # TODO: do we care about committer name + email? or tree OID? my @FMT = qw(H P ct an ae at s b); # (b)ody must be last # git log --stdin buffers all commits before emitting, thus --reverse # doesn't incur extra overhead. We use --reverse to keep Xapian docids # increasing so we may be able to avoid sorting results in some cases my @LOG_STDIN = (qw(log --no-decorate --no-color --no-notes -p --stat -M --reverse --stdin --no-walk=unsorted), '--pretty=format:%n%x00'. join('%n', map { "%$_" } @FMT)); sub new { my (undef, $dir, $opt) = @_; my $l = $opt->{indexlevel} // 'full'; $l !~ $PublicInbox::SearchIdx::INDEXLEVELS and die "invalid indexlevel=$l\n"; $l eq 'basic' and die "E: indexlevel=basic not supported\n"; my $self = bless { xpfx => "$dir/cidx". PublicInbox::CodeSearch::CIDX_SCHEMA_VER, cidx_dir => $dir, creat => 1, # TODO: get rid of this, should be implicit transact_bytes => 0, # for checkpoint total_bytes => 0, # for lock_release current_info => '', parallel => 1, -opt => $opt, lock_path => "$dir/cidx.lock", }, __PACKAGE__; $self->{nshard} = count_shards($self) || nproc_shards({nproc => $opt->{jobs}}); $self->{-no_fsync} = 1 if !$opt->{fsync}; $self->{-dangerous} = 1 if $opt->{dangerous}; $self; } # This is similar to uniq(1) on the first column, but combines the # contents of subsequent columns using $OFS. our @UNIQ_FOLD = ($^X, $^W ? ('-w') : (), qw(-MList::Util=uniq -ane), <<'EOM'); BEGIN { $ofs = $ENV{OFS} // ','; $apfx = '' } if ($F[0] eq $apfx) { shift @F; push @ids, @F; } else { print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids; ($apfx, @ids) = @F; } END { print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids } EOM # TODO: may be used for reshard/compact sub count_shards { scalar($_[0]->xdb_shards_flat) } sub update_commit ($$$) { my ($self, $cmt, $roots) = @_; # fields from @FMT my $x = 'Q'.$cmt->{H}; my ($docid, @extra) = sort { $a <=> $b } $self->docids_by_postlist($x); @extra and warn "W: $cmt->{H} indexed multiple times, pruning ", join(', ', map { "#$_" } @extra), "\n"; $self->{xdb}->delete_document($_) for @extra; my $doc = $PublicInbox::Search::X{Document}->new; $doc->add_boolean_term($x); $doc->add_boolean_term('G'.$_) for @$roots; $doc->add_boolean_term('XP'.$_) for split(/ /, $cmt->{P}); $doc->add_boolean_term('T'.'c'); # Author-Time is compatible with dt: for mail search schema_version=15 add_val($doc, PublicInbox::CodeSearch::AT, POSIX::strftime('%Y%m%d%H%M%S', gmtime($cmt->{at}))); # Commit-Time is the fallback used by rt: (TS) for mail search: add_val($doc, PublicInbox::CodeSearch::CT, $cmt->{ct}); $self->term_generator->set_document($doc); # email address is always indexed with positional data for usability $self->index_phrase("$cmt->{an} <$cmt->{ae}>", 1, 'A'); $x = $cmt->{'s'}; $self->index_text($x, 1, 'S') if $x =~ /\S/s; $doc->set_data($x); # subject is the first (and currently only) line $x = delete $cmt->{b}; $self->index_body_text($doc, \$x) if $x =~ /\S/s; defined($docid) ? $self->{xdb}->replace_document($docid, $doc) : $self->{xdb}->add_document($doc); } sub progress { my ($self, @msg) = @_; my $pr = $self->{-opt}->{-progress} or return; $pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n"); } sub check_objfmt_status ($$$) { my ($git, $chld_err, $fmt) = @_; my ($status, $sig) = ($chld_err >> 8, $chld_err & 127); if (!$sig && $status == 1) { # unset, default is '' (SHA-1) $fmt = 'sha1'; } elsif (!$sig && $status == 0) { chomp($fmt ||= 'sha1'); } $fmt // warn("git --git-dir=$git->{git_dir} config \$?=$chld_err"); $fmt; } sub store_repo { # wq_io_do, sends docid back my ($self, $repo) = @_; my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; my $git = bless $repo, 'PublicInbox::Git'; my $rd = $git->popen(qw(config extensions.objectFormat)); $self->begin_txn_lazy; $self->{xdb}->delete_document($_) for @{$repo->{to_delete}}; my $doc = $PublicInbox::Search::X{Document}->new; add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct}); $doc->add_boolean_term("P$repo->{git_dir}"); $doc->add_boolean_term('T'.'r'); $doc->add_boolean_term('G'.$_) for @{$repo->{roots}}; $doc->set_data($repo->{fp}); # \n delimited my $fmt = readline($rd); $rd->close; $fmt = check_objfmt_status $git, $?, $fmt; $OFMT2HEXLEN{$fmt} // warn <{git_dir} EOM $doc->add_boolean_term('H'.$fmt); my $did = $repo->{docid}; $did ? $self->{xdb}->replace_document($did, $doc) : ($did = $self->{xdb}->add_document($doc)); send($op_p, "repo_stored $did", 0); } sub cidx_ckpoint ($;$) { my ($self, $msg) = @_; progress($self, $msg) if defined($msg); $TXN_BYTES = $BATCH_BYTES; # reset return if $PublicInbox::Search::X{CLOEXEC_UNSET}; $self->commit_txn_lazy; $self->begin_txn_lazy; } sub truncate_cmt ($$) { my ($cmt) = @_; # _[1] is $buf (giant) my ($orig_len, $len); $len = $orig_len = length($_[1]); @$cmt{@FMT} = split(/\n/, $_[1], scalar(@FMT)); undef $_[1]; $len -= length($cmt->{b}); # try to keep the commit message body. # n.b. this diffstat split may be unreliable but it's not worth # perfection for giant commits: my ($bdy) = split(/^---\n/sm, delete($cmt->{b}), 2); if (($len + length($bdy)) <= $MAX_SIZE) { $len += length($bdy); $cmt->{b} = $bdy; warn <{H}: truncated body ($orig_len => $len bytes) W: to be under --max-size=$MAX_SIZE EOM } else { $cmt->{b} = ''; warn <{H}: deleted body ($orig_len => $len bytes) W: to be under --max-size=$MAX_SIZE EOM } $len; } sub cidx_reap_log { # awaitpid cb my ($pid, $cmd, $self, $op_p) = @_; if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT || ($? & 127) == POSIX::SIGPIPE))) { send($op_p, "shard_done $self->{shard}", 0); } else { warn "W: @$cmd (\$?=$?)\n"; $self->{xdb}->cancel_transaction; } } sub shard_index { # via wq_io_do in IDX_SHARDS my ($self, $git, $roots) = @_; my $in = delete($self->{0}) // die 'BUG: no {0} input'; my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p'; sysseek($in, 0, SEEK_SET); my $cmd = $git->cmd(@NO_ABBREV, @LOG_STDIN); my $rd = popen_rd($cmd, undef, { 0 => $in }, \&cidx_reap_log, $cmd, $self, $op_p); PublicInbox::CidxLogP->new($rd, $self, $git, $roots); # CidxLogP->event_step will call cidx_read_log_p once there's input } # sharded reader for `git log --pretty=format: --stdin' sub cidx_read_log_p { my ($self, $log_p, $rd) = @_; my $git = delete $log_p->{git} // die 'BUG: no {git}'; local $self->{current_info} = "$git->{git_dir} [$self->{shard}]"; my $roots = delete $log_p->{roots} // die 'BUG: no {roots}'; # local-ized in parent before fork $TXN_BYTES = $BATCH_BYTES; local $self->{git} = $git; # for patchid return if $DO_QUIT; my $nr = 0; # a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4 # in linux.git, so we use $/ = "\n\0" to check end-of-patch my $FS = "\n\0"; my $len; my $cmt = {}; local $/ = $FS; my $buf = <$rd> // return; # leading $FS $buf eq $FS or die "BUG: not LF-NUL: $buf\n"; $self->begin_txn_lazy; while (!$DO_QUIT && defined($buf = <$rd>)) { chomp($buf); $/ = "\n"; $len = length($buf); if (defined($MAX_SIZE) && $len > $MAX_SIZE) { $len = truncate_cmt($cmt, $buf); } else { @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT)); } if (($TXN_BYTES -= $len) <= 0) { cidx_ckpoint($self, "[$self->{shard}] $nr"); $TXN_BYTES -= $len; # len may be huge, >TXN_BYTES; } update_commit($self, $cmt, $roots); ++$nr; cidx_ckpoint($self, "[$self->{shard}] $nr") if $TXN_BYTES <= 0; $/ = $FS; } # return and wait for cidx_reap_log } sub shard_done { # called via PktOp on shard_index completion my ($self, $repo_ctx, $on_destroy, $n) = @_; $repo_ctx->{shard_ok}->{$n} = 1; } sub repo_stored { my ($self, $repo_ctx, $drs, $did) = @_; # check @IDX_SHARDS instead of DO_QUIT to avoid wasting prior work # because shard_commit is fast return unless @IDX_SHARDS; $did > 0 or die "BUG: $repo_ctx->{repo}->{git_dir}: docid=$did"; my ($c, $p) = PublicInbox::PktOp->pair; $c->{ops}->{shard_done} = [ $self, $repo_ctx, on_destroy(\&next_repos, $repo_ctx, $drs)]; # shard_done fires when all shards are committed my @active = keys %{$repo_ctx->{active}}; $IDX_SHARDS[$_]->wq_io_do('shard_commit', [ $p->{op_p} ]) for @active; } sub prune_done { # called via prune_do completion my ($self, $drs, $n) = @_; return if $DO_QUIT || !$PRUNE_DONE; die "BUG: \$PRUNE_DONE->[$n] already defined" if $PRUNE_DONE->[$n]; $PRUNE_DONE->[$n] = 1; if (grep(defined, @$PRUNE_DONE) == @IDX_SHARDS) { progress($self, 'prune done'); index_next($self); # may kick dump_roots_start } } sub seen ($$) { my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH" for (1..100) { my $ret = eval { $xdb->postlist_begin($q) != $xdb->postlist_end($q); }; return $ret unless $@; if (ref($@) =~ /\bDatabaseModifiedError\b/) { $xdb->reopen; } else { Carp::croak($@); } } Carp::croak('too many Xapian DB modifications in progress'); } # used to select the shard for a GIT_DIR sub git_dir_hash ($) { hex(substr(sha256_hex($_[0]), 0, 8)) } sub _cb { # run_await cb my ($pid, $cmd, undef, $opt, $cb, $self, $git, @arg) = @_; return if $DO_QUIT; return $cb->($opt, $self, $git, @arg) if $opt->{quiet}; $? ? ($git->{-cidx_err} = warn("W: @$cmd (\$?=$?)\n")) : $cb->($opt, $self, $git, @arg); } sub run_git { my ($cmd, $opt, $cb, $self, $git, @arg) = @_; run_await($git->cmd(@$cmd), undef, $opt, \&_cb, $cb, $self, $git, @arg) } # this is different from the grokmirror-compatible fingerprint since we # only care about --heads (branches) and --tags, and not even their names sub fp_start ($$) { my ($self, $git) = @_; return if $DO_QUIT; open my $refs, '+>', undef; $git->{-repo}->{refs} = $refs; my ($c, $p) = PublicInbox::PktOp->pair; my $next_on_err = on_destroy \&index_next, $self; $c->{ops}->{fp_done} = [ $self, $git, $next_on_err ]; $IDX_SHARDS[++$ANY_SHARD % scalar(@IDX_SHARDS)]->wq_io_do('fp_async', [ $p->{op_p}, $refs ], $git->{git_dir}) } sub fp_async { # via wq_io_do in worker my ($self, $git_dir) = @_; my $op_p = delete $self->{0} // die 'BUG: no {0} op_p'; my $refs = delete $self->{1} // die 'BUG: no {1} refs'; my $git = PublicInbox::Git->new($git_dir); run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs }, \&fp_async_done, $self, $git, $op_p); } sub fp_async_done { # run_git cb from worker my ($opt, $self, $git, $op_p) = @_; my $refs = delete $opt->{1} // 'BUG: no {-repo}->{refs}'; sysseek($refs, 0, SEEK_SET); send($op_p, 'fp_done '.sha_all(256, $refs)->hexdigest, 0); } sub fp_done { # called parent via PktOp by fp_async_done my ($self, $git, $next_on_err, $hex) = @_; $next_on_err->cancel; return if $DO_QUIT; $git->{-repo}->{fp} = $hex; my $n = git_dir_hash($git->{git_dir}) % scalar(@RDONLY_XDB); my $shard = bless { %$self, shard => $n }, ref($self); $git->{-repo}->{shard_n} = $n; delete @$shard{qw(lockfh lock_path)}; local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef"; $shard->retry_reopen(\&check_existing, $self, $git); } sub check_existing { # retry_reopen callback my ($shard, $self, $git) = @_; my @docids = $shard->docids_of_git_dir($git->{git_dir}); my $docid = shift(@docids) // return prep_repo($self, $git); # new repo my $doc = $shard->get_doc($docid) // die "BUG: no #$docid ($git->{git_dir})"; my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data; if ($old_fp eq $git->{-repo}->{fp}) { # no change delete $git->{-repo}; return index_next($self); } $git->{-repo}->{docid} = $docid; if (@docids) { warn "BUG: $git->{git_dir} indexed multiple times, culling\n"; $git->{-repo}->{to_delete} = \@docids; # XXX needed? } prep_repo($self, $git); } sub partition_refs ($$$) { my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output sysseek($refs, 0, SEEK_SET); my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs }); my $seen = 0; my @shard_in = map { $_->reopen; open my $fh, '+>', undef; $fh; } @RDONLY_XDB; my $n0 = $NCHANGE; while (defined(my $cmt = <$rfh>)) { chomp $cmt; my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_XDB); if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) { say { $shard_in[$n] } $cmt; ++$NCHANGE; } elsif (seen($RDONLY_XDB[$n], 'Q'.$cmt)) { last if ++$seen > $SEEN_MAX; } else { say { $shard_in[$n] } $cmt; ++$NCHANGE; $seen = 0; } if ($DO_QUIT) { $rfh->close; return (); } } $rfh->close; return () if $DO_QUIT; if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) { my $n = $NCHANGE - $n0; progress($self, "$git->{git_dir}: $n commits") if $n; return @shard_in; } die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n"; } sub shard_commit { # via wq_io_do my ($self) = @_; my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; $self->commit_txn_lazy; send($op_p, "shard_done $self->{shard}", 0); } sub dump_roots_start { my ($self, $do_join) = @_; return if $DO_QUIT; $XHC //= PublicInbox::XapClient::start_helper("-j$NPROC"); $do_join // die 'BUG: no $do_join'; progress($self, 'dumping IDs from coderepos'); local $self->{xdb}; @OFF2ROOT = $self->all_terms('G'); my $root2id = "$TMPDIR/root2id"; open my $fh, '>', $root2id; my $nr = -1; for (@OFF2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly close $fh; # dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_off my ($sort_opt, $fold_opt); pipe(local $sort_opt->{0}, my $sort_w); pipe(local $fold_opt->{0}, local $sort_opt->{1}); my @sort = (@SORT, '-k1,1'); my $dst = "$TMPDIR/to_root_off"; open $fold_opt->{1}, '>', $dst; my $fold_env = { %$CMD_ENV, OFS => ' ' }; run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $do_join); run_await(\@UNIQ_FOLD, $fold_env, $fold_opt, \&cmd_done, $do_join); my $window = $JOIN{window} // $JOIN_WINDOW; my @m = $window <= 0 ? () : ('-m', $window); my @arg = ((map { ('-A', $_) } @JOIN_PFX), '-c', @m, $root2id, $QRY_STR); for my $d ($self->shard_dirs) { pipe(my $err_r, my $err_w); $XHC->mkreq([$sort_w, $err_w], qw(dump_roots -d), $d, @arg); my $desc = "dump_roots $d"; $self->{PENDING}->{$desc} = $do_join; PublicInbox::CidxXapHelperAux->new($err_r, $self, $desc); } progress($self, 'waiting on dump_roots sort'); } sub dump_ibx { # sends to xap_helper.h my ($self, $ibx_off) = @_; my $ibx = $IBX[$ibx_off] // die "BUG: no IBX[$ibx_off]"; my $ekey = $ibx->eidx_key; my $srch = $ibx->isrch or return warn <xh_args, (map { ('-A', $_) } @JOIN_PFX), $ibx_off, $QRY_STR); pipe(my $r, my $w); $XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd); $self->{PENDING}->{$ekey} = $TODO{do_join}; PublicInbox::CidxXapHelperAux->new($r, $self, $ekey); } sub dump_ibx_start { my ($self, $do_join) = @_; return if $DO_QUIT; $XHC //= PublicInbox::XapClient::start_helper("-j$NPROC"); my ($sort_opt, $fold_opt); pipe(local $sort_opt->{0}, $DUMP_IBX_WPIPE); pipe(local $fold_opt->{0}, local $sort_opt->{1}); my @sort = (@SORT, '-k1,1'); # sort only on JOIN_PFX # pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_off open $fold_opt->{1}, '>', "$TMPDIR/to_ibx_off"; run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $do_join); run_await(\@UNIQ_FOLD, $CMD_ENV, $fold_opt, \&cmd_done, $do_join); } sub index_next ($) { my ($self) = @_; return if $DO_QUIT; if ($IDXQ && @$IDXQ) { index_repo($self, shift @$IDXQ); } elsif ($SCANQ && @$SCANQ) { fp_start $self, shift @$SCANQ; } elsif ($TMPDIR) { delete $TODO{dump_roots_start}; delete $TODO{dump_ibx_start}; # runs OnDestroy once return dump_ibx($self, shift @IBXQ) if @IBXQ; undef $DUMP_IBX_WPIPE; # done dumping inboxes delete $TODO{do_join}; } # else: wait for shards_active (post_loop_do) callback } sub next_repos { # OnDestroy cb my ($repo_ctx, $drs) = @_; my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)}; progress($self, "$repo->{git_dir}: done"); return if $DO_QUIT || !$REPO_CTX; my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active; die "E: $repo->{git_dir} $n shards failed" if $n; $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx"; $REPO_CTX = undef; index_next($self); } sub index_done { # OnDestroy cb called when done indexing each code repo my ($repo_ctx, $drs) = @_; return if $DO_QUIT; my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)}; # $active may be undef here, but it's fine to vivify my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active; die "E: $repo->{git_dir} $n shards failed" if $n; $repo_ctx->{shard_ok} = {}; # reset for future shard_done $n = $repo->{shard_n}; $repo_ctx->{active}->{$n} = undef; # may vivify $repo_ctx->{active} my ($c, $p) = PublicInbox::PktOp->pair; $c->{ops}->{repo_stored} = [ $self, $repo_ctx, $drs ]; $IDX_SHARDS[$n]->wq_io_do('store_repo', [ $p->{op_p} ], $repo); # repo_stored will fire once store_repo is done } sub index_repo { my ($self, $git) = @_; return if $DO_QUIT; my $repo = $git->{-repo} // die 'BUG: no {-repo}'; return index_next($self) if $git->{-cidx_err}; if (!defined($repo->{ct})) { warn "W: $git->{git_dir} has no commits, skipping\n"; return index_next($self); } return push(@$IDXQ, $git) if $REPO_CTX; # busy delete $git->{-repo}; my $roots_fh = delete $repo->{roots_fh} // die 'BUG: no {roots_fh}'; seek($roots_fh, 0, SEEK_SET); chomp(my @roots = PublicInbox::IO::read_all $roots_fh); if (!@roots) { warn("E: $git->{git_dir} has no root commits\n"); return index_next($self); } $repo->{roots} = \@roots; local $self->{current_info} = $git->{git_dir}; my @shard_in = partition_refs($self, $git, delete($repo->{refs})); $repo->{git_dir} = $git->{git_dir}; my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo }; delete $git->{-cidx_gits_fini}; # may fire gits_fini my $drs = delete $git->{-cidx_dump_roots_start}; my $index_done = on_destroy \&index_done, $repo_ctx, $drs; my ($c, $p) = PublicInbox::PktOp->pair; $c->{ops}->{shard_done} = [ $self, $repo_ctx, $index_done ]; for my $n (0..$#shard_in) { $shard_in[$n]->flush or die "flush shard[$n]: $!"; -s $shard_in[$n] or next; last if $DO_QUIT; $IDX_SHARDS[$n]->wq_io_do('shard_index', [ $shard_in[$n], $p->{op_p} ], $git, \@roots); $repo_ctx->{active}->{$n} = undef; } # shard_done fires when shard_index is done } sub ct_fini { # run_git cb my ($opt, $self, $git, $index_repo) = @_; my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF $git->{-repo}->{ct} = $ct + 0; } # TODO: also index gitweb.owner and the full fingerprint for grokmirror? sub prep_repo ($$) { my ($self, $git) = @_; return if $DO_QUIT; my $index_repo = on_destroy \&index_repo, $self, $git; my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; sysseek($refs, 0, SEEK_SET); open my $roots_fh, '+>', undef; $git->{-repo}->{roots_fh} = $roots_fh; run_git([ qw(rev-list --stdin --max-parents=0) ], { 0 => $refs, 1 => $roots_fh }, \&PublicInbox::Config::noop, $self, $git, $index_repo); run_git([ qw[for-each-ref --sort=-committerdate --format=%(committerdate:raw) --count=1 refs/heads/ refs/tags/] ], undef, # capture like qx \&ct_fini, $self, $git, $index_repo); } # for PublicInbox::SearchIdx `git patch-id' call and with_umask sub git { $_[0]->{git} } sub load_existing ($) { # for -u/--update my ($self) = @_; my $dirs = $self->{git_dirs} //= []; if ($self->{-opt}->{update} || $self->{-opt}->{prune}) { local $self->{xdb}; $self->xdb or die "E: $self->{cidx_dir} non-existent for --update\n"; my @cur = grep { if (-e $_) { 1; } else { push @GIT_DIR_GONE, $_; undef; } } $self->all_terms('P'); if (@GIT_DIR_GONE && !$self->{-opt}->{prune}) { warn "W: the following repos no longer exist:\n", (map { "W:\t$_\n" } @GIT_DIR_GONE), "W: use --prune to remove them from ", $self->{cidx_dir}, "\n"; } push @$dirs, @cur; } @$dirs = uniqstr @$dirs; } # SIG handlers: sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() } sub shard_usr1 { $TXN_BYTES = -1 } sub cidx_init ($) { my ($self) = @_; my $dir = $self->{cidx_dir}; unless (-d $dir) { warn "# creating $dir\n" if !$self->{-opt}->{quiet}; File::Path::mkpath($dir); } $self->lock_acquire; my @shards; my $l = $self->{indexlevel} //= $self->{-opt}->{indexlevel}; for my $n (0..($self->{nshard} - 1)) { my $shard = bless { %$self, shard => $n }, ref($self); delete @$shard{qw(lockfh lock_path)}; my $xdb = $shard->idx_acquire; if (!$n) { if (($l // '') eq 'medium') { $xdb->set_metadata('indexlevel', $l); } elsif (($l // '') eq 'full') { $xdb->set_metadata('indexlevel', ''); # unset } $l ||= $xdb->get_metadata('indexlevel') || 'full'; } $shard->{indexlevel} = $l; $shard->idx_release; $shard->wq_workers_start("cidx shard[$n]", 1, $SIGSET, { siblings => \@shards, # for ipc_atfork_child }, \&shard_done_wait, $self); push @shards, $shard; } $self->{indexlevel} //= $l; # this warning needs to happen after idx_acquire state $once; warn <{-opt}->{scan}; $GITS_NR = @$SCANQ or return; my $gits_fini = on_destroy \&gits_fini; $_->{-cidx_gits_fini} = $gits_fini for @$SCANQ; if (my $drs = $TODO{dump_roots_start}) { $_->{-cidx_dump_roots_start} = $drs for @$SCANQ; } progress($self, "scanning $GITS_NR code repositories..."); } sub prune_init { # via wq_io_do in IDX_SHARDS my ($self) = @_; $self->{nr_prune} = 0; $TXN_BYTES = $BATCH_BYTES; $self->begin_txn_lazy; } sub prune_one { # via wq_io_do in IDX_SHARDS my ($self, $term) = @_; my @docids = $self->docids_by_postlist($term); for (@docids) { $TXN_BYTES -= $self->{xdb}->get_doclength($_) * 42; $self->{xdb}->delete_document($_); } ++$self->{nr_prune}; $TXN_BYTES < 0 and cidx_ckpoint($self, "prune [$self->{shard}] $self->{nr_prune}"); } sub prune_commit { # via wq_io_do in IDX_SHARDS my ($self) = @_; my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p'; my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef'; cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr; send($prune_op_p, "prune_done $self->{shard}", 0); } sub shards_active { # post_loop_do return if $DO_QUIT; return if grep(defined, $PRUNE_DONE, $SCANQ, $IDXQ) != 3; return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS; return 1 if $GITS_NR || scalar(@$IDXQ) || $REPO_CTX; return 1 if @IBXQ || keys(%TODO); for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) { $s->{-cidx_quit} = 1 if defined($s->{-wq_s1}); $s->wq_close; # may recurse via awaitpid outside of event_loop } scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); } # signal handlers sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) } sub parent_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->(); $XHC = 0; # stops the process kill_shards(@_); warn "# SIG$_[0] received, quitting...\n"; } sub prep_umask ($) { my ($self) = @_; if ($self->{-cidx_internal}) { # respect core.sharedRepository @{$self->{git_dirs}} == 1 or die 'BUG: only for GIT_DIR'; local $self->{git} = PublicInbox::Git->new($self->{git_dirs}->[0]); $self->with_umask; } elsif (-d $self->{cidx_dir}) { # respect existing perms my @st = stat(_); my $um = (~$st[2] & 0777); $self->{umask} = $um; # for SearchIdx->with_umask umask == $um or progress($self, 'using umask from ', $self->{cidx_dir}, ': ', sprintf('0%03o', $um)); on_destroy \&CORE::umask, umask($um); } else { $self->{umask} = umask; # for SearchIdx->with_umask undef; } } sub prep_alternate_end ($$) { my ($objdir, $fmt) = @_; my $hexlen = $OFMT2HEXLEN{$fmt} // return warn <', "$git_dir/objects/info/alternates"; } say { $ALT_FH{$hexlen} } $objdir; } sub store_objfmt { # via wq_do - make early cidx users happy my ($self, $docid, $git_dir, $fmt) = @_; $self->begin_txn_lazy; my $doc = $self->get_doc($docid) // return warn "BUG? #$docid for $git_dir missing"; my @p = xap_terms('P', $doc) or return warn "BUG? #$docid for $git_dir has no P(ath)"; @p == 1 or return warn "BUG? #$docid $git_dir multi: @p"; $p[0] eq $git_dir or return warn "BUG? #$docid $git_dir != @p"; $doc->add_boolean_term('H'.$fmt); $self->{xdb}->replace_document($docid, $doc); # wait for prune_commit to commit... } # TODO: remove prep_alternate_read and store_objfmt 1-2 years after 2.0 is out # they are for compatibility with pre-release indices sub prep_alternate_read { # run_git cb for config extensions.objectFormat my ($opt, $self, $git, $objdir, $docid, $shard_n, $run_prune) = @_; return if $DO_QUIT; my $chld_err = $?; prep_alternate_start($self, shift(@PRUNEQ), $run_prune) if @PRUNEQ; my $fmt = check_objfmt_status $git, $chld_err, ${$opt->{1}}; $IDX_SHARDS[$shard_n]->wq_do('store_objfmt', # async $docid, $git->{git_dir}, $fmt); prep_alternate_end $objdir, $fmt; } sub prep_alternate_start { my ($self, $git, $run_prune) = @_; local $self->{xdb}; my ($o, $n, @ids, @fmt); start: $o = $git->git_path('objects'); while (!-d $o) { $git = shift(@PRUNEQ) // return; $o = $git->git_path('objects'); } $n = git_dir_hash($git->{git_dir}) % scalar(@RDONLY_XDB); $self->{xdb} = $RDONLY_XDB[$n] // croak("BUG: no shard[$n]"); @ids = $self->docids_by_postlist('P'.$git->{git_dir}); @fmt = @ids ? xap_terms('H', $self->{xdb}, $ids[0]) : (); @fmt > 1 and warn "BUG? multi `H' for shard[$n] #$ids[0]: @fmt"; if (@fmt) { # cache hit prep_alternate_end $o, $fmt[0]; $git = shift(@PRUNEQ) and goto start; } else { # compatibility w/ early cidx format run_git([qw(config extensions.objectFormat)], { quiet => 1 }, \&prep_alternate_read, $self, $git, $o, $ids[0], $n, $run_prune); } } sub cmd_done { # run_await cb for sort, xapian-delve, sed failures my ($pid, $cmd, undef, undef, $run_on_destroy) = @_; $? and die "fatal: @$cmd (\$?=$?)\n"; # $run_on_destroy calls do_join() or run_prune() } sub current_join_data ($) { my ($self) = @_; local $self->{xdb} = $RDONLY_XDB[0] // die 'BUG: shard[0] undef'; # we support multiple PI_CONFIG files for a cindex: $self->join_data; } # combined previously stored stats with new sub score_old_join_data ($$$) { my ($self, $score, $ekeys_new) = @_; my $old = ($JOIN{reset} ? undef : current_join_data($self)) or return; progress($self, 'merging old join data...'); my ($ekeys_old, $roots_old, $ibx2root_old) = @$old{qw(ekeys roots ibx2root)}; # score: "ibx_off root_off" => nr my $i = -1; my %root2id_new = map { $_ => ++$i } @OFF2ROOT; $i = -1; my %ekey2id_new = map { $_ => ++$i } @$ekeys_new; for my $ibx_off_old (0..$#$ibx2root_old) { my $root_offs_old = $ibx2root_old->[$ibx_off_old]; my $ekey = $ekeys_old->[$ibx_off_old] // do { warn "W: no ibx #$ibx_off_old in old join data\n"; next; }; my $ibx_off_new = $ekey2id_new{$ekey} // do { warn "W: `$ekey' no longer exists\n"; next; }; for (@$root_offs_old) { my ($nr, $rid_old) = @$_; my $root_old = $roots_old->[$rid_old] // do { warn "W: no root #$rid_old in old data\n"; next; }; my $rid_new = $root2id_new{$root_old} // do { warn "W: root `$root_old' no longer exists\n"; next; }; $score->{"$ibx_off_new $rid_new"} += $nr; } } } sub metadata_set { # via wq_do my ($self, $key, $val, $commit) = @_; $self->begin_txn_lazy; $self->{xdb}->set_metadata($key, $val); $self->commit_txn_lazy if $commit || defined(wantarray); } # runs once all inboxes and shards are dumped via OnDestroy sub do_join { my ($self) = @_; return if $DO_QUIT; $XHC = 0; # should not be recreated again @IDX_SHARDS or return warn("# aborting on no shards\n"); unlink("$TMPDIR/root2id"); my @pending = keys %{$self->{PENDING}}; die "BUG: pending=@pending jobs not done\n" if @pending; progress($self, 'joining...'); my @join = (@JOIN, 'to_ibx_off', 'to_root_off'); if (my $time = which('time')) { unshift @join, $time }; my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" }); my %score; while (<$rd>) { # PFX ibx_offs root_off chop eq "\n" or die "no newline from @join: <$_>"; my (undef, $ibx_offs, @root_offs) = split / /, $_; for my $ibx_off (split(/,/, $ibx_offs)) { ++$score{"$ibx_off $_"} for @root_offs; } } $rd->close or die "fatal: @join failed: \$?=$?"; my $nr = scalar(keys %score) or do { delete $TODO{joining}; return progress($self, 'no potential new pairings'); }; progress($self, "$nr potential new pairings..."); my @ekeys = map { $_->eidx_key } @IBX; score_old_join_data($self, \%score, \@ekeys); my $new; while (my ($k, $nr) = each %score) { my ($ibx_off, $root_off) = split(/ /, $k); my ($ekey, $root) = ($ekeys[$ibx_off], $OFF2ROOT[$root_off]); progress($self, "$ekey => $root has $nr matches"); push @{$new->{ibx2root}->[$ibx_off]}, [ $nr, $root_off ]; } for my $ary (values %$new) { # sort by nr (largest first) for (@$ary) { @$_ = sort { $b->[0] <=> $a->[0] } @$_ } } $new->{ekeys} = \@ekeys; $new->{roots} = \@OFF2ROOT; $new->{dt} = \@JOIN_DT; $new = compress(PublicInbox::Config::json()->encode($new)); my $key = $self->join_data_key; my $wait = $IDX_SHARDS[0]->wq_do('metadata_set', $key, $new); delete $TODO{joining}; } sub require_progs { my $op = shift; while (my ($x, $argv) = splice(@_, 0, 2)) { my $e = $x; $e =~ tr/a-z-/A-Z_/; my $c = $ENV{$e} // $x; $argv->[0] //= which($c) // die "E: `$x' required for --$op\n"; } } sub init_join_postfork ($) { my ($self) = @_; return unless $self->{-opt}->{join}; require_progs('join', join => \@JOIN); my $d2 = '([0-9]{2})'; my $dt_re = qr!([0-9]{4})$d2$d2$d2$d2$d2!; if (my $cur = $JOIN{reset} ? undef : current_join_data($self)) { if (($cur->{dt}->[1] // '') =~ m!\A$dt_re\z!o) { my ($Y, $m, $d, $H, $M, $S) = ($1, $2, $3, $4, $5, $6); my $t = timegm($S, $M, $H, $d, $m - 1, $Y); $t = strftime('%Y%m%d%H%M%S', gmtime($t + 1)); $JOIN{dt} //= "$t.."; } else { warn <= 0 or die "E: dt:$QRY_STR is not a range\n"; # Account for send->apply delay (torvalds/linux.git mean is ~20 days # from Author to CommitDate in cases where CommitDate > AuthorDate $QRY_STR .= '1.month.ago' if $QRY_STR =~ /\.\.\z/; @{$self->{git_dirs} // []} or die "E: no coderepos to join\n"; @IBX or die "E: no inboxes to join\n"; my $approx_git = PublicInbox::Git->new($self->{git_dirs}->[0]); # ugh substr($QRY_STR, 0, 0) = 'dt:'; $self->query_approxidate($approx_git, $QRY_STR); # in-place ($JOIN_DT[1]) = ($QRY_STR =~ /\.\.([0-9]{14})\z/); # YYYYmmddHHMMSS ($JOIN_DT[0]) = ($QRY_STR =~ /\Adt:([0-9]{14})/); # YYYYmmddHHMMSS $JOIN_DT[0] //= '19700101'.'000000'; # git uses unsigned times $TODO{do_join} = on_destroy \&do_join, $self; $TODO{joining} = 1; # keep shards_active() happy $TODO{dump_ibx_start} = on_destroy \&dump_ibx_start, $self, $TODO{do_join}; $TODO{dump_roots_start} = on_destroy \&dump_roots_start, $self, $TODO{do_join}; progress($self, "will join in $QRY_STR date range..."); my $id = -1; @IBXQ = map { ++$id } @IBX; } sub init_prune ($) { my ($self) = @_; return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune}; # Dealing with millions of commits here at once, so use faster tools. # xapian-delve is nearly an order-of-magnitude faster than Xapian Perl # bindings. sed/awk are faster than Perl for simple stream ops, and # sort+comm are more memory-efficient with gigantic lists. # pipeline: delve | sed | sort >indexed_commits my @delve = (undef, qw(-A Q -1)); my @sed = (undef, '-ne', 's/^Q//p'); @COMM = (undef, qw(-2 -3 indexed_commits -)); @AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed, comm => \@COMM, awk => \@AWK); for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" } my $run_prune = on_destroy \&run_prune, $self, $TODO{dump_roots_start}; my ($sort_opt, $sed_opt, $delve_opt); pipe(local $sed_opt->{0}, local $delve_opt->{1}); pipe(local $sort_opt->{0}, local $sed_opt->{1}); open($sort_opt->{1}, '+>', "$TMPDIR/indexed_commits"); run_await([@SORT, '-u'], $CMD_ENV, $sort_opt, \&cmd_done, $run_prune); run_await(\@sed, $CMD_ENV, $sed_opt, \&cmd_done, $run_prune); run_await(\@delve, undef, $delve_opt, \&cmd_done, $run_prune); @PRUNEQ = @$SCANQ; for (1..$LIVE_JOBS) { prep_alternate_start($self, shift(@PRUNEQ) // last, $run_prune); } } sub dump_git_commits { # run_await cb my ($pid, $cmd, undef, $batch_opt, $self) = @_; (defined($pid) && $?) and die "E: @$cmd \$?=$?"; return if $DO_QUIT; my ($hexlen) = keys(%ALT_FH) or return; # done, DESTROY batch_opt->{1} close(delete $ALT_FH{$hexlen}); # flushes `say' buffer progress($self, "preparing $hexlen-byte hex OID commits for prune..."); my $g = PublicInbox::Git->new("$TMPDIR/hexlen$hexlen.git"); run_await($g->cmd(@PRUNE_BATCH), undef, $batch_opt, \&dump_git_commits, $self); } sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done my ($self, $drs) = @_; return if $DO_QUIT; # setup the following pipeline: ( # git --git-dir=hexlen40.git cat-file \ # --batch-all-objects --batch-check && # git --git-dir=hexlen64.git cat-file \ # --batch-all-objects --batch-check # ) | awk | sort | comm | cidx_read_comm() my ($awk_opt, $sort_opt, $batch_opt); my $comm_opt = { -C => "$TMPDIR" }; pipe(local $awk_opt->{0}, $batch_opt->{1}); pipe(local $sort_opt->{0}, local $awk_opt->{1}); pipe(local $comm_opt->{0}, local $sort_opt->{1}); run_await(\@AWK, $CMD_ENV, $awk_opt, \&cmd_done); run_await([@SORT, '-u'], $CMD_ENV, $sort_opt, \&cmd_done); my $comm_rd = popen_rd(\@COMM, $CMD_ENV, $comm_opt, \&cmd_done, \@COMM); PublicInbox::CidxComm->new($comm_rd, $self, $drs); # ->cidx_read_comm push @PRUNE_BATCH, '--buffer' if $GIT_VER ge v2.6; # Yes, we pipe --unordered git output to sort(1) because sorting # inside git leads to orders-of-magnitude slowdowns on rotational # storage. GNU sort(1) also works well on larger-than-memory # datasets, and it's not worth eliding sort(1) for old git. push @PRUNE_BATCH, '--unordered' if $GIT_VER ge v2.19; warn(sprintf(<wq_do('prune_init') for @IDX_SHARDS; while (defined(my $cmt = <$comm_rd>)) { chop($cmt) eq "\n" or die "BUG: no LF in comm output ($cmt)"; my $n = hex(substr($cmt, 0, 8)) % scalar(@IDX_SHARDS); $IDX_SHARDS[$n]->wq_do('prune_one', 'Q'.$cmt); last if $DO_QUIT; } for my $git_dir (@GIT_DIR_GONE) { my $n = git_dir_hash($git_dir) % scalar(@IDX_SHARDS); $IDX_SHARDS[$n]->wq_do('prune_one', 'P'.$git_dir); last if $DO_QUIT; } my ($c, $p) = PublicInbox::PktOp->pair; $c->{ops}->{prune_done} = [ $self, $drs ]; $_->wq_io_do('prune_commit', [ $p->{op_p} ]) for @IDX_SHARDS; } sub init_join_prefork ($) { my ($self) = @_; my $subopt = $self->{-opt}->{join} // return; %JOIN = map { my ($k, $v) = split /:/, $_, 2; $k => $v // 1; } split(/,/, join(',', @$subopt)); require PublicInbox::CidxXapHelperAux; require PublicInbox::XapClient; my @unknown; my $pfx = $JOIN{prefixes} // 'dfpost7'; for my $p (split /\+/, $pfx) { my $n = ''; $p =~ s/([0-9]+)\z// and $n = $1; my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$p} // push(@unknown, $p); push(@JOIN_PFX, map { $_.$n } split(/ /, $v)); } @unknown and die < undef; } else { warn "W: `$_' is not a public inbox, skipping\n"; (); } } (@{$self->{-opt}->{include} // []}); my $all = $self->{-opt}->{all}; if (my $only = $self->{-opt}->{only}) { die <<'' if $all; E: --all is incompatible with --only $incl{rel2abs_collapsed($_)} = undef for @$only; } unless (keys(%incl)) { $all = 1; warn <{opt}->{quiet}; # --all implied since no inboxes were specified with --only or --include EOM } $self->{-opt}->{-pi_cfg}->each_inbox(\&_prep_ibx, $self, \%incl, $all); my $nr = scalar(@IBX) or die "E: no inboxes to join with\n"; progress($self, "will join with $nr inboxes in ", $self->{-opt}->{-pi_cfg}->{-f}, " using: $pfx"); } sub _prep_ibx { # each_inbox callback my ($ibx, $self, $incl, $all) = @_; ($all || exists($incl->{$ibx->{inboxdir}})) and push @IBX, $ibx; } sub show_json { # for diagnostics (unstable output) my ($self) = @_; my $s = $self->{-opt}->{show} or return; # for diagnostics local $self->{xdb}; my %ret; my @todo = @$s; while (defined(my $f = shift @todo)) { if ($f =~ /\A(?:roots2paths|paths2roots|join_data)\z/) { $ret{$f} = $self->$f; } elsif ($f eq '') { # default --show (no args) push @todo, qw(roots2paths join_data); } else { warn "E: cannot show `$f'\n"; } } my $json = ref(PublicInbox::Config::json())->new; $json->utf8->canonical->pretty; # n.b. FS pathnames may not be UTF-8... say $json->encode(\%ret); } sub do_inits { # called via PublicInbox::DS::add_timer my ($self) = @_; grep !!$_, @{$self->{-opt}}{qw(scan prune)} and @$SCANQ = map PublicInbox::Git->new($_), @{$self->{git_dirs}}; init_join_postfork $self; init_prune $self; scan_git_dirs $self; my $max = $TODO{do_join} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS; index_next($self) for (1..$max); } sub cidx_run { # main entry point my ($self) = @_; my $restore_umask = prep_umask($self); local $SIGSET = PublicInbox::DS::block_signals( POSIX::SIGTSTP, POSIX::SIGCONT); my $restore = on_destroy \&PublicInbox::DS::sig_setmask, $SIGSET; local $PRUNE_DONE = []; local $IDXQ = []; local $SCANQ = []; local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNEQ, $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV, %TODO, @IBXQ, @IBX, @JOIN, %JOIN, @JOIN_PFX, @NO_ABBREV, @JOIN_DT, $DUMP_IBX_WPIPE, @OFF2ROOT, $XHC, @SORT, $GITS_NR); local $BATCH_BYTES = $self->{-opt}->{batch_size} // $PublicInbox::SearchIdx::BATCH_BYTES; local $MAX_SIZE = $self->{-opt}->{max_size}; local $self->{PENDING} = {}; # used by PublicInbox::CidxXapHelperAux my $cfg = $self->{-opt}->{-pi_cfg} // die 'BUG: -pi_cfg unset'; $self->{-cfg_f} = $cfg->{-f} = rel2abs_collapsed($cfg->{-f}); local $GIT_VER = PublicInbox::Git::git_version(); @NO_ABBREV = ('-c', 'core.abbrev='.($GIT_VER lt v2.31.0 ? 40 : 'no')); if (grep { $_ } @{$self->{-opt}}{qw(prune join)}) { require File::Temp; $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); $CMD_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' }; require_progs('(prune|join)', sort => \@SORT); for (qw(parallel compress-program buffer-size)) { # GNU sort my $v = $self->{-opt}->{"sort-$_"}; push @SORT, "--$_=$v" if defined $v; } ($self->{-opt}->{prune} && $GIT_VER le v2.6) and die "W: --prune requires git v2.6+\n"; init_join_prefork($self) } local @IDX_SHARDS = cidx_init($self); # forks workers local $ANY_SHARD = -1; local $self->{current_info} = ''; local $MY_SIG = { CHLD => \&PublicInbox::DS::enqueue_reap, USR1 => \&kill_shards, }; local @PRUNE_BATCH = @PRUNE_BATCH; $MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT); my $cb = $SIG{__WARN__} || \&CORE::warn; local $SIG{__WARN__} = sub { my $m = shift @_; $self->{current_info} eq '' or $m =~ s/\A(#?\s*)/$1$self->{current_info}: /; $cb->($m, @_); }; load_existing($self) unless $self->{-cidx_internal}; if ($self->{-opt}->{reindex}) { require PublicInbox::SharedKV; $REINDEX = PublicInbox::SharedKV->new; delete $REINDEX->{lock_path}; $REINDEX->dbh; } my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}}; if (@nc) { warn "E: BUG? paths in $self->{cidx_dir} not canonicalized:\n"; for my $d (@{$self->{git_dirs}}) { my $c = File::Spec->canonpath($_); warn "E: $d => $c\n"; $d = $c; } warn "E: canonicalized and attempting to continue\n"; } if (defined(my $excl = $self->{-opt}->{exclude})) { my $re = '(?:'.join('\\z|', map { glob2re($_) // qr/\A\Q$_\E/ } @$excl).'\\z)'; my @excl; @{$self->{git_dirs}} = grep { $_ =~ /$re/ ? (push(@excl, $_), 0) : 1; } @{$self->{git_dirs}}; warn("# excluding $_\n") for @excl; @GIT_DIR_GONE = uniqstr @GIT_DIR_GONE, @excl; } local $NCHANGE = 0; local $NPROC = PublicInbox::IPC::detect_nproc(); local $LIVE_JOBS = $self->{-opt}->{jobs} || $NPROC || 2; local @RDONLY_XDB = $self->xdb_shards_flat; PublicInbox::DS::add_timer(0, \&do_inits, $self); # FreeBSD ignores/discards SIGCHLD while signals are blocked and # EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending PublicInbox::DS::enqueue_reap(); local @PublicInbox::DS::post_loop_do = (\&shards_active); PublicInbox::DS::event_loop($MY_SIG, $SIGSET); $self->lock_release(!!$NCHANGE); show_json($self); } sub ipc_atfork_child { # @IDX_SHARDS my ($self) = @_; $self->SUPER::ipc_atfork_child; $SIG{USR1} = \&shard_usr1; $SIG{$_} = \&shard_quit for qw(INT TERM QUIT); my $x = delete $self->{siblings} // die 'BUG: no {siblings}'; $_->wq_close for @$x; undef; } sub shard_done_wait { # awaitpid cb via ipc_worker_reap my ($pid, $shard, $self) = @_; my $quit_req = delete($shard->{-cidx_quit}); return if $DO_QUIT; if ($? == 0) { # success $quit_req // warn 'BUG: {-cidx_quit} unset'; } else { warn "PID:$pid $shard->{shard} exited with \$?=$?\n"; ++$self->{shard_err} if defined($self->{shard_err}); } } 1;