# Copyright (C) all contributors
# License: AGPL-3.0+
#
# indexer for git coderepos, just commits and repo paths for now
# this stores normalized absolute paths of indexed GIT_DIR inside
# the DB itself and is designed to handle forks by designating roots
# At minimum, it needs to have the pathnames of all git repos in
# memory at runtime. --join also requires all inbox pathnames to
# be in memory (as it happens when loaded from ~/.public-inbox/config).
#
# Unlike mail search, docid isn't tied to NNTP artnum or IMAP UID,
# there's no serial number dependency at all. The first 32-bits of
# the commit SHA-(1|256) is used to select a shard.
#
# We shard repos using the first 32-bits of sha256($ABS_GIT_DIR)
#
# --join associates root commits of coderepos to inboxes based on prefixes.
#
# Internally, each inbox is assigned a non-negative integer index ($IBX_OFF),
# and each root commit object ID (SHA-1/SHA-256 hex) is also assigned
# a non-negative integer index ($ROOT_COMMIT_OID_ID).
#
# join dumps to 2 intermediate files in $TMPDIR:
#
# * to_root_off - each line is of the format:
#
# $PFX @ROOT_COMMIT_OID_OFFS
#
# * to_ibx_off - each line is of the format:
#
# $PFX @IBX_OFFS
#
# $IBX_OFFS is a comma-delimited list of integers ($IBX_ID)
# The $IBX_OFF here is ephemeral (per-join_data) and NOT related to
# the `ibx_off' column of `over.sqlite3' for extindex.
# @ROOT_COMMIT_OID_OFFS is space-delimited
# In both cases, $PFX is typically the value of the 7-(hex)char dfpost
# XDFPOST but it can be configured to use any combination of patchid,
# dfpre, dfpost or dfblob.
#
# WARNING: this is vulnerable to arbitrary memory usage attacks if we
# attempt to index or join against malicious coderepos with
# thousands/millions of root commits. Most coderepos have only one
# root commit, some have several: git.git currently has 7,
# torvalds/linux.git has 4.
# --max-size= is required to keep memory usage reasonable for gigantic
# commits.
#
# See PublicInbox::CodeSearch (read-only API) for more
package PublicInbox::CodeSearchIdx;
use v5.12;
# parent order matters, we want ->DESTROY from IPC, not SearchIdx
use parent qw(PublicInbox::CodeSearch PublicInbox::IPC PublicInbox::SearchIdx);
use PublicInbox::DS qw(awaitpid);
use PublicInbox::PktOp;
use PublicInbox::IPC qw(nproc_shards);
use POSIX qw(WNOHANG SEEK_SET strftime);
use File::Path ();
use File::Spec ();
use List::Util qw(max);
use PublicInbox::SHA qw(sha256_hex sha_all);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::SearchIdx qw(add_val);
use PublicInbox::Config qw(glob2re rel2abs_collapsed);
use PublicInbox::Spawn qw(which spawn popen_rd);
use PublicInbox::OnDestroy;
use PublicInbox::CidxLogP;
use PublicInbox::CidxComm;
use PublicInbox::Git qw(%OFMT2HEXLEN);
use PublicInbox::Compat qw(uniqstr);
use PublicInbox::Aspawn qw(run_await);
use Compress::Zlib qw(compress);
use Carp qw(croak);
use Time::Local qw(timegm);
use autodie qw(close pipe open sysread seek sysseek send);
our $DO_QUIT = 15; # signal number
our (
$LIVE_JOBS, # integer
$GITS_NR, # number of coderepos
$MY_SIG, # like %SIG
$SIGSET,
$TXN_BYTES, # number of bytes in current shard transaction
$BATCH_BYTES,
@RDONLY_XDB, # Xapian::Database
@IDX_SHARDS, # clones of self
$MAX_SIZE,
$REINDEX, # PublicInbox::SharedKV
@GIT_DIR_GONE, # [ git_dir1, git_dir2 ]
$PRUNE_DONE, # marks off prune completions
$NCHANGE, # current number of changes
$NPROC,
$XHC, # XapClient
$REPO_CTX, # current repo being indexed in shards
$IDXQ, # PublicInbox::Git object arrayref
$SCANQ, # PublicInbox::Git object arrayref
%ALT_FH, # hexlen => tmp IO for TMPDIR git alternates
$TMPDIR, # File::Temp->newdir object for prune
@PRUNEQ, # GIT_DIRs to prepare for pruning
%TODO, @IBXQ, @IBX,
@JOIN, # join(1) command for --join
$CMD_ENV, # env for awk(1), comm(1), sort(1) commands during prune
@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
%JOIN, # CLI --join= suboptions
@JOIN_PFX, # any combination of XDFID, XDFPRE, XDFPOST
@JOIN_DT, # YYYYmmddHHMMSS for dt:
$QRY_STR, # common query string for both code and inbox associations
$DUMP_IBX_WPIPE, # goes to sort(1)
$ANY_SHARD, # shard round-robin for scan fingerprinting
@OFF2ROOT,
$GIT_VER,
@NO_ABBREV,
);
# stop walking history if we see >$SEEN_MAX existing commits, this assumes
# branches don't diverge by more than this number of commits...
# git walks commits quickly if it doesn't have to read trees
our $SEEN_MAX = 100000;
# window for commits/emails to determine a inbox <-> coderepo association
my $JOIN_WINDOW = 50000;
our @PRUNE_BATCH = qw(cat-file --batch-all-objects --batch-check);
# TODO: do we care about committer name + email? or tree OID?
my @FMT = qw(H P ct an ae at s b); # (b)ody must be last
# git log --stdin buffers all commits before emitting, thus --reverse
# doesn't incur extra overhead. We use --reverse to keep Xapian docids
# increasing so we may be able to avoid sorting results in some cases
my @LOG_STDIN = (qw(log --no-decorate --no-color --no-notes -p --stat -M
--reverse --stdin --no-walk=unsorted), '--pretty=format:%n%x00'.
join('%n', map { "%$_" } @FMT));
sub new {
my (undef, $dir, $opt) = @_;
my $l = $opt->{indexlevel} // 'full';
$l !~ $PublicInbox::SearchIdx::INDEXLEVELS and
die "invalid indexlevel=$l\n";
$l eq 'basic' and die "E: indexlevel=basic not supported\n";
my $self = bless {
xpfx => "$dir/cidx". PublicInbox::CodeSearch::CIDX_SCHEMA_VER,
cidx_dir => $dir,
creat => 1, # TODO: get rid of this, should be implicit
transact_bytes => 0, # for checkpoint
total_bytes => 0, # for lock_release
current_info => '',
parallel => 1,
-opt => $opt,
lock_path => "$dir/cidx.lock",
}, __PACKAGE__;
$self->{nshard} = count_shards($self) ||
nproc_shards({nproc => $opt->{jobs}});
$self->{-no_fsync} = 1 if !$opt->{fsync};
$self->{-dangerous} = 1 if $opt->{dangerous};
$self;
}
# This is similar to uniq(1) on the first column, but combines the
# contents of subsequent columns using $OFS.
our @UNIQ_FOLD = ($^X, $^W ? ('-w') : (), qw(-MList::Util=uniq -ane), <<'EOM');
BEGIN { $ofs = $ENV{OFS} // ','; $apfx = '' }
if ($F[0] eq $apfx) {
shift @F;
push @ids, @F;
} else {
print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids;
($apfx, @ids) = @F;
}
END { print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids }
EOM
# TODO: may be used for reshard/compact
sub count_shards { scalar($_[0]->xdb_shards_flat) }
sub update_commit ($$$) {
my ($self, $cmt, $roots) = @_; # fields from @FMT
my $x = 'Q'.$cmt->{H};
my ($docid, @extra) = sort { $a <=> $b } $self->docids_by_postlist($x);
@extra and warn "W: $cmt->{H} indexed multiple times, pruning ",
join(', ', map { "#$_" } @extra), "\n";
$self->{xdb}->delete_document($_) for @extra;
my $doc = $PublicInbox::Search::X{Document}->new;
$doc->add_boolean_term($x);
$doc->add_boolean_term('G'.$_) for @$roots;
$doc->add_boolean_term('XP'.$_) for split(/ /, $cmt->{P});
$doc->add_boolean_term('T'.'c');
# Author-Time is compatible with dt: for mail search schema_version=15
add_val($doc, PublicInbox::CodeSearch::AT,
POSIX::strftime('%Y%m%d%H%M%S', gmtime($cmt->{at})));
# Commit-Time is the fallback used by rt: (TS) for mail search:
add_val($doc, PublicInbox::CodeSearch::CT, $cmt->{ct});
$self->term_generator->set_document($doc);
# email address is always indexed with positional data for usability
$self->index_phrase("$cmt->{an} <$cmt->{ae}>", 1, 'A');
$x = $cmt->{'s'};
$self->index_text($x, 1, 'S') if $x =~ /\S/s;
$doc->set_data($x); # subject is the first (and currently only) line
$x = delete $cmt->{b};
$self->index_body_text($doc, \$x) if $x =~ /\S/s;
defined($docid) ? $self->{xdb}->replace_document($docid, $doc) :
$self->{xdb}->add_document($doc);
}
sub progress {
my ($self, @msg) = @_;
my $pr = $self->{-opt}->{-progress} or return;
$pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
}
sub check_objfmt_status ($$$) {
my ($git, $chld_err, $fmt) = @_;
my ($status, $sig) = ($chld_err >> 8, $chld_err & 127);
if (!$sig && $status == 1) { # unset, default is '' (SHA-1)
$fmt = 'sha1';
} elsif (!$sig && $status == 0) {
chomp($fmt ||= 'sha1');
}
$fmt // warn("git --git-dir=$git->{git_dir} config \$?=$chld_err");
$fmt;
}
sub store_repo { # wq_io_do, sends docid back
my ($self, $repo) = @_;
my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
my $git = bless $repo, 'PublicInbox::Git';
my $rd = $git->popen(qw(config extensions.objectFormat));
$self->begin_txn_lazy;
$self->{xdb}->delete_document($_) for @{$repo->{to_delete}};
my $doc = $PublicInbox::Search::X{Document}->new;
add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
$doc->add_boolean_term("P$repo->{git_dir}");
$doc->add_boolean_term('T'.'r');
$doc->add_boolean_term('G'.$_) for @{$repo->{roots}};
$doc->set_data($repo->{fp}); # \n delimited
my $fmt = readline($rd);
$rd->close;
$fmt = check_objfmt_status $git, $?, $fmt;
$OFMT2HEXLEN{$fmt} // warn <{git_dir}
EOM
$doc->add_boolean_term('H'.$fmt);
my $did = $repo->{docid};
$did ? $self->{xdb}->replace_document($did, $doc)
: ($did = $self->{xdb}->add_document($doc));
send($op_p, "repo_stored $did", 0);
}
sub cidx_ckpoint ($;$) {
my ($self, $msg) = @_;
progress($self, $msg) if defined($msg);
$TXN_BYTES = $BATCH_BYTES; # reset
return if $PublicInbox::Search::X{CLOEXEC_UNSET};
$self->commit_txn_lazy;
$self->begin_txn_lazy;
}
sub truncate_cmt ($$) {
my ($cmt) = @_; # _[1] is $buf (giant)
my ($orig_len, $len);
$len = $orig_len = length($_[1]);
@$cmt{@FMT} = split(/\n/, $_[1], scalar(@FMT));
undef $_[1];
$len -= length($cmt->{b});
# try to keep the commit message body.
# n.b. this diffstat split may be unreliable but it's not worth
# perfection for giant commits:
my ($bdy) = split(/^---\n/sm, delete($cmt->{b}), 2);
if (($len + length($bdy)) <= $MAX_SIZE) {
$len += length($bdy);
$cmt->{b} = $bdy;
warn <{H}: truncated body ($orig_len => $len bytes)
W: to be under --max-size=$MAX_SIZE
EOM
} else {
$cmt->{b} = '';
warn <{H}: deleted body ($orig_len => $len bytes)
W: to be under --max-size=$MAX_SIZE
EOM
}
$len;
}
sub cidx_reap_log { # awaitpid cb
my ($pid, $cmd, $self, $op_p) = @_;
if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
($? & 127) == POSIX::SIGPIPE))) {
send($op_p, "shard_done $self->{shard}", 0);
} else {
warn "W: @$cmd (\$?=$?)\n";
$self->{xdb}->cancel_transaction;
}
}
sub shard_index { # via wq_io_do in IDX_SHARDS
my ($self, $git, $roots) = @_;
my $in = delete($self->{0}) // die 'BUG: no {0} input';
my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
sysseek($in, 0, SEEK_SET);
my $cmd = $git->cmd(@NO_ABBREV, @LOG_STDIN);
my $rd = popen_rd($cmd, undef, { 0 => $in },
\&cidx_reap_log, $cmd, $self, $op_p);
PublicInbox::CidxLogP->new($rd, $self, $git, $roots);
# CidxLogP->event_step will call cidx_read_log_p once there's input
}
# sharded reader for `git log --pretty=format: --stdin'
sub cidx_read_log_p {
my ($self, $log_p, $rd) = @_;
my $git = delete $log_p->{git} // die 'BUG: no {git}';
local $self->{current_info} = "$git->{git_dir} [$self->{shard}]";
my $roots = delete $log_p->{roots} // die 'BUG: no {roots}';
# local-ized in parent before fork
$TXN_BYTES = $BATCH_BYTES;
local $self->{git} = $git; # for patchid
return if $DO_QUIT;
my $nr = 0;
# a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
# in linux.git, so we use $/ = "\n\0" to check end-of-patch
my $FS = "\n\0";
my $len;
my $cmt = {};
local $/ = $FS;
my $buf = <$rd> // return; # leading $FS
$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
$self->begin_txn_lazy;
while (!$DO_QUIT && defined($buf = <$rd>)) {
chomp($buf);
$/ = "\n";
$len = length($buf);
if (defined($MAX_SIZE) && $len > $MAX_SIZE) {
$len = truncate_cmt($cmt, $buf);
} else {
@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
}
if (($TXN_BYTES -= $len) <= 0) {
cidx_ckpoint($self, "[$self->{shard}] $nr");
$TXN_BYTES -= $len; # len may be huge, >TXN_BYTES;
}
update_commit($self, $cmt, $roots);
++$nr;
cidx_ckpoint($self, "[$self->{shard}] $nr") if $TXN_BYTES <= 0;
$/ = $FS;
}
# return and wait for cidx_reap_log
}
sub shard_done { # called via PktOp on shard_index completion
my ($self, $repo_ctx, $on_destroy, $n) = @_;
$repo_ctx->{shard_ok}->{$n} = 1;
}
sub repo_stored {
my ($self, $repo_ctx, $drs, $did) = @_;
# check @IDX_SHARDS instead of DO_QUIT to avoid wasting prior work
# because shard_commit is fast
return unless @IDX_SHARDS;
$did > 0 or die "BUG: $repo_ctx->{repo}->{git_dir}: docid=$did";
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{shard_done} = [ $self, $repo_ctx,
on_destroy(\&next_repos, $repo_ctx, $drs)];
# shard_done fires when all shards are committed
my @active = keys %{$repo_ctx->{active}};
$IDX_SHARDS[$_]->wq_io_do('shard_commit', [ $p->{op_p} ]) for @active;
}
sub prune_done { # called via prune_do completion
my ($self, $drs, $n) = @_;
return if $DO_QUIT || !$PRUNE_DONE;
die "BUG: \$PRUNE_DONE->[$n] already defined" if $PRUNE_DONE->[$n];
$PRUNE_DONE->[$n] = 1;
if (grep(defined, @$PRUNE_DONE) == @IDX_SHARDS) {
progress($self, 'prune done');
index_next($self); # may kick dump_roots_start
}
}
sub seen ($$) {
my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
for (1..100) {
my $ret = eval {
$xdb->postlist_begin($q) != $xdb->postlist_end($q);
};
return $ret unless $@;
if (ref($@) =~ /\bDatabaseModifiedError\b/) {
$xdb->reopen;
} else {
Carp::croak($@);
}
}
Carp::croak('too many Xapian DB modifications in progress');
}
# used to select the shard for a GIT_DIR
sub git_dir_hash ($) { hex(substr(sha256_hex($_[0]), 0, 8)) }
sub _cb { # run_await cb
my ($pid, $cmd, undef, $opt, $cb, $self, $git, @arg) = @_;
return if $DO_QUIT;
return $cb->($opt, $self, $git, @arg) if $opt->{quiet};
$? ? ($git->{-cidx_err} = warn("W: @$cmd (\$?=$?)\n")) :
$cb->($opt, $self, $git, @arg);
}
sub run_git {
my ($cmd, $opt, $cb, $self, $git, @arg) = @_;
run_await($git->cmd(@$cmd), undef, $opt, \&_cb, $cb, $self, $git, @arg)
}
# this is different from the grokmirror-compatible fingerprint since we
# only care about --heads (branches) and --tags, and not even their names
sub fp_start ($$) {
my ($self, $git) = @_;
return if $DO_QUIT;
open my $refs, '+>', undef;
$git->{-repo}->{refs} = $refs;
my ($c, $p) = PublicInbox::PktOp->pair;
my $next_on_err = on_destroy \&index_next, $self;
$c->{ops}->{fp_done} = [ $self, $git, $next_on_err ];
$IDX_SHARDS[++$ANY_SHARD % scalar(@IDX_SHARDS)]->wq_io_do('fp_async',
[ $p->{op_p}, $refs ], $git->{git_dir})
}
sub fp_async { # via wq_io_do in worker
my ($self, $git_dir) = @_;
my $op_p = delete $self->{0} // die 'BUG: no {0} op_p';
my $refs = delete $self->{1} // die 'BUG: no {1} refs';
my $git = PublicInbox::Git->new($git_dir);
run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs },
\&fp_async_done, $self, $git, $op_p);
}
sub fp_async_done { # run_git cb from worker
my ($opt, $self, $git, $op_p) = @_;
my $refs = delete $opt->{1} // 'BUG: no {-repo}->{refs}';
sysseek($refs, 0, SEEK_SET);
send($op_p, 'fp_done '.sha_all(256, $refs)->hexdigest, 0);
}
sub fp_done { # called parent via PktOp by fp_async_done
my ($self, $git, $next_on_err, $hex) = @_;
$next_on_err->cancel;
return if $DO_QUIT;
$git->{-repo}->{fp} = $hex;
my $n = git_dir_hash($git->{git_dir}) % scalar(@RDONLY_XDB);
my $shard = bless { %$self, shard => $n }, ref($self);
$git->{-repo}->{shard_n} = $n;
delete @$shard{qw(lockfh lock_path)};
local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef";
$shard->retry_reopen(\&check_existing, $self, $git);
}
sub check_existing { # retry_reopen callback
my ($shard, $self, $git) = @_;
my @docids = $shard->docids_of_git_dir($git->{git_dir});
my $docid = shift(@docids) // return prep_repo($self, $git); # new repo
my $doc = $shard->get_doc($docid) //
die "BUG: no #$docid ($git->{git_dir})";
my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data;
if ($old_fp eq $git->{-repo}->{fp}) { # no change
delete $git->{-repo};
return index_next($self);
}
$git->{-repo}->{docid} = $docid;
if (@docids) {
warn "BUG: $git->{git_dir} indexed multiple times, culling\n";
$git->{-repo}->{to_delete} = \@docids; # XXX needed?
}
prep_repo($self, $git);
}
sub partition_refs ($$$) {
my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
sysseek($refs, 0, SEEK_SET);
my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
my $seen = 0;
my @shard_in = map {
$_->reopen;
open my $fh, '+>', undef;
$fh;
} @RDONLY_XDB;
my $n0 = $NCHANGE;
while (defined(my $cmt = <$rfh>)) {
chomp $cmt;
my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_XDB);
if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) {
say { $shard_in[$n] } $cmt;
++$NCHANGE;
} elsif (seen($RDONLY_XDB[$n], 'Q'.$cmt)) {
last if ++$seen > $SEEN_MAX;
} else {
say { $shard_in[$n] } $cmt;
++$NCHANGE;
$seen = 0;
}
if ($DO_QUIT) {
$rfh->close;
return ();
}
}
$rfh->close;
return () if $DO_QUIT;
if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
my $n = $NCHANGE - $n0;
progress($self, "$git->{git_dir}: $n commits") if $n;
return @shard_in;
}
die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n";
}
sub shard_commit { # via wq_io_do
my ($self) = @_;
my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
$self->commit_txn_lazy;
send($op_p, "shard_done $self->{shard}", 0);
}
sub dump_roots_start {
my ($self, $do_join) = @_;
return if $DO_QUIT;
$XHC //= PublicInbox::XapClient::start_helper("-j$NPROC");
$do_join // die 'BUG: no $do_join';
progress($self, 'dumping IDs from coderepos');
local $self->{xdb};
@OFF2ROOT = $self->all_terms('G');
my $root2id = "$TMPDIR/root2id";
open my $fh, '>', $root2id;
my $nr = -1;
for (@OFF2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly
close $fh;
# dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_off
my ($sort_opt, $fold_opt);
pipe(local $sort_opt->{0}, my $sort_w);
pipe(local $fold_opt->{0}, local $sort_opt->{1});
my @sort = (@SORT, '-k1,1');
my $dst = "$TMPDIR/to_root_off";
open $fold_opt->{1}, '>', $dst;
my $fold_env = { %$CMD_ENV, OFS => ' ' };
run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $do_join);
run_await(\@UNIQ_FOLD, $fold_env, $fold_opt, \&cmd_done, $do_join);
my $window = $JOIN{window} // $JOIN_WINDOW;
my @m = $window <= 0 ? () : ('-m', $window);
my @arg = ((map { ('-A', $_) } @JOIN_PFX), '-c',
@m, $root2id, $QRY_STR);
for my $d ($self->shard_dirs) {
pipe(my $err_r, my $err_w);
$XHC->mkreq([$sort_w, $err_w], qw(dump_roots -d), $d, @arg);
my $desc = "dump_roots $d";
$self->{PENDING}->{$desc} = $do_join;
PublicInbox::CidxXapHelperAux->new($err_r, $self, $desc);
}
progress($self, 'waiting on dump_roots sort');
}
sub dump_ibx { # sends to xap_helper.h
my ($self, $ibx_off) = @_;
my $ibx = $IBX[$ibx_off] // die "BUG: no IBX[$ibx_off]";
my $ekey = $ibx->eidx_key;
my $srch = $ibx->isrch or return warn <xh_args,
(map { ('-A', $_) } @JOIN_PFX), $ibx_off, $QRY_STR);
pipe(my $r, my $w);
$XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
$self->{PENDING}->{$ekey} = $TODO{do_join};
PublicInbox::CidxXapHelperAux->new($r, $self, $ekey);
}
sub dump_ibx_start {
my ($self, $do_join) = @_;
return if $DO_QUIT;
$XHC //= PublicInbox::XapClient::start_helper("-j$NPROC");
my ($sort_opt, $fold_opt);
pipe(local $sort_opt->{0}, $DUMP_IBX_WPIPE);
pipe(local $fold_opt->{0}, local $sort_opt->{1});
my @sort = (@SORT, '-k1,1'); # sort only on JOIN_PFX
# pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_off
open $fold_opt->{1}, '>', "$TMPDIR/to_ibx_off";
run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $do_join);
run_await(\@UNIQ_FOLD, $CMD_ENV, $fold_opt, \&cmd_done, $do_join);
}
sub index_next ($) {
my ($self) = @_;
return if $DO_QUIT;
if ($IDXQ && @$IDXQ) {
index_repo($self, shift @$IDXQ);
} elsif ($SCANQ && @$SCANQ) {
fp_start $self, shift @$SCANQ;
} elsif ($TMPDIR) {
delete $TODO{dump_roots_start};
delete $TODO{dump_ibx_start}; # runs OnDestroy once
return dump_ibx($self, shift @IBXQ) if @IBXQ;
undef $DUMP_IBX_WPIPE; # done dumping inboxes
delete $TODO{do_join};
}
# else: wait for shards_active (post_loop_do) callback
}
sub next_repos { # OnDestroy cb
my ($repo_ctx, $drs) = @_;
my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)};
progress($self, "$repo->{git_dir}: done");
return if $DO_QUIT || !$REPO_CTX;
my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active;
die "E: $repo->{git_dir} $n shards failed" if $n;
$REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx";
$REPO_CTX = undef;
index_next($self);
}
sub index_done { # OnDestroy cb called when done indexing each code repo
my ($repo_ctx, $drs) = @_;
return if $DO_QUIT;
my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)};
# $active may be undef here, but it's fine to vivify
my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active;
die "E: $repo->{git_dir} $n shards failed" if $n;
$repo_ctx->{shard_ok} = {}; # reset for future shard_done
$n = $repo->{shard_n};
$repo_ctx->{active}->{$n} = undef; # may vivify $repo_ctx->{active}
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{repo_stored} = [ $self, $repo_ctx, $drs ];
$IDX_SHARDS[$n]->wq_io_do('store_repo', [ $p->{op_p} ], $repo);
# repo_stored will fire once store_repo is done
}
sub index_repo {
my ($self, $git) = @_;
return if $DO_QUIT;
my $repo = $git->{-repo} // die 'BUG: no {-repo}';
return index_next($self) if $git->{-cidx_err};
if (!defined($repo->{ct})) {
warn "W: $git->{git_dir} has no commits, skipping\n";
return index_next($self);
}
return push(@$IDXQ, $git) if $REPO_CTX; # busy
delete $git->{-repo};
my $roots_fh = delete $repo->{roots_fh} // die 'BUG: no {roots_fh}';
seek($roots_fh, 0, SEEK_SET);
chomp(my @roots = PublicInbox::IO::read_all $roots_fh);
if (!@roots) {
warn("E: $git->{git_dir} has no root commits\n");
return index_next($self);
}
$repo->{roots} = \@roots;
local $self->{current_info} = $git->{git_dir};
my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
$repo->{git_dir} = $git->{git_dir};
my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo };
delete $git->{-cidx_gits_fini}; # may fire gits_fini
my $drs = delete $git->{-cidx_dump_roots_start};
my $index_done = on_destroy \&index_done, $repo_ctx, $drs;
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{shard_done} = [ $self, $repo_ctx, $index_done ];
for my $n (0..$#shard_in) {
$shard_in[$n]->flush or die "flush shard[$n]: $!";
-s $shard_in[$n] or next;
last if $DO_QUIT;
$IDX_SHARDS[$n]->wq_io_do('shard_index',
[ $shard_in[$n], $p->{op_p} ],
$git, \@roots);
$repo_ctx->{active}->{$n} = undef;
}
# shard_done fires when shard_index is done
}
sub ct_fini { # run_git cb
my ($opt, $self, $git, $index_repo) = @_;
my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF
$git->{-repo}->{ct} = $ct + 0;
}
# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
sub prep_repo ($$) {
my ($self, $git) = @_;
return if $DO_QUIT;
my $index_repo = on_destroy \&index_repo, $self, $git;
my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
sysseek($refs, 0, SEEK_SET);
open my $roots_fh, '+>', undef;
$git->{-repo}->{roots_fh} = $roots_fh;
run_git([ qw(rev-list --stdin --max-parents=0) ],
{ 0 => $refs, 1 => $roots_fh }, \&PublicInbox::Config::noop,
$self, $git, $index_repo);
run_git([ qw[for-each-ref --sort=-committerdate
--format=%(committerdate:raw) --count=1
refs/heads/ refs/tags/] ], undef, # capture like qx
\&ct_fini, $self, $git, $index_repo);
}
# for PublicInbox::SearchIdx `git patch-id' call and with_umask
sub git { $_[0]->{git} }
sub load_existing ($) { # for -u/--update
my ($self) = @_;
my $dirs = $self->{git_dirs} //= [];
if ($self->{-opt}->{update} || $self->{-opt}->{prune}) {
local $self->{xdb};
$self->xdb or
die "E: $self->{cidx_dir} non-existent for --update\n";
my @cur = grep {
if (-e $_) {
1;
} else {
push @GIT_DIR_GONE, $_;
undef;
}
} $self->all_terms('P');
if (@GIT_DIR_GONE && !$self->{-opt}->{prune}) {
warn "W: the following repos no longer exist:\n",
(map { "W:\t$_\n" } @GIT_DIR_GONE),
"W: use --prune to remove them from ",
$self->{cidx_dir}, "\n";
}
push @$dirs, @cur;
}
@$dirs = uniqstr @$dirs;
}
# SIG handlers:
sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() }
sub shard_usr1 { $TXN_BYTES = -1 }
sub cidx_init ($) {
my ($self) = @_;
my $dir = $self->{cidx_dir};
unless (-d $dir) {
warn "# creating $dir\n" if !$self->{-opt}->{quiet};
File::Path::mkpath($dir);
}
$self->lock_acquire;
my @shards;
my $l = $self->{indexlevel} //= $self->{-opt}->{indexlevel};
for my $n (0..($self->{nshard} - 1)) {
my $shard = bless { %$self, shard => $n }, ref($self);
delete @$shard{qw(lockfh lock_path)};
my $xdb = $shard->idx_acquire;
if (!$n) {
if (($l // '') eq 'medium') {
$xdb->set_metadata('indexlevel', $l);
} elsif (($l // '') eq 'full') {
$xdb->set_metadata('indexlevel', ''); # unset
}
$l ||= $xdb->get_metadata('indexlevel') || 'full';
}
$shard->{indexlevel} = $l;
$shard->idx_release;
$shard->wq_workers_start("cidx shard[$n]", 1, $SIGSET, {
siblings => \@shards, # for ipc_atfork_child
}, \&shard_done_wait, $self);
push @shards, $shard;
}
$self->{indexlevel} //= $l;
# this warning needs to happen after idx_acquire
state $once;
warn <{-opt}->{scan};
$GITS_NR = @$SCANQ or return;
my $gits_fini = on_destroy \&gits_fini;
$_->{-cidx_gits_fini} = $gits_fini for @$SCANQ;
if (my $drs = $TODO{dump_roots_start}) {
$_->{-cidx_dump_roots_start} = $drs for @$SCANQ;
}
progress($self, "scanning $GITS_NR code repositories...");
}
sub prune_init { # via wq_io_do in IDX_SHARDS
my ($self) = @_;
$self->{nr_prune} = 0;
$TXN_BYTES = $BATCH_BYTES;
$self->begin_txn_lazy;
}
sub prune_one { # via wq_io_do in IDX_SHARDS
my ($self, $term) = @_;
my @docids = $self->docids_by_postlist($term);
for (@docids) {
$TXN_BYTES -= $self->{xdb}->get_doclength($_) * 42;
$self->{xdb}->delete_document($_);
}
++$self->{nr_prune};
$TXN_BYTES < 0 and
cidx_ckpoint($self, "prune [$self->{shard}] $self->{nr_prune}");
}
sub prune_commit { # via wq_io_do in IDX_SHARDS
my ($self) = @_;
my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p';
my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef';
cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr;
send($prune_op_p, "prune_done $self->{shard}", 0);
}
sub shards_active { # post_loop_do
return if $DO_QUIT;
return if grep(defined, $PRUNE_DONE, $SCANQ, $IDXQ) != 3;
return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
return 1 if $GITS_NR || scalar(@$IDXQ) || $REPO_CTX;
return 1 if @IBXQ || keys(%TODO);
for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) {
$s->{-cidx_quit} = 1 if defined($s->{-wq_s1});
$s->wq_close; # may recurse via awaitpid outside of event_loop
}
scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
}
# signal handlers
sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
sub parent_quit {
$DO_QUIT = POSIX->can("SIG$_[0]")->();
$XHC = 0; # stops the process
kill_shards(@_);
warn "# SIG$_[0] received, quitting...\n";
}
sub prep_umask ($) {
my ($self) = @_;
if ($self->{-cidx_internal}) { # respect core.sharedRepository
@{$self->{git_dirs}} == 1 or die 'BUG: only for GIT_DIR';
local $self->{git} =
PublicInbox::Git->new($self->{git_dirs}->[0]);
$self->with_umask;
} elsif (-d $self->{cidx_dir}) { # respect existing perms
my @st = stat(_);
my $um = (~$st[2] & 0777);
$self->{umask} = $um; # for SearchIdx->with_umask
umask == $um or progress($self, 'using umask from ',
$self->{cidx_dir}, ': ',
sprintf('0%03o', $um));
on_destroy \&CORE::umask, umask($um);
} else {
$self->{umask} = umask; # for SearchIdx->with_umask
undef;
}
}
sub prep_alternate_end ($$) {
my ($objdir, $fmt) = @_;
my $hexlen = $OFMT2HEXLEN{$fmt} // return warn <', "$git_dir/objects/info/alternates";
}
say { $ALT_FH{$hexlen} } $objdir;
}
sub store_objfmt { # via wq_do - make early cidx users happy
my ($self, $docid, $git_dir, $fmt) = @_;
$self->begin_txn_lazy;
my $doc = $self->get_doc($docid) // return
warn "BUG? #$docid for $git_dir missing";
my @p = xap_terms('P', $doc) or return
warn "BUG? #$docid for $git_dir has no P(ath)";
@p == 1 or return warn "BUG? #$docid $git_dir multi: @p";
$p[0] eq $git_dir or return warn "BUG? #$docid $git_dir != @p";
$doc->add_boolean_term('H'.$fmt);
$self->{xdb}->replace_document($docid, $doc);
# wait for prune_commit to commit...
}
# TODO: remove prep_alternate_read and store_objfmt 1-2 years after 2.0 is out
# they are for compatibility with pre-release indices
sub prep_alternate_read { # run_git cb for config extensions.objectFormat
my ($opt, $self, $git, $objdir, $docid, $shard_n, $run_prune) = @_;
return if $DO_QUIT;
my $chld_err = $?;
prep_alternate_start($self, shift(@PRUNEQ), $run_prune) if @PRUNEQ;
my $fmt = check_objfmt_status $git, $chld_err, ${$opt->{1}};
$IDX_SHARDS[$shard_n]->wq_do('store_objfmt', # async
$docid, $git->{git_dir}, $fmt);
prep_alternate_end $objdir, $fmt;
}
sub prep_alternate_start {
my ($self, $git, $run_prune) = @_;
local $self->{xdb};
my ($o, $n, @ids, @fmt);
start:
$o = $git->git_path('objects');
while (!-d $o) {
$git = shift(@PRUNEQ) // return;
$o = $git->git_path('objects');
}
$n = git_dir_hash($git->{git_dir}) % scalar(@RDONLY_XDB);
$self->{xdb} = $RDONLY_XDB[$n] // croak("BUG: no shard[$n]");
@ids = $self->docids_by_postlist('P'.$git->{git_dir});
@fmt = @ids ? xap_terms('H', $self->{xdb}, $ids[0]) : ();
@fmt > 1 and warn "BUG? multi `H' for shard[$n] #$ids[0]: @fmt";
if (@fmt) { # cache hit
prep_alternate_end $o, $fmt[0];
$git = shift(@PRUNEQ) and goto start;
} else { # compatibility w/ early cidx format
run_git([qw(config extensions.objectFormat)], { quiet => 1 },
\&prep_alternate_read, $self, $git, $o, $ids[0], $n,
$run_prune);
}
}
sub cmd_done { # run_await cb for sort, xapian-delve, sed failures
my ($pid, $cmd, undef, undef, $run_on_destroy) = @_;
$? and die "fatal: @$cmd (\$?=$?)\n";
# $run_on_destroy calls do_join() or run_prune()
}
sub current_join_data ($) {
my ($self) = @_;
local $self->{xdb} = $RDONLY_XDB[0] // die 'BUG: shard[0] undef';
# we support multiple PI_CONFIG files for a cindex:
$self->join_data;
}
# combined previously stored stats with new
sub score_old_join_data ($$$) {
my ($self, $score, $ekeys_new) = @_;
my $old = ($JOIN{reset} ? undef : current_join_data($self)) or return;
progress($self, 'merging old join data...');
my ($ekeys_old, $roots_old, $ibx2root_old) =
@$old{qw(ekeys roots ibx2root)};
# score: "ibx_off root_off" => nr
my $i = -1;
my %root2id_new = map { $_ => ++$i } @OFF2ROOT;
$i = -1;
my %ekey2id_new = map { $_ => ++$i } @$ekeys_new;
for my $ibx_off_old (0..$#$ibx2root_old) {
my $root_offs_old = $ibx2root_old->[$ibx_off_old];
my $ekey = $ekeys_old->[$ibx_off_old] // do {
warn "W: no ibx #$ibx_off_old in old join data\n";
next;
};
my $ibx_off_new = $ekey2id_new{$ekey} // do {
warn "W: `$ekey' no longer exists\n";
next;
};
for (@$root_offs_old) {
my ($nr, $rid_old) = @$_;
my $root_old = $roots_old->[$rid_old] // do {
warn "W: no root #$rid_old in old data\n";
next;
};
my $rid_new = $root2id_new{$root_old} // do {
warn "W: root `$root_old' no longer exists\n";
next;
};
$score->{"$ibx_off_new $rid_new"} += $nr;
}
}
}
sub metadata_set { # via wq_do
my ($self, $key, $val, $commit) = @_;
$self->begin_txn_lazy;
$self->{xdb}->set_metadata($key, $val);
$self->commit_txn_lazy if $commit || defined(wantarray);
}
# runs once all inboxes and shards are dumped via OnDestroy
sub do_join {
my ($self) = @_;
return if $DO_QUIT;
$XHC = 0; # should not be recreated again
@IDX_SHARDS or return warn("# aborting on no shards\n");
unlink("$TMPDIR/root2id");
my @pending = keys %{$self->{PENDING}};
die "BUG: pending=@pending jobs not done\n" if @pending;
progress($self, 'joining...');
my @join = (@JOIN, 'to_ibx_off', 'to_root_off');
if (my $time = which('time')) { unshift @join, $time };
my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
my %score;
while (<$rd>) { # PFX ibx_offs root_off
chop eq "\n" or die "no newline from @join: <$_>";
my (undef, $ibx_offs, @root_offs) = split / /, $_;
for my $ibx_off (split(/,/, $ibx_offs)) {
++$score{"$ibx_off $_"} for @root_offs;
}
}
$rd->close or die "fatal: @join failed: \$?=$?";
my $nr = scalar(keys %score) or do {
delete $TODO{joining};
return progress($self, 'no potential new pairings');
};
progress($self, "$nr potential new pairings...");
my @ekeys = map { $_->eidx_key } @IBX;
score_old_join_data($self, \%score, \@ekeys);
my $new;
while (my ($k, $nr) = each %score) {
my ($ibx_off, $root_off) = split(/ /, $k);
my ($ekey, $root) = ($ekeys[$ibx_off], $OFF2ROOT[$root_off]);
progress($self, "$ekey => $root has $nr matches");
push @{$new->{ibx2root}->[$ibx_off]}, [ $nr, $root_off ];
}
for my $ary (values %$new) { # sort by nr (largest first)
for (@$ary) { @$_ = sort { $b->[0] <=> $a->[0] } @$_ }
}
$new->{ekeys} = \@ekeys;
$new->{roots} = \@OFF2ROOT;
$new->{dt} = \@JOIN_DT;
$new = compress(PublicInbox::Config::json()->encode($new));
my $key = $self->join_data_key;
my $wait = $IDX_SHARDS[0]->wq_do('metadata_set', $key, $new);
delete $TODO{joining};
}
sub require_progs {
my $op = shift;
while (my ($x, $argv) = splice(@_, 0, 2)) {
my $e = $x;
$e =~ tr/a-z-/A-Z_/;
my $c = $ENV{$e} // $x;
$argv->[0] //= which($c) // die "E: `$x' required for --$op\n";
}
}
sub init_join_postfork ($) {
my ($self) = @_;
return unless $self->{-opt}->{join};
require_progs('join', join => \@JOIN);
my $d2 = '([0-9]{2})';
my $dt_re = qr!([0-9]{4})$d2$d2$d2$d2$d2!;
if (my $cur = $JOIN{reset} ? undef : current_join_data($self)) {
if (($cur->{dt}->[1] // '') =~ m!\A$dt_re\z!o) {
my ($Y, $m, $d, $H, $M, $S) = ($1, $2, $3, $4, $5, $6);
my $t = timegm($S, $M, $H, $d, $m - 1, $Y);
$t = strftime('%Y%m%d%H%M%S', gmtime($t + 1));
$JOIN{dt} //= "$t..";
} else {
warn <= 0 or die "E: dt:$QRY_STR is not a range\n";
# Account for send->apply delay (torvalds/linux.git mean is ~20 days
# from Author to CommitDate in cases where CommitDate > AuthorDate
$QRY_STR .= '1.month.ago' if $QRY_STR =~ /\.\.\z/;
@{$self->{git_dirs} // []} or die "E: no coderepos to join\n";
@IBX or die "E: no inboxes to join\n";
my $approx_git = PublicInbox::Git->new($self->{git_dirs}->[0]); # ugh
substr($QRY_STR, 0, 0) = 'dt:';
$self->query_approxidate($approx_git, $QRY_STR); # in-place
($JOIN_DT[1]) = ($QRY_STR =~ /\.\.([0-9]{14})\z/); # YYYYmmddHHMMSS
($JOIN_DT[0]) = ($QRY_STR =~ /\Adt:([0-9]{14})/); # YYYYmmddHHMMSS
$JOIN_DT[0] //= '19700101'.'000000'; # git uses unsigned times
$TODO{do_join} = on_destroy \&do_join, $self;
$TODO{joining} = 1; # keep shards_active() happy
$TODO{dump_ibx_start} = on_destroy \&dump_ibx_start,
$self, $TODO{do_join};
$TODO{dump_roots_start} = on_destroy \&dump_roots_start,
$self, $TODO{do_join};
progress($self, "will join in $QRY_STR date range...");
my $id = -1;
@IBXQ = map { ++$id } @IBX;
}
sub init_prune ($) {
my ($self) = @_;
return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune};
# Dealing with millions of commits here at once, so use faster tools.
# xapian-delve is nearly an order-of-magnitude faster than Xapian Perl
# bindings. sed/awk are faster than Perl for simple stream ops, and
# sort+comm are more memory-efficient with gigantic lists.
# pipeline: delve | sed | sort >indexed_commits
my @delve = (undef, qw(-A Q -1));
my @sed = (undef, '-ne', 's/^Q//p');
@COMM = (undef, qw(-2 -3 indexed_commits -));
@AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output
require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed,
comm => \@COMM, awk => \@AWK);
for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" }
my $run_prune = on_destroy \&run_prune, $self, $TODO{dump_roots_start};
my ($sort_opt, $sed_opt, $delve_opt);
pipe(local $sed_opt->{0}, local $delve_opt->{1});
pipe(local $sort_opt->{0}, local $sed_opt->{1});
open($sort_opt->{1}, '+>', "$TMPDIR/indexed_commits");
run_await([@SORT, '-u'], $CMD_ENV, $sort_opt, \&cmd_done, $run_prune);
run_await(\@sed, $CMD_ENV, $sed_opt, \&cmd_done, $run_prune);
run_await(\@delve, undef, $delve_opt, \&cmd_done, $run_prune);
@PRUNEQ = @$SCANQ;
for (1..$LIVE_JOBS) {
prep_alternate_start($self, shift(@PRUNEQ) // last, $run_prune);
}
}
sub dump_git_commits { # run_await cb
my ($pid, $cmd, undef, $batch_opt, $self) = @_;
(defined($pid) && $?) and die "E: @$cmd \$?=$?";
return if $DO_QUIT;
my ($hexlen) = keys(%ALT_FH) or return; # done, DESTROY batch_opt->{1}
close(delete $ALT_FH{$hexlen}); # flushes `say' buffer
progress($self, "preparing $hexlen-byte hex OID commits for prune...");
my $g = PublicInbox::Git->new("$TMPDIR/hexlen$hexlen.git");
run_await($g->cmd(@PRUNE_BATCH), undef, $batch_opt,
\&dump_git_commits, $self);
}
sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
my ($self, $drs) = @_;
return if $DO_QUIT;
# setup the following pipeline: (
# git --git-dir=hexlen40.git cat-file \
# --batch-all-objects --batch-check &&
# git --git-dir=hexlen64.git cat-file \
# --batch-all-objects --batch-check
# ) | awk | sort | comm | cidx_read_comm()
my ($awk_opt, $sort_opt, $batch_opt);
my $comm_opt = { -C => "$TMPDIR" };
pipe(local $awk_opt->{0}, $batch_opt->{1});
pipe(local $sort_opt->{0}, local $awk_opt->{1});
pipe(local $comm_opt->{0}, local $sort_opt->{1});
run_await(\@AWK, $CMD_ENV, $awk_opt, \&cmd_done);
run_await([@SORT, '-u'], $CMD_ENV, $sort_opt, \&cmd_done);
my $comm_rd = popen_rd(\@COMM, $CMD_ENV, $comm_opt, \&cmd_done, \@COMM);
PublicInbox::CidxComm->new($comm_rd, $self, $drs); # ->cidx_read_comm
push @PRUNE_BATCH, '--buffer' if $GIT_VER ge v2.6;
# Yes, we pipe --unordered git output to sort(1) because sorting
# inside git leads to orders-of-magnitude slowdowns on rotational
# storage. GNU sort(1) also works well on larger-than-memory
# datasets, and it's not worth eliding sort(1) for old git.
push @PRUNE_BATCH, '--unordered' if $GIT_VER ge v2.19;
warn(sprintf(<wq_do('prune_init') for @IDX_SHARDS;
while (defined(my $cmt = <$comm_rd>)) {
chop($cmt) eq "\n" or die "BUG: no LF in comm output ($cmt)";
my $n = hex(substr($cmt, 0, 8)) % scalar(@IDX_SHARDS);
$IDX_SHARDS[$n]->wq_do('prune_one', 'Q'.$cmt);
last if $DO_QUIT;
}
for my $git_dir (@GIT_DIR_GONE) {
my $n = git_dir_hash($git_dir) % scalar(@IDX_SHARDS);
$IDX_SHARDS[$n]->wq_do('prune_one', 'P'.$git_dir);
last if $DO_QUIT;
}
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{prune_done} = [ $self, $drs ];
$_->wq_io_do('prune_commit', [ $p->{op_p} ]) for @IDX_SHARDS;
}
sub init_join_prefork ($) {
my ($self) = @_;
my $subopt = $self->{-opt}->{join} // return;
%JOIN = map {
my ($k, $v) = split /:/, $_, 2;
$k => $v // 1;
} split(/,/, join(',', @$subopt));
require PublicInbox::CidxXapHelperAux;
require PublicInbox::XapClient;
my @unknown;
my $pfx = $JOIN{prefixes} // 'dfpost7';
for my $p (split /\+/, $pfx) {
my $n = '';
$p =~ s/([0-9]+)\z// and $n = $1;
my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$p} //
push(@unknown, $p);
push(@JOIN_PFX, map { $_.$n } split(/ /, $v));
}
@unknown and die < undef;
} else {
warn "W: `$_' is not a public inbox, skipping\n";
();
}
} (@{$self->{-opt}->{include} // []});
my $all = $self->{-opt}->{all};
if (my $only = $self->{-opt}->{only}) {
die <<'' if $all;
E: --all is incompatible with --only
$incl{rel2abs_collapsed($_)} = undef for @$only;
}
unless (keys(%incl)) {
$all = 1;
warn <{opt}->{quiet};
# --all implied since no inboxes were specified with --only or --include
EOM
}
$self->{-opt}->{-pi_cfg}->each_inbox(\&_prep_ibx, $self, \%incl, $all);
my $nr = scalar(@IBX) or die "E: no inboxes to join with\n";
progress($self, "will join with $nr inboxes in ",
$self->{-opt}->{-pi_cfg}->{-f}, " using: $pfx");
}
sub _prep_ibx { # each_inbox callback
my ($ibx, $self, $incl, $all) = @_;
($all || exists($incl->{$ibx->{inboxdir}})) and push @IBX, $ibx;
}
sub show_json { # for diagnostics (unstable output)
my ($self) = @_;
my $s = $self->{-opt}->{show} or return; # for diagnostics
local $self->{xdb};
my %ret;
my @todo = @$s;
while (defined(my $f = shift @todo)) {
if ($f =~ /\A(?:roots2paths|paths2roots|join_data)\z/) {
$ret{$f} = $self->$f;
} elsif ($f eq '') { # default --show (no args)
push @todo, qw(roots2paths join_data);
} else {
warn "E: cannot show `$f'\n";
}
}
my $json = ref(PublicInbox::Config::json())->new;
$json->utf8->canonical->pretty; # n.b. FS pathnames may not be UTF-8...
say $json->encode(\%ret);
}
sub do_inits { # called via PublicInbox::DS::add_timer
my ($self) = @_;
grep !!$_, @{$self->{-opt}}{qw(scan prune)} and
@$SCANQ = map PublicInbox::Git->new($_), @{$self->{git_dirs}};
init_join_postfork $self;
init_prune $self;
scan_git_dirs $self;
my $max = $TODO{do_join} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS;
index_next($self) for (1..$max);
}
sub cidx_run { # main entry point
my ($self) = @_;
my $restore_umask = prep_umask($self);
local $SIGSET = PublicInbox::DS::block_signals(
POSIX::SIGTSTP, POSIX::SIGCONT);
my $restore = on_destroy \&PublicInbox::DS::sig_setmask, $SIGSET;
local $PRUNE_DONE = [];
local $IDXQ = [];
local $SCANQ = [];
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNEQ,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
%TODO, @IBXQ, @IBX, @JOIN, %JOIN, @JOIN_PFX, @NO_ABBREV,
@JOIN_DT, $DUMP_IBX_WPIPE, @OFF2ROOT, $XHC, @SORT, $GITS_NR);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
local $MAX_SIZE = $self->{-opt}->{max_size};
local $self->{PENDING} = {}; # used by PublicInbox::CidxXapHelperAux
my $cfg = $self->{-opt}->{-pi_cfg} // die 'BUG: -pi_cfg unset';
$self->{-cfg_f} = $cfg->{-f} = rel2abs_collapsed($cfg->{-f});
local $GIT_VER = PublicInbox::Git::git_version();
@NO_ABBREV = ('-c', 'core.abbrev='.($GIT_VER lt v2.31.0 ? 40 : 'no'));
if (grep { $_ } @{$self->{-opt}}{qw(prune join)}) {
require File::Temp;
$TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
$CMD_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
require_progs('(prune|join)', sort => \@SORT);
for (qw(parallel compress-program buffer-size)) { # GNU sort
my $v = $self->{-opt}->{"sort-$_"};
push @SORT, "--$_=$v" if defined $v;
}
($self->{-opt}->{prune} && $GIT_VER le v2.6) and
die "W: --prune requires git v2.6+\n";
init_join_prefork($self)
}
local @IDX_SHARDS = cidx_init($self); # forks workers
local $ANY_SHARD = -1;
local $self->{current_info} = '';
local $MY_SIG = {
CHLD => \&PublicInbox::DS::enqueue_reap,
USR1 => \&kill_shards,
};
local @PRUNE_BATCH = @PRUNE_BATCH;
$MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT);
my $cb = $SIG{__WARN__} || \&CORE::warn;
local $SIG{__WARN__} = sub {
my $m = shift @_;
$self->{current_info} eq '' or
$m =~ s/\A(#?\s*)/$1$self->{current_info}: /;
$cb->($m, @_);
};
load_existing($self) unless $self->{-cidx_internal};
if ($self->{-opt}->{reindex}) {
require PublicInbox::SharedKV;
$REINDEX = PublicInbox::SharedKV->new;
delete $REINDEX->{lock_path};
$REINDEX->dbh;
}
my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}};
if (@nc) {
warn "E: BUG? paths in $self->{cidx_dir} not canonicalized:\n";
for my $d (@{$self->{git_dirs}}) {
my $c = File::Spec->canonpath($_);
warn "E: $d => $c\n";
$d = $c;
}
warn "E: canonicalized and attempting to continue\n";
}
if (defined(my $excl = $self->{-opt}->{exclude})) {
my $re = '(?:'.join('\\z|', map {
glob2re($_) // qr/\A\Q$_\E/
} @$excl).'\\z)';
my @excl;
@{$self->{git_dirs}} = grep {
$_ =~ /$re/ ? (push(@excl, $_), 0) : 1;
} @{$self->{git_dirs}};
warn("# excluding $_\n") for @excl;
@GIT_DIR_GONE = uniqstr @GIT_DIR_GONE, @excl;
}
local $NCHANGE = 0;
local $NPROC = PublicInbox::IPC::detect_nproc();
local $LIVE_JOBS = $self->{-opt}->{jobs} || $NPROC || 2;
local @RDONLY_XDB = $self->xdb_shards_flat;
PublicInbox::DS::add_timer(0, \&do_inits, $self);
# FreeBSD ignores/discards SIGCHLD while signals are blocked and
# EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
PublicInbox::DS::enqueue_reap();
local @PublicInbox::DS::post_loop_do = (\&shards_active);
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
$self->lock_release(!!$NCHANGE);
show_json($self);
}
sub ipc_atfork_child { # @IDX_SHARDS
my ($self) = @_;
$self->SUPER::ipc_atfork_child;
$SIG{USR1} = \&shard_usr1;
$SIG{$_} = \&shard_quit for qw(INT TERM QUIT);
my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
$_->wq_close for @$x;
undef;
}
sub shard_done_wait { # awaitpid cb via ipc_worker_reap
my ($pid, $shard, $self) = @_;
my $quit_req = delete($shard->{-cidx_quit});
return if $DO_QUIT;
if ($? == 0) { # success
$quit_req // warn 'BUG: {-cidx_quit} unset';
} else {
warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
++$self->{shard_err} if defined($self->{shard_err});
}
}
1;