diff options
Diffstat (limited to 'lib/PublicInbox/CodeSearchIdx.pm')
-rw-r--r-- | lib/PublicInbox/CodeSearchIdx.pm | 1391 |
1 files changed, 1391 insertions, 0 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm new file mode 100644 index 00000000..6d777bf6 --- /dev/null +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -0,0 +1,1391 @@ +# Copyright (C) all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +# +# indexer for git coderepos, just commits and repo paths for now +# this stores normalized absolute paths of indexed GIT_DIR inside +# the DB itself and is designed to handle forks by designating roots +# At minimum, it needs to have the pathnames of all git repos in +# memory at runtime. --join also requires all inbox pathnames to +# be in memory (as it happens when loaded from ~/.public-inbox/config). +# +# Unlike mail search, docid isn't tied to NNTP artnum or IMAP UID, +# there's no serial number dependency at all. The first 32-bits of +# the commit SHA-(1|256) is used to select a shard. +# +# We shard repos using the first 32-bits of sha256($ABS_GIT_DIR) +# +# --join associates root commits of coderepos to inboxes based on prefixes. +# +# Internally, each inbox is assigned a non-negative integer index ($IBX_OFF), +# and each root commit object ID (SHA-1/SHA-256 hex) is also assigned +# a non-negative integer index ($ROOT_COMMIT_OID_ID). +# +# join dumps to 2 intermediate files in $TMPDIR: +# +# * to_root_off - each line is of the format: +# +# $PFX @ROOT_COMMIT_OID_OFFS +# +# * to_ibx_off - each line is of the format: +# +# $PFX @IBX_OFFS +# +# $IBX_OFFS is a comma-delimited list of integers ($IBX_ID) +# The $IBX_OFF here is ephemeral (per-join_data) and NOT related to +# the `ibx_off' column of `over.sqlite3' for extindex. +# @ROOT_COMMIT_OID_OFFS is space-delimited +# In both cases, $PFX is typically the value of the 7-(hex)char dfpost +# XDFPOST but it can be configured to use any combination of patchid, +# dfpre, dfpost or dfblob. +# +# WARNING: this is vulnerable to arbitrary memory usage attacks if we +# attempt to index or join against malicious coderepos with +# thousands/millions of root commits. Most coderepos have only one +# root commit, some have several: git.git currently has 7, +# torvalds/linux.git has 4. +# --max-size= is required to keep memory usage reasonable for gigantic +# commits. +# +# See PublicInbox::CodeSearch (read-only API) for more +package PublicInbox::CodeSearchIdx; +use v5.12; +# parent order matters, we want ->DESTROY from IPC, not SearchIdx +use parent qw(PublicInbox::CodeSearch PublicInbox::IPC PublicInbox::SearchIdx); +use PublicInbox::DS qw(awaitpid); +use PublicInbox::PktOp; +use PublicInbox::IPC qw(nproc_shards); +use POSIX qw(WNOHANG SEEK_SET strftime); +use File::Path (); +use File::Spec (); +use List::Util qw(max); +use PublicInbox::SHA qw(sha256_hex sha_all); +use PublicInbox::Search qw(xap_terms); +use PublicInbox::SearchIdx qw(add_val); +use PublicInbox::Config qw(glob2re rel2abs_collapsed); +use PublicInbox::Spawn qw(which spawn popen_rd); +use PublicInbox::OnDestroy; +use PublicInbox::CidxLogP; +use PublicInbox::CidxComm; +use PublicInbox::Git qw(%OFMT2HEXLEN); +use PublicInbox::Compat qw(uniqstr); +use PublicInbox::Aspawn qw(run_await); +use Compress::Zlib qw(compress); +use Carp qw(croak); +use Time::Local qw(timegm); +use autodie qw(close pipe open sysread seek sysseek send); +our $DO_QUIT = 15; # signal number +our ( + $LIVE_JOBS, # integer + $GITS_NR, # number of coderepos + $MY_SIG, # like %SIG + $SIGSET, + $TXN_BYTES, # number of bytes in current shard transaction + $BATCH_BYTES, + @RDONLY_XDB, # Xapian::Database + @IDX_SHARDS, # clones of self + $MAX_SIZE, + $REINDEX, # PublicInbox::SharedKV + @GIT_DIR_GONE, # [ git_dir1, git_dir2 ] + $PRUNE_DONE, # marks off prune completions + $NCHANGE, # current number of changes + $NPROC, + $XHC, # XapClient + $REPO_CTX, # current repo being indexed in shards + $IDXQ, # PublicInbox::Git object arrayref + $SCANQ, # PublicInbox::Git object arrayref + %ALT_FH, # hexlen => tmp IO for TMPDIR git alternates + $TMPDIR, # File::Temp->newdir object for prune + @PRUNEQ, # GIT_DIRs to prepare for pruning + %TODO, @IBXQ, @IBX, + @JOIN, # join(1) command for --join + $CMD_ENV, # env for awk(1), comm(1), sort(1) commands during prune + @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands + %JOIN, # CLI --join= suboptions + @JOIN_PFX, # any combination of XDFID, XDFPRE, XDFPOST + @JOIN_DT, # YYYYmmddHHMMSS for dt: + $QRY_STR, # common query string for both code and inbox associations + $DUMP_IBX_WPIPE, # goes to sort(1) + $ANY_SHARD, # shard round-robin for scan fingerprinting + @OFF2ROOT, + $GIT_VER, + @NO_ABBREV, +); + +# stop walking history if we see >$SEEN_MAX existing commits, this assumes +# branches don't diverge by more than this number of commits... +# git walks commits quickly if it doesn't have to read trees +our $SEEN_MAX = 100000; + +# window for commits/emails to determine a inbox <-> coderepo association +my $JOIN_WINDOW = 50000; + +our @PRUNE_BATCH = qw(cat-file --batch-all-objects --batch-check); + +# TODO: do we care about committer name + email? or tree OID? +my @FMT = qw(H P ct an ae at s b); # (b)ody must be last + +# git log --stdin buffers all commits before emitting, thus --reverse +# doesn't incur extra overhead. We use --reverse to keep Xapian docids +# increasing so we may be able to avoid sorting results in some cases +my @LOG_STDIN = (qw(log --no-decorate --no-color --no-notes -p --stat -M + --reverse --stdin --no-walk=unsorted), '--pretty=format:%n%x00'. + join('%n', map { "%$_" } @FMT)); + +sub new { + my (undef, $dir, $opt) = @_; + my $l = $opt->{indexlevel} // 'full'; + $l !~ $PublicInbox::SearchIdx::INDEXLEVELS and + die "invalid indexlevel=$l\n"; + $l eq 'basic' and die "E: indexlevel=basic not supported\n"; + my $self = bless { + xpfx => "$dir/cidx". PublicInbox::CodeSearch::CIDX_SCHEMA_VER, + cidx_dir => $dir, + creat => 1, # TODO: get rid of this, should be implicit + transact_bytes => 0, # for checkpoint + total_bytes => 0, # for lock_release + current_info => '', + parallel => 1, + -opt => $opt, + lock_path => "$dir/cidx.lock", + }, __PACKAGE__; + $self->{nshard} = count_shards($self) || + nproc_shards({nproc => $opt->{jobs}}); + $self->{-no_fsync} = 1 if !$opt->{fsync}; + $self->{-dangerous} = 1 if $opt->{dangerous}; + $self; +} + +# This is similar to uniq(1) on the first column, but combines the +# contents of subsequent columns using $OFS. +our @UNIQ_FOLD = ($^X, $^W ? ('-w') : (), qw(-MList::Util=uniq -ane), <<'EOM'); +BEGIN { $ofs = $ENV{OFS} // ','; $apfx = '' } +if ($F[0] eq $apfx) { + shift @F; + push @ids, @F; +} else { + print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids; + ($apfx, @ids) = @F; +} +END { print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids } +EOM + +# TODO: may be used for reshard/compact +sub count_shards { scalar($_[0]->xdb_shards_flat) } + +sub update_commit ($$$) { + my ($self, $cmt, $roots) = @_; # fields from @FMT + my $x = 'Q'.$cmt->{H}; + my ($docid, @extra) = sort { $a <=> $b } $self->docids_by_postlist($x); + @extra and warn "W: $cmt->{H} indexed multiple times, pruning ", + join(', ', map { "#$_" } @extra), "\n"; + $self->{xdb}->delete_document($_) for @extra; + my $doc = $PublicInbox::Search::X{Document}->new; + $doc->add_boolean_term($x); + $doc->add_boolean_term('G'.$_) for @$roots; + $doc->add_boolean_term('XP'.$_) for split(/ /, $cmt->{P}); + $doc->add_boolean_term('T'.'c'); + + # Author-Time is compatible with dt: for mail search schema_version=15 + add_val($doc, PublicInbox::CodeSearch::AT, + POSIX::strftime('%Y%m%d%H%M%S', gmtime($cmt->{at}))); + + # Commit-Time is the fallback used by rt: (TS) for mail search: + add_val($doc, PublicInbox::CodeSearch::CT, $cmt->{ct}); + + $self->term_generator->set_document($doc); + + # email address is always indexed with positional data for usability + $self->index_phrase("$cmt->{an} <$cmt->{ae}>", 1, 'A'); + + $x = $cmt->{'s'}; + $self->index_text($x, 1, 'S') if $x =~ /\S/s; + $doc->set_data($x); # subject is the first (and currently only) line + + $x = delete $cmt->{b}; + $self->index_body_text($doc, \$x) if $x =~ /\S/s; + defined($docid) ? $self->{xdb}->replace_document($docid, $doc) : + $self->{xdb}->add_document($doc); +} + +sub progress { + my ($self, @msg) = @_; + my $pr = $self->{-opt}->{-progress} or return; + $pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n"); +} + +sub check_objfmt_status ($$$) { + my ($git, $chld_err, $fmt) = @_; + my ($status, $sig) = ($chld_err >> 8, $chld_err & 127); + if (!$sig && $status == 1) { # unset, default is '' (SHA-1) + $fmt = 'sha1'; + } elsif (!$sig && $status == 0) { + chomp($fmt ||= 'sha1'); + } + $fmt // warn("git --git-dir=$git->{git_dir} config \$?=$chld_err"); + $fmt; +} + +sub store_repo { # wq_io_do, sends docid back + my ($self, $repo) = @_; + my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; + my $git = bless $repo, 'PublicInbox::Git'; + my $rd = $git->popen(qw(config extensions.objectFormat)); + $self->begin_txn_lazy; + $self->{xdb}->delete_document($_) for @{$repo->{to_delete}}; + my $doc = $PublicInbox::Search::X{Document}->new; + add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct}); + $doc->add_boolean_term("P$repo->{git_dir}"); + $doc->add_boolean_term('T'.'r'); + $doc->add_boolean_term('G'.$_) for @{$repo->{roots}}; + $doc->set_data($repo->{fp}); # \n delimited + my $fmt = readline($rd); + $rd->close; + $fmt = check_objfmt_status $git, $?, $fmt; + $OFMT2HEXLEN{$fmt} // warn <<EOM; # store unknown formats anyways +E: unknown extensions.objectFormat=$fmt in $repo->{git_dir} +EOM + $doc->add_boolean_term('H'.$fmt); + my $did = $repo->{docid}; + $did ? $self->{xdb}->replace_document($did, $doc) + : ($did = $self->{xdb}->add_document($doc)); + send($op_p, "repo_stored $did", 0); +} + +sub cidx_ckpoint ($;$) { + my ($self, $msg) = @_; + progress($self, $msg) if defined($msg); + $TXN_BYTES = $BATCH_BYTES; # reset + return if $PublicInbox::Search::X{CLOEXEC_UNSET}; + $self->commit_txn_lazy; + $self->begin_txn_lazy; +} + +sub truncate_cmt ($$) { + my ($cmt) = @_; # _[1] is $buf (giant) + my ($orig_len, $len); + $len = $orig_len = length($_[1]); + @$cmt{@FMT} = split(/\n/, $_[1], scalar(@FMT)); + undef $_[1]; + $len -= length($cmt->{b}); + + # try to keep the commit message body. + # n.b. this diffstat split may be unreliable but it's not worth + # perfection for giant commits: + my ($bdy) = split(/^---\n/sm, delete($cmt->{b}), 2); + if (($len + length($bdy)) <= $MAX_SIZE) { + $len += length($bdy); + $cmt->{b} = $bdy; + warn <<EOM; +W: $cmt->{H}: truncated body ($orig_len => $len bytes) +W: to be under --max-size=$MAX_SIZE +EOM + } else { + $cmt->{b} = ''; + warn <<EOM; +W: $cmt->{H}: deleted body ($orig_len => $len bytes) +W: to be under --max-size=$MAX_SIZE +EOM + } + $len; +} + +sub cidx_reap_log { # awaitpid cb + my ($pid, $cmd, $self, $op_p) = @_; + if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT || + ($? & 127) == POSIX::SIGPIPE))) { + send($op_p, "shard_done $self->{shard}", 0); + } else { + warn "W: @$cmd (\$?=$?)\n"; + $self->{xdb}->cancel_transaction; + } +} + +sub shard_index { # via wq_io_do in IDX_SHARDS + my ($self, $git, $roots) = @_; + + my $in = delete($self->{0}) // die 'BUG: no {0} input'; + my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p'; + sysseek($in, 0, SEEK_SET); + my $cmd = $git->cmd(@NO_ABBREV, @LOG_STDIN); + my $rd = popen_rd($cmd, undef, { 0 => $in }, + \&cidx_reap_log, $cmd, $self, $op_p); + PublicInbox::CidxLogP->new($rd, $self, $git, $roots); + # CidxLogP->event_step will call cidx_read_log_p once there's input +} + +# sharded reader for `git log --pretty=format: --stdin' +sub cidx_read_log_p { + my ($self, $log_p, $rd) = @_; + my $git = delete $log_p->{git} // die 'BUG: no {git}'; + local $self->{current_info} = "$git->{git_dir} [$self->{shard}]"; + my $roots = delete $log_p->{roots} // die 'BUG: no {roots}'; + # local-ized in parent before fork + $TXN_BYTES = $BATCH_BYTES; + local $self->{git} = $git; # for patchid + return if $DO_QUIT; + my $nr = 0; + + # a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4 + # in linux.git, so we use $/ = "\n\0" to check end-of-patch + my $FS = "\n\0"; + my $len; + my $cmt = {}; + local $/ = $FS; + my $buf = <$rd> // return; # leading $FS + $buf eq $FS or die "BUG: not LF-NUL: $buf\n"; + $self->begin_txn_lazy; + while (!$DO_QUIT && defined($buf = <$rd>)) { + chomp($buf); + $/ = "\n"; + $len = length($buf); + if (defined($MAX_SIZE) && $len > $MAX_SIZE) { + $len = truncate_cmt($cmt, $buf); + } else { + @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT)); + } + if (($TXN_BYTES -= $len) <= 0) { + cidx_ckpoint($self, "[$self->{shard}] $nr"); + $TXN_BYTES -= $len; # len may be huge, >TXN_BYTES; + } + update_commit($self, $cmt, $roots); + ++$nr; + cidx_ckpoint($self, "[$self->{shard}] $nr") if $TXN_BYTES <= 0; + $/ = $FS; + } + # return and wait for cidx_reap_log +} + +sub shard_done { # called via PktOp on shard_index completion + my ($self, $repo_ctx, $on_destroy, $n) = @_; + $repo_ctx->{shard_ok}->{$n} = 1; +} + +sub repo_stored { + my ($self, $repo_ctx, $drs, $did) = @_; + # check @IDX_SHARDS instead of DO_QUIT to avoid wasting prior work + # because shard_commit is fast + return unless @IDX_SHARDS; + $did > 0 or die "BUG: $repo_ctx->{repo}->{git_dir}: docid=$did"; + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{shard_done} = [ $self, $repo_ctx, + on_destroy(\&next_repos, $repo_ctx, $drs)]; + # shard_done fires when all shards are committed + my @active = keys %{$repo_ctx->{active}}; + $IDX_SHARDS[$_]->wq_io_do('shard_commit', [ $p->{op_p} ]) for @active; +} + +sub prune_done { # called via prune_do completion + my ($self, $drs, $n) = @_; + return if $DO_QUIT || !$PRUNE_DONE; + die "BUG: \$PRUNE_DONE->[$n] already defined" if $PRUNE_DONE->[$n]; + $PRUNE_DONE->[$n] = 1; + if (grep(defined, @$PRUNE_DONE) == @IDX_SHARDS) { + progress($self, 'prune done'); + index_next($self); # may kick dump_roots_start + } +} + +sub seen ($$) { + my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH" + for (1..100) { + my $ret = eval { + $xdb->postlist_begin($q) != $xdb->postlist_end($q); + }; + return $ret unless $@; + if (ref($@) =~ /\bDatabaseModifiedError\b/) { + $xdb->reopen; + } else { + Carp::croak($@); + } + } + Carp::croak('too many Xapian DB modifications in progress'); +} + +# used to select the shard for a GIT_DIR +sub git_dir_hash ($) { hex(substr(sha256_hex($_[0]), 0, 8)) } + +sub _cb { # run_await cb + my ($pid, $cmd, undef, $opt, $cb, $self, $git, @arg) = @_; + return if $DO_QUIT; + return $cb->($opt, $self, $git, @arg) if $opt->{quiet}; + $? ? ($git->{-cidx_err} = warn("W: @$cmd (\$?=$?)\n")) : + $cb->($opt, $self, $git, @arg); +} + +sub run_git { + my ($cmd, $opt, $cb, $self, $git, @arg) = @_; + run_await($git->cmd(@$cmd), undef, $opt, \&_cb, $cb, $self, $git, @arg) +} + +# this is different from the grokmirror-compatible fingerprint since we +# only care about --heads (branches) and --tags, and not even their names +sub fp_start ($$) { + my ($self, $git) = @_; + return if $DO_QUIT; + open my $refs, '+>', undef; + $git->{-repo}->{refs} = $refs; + my ($c, $p) = PublicInbox::PktOp->pair; + my $next_on_err = on_destroy \&index_next, $self; + $c->{ops}->{fp_done} = [ $self, $git, $next_on_err ]; + $IDX_SHARDS[++$ANY_SHARD % scalar(@IDX_SHARDS)]->wq_io_do('fp_async', + [ $p->{op_p}, $refs ], $git->{git_dir}) +} + +sub fp_async { # via wq_io_do in worker + my ($self, $git_dir) = @_; + my $op_p = delete $self->{0} // die 'BUG: no {0} op_p'; + my $refs = delete $self->{1} // die 'BUG: no {1} refs'; + my $git = PublicInbox::Git->new($git_dir); + run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs }, + \&fp_async_done, $self, $git, $op_p); +} + +sub fp_async_done { # run_git cb from worker + my ($opt, $self, $git, $op_p) = @_; + my $refs = delete $opt->{1} // 'BUG: no {-repo}->{refs}'; + sysseek($refs, 0, SEEK_SET); + send($op_p, 'fp_done '.sha_all(256, $refs)->hexdigest, 0); +} + +sub fp_done { # called parent via PktOp by fp_async_done + my ($self, $git, $next_on_err, $hex) = @_; + $next_on_err->cancel; + return if $DO_QUIT; + $git->{-repo}->{fp} = $hex; + my $n = git_dir_hash($git->{git_dir}) % scalar(@RDONLY_XDB); + my $shard = bless { %$self, shard => $n }, ref($self); + $git->{-repo}->{shard_n} = $n; + delete @$shard{qw(lockfh lock_path)}; + local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef"; + $shard->retry_reopen(\&check_existing, $self, $git); +} + +sub check_existing { # retry_reopen callback + my ($shard, $self, $git) = @_; + my @docids = $shard->docids_of_git_dir($git->{git_dir}); + my $docid = shift(@docids) // return prep_repo($self, $git); # new repo + my $doc = $shard->get_doc($docid) // + die "BUG: no #$docid ($git->{git_dir})"; + my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data; + if ($old_fp eq $git->{-repo}->{fp}) { # no change + delete $git->{-repo}; + return index_next($self); + } + $git->{-repo}->{docid} = $docid; + if (@docids) { + warn "BUG: $git->{git_dir} indexed multiple times, culling\n"; + $git->{-repo}->{to_delete} = \@docids; # XXX needed? + } + prep_repo($self, $git); +} + +sub partition_refs ($$$) { + my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output + sysseek($refs, 0, SEEK_SET); + my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs }); + my $seen = 0; + my @shard_in = map { + $_->reopen; + open my $fh, '+>', undef; + $fh; + } @RDONLY_XDB; + + my $n0 = $NCHANGE; + while (defined(my $cmt = <$rfh>)) { + chomp $cmt; + my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_XDB); + if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) { + say { $shard_in[$n] } $cmt; + ++$NCHANGE; + } elsif (seen($RDONLY_XDB[$n], 'Q'.$cmt)) { + last if ++$seen > $SEEN_MAX; + } else { + say { $shard_in[$n] } $cmt; + ++$NCHANGE; + $seen = 0; + } + if ($DO_QUIT) { + $rfh->close; + return (); + } + } + $rfh->close; + return () if $DO_QUIT; + if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) { + my $n = $NCHANGE - $n0; + progress($self, "$git->{git_dir}: $n commits") if $n; + return @shard_in; + } + die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n"; +} + +sub shard_commit { # via wq_io_do + my ($self) = @_; + my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; + $self->commit_txn_lazy; + send($op_p, "shard_done $self->{shard}", 0); +} + +sub dump_roots_start { + my ($self, $do_join) = @_; + return if $DO_QUIT; + $XHC //= PublicInbox::XapClient::start_helper("-j$NPROC"); + $do_join // die 'BUG: no $do_join'; + progress($self, 'dumping IDs from coderepos'); + local $self->{xdb}; + @OFF2ROOT = $self->all_terms('G'); + my $root2id = "$TMPDIR/root2id"; + open my $fh, '>', $root2id; + my $nr = -1; + for (@OFF2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly + close $fh; + # dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_off + my ($sort_opt, $fold_opt); + pipe(local $sort_opt->{0}, my $sort_w); + pipe(local $fold_opt->{0}, local $sort_opt->{1}); + my @sort = (@SORT, '-k1,1'); + my $dst = "$TMPDIR/to_root_off"; + open $fold_opt->{1}, '>', $dst; + my $fold_env = { %$CMD_ENV, OFS => ' ' }; + run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $do_join); + run_await(\@UNIQ_FOLD, $fold_env, $fold_opt, \&cmd_done, $do_join); + my $window = $JOIN{window} // $JOIN_WINDOW; + my @m = $window <= 0 ? () : ('-m', $window); + my @arg = ((map { ('-A', $_) } @JOIN_PFX), '-c', + @m, $root2id, $QRY_STR); + for my $d ($self->shard_dirs) { + pipe(my $err_r, my $err_w); + $XHC->mkreq([$sort_w, $err_w], qw(dump_roots -d), $d, @arg); + my $desc = "dump_roots $d"; + $self->{PENDING}->{$desc} = $do_join; + PublicInbox::CidxXapHelperAux->new($err_r, $self, $desc); + } + progress($self, 'waiting on dump_roots sort'); +} + +sub dump_ibx { # sends to xap_helper.h + my ($self, $ibx_off) = @_; + my $ibx = $IBX[$ibx_off] // die "BUG: no IBX[$ibx_off]"; + my $ekey = $ibx->eidx_key; + my $srch = $ibx->isrch or return warn <<EOM; +W: $ekey not indexed for search +EOM + # note: we don't send `-m MAX' to dump_ibx since we have to + # post-filter non-patch messages for now... + my @cmd = ('dump_ibx', $srch->xh_args, + (map { ('-A', $_) } @JOIN_PFX), $ibx_off, $QRY_STR); + pipe(my $r, my $w); + $XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd); + $self->{PENDING}->{$ekey} = $TODO{do_join}; + PublicInbox::CidxXapHelperAux->new($r, $self, $ekey); +} + +sub dump_ibx_start { + my ($self, $do_join) = @_; + return if $DO_QUIT; + $XHC //= PublicInbox::XapClient::start_helper("-j$NPROC"); + my ($sort_opt, $fold_opt); + pipe(local $sort_opt->{0}, $DUMP_IBX_WPIPE); + pipe(local $fold_opt->{0}, local $sort_opt->{1}); + my @sort = (@SORT, '-k1,1'); # sort only on JOIN_PFX + # pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_off + open $fold_opt->{1}, '>', "$TMPDIR/to_ibx_off"; + run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $do_join); + run_await(\@UNIQ_FOLD, $CMD_ENV, $fold_opt, \&cmd_done, $do_join); +} + +sub index_next ($) { + my ($self) = @_; + return if $DO_QUIT; + if ($IDXQ && @$IDXQ) { + index_repo($self, shift @$IDXQ); + } elsif ($SCANQ && @$SCANQ) { + fp_start $self, shift @$SCANQ; + } elsif ($TMPDIR) { + delete $TODO{dump_roots_start}; + delete $TODO{dump_ibx_start}; # runs OnDestroy once + return dump_ibx($self, shift @IBXQ) if @IBXQ; + undef $DUMP_IBX_WPIPE; # done dumping inboxes + delete $TODO{do_join}; + } + # else: wait for shards_active (post_loop_do) callback +} + +sub next_repos { # OnDestroy cb + my ($repo_ctx, $drs) = @_; + my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)}; + progress($self, "$repo->{git_dir}: done"); + return if $DO_QUIT || !$REPO_CTX; + my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active; + die "E: $repo->{git_dir} $n shards failed" if $n; + $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx"; + $REPO_CTX = undef; + index_next($self); +} + +sub index_done { # OnDestroy cb called when done indexing each code repo + my ($repo_ctx, $drs) = @_; + return if $DO_QUIT; + my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)}; + # $active may be undef here, but it's fine to vivify + my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active; + die "E: $repo->{git_dir} $n shards failed" if $n; + $repo_ctx->{shard_ok} = {}; # reset for future shard_done + $n = $repo->{shard_n}; + $repo_ctx->{active}->{$n} = undef; # may vivify $repo_ctx->{active} + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{repo_stored} = [ $self, $repo_ctx, $drs ]; + $IDX_SHARDS[$n]->wq_io_do('store_repo', [ $p->{op_p} ], $repo); + # repo_stored will fire once store_repo is done +} + +sub index_repo { + my ($self, $git) = @_; + return if $DO_QUIT; + my $repo = $git->{-repo} // die 'BUG: no {-repo}'; + return index_next($self) if $git->{-cidx_err}; + if (!defined($repo->{ct})) { + warn "W: $git->{git_dir} has no commits, skipping\n"; + return index_next($self); + } + return push(@$IDXQ, $git) if $REPO_CTX; # busy + delete $git->{-repo}; + my $roots_fh = delete $repo->{roots_fh} // die 'BUG: no {roots_fh}'; + seek($roots_fh, 0, SEEK_SET); + chomp(my @roots = PublicInbox::IO::read_all $roots_fh); + if (!@roots) { + warn("E: $git->{git_dir} has no root commits\n"); + return index_next($self); + } + $repo->{roots} = \@roots; + local $self->{current_info} = $git->{git_dir}; + my @shard_in = partition_refs($self, $git, delete($repo->{refs})); + $repo->{git_dir} = $git->{git_dir}; + my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo }; + delete $git->{-cidx_gits_fini}; # may fire gits_fini + my $drs = delete $git->{-cidx_dump_roots_start}; + my $index_done = on_destroy \&index_done, $repo_ctx, $drs; + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{shard_done} = [ $self, $repo_ctx, $index_done ]; + for my $n (0..$#shard_in) { + $shard_in[$n]->flush or die "flush shard[$n]: $!"; + -s $shard_in[$n] or next; + last if $DO_QUIT; + $IDX_SHARDS[$n]->wq_io_do('shard_index', + [ $shard_in[$n], $p->{op_p} ], + $git, \@roots); + $repo_ctx->{active}->{$n} = undef; + } + # shard_done fires when shard_index is done +} + +sub ct_fini { # run_git cb + my ($opt, $self, $git, $index_repo) = @_; + my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF + $git->{-repo}->{ct} = $ct + 0; +} + +# TODO: also index gitweb.owner and the full fingerprint for grokmirror? +sub prep_repo ($$) { + my ($self, $git) = @_; + return if $DO_QUIT; + my $index_repo = on_destroy \&index_repo, $self, $git; + my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; + sysseek($refs, 0, SEEK_SET); + open my $roots_fh, '+>', undef; + $git->{-repo}->{roots_fh} = $roots_fh; + run_git([ qw(rev-list --stdin --max-parents=0) ], + { 0 => $refs, 1 => $roots_fh }, \&PublicInbox::Config::noop, + $self, $git, $index_repo); + run_git([ qw[for-each-ref --sort=-committerdate + --format=%(committerdate:raw) --count=1 + refs/heads/ refs/tags/] ], undef, # capture like qx + \&ct_fini, $self, $git, $index_repo); +} + +# for PublicInbox::SearchIdx `git patch-id' call and with_umask +sub git { $_[0]->{git} } + +sub load_existing ($) { # for -u/--update + my ($self) = @_; + my $dirs = $self->{git_dirs} //= []; + if ($self->{-opt}->{update} || $self->{-opt}->{prune}) { + local $self->{xdb}; + $self->xdb or + die "E: $self->{cidx_dir} non-existent for --update\n"; + my @cur = grep { + if (-e $_) { + 1; + } else { + push @GIT_DIR_GONE, $_; + undef; + } + } $self->all_terms('P'); + if (@GIT_DIR_GONE && !$self->{-opt}->{prune}) { + warn "W: the following repos no longer exist:\n", + (map { "W:\t$_\n" } @GIT_DIR_GONE), + "W: use --prune to remove them from ", + $self->{cidx_dir}, "\n"; + } + push @$dirs, @cur; + } + @$dirs = uniqstr @$dirs; +} + +# SIG handlers: +sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() } +sub shard_usr1 { $TXN_BYTES = -1 } + +sub cidx_init ($) { + my ($self) = @_; + my $dir = $self->{cidx_dir}; + unless (-d $dir) { + warn "# creating $dir\n" if !$self->{-opt}->{quiet}; + File::Path::mkpath($dir); + } + $self->lock_acquire; + my @shards; + my $l = $self->{indexlevel} //= $self->{-opt}->{indexlevel}; + + for my $n (0..($self->{nshard} - 1)) { + my $shard = bless { %$self, shard => $n }, ref($self); + delete @$shard{qw(lockfh lock_path)}; + my $xdb = $shard->idx_acquire; + if (!$n) { + if (($l // '') eq 'medium') { + $xdb->set_metadata('indexlevel', $l); + } elsif (($l // '') eq 'full') { + $xdb->set_metadata('indexlevel', ''); # unset + } + $l ||= $xdb->get_metadata('indexlevel') || 'full'; + } + $shard->{indexlevel} = $l; + $shard->idx_release; + $shard->wq_workers_start("cidx shard[$n]", 1, $SIGSET, { + siblings => \@shards, # for ipc_atfork_child + }, \&shard_done_wait, $self); + push @shards, $shard; + } + $self->{indexlevel} //= $l; + # this warning needs to happen after idx_acquire + state $once; + warn <<EOM if $PublicInbox::Search::X{CLOEXEC_UNSET} && !$once++; +W: Xapian v1.2.21..v1.2.24 were missing close-on-exec on OFD locks, +W: memory usage may be high for large indexing runs +EOM + @shards; +} + +# called when all git coderepos are done +sub gits_fini { + undef $GITS_NR; + PublicInbox::DS::enqueue_reap(); # kick @post_loop_do +} + +sub scan_git_dirs ($) { + my ($self) = @_; + @$SCANQ = () unless $self->{-opt}->{scan}; + $GITS_NR = @$SCANQ or return; + my $gits_fini = on_destroy \&gits_fini; + $_->{-cidx_gits_fini} = $gits_fini for @$SCANQ; + if (my $drs = $TODO{dump_roots_start}) { + $_->{-cidx_dump_roots_start} = $drs for @$SCANQ; + } + progress($self, "scanning $GITS_NR code repositories..."); +} + +sub prune_init { # via wq_io_do in IDX_SHARDS + my ($self) = @_; + $self->{nr_prune} = 0; + $TXN_BYTES = $BATCH_BYTES; + $self->begin_txn_lazy; +} + +sub prune_one { # via wq_io_do in IDX_SHARDS + my ($self, $term) = @_; + my @docids = $self->docids_by_postlist($term); + for (@docids) { + $TXN_BYTES -= $self->{xdb}->get_doclength($_) * 42; + $self->{xdb}->delete_document($_); + } + ++$self->{nr_prune}; + $TXN_BYTES < 0 and + cidx_ckpoint($self, "prune [$self->{shard}] $self->{nr_prune}"); +} + +sub prune_commit { # via wq_io_do in IDX_SHARDS + my ($self) = @_; + my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p'; + my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef'; + cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr; + send($prune_op_p, "prune_done $self->{shard}", 0); +} + +sub shards_active { # post_loop_do + return if $DO_QUIT; + return if grep(defined, $PRUNE_DONE, $SCANQ, $IDXQ) != 3; + return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS; + return 1 if $GITS_NR || scalar(@$IDXQ) || $REPO_CTX; + return 1 if @IBXQ || keys(%TODO); + for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) { + $s->{-cidx_quit} = 1 if defined($s->{-wq_s1}); + $s->wq_close; # may recurse via awaitpid outside of event_loop + } + scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); +} + +# signal handlers +sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) } + +sub parent_quit { + $DO_QUIT = POSIX->can("SIG$_[0]")->(); + $XHC = 0; # stops the process + kill_shards(@_); + warn "# SIG$_[0] received, quitting...\n"; +} + +sub prep_umask ($) { + my ($self) = @_; + if ($self->{-cidx_internal}) { # respect core.sharedRepository + @{$self->{git_dirs}} == 1 or die 'BUG: only for GIT_DIR'; + local $self->{git} = + PublicInbox::Git->new($self->{git_dirs}->[0]); + $self->with_umask; + } elsif (-d $self->{cidx_dir}) { # respect existing perms + my @st = stat(_); + my $um = (~$st[2] & 0777); + $self->{umask} = $um; # for SearchIdx->with_umask + umask == $um or progress($self, 'using umask from ', + $self->{cidx_dir}, ': ', + sprintf('0%03o', $um)); + on_destroy \&CORE::umask, umask($um); + } else { + $self->{umask} = umask; # for SearchIdx->with_umask + undef; + } +} + +sub prep_alternate_end ($$) { + my ($objdir, $fmt) = @_; + my $hexlen = $OFMT2HEXLEN{$fmt} // return warn <<EOM; +E: ignoring objdir=$objdir, unknown extensions.objectFormat=$fmt +EOM + unless ($ALT_FH{$hexlen}) { + require PublicInbox::Import; + my $git_dir = "$TMPDIR/hexlen$hexlen.git"; + PublicInbox::Import::init_bare($git_dir, 'cidx-all', $fmt); + open $ALT_FH{$hexlen}, '>', "$git_dir/objects/info/alternates"; + } + say { $ALT_FH{$hexlen} } $objdir; +} + +sub store_objfmt { # via wq_do - make early cidx users happy + my ($self, $docid, $git_dir, $fmt) = @_; + $self->begin_txn_lazy; + my $doc = $self->get_doc($docid) // return + warn "BUG? #$docid for $git_dir missing"; + my @p = xap_terms('P', $doc) or return + warn "BUG? #$docid for $git_dir has no P(ath)"; + @p == 1 or return warn "BUG? #$docid $git_dir multi: @p"; + $p[0] eq $git_dir or return warn "BUG? #$docid $git_dir != @p"; + $doc->add_boolean_term('H'.$fmt); + $self->{xdb}->replace_document($docid, $doc); + # wait for prune_commit to commit... +} + +# TODO: remove prep_alternate_read and store_objfmt 1-2 years after 2.0 is out +# they are for compatibility with pre-release indices +sub prep_alternate_read { # run_git cb for config extensions.objectFormat + my ($opt, $self, $git, $objdir, $docid, $shard_n, $run_prune) = @_; + return if $DO_QUIT; + my $chld_err = $?; + prep_alternate_start($self, shift(@PRUNEQ), $run_prune) if @PRUNEQ; + my $fmt = check_objfmt_status $git, $chld_err, ${$opt->{1}}; + $IDX_SHARDS[$shard_n]->wq_do('store_objfmt', # async + $docid, $git->{git_dir}, $fmt); + prep_alternate_end $objdir, $fmt; +} + +sub prep_alternate_start { + my ($self, $git, $run_prune) = @_; + local $self->{xdb}; + my ($o, $n, @ids, @fmt); +start: + $o = $git->git_path('objects'); + while (!-d $o) { + $git = shift(@PRUNEQ) // return; + $o = $git->git_path('objects'); + } + $n = git_dir_hash($git->{git_dir}) % scalar(@RDONLY_XDB); + $self->{xdb} = $RDONLY_XDB[$n] // croak("BUG: no shard[$n]"); + @ids = $self->docids_by_postlist('P'.$git->{git_dir}); + @fmt = @ids ? xap_terms('H', $self->{xdb}, $ids[0]) : (); + @fmt > 1 and warn "BUG? multi `H' for shard[$n] #$ids[0]: @fmt"; + + if (@fmt) { # cache hit + prep_alternate_end $o, $fmt[0]; + $git = shift(@PRUNEQ) and goto start; + } else { # compatibility w/ early cidx format + run_git([qw(config extensions.objectFormat)], { quiet => 1 }, + \&prep_alternate_read, $self, $git, $o, $ids[0], $n, + $run_prune); + } +} + +sub cmd_done { # run_await cb for sort, xapian-delve, sed failures + my ($pid, $cmd, undef, undef, $run_on_destroy) = @_; + $? and die "fatal: @$cmd (\$?=$?)\n"; + # $run_on_destroy calls do_join() or run_prune() +} + +sub current_join_data ($) { + my ($self) = @_; + local $self->{xdb} = $RDONLY_XDB[0] // die 'BUG: shard[0] undef'; + # we support multiple PI_CONFIG files for a cindex: + $self->join_data; +} + +# combined previously stored stats with new +sub score_old_join_data ($$$) { + my ($self, $score, $ekeys_new) = @_; + my $old = ($JOIN{reset} ? undef : current_join_data($self)) or return; + progress($self, 'merging old join data...'); + my ($ekeys_old, $roots_old, $ibx2root_old) = + @$old{qw(ekeys roots ibx2root)}; + # score: "ibx_off root_off" => nr + my $i = -1; + my %root2id_new = map { $_ => ++$i } @OFF2ROOT; + $i = -1; + my %ekey2id_new = map { $_ => ++$i } @$ekeys_new; + for my $ibx_off_old (0..$#$ibx2root_old) { + my $root_offs_old = $ibx2root_old->[$ibx_off_old]; + my $ekey = $ekeys_old->[$ibx_off_old] // do { + warn "W: no ibx #$ibx_off_old in old join data\n"; + next; + }; + my $ibx_off_new = $ekey2id_new{$ekey} // do { + warn "W: `$ekey' no longer exists\n"; + next; + }; + for (@$root_offs_old) { + my ($nr, $rid_old) = @$_; + my $root_old = $roots_old->[$rid_old] // do { + warn "W: no root #$rid_old in old data\n"; + next; + }; + my $rid_new = $root2id_new{$root_old} // do { + warn "W: root `$root_old' no longer exists\n"; + next; + }; + $score->{"$ibx_off_new $rid_new"} += $nr; + } + } +} + +sub metadata_set { # via wq_do + my ($self, $key, $val, $commit) = @_; + $self->begin_txn_lazy; + $self->{xdb}->set_metadata($key, $val); + $self->commit_txn_lazy if $commit || defined(wantarray); +} + +# runs once all inboxes and shards are dumped via OnDestroy +sub do_join { + my ($self) = @_; + return if $DO_QUIT; + $XHC = 0; # should not be recreated again + @IDX_SHARDS or return warn("# aborting on no shards\n"); + unlink("$TMPDIR/root2id"); + my @pending = keys %{$self->{PENDING}}; + die "BUG: pending=@pending jobs not done\n" if @pending; + progress($self, 'joining...'); + my @join = (@JOIN, 'to_ibx_off', 'to_root_off'); + if (my $time = which('time')) { unshift @join, $time }; + my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" }); + my %score; + while (<$rd>) { # PFX ibx_offs root_off + chop eq "\n" or die "no newline from @join: <$_>"; + my (undef, $ibx_offs, @root_offs) = split / /, $_; + for my $ibx_off (split(/,/, $ibx_offs)) { + ++$score{"$ibx_off $_"} for @root_offs; + } + } + $rd->close or die "fatal: @join failed: \$?=$?"; + my $nr = scalar(keys %score) or do { + delete $TODO{joining}; + return progress($self, 'no potential new pairings'); + }; + progress($self, "$nr potential new pairings..."); + my @ekeys = map { $_->eidx_key } @IBX; + score_old_join_data($self, \%score, \@ekeys); + my $new; + while (my ($k, $nr) = each %score) { + my ($ibx_off, $root_off) = split(/ /, $k); + my ($ekey, $root) = ($ekeys[$ibx_off], $OFF2ROOT[$root_off]); + progress($self, "$ekey => $root has $nr matches"); + push @{$new->{ibx2root}->[$ibx_off]}, [ $nr, $root_off ]; + } + for my $ary (values %$new) { # sort by nr (largest first) + for (@$ary) { @$_ = sort { $b->[0] <=> $a->[0] } @$_ } + } + $new->{ekeys} = \@ekeys; + $new->{roots} = \@OFF2ROOT; + $new->{dt} = \@JOIN_DT; + $new = compress(PublicInbox::Config::json()->encode($new)); + my $key = $self->join_data_key; + my $wait = $IDX_SHARDS[0]->wq_do('metadata_set', $key, $new); + delete $TODO{joining}; +} + +sub require_progs { + my $op = shift; + while (my ($x, $argv) = splice(@_, 0, 2)) { + my $e = $x; + $e =~ tr/a-z-/A-Z_/; + my $c = $ENV{$e} // $x; + $argv->[0] //= which($c) // die "E: `$x' required for --$op\n"; + } +} + +sub init_join_postfork ($) { + my ($self) = @_; + return unless $self->{-opt}->{join}; + require_progs('join', join => \@JOIN); + my $d2 = '([0-9]{2})'; + my $dt_re = qr!([0-9]{4})$d2$d2$d2$d2$d2!; + if (my $cur = $JOIN{reset} ? undef : current_join_data($self)) { + if (($cur->{dt}->[1] // '') =~ m!\A$dt_re\z!o) { + my ($Y, $m, $d, $H, $M, $S) = ($1, $2, $3, $4, $5, $6); + my $t = timegm($S, $M, $H, $d, $m - 1, $Y); + $t = strftime('%Y%m%d%H%M%S', gmtime($t + 1)); + $JOIN{dt} //= "$t.."; + } else { + warn <<EOM; +BUG?: previous --join invocation did not store usable `dt' key +EOM + } + } + if ($JOIN{aggressive}) { + $JOIN{window} //= -1; + $JOIN{dt} //= '..1.month.ago'; + } + $QRY_STR = $JOIN{dt} // '1.year.ago..'; + index($QRY_STR, '..') >= 0 or die "E: dt:$QRY_STR is not a range\n"; + # Account for send->apply delay (torvalds/linux.git mean is ~20 days + # from Author to CommitDate in cases where CommitDate > AuthorDate + $QRY_STR .= '1.month.ago' if $QRY_STR =~ /\.\.\z/; + @{$self->{git_dirs} // []} or die "E: no coderepos to join\n"; + @IBX or die "E: no inboxes to join\n"; + my $approx_git = PublicInbox::Git->new($self->{git_dirs}->[0]); # ugh + substr($QRY_STR, 0, 0) = 'dt:'; + $self->query_approxidate($approx_git, $QRY_STR); # in-place + ($JOIN_DT[1]) = ($QRY_STR =~ /\.\.([0-9]{14})\z/); # YYYYmmddHHMMSS + ($JOIN_DT[0]) = ($QRY_STR =~ /\Adt:([0-9]{14})/); # YYYYmmddHHMMSS + $JOIN_DT[0] //= '19700101'.'000000'; # git uses unsigned times + $TODO{do_join} = on_destroy \&do_join, $self; + $TODO{joining} = 1; # keep shards_active() happy + $TODO{dump_ibx_start} = on_destroy \&dump_ibx_start, + $self, $TODO{do_join}; + $TODO{dump_roots_start} = on_destroy \&dump_roots_start, + $self, $TODO{do_join}; + progress($self, "will join in $QRY_STR date range..."); + my $id = -1; + @IBXQ = map { ++$id } @IBX; +} + +sub init_prune ($) { + my ($self) = @_; + return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune}; + + # Dealing with millions of commits here at once, so use faster tools. + # xapian-delve is nearly an order-of-magnitude faster than Xapian Perl + # bindings. sed/awk are faster than Perl for simple stream ops, and + # sort+comm are more memory-efficient with gigantic lists. + # pipeline: delve | sed | sort >indexed_commits + my @delve = (undef, qw(-A Q -1)); + my @sed = (undef, '-ne', 's/^Q//p'); + @COMM = (undef, qw(-2 -3 indexed_commits -)); + @AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output + require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed, + comm => \@COMM, awk => \@AWK); + for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" } + my $run_prune = on_destroy \&run_prune, $self, $TODO{dump_roots_start}; + my ($sort_opt, $sed_opt, $delve_opt); + pipe(local $sed_opt->{0}, local $delve_opt->{1}); + pipe(local $sort_opt->{0}, local $sed_opt->{1}); + open($sort_opt->{1}, '+>', "$TMPDIR/indexed_commits"); + run_await([@SORT, '-u'], $CMD_ENV, $sort_opt, \&cmd_done, $run_prune); + run_await(\@sed, $CMD_ENV, $sed_opt, \&cmd_done, $run_prune); + run_await(\@delve, undef, $delve_opt, \&cmd_done, $run_prune); + @PRUNEQ = @$SCANQ; + for (1..$LIVE_JOBS) { + prep_alternate_start($self, shift(@PRUNEQ) // last, $run_prune); + } +} + +sub dump_git_commits { # run_await cb + my ($pid, $cmd, undef, $batch_opt, $self) = @_; + (defined($pid) && $?) and die "E: @$cmd \$?=$?"; + return if $DO_QUIT; + my ($hexlen) = keys(%ALT_FH) or return; # done, DESTROY batch_opt->{1} + close(delete $ALT_FH{$hexlen}); # flushes `say' buffer + progress($self, "preparing $hexlen-byte hex OID commits for prune..."); + my $g = PublicInbox::Git->new("$TMPDIR/hexlen$hexlen.git"); + run_await($g->cmd(@PRUNE_BATCH), undef, $batch_opt, + \&dump_git_commits, $self); +} + +sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done + my ($self, $drs) = @_; + return if $DO_QUIT; + # setup the following pipeline: ( + # git --git-dir=hexlen40.git cat-file \ + # --batch-all-objects --batch-check && + # git --git-dir=hexlen64.git cat-file \ + # --batch-all-objects --batch-check + # ) | awk | sort | comm | cidx_read_comm() + my ($awk_opt, $sort_opt, $batch_opt); + my $comm_opt = { -C => "$TMPDIR" }; + pipe(local $awk_opt->{0}, $batch_opt->{1}); + pipe(local $sort_opt->{0}, local $awk_opt->{1}); + pipe(local $comm_opt->{0}, local $sort_opt->{1}); + run_await(\@AWK, $CMD_ENV, $awk_opt, \&cmd_done); + run_await([@SORT, '-u'], $CMD_ENV, $sort_opt, \&cmd_done); + my $comm_rd = popen_rd(\@COMM, $CMD_ENV, $comm_opt, \&cmd_done, \@COMM); + PublicInbox::CidxComm->new($comm_rd, $self, $drs); # ->cidx_read_comm + push @PRUNE_BATCH, '--buffer' if $GIT_VER ge v2.6; + + # Yes, we pipe --unordered git output to sort(1) because sorting + # inside git leads to orders-of-magnitude slowdowns on rotational + # storage. GNU sort(1) also works well on larger-than-memory + # datasets, and it's not worth eliding sort(1) for old git. + push @PRUNE_BATCH, '--unordered' if $GIT_VER ge v2.19; + warn(sprintf(<<EOM, $GIT_VER)) if $GIT_VER lt v2.19; +W: git v2.19+ recommended for high-latency storage (have git v%vd) +EOM + dump_git_commits(undef, undef, undef, $batch_opt, $self); +} + +sub cidx_read_comm { # via PublicInbox::CidxComm::event_step + my ($self, $comm_rd, $drs) = @_; + return if $DO_QUIT; + progress($self, 'starting prune...'); + $_->wq_do('prune_init') for @IDX_SHARDS; + while (defined(my $cmt = <$comm_rd>)) { + chop($cmt) eq "\n" or die "BUG: no LF in comm output ($cmt)"; + my $n = hex(substr($cmt, 0, 8)) % scalar(@IDX_SHARDS); + $IDX_SHARDS[$n]->wq_do('prune_one', 'Q'.$cmt); + last if $DO_QUIT; + } + for my $git_dir (@GIT_DIR_GONE) { + my $n = git_dir_hash($git_dir) % scalar(@IDX_SHARDS); + $IDX_SHARDS[$n]->wq_do('prune_one', 'P'.$git_dir); + last if $DO_QUIT; + } + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{prune_done} = [ $self, $drs ]; + $_->wq_io_do('prune_commit', [ $p->{op_p} ]) for @IDX_SHARDS; +} + +sub init_join_prefork ($) { + my ($self) = @_; + my $subopt = $self->{-opt}->{join} // return; + %JOIN = map { + my ($k, $v) = split /:/, $_, 2; + $k => $v // 1; + } split(/,/, join(',', @$subopt)); + require PublicInbox::CidxXapHelperAux; + require PublicInbox::XapClient; + my @unknown; + my $pfx = $JOIN{prefixes} // 'dfpost7'; + for my $p (split /\+/, $pfx) { + my $n = ''; + $p =~ s/([0-9]+)\z// and $n = $1; + my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$p} // + push(@unknown, $p); + push(@JOIN_PFX, map { $_.$n } split(/ /, $v)); + } + @unknown and die <<EOM; +E: --join=prefixes= contains unsupported prefixes: @unknown +EOM + @JOIN_PFX = uniqstr @JOIN_PFX; + my %incl = map { + if (-f "$_/inbox.lock" || -d "$_/public-inbox") { + rel2abs_collapsed($_) => undef; + } else { + warn "W: `$_' is not a public inbox, skipping\n"; + (); + } + } (@{$self->{-opt}->{include} // []}); + my $all = $self->{-opt}->{all}; + if (my $only = $self->{-opt}->{only}) { + die <<'' if $all; +E: --all is incompatible with --only + + $incl{rel2abs_collapsed($_)} = undef for @$only; + } + unless (keys(%incl)) { + $all = 1; + warn <<EOM unless $self->{opt}->{quiet}; +# --all implied since no inboxes were specified with --only or --include +EOM + } + $self->{-opt}->{-pi_cfg}->each_inbox(\&_prep_ibx, $self, \%incl, $all); + my $nr = scalar(@IBX) or die "E: no inboxes to join with\n"; + progress($self, "will join with $nr inboxes in ", + $self->{-opt}->{-pi_cfg}->{-f}, " using: $pfx"); +} + +sub _prep_ibx { # each_inbox callback + my ($ibx, $self, $incl, $all) = @_; + ($all || exists($incl->{$ibx->{inboxdir}})) and push @IBX, $ibx; +} + +sub show_json { # for diagnostics (unstable output) + my ($self) = @_; + my $s = $self->{-opt}->{show} or return; # for diagnostics + local $self->{xdb}; + my %ret; + my @todo = @$s; + while (defined(my $f = shift @todo)) { + if ($f =~ /\A(?:roots2paths|paths2roots|join_data)\z/) { + $ret{$f} = $self->$f; + } elsif ($f eq '') { # default --show (no args) + push @todo, qw(roots2paths join_data); + } else { + warn "E: cannot show `$f'\n"; + } + } + my $json = ref(PublicInbox::Config::json())->new; + $json->utf8->canonical->pretty; # n.b. FS pathnames may not be UTF-8... + say $json->encode(\%ret); +} + +sub do_inits { # called via PublicInbox::DS::add_timer + my ($self) = @_; + grep !!$_, @{$self->{-opt}}{qw(scan prune)} and + @$SCANQ = map PublicInbox::Git->new($_), @{$self->{git_dirs}}; + init_join_postfork $self; + init_prune $self; + scan_git_dirs $self; + my $max = $TODO{do_join} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS; + index_next($self) for (1..$max); +} + +sub cidx_run { # main entry point + my ($self) = @_; + my $restore_umask = prep_umask($self); + local $SIGSET = PublicInbox::DS::block_signals( + POSIX::SIGTSTP, POSIX::SIGCONT); + my $restore = on_destroy \&PublicInbox::DS::sig_setmask, $SIGSET; + local $PRUNE_DONE = []; + local $IDXQ = []; + local $SCANQ = []; + local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNEQ, + $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV, + %TODO, @IBXQ, @IBX, @JOIN, %JOIN, @JOIN_PFX, @NO_ABBREV, + @JOIN_DT, $DUMP_IBX_WPIPE, @OFF2ROOT, $XHC, @SORT, $GITS_NR); + local $BATCH_BYTES = $self->{-opt}->{batch_size} // + $PublicInbox::SearchIdx::BATCH_BYTES; + local $MAX_SIZE = $self->{-opt}->{max_size}; + local $self->{PENDING} = {}; # used by PublicInbox::CidxXapHelperAux + my $cfg = $self->{-opt}->{-pi_cfg} // die 'BUG: -pi_cfg unset'; + $self->{-cfg_f} = $cfg->{-f} = rel2abs_collapsed($cfg->{-f}); + local $GIT_VER = PublicInbox::Git::git_version(); + @NO_ABBREV = ('-c', 'core.abbrev='.($GIT_VER lt v2.31.0 ? 40 : 'no')); + if (grep { $_ } @{$self->{-opt}}{qw(prune join)}) { + require File::Temp; + $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); + $CMD_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' }; + require_progs('(prune|join)', sort => \@SORT); + for (qw(parallel compress-program buffer-size)) { # GNU sort + my $v = $self->{-opt}->{"sort-$_"}; + push @SORT, "--$_=$v" if defined $v; + } + ($self->{-opt}->{prune} && $GIT_VER le v2.6) and + die "W: --prune requires git v2.6+\n"; + init_join_prefork($self) + } + local @IDX_SHARDS = cidx_init($self); # forks workers + local $ANY_SHARD = -1; + local $self->{current_info} = ''; + local $MY_SIG = { + CHLD => \&PublicInbox::DS::enqueue_reap, + USR1 => \&kill_shards, + }; + local @PRUNE_BATCH = @PRUNE_BATCH; + $MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT); + my $cb = $SIG{__WARN__} || \&CORE::warn; + local $SIG{__WARN__} = sub { + my $m = shift @_; + $self->{current_info} eq '' or + $m =~ s/\A(#?\s*)/$1$self->{current_info}: /; + $cb->($m, @_); + }; + load_existing($self) unless $self->{-cidx_internal}; + if ($self->{-opt}->{reindex}) { + require PublicInbox::SharedKV; + $REINDEX = PublicInbox::SharedKV->new; + delete $REINDEX->{lock_path}; + $REINDEX->dbh; + } + my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}}; + if (@nc) { + warn "E: BUG? paths in $self->{cidx_dir} not canonicalized:\n"; + for my $d (@{$self->{git_dirs}}) { + my $c = File::Spec->canonpath($_); + warn "E: $d => $c\n"; + $d = $c; + } + warn "E: canonicalized and attempting to continue\n"; + } + if (defined(my $excl = $self->{-opt}->{exclude})) { + my $re = '(?:'.join('\\z|', map { + glob2re($_) // qr/\A\Q$_\E/ + } @$excl).'\\z)'; + my @excl; + @{$self->{git_dirs}} = grep { + $_ =~ /$re/ ? (push(@excl, $_), 0) : 1; + } @{$self->{git_dirs}}; + warn("# excluding $_\n") for @excl; + @GIT_DIR_GONE = uniqstr @GIT_DIR_GONE, @excl; + } + local $NCHANGE = 0; + local $NPROC = PublicInbox::IPC::detect_nproc(); + local $LIVE_JOBS = $self->{-opt}->{jobs} || $NPROC || 2; + local @RDONLY_XDB = $self->xdb_shards_flat; + PublicInbox::DS::add_timer(0, \&do_inits, $self); + + # FreeBSD ignores/discards SIGCHLD while signals are blocked and + # EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending + PublicInbox::DS::enqueue_reap(); + + local @PublicInbox::DS::post_loop_do = (\&shards_active); + PublicInbox::DS::event_loop($MY_SIG, $SIGSET); + $self->lock_release(!!$NCHANGE); + show_json($self); +} + +sub ipc_atfork_child { # @IDX_SHARDS + my ($self) = @_; + $self->SUPER::ipc_atfork_child; + $SIG{USR1} = \&shard_usr1; + $SIG{$_} = \&shard_quit for qw(INT TERM QUIT); + my $x = delete $self->{siblings} // die 'BUG: no {siblings}'; + $_->wq_close for @$x; + undef; +} + +sub shard_done_wait { # awaitpid cb via ipc_worker_reap + my ($pid, $shard, $self) = @_; + my $quit_req = delete($shard->{-cidx_quit}); + return if $DO_QUIT; + if ($? == 0) { # success + $quit_req // warn 'BUG: {-cidx_quit} unset'; + } else { + warn "PID:$pid $shard->{shard} exited with \$?=$?\n"; + ++$self->{shard_err} if defined($self->{shard_err}); + } +} + +1; |