about summary refs log tree commit homepage
path: root/lib/PublicInbox/CodeSearchIdx.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/CodeSearchIdx.pm')
-rw-r--r--lib/PublicInbox/CodeSearchIdx.pm846
1 files changed, 846 insertions, 0 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
new file mode 100644
index 00000000..e353f452
--- /dev/null
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -0,0 +1,846 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# indexer for git coderepos, just commits and repo paths for now
+# this stores normalized absolute paths of indexed GIT_DIR inside
+# the DB itself and is designed to handle forks by designating roots
+#
+# Unlike mail search, docid isn't tied to NNTP artnum or IMAP UID,
+# there's no serial number dependency at all.  The first 32-bits of
+# the commit SHA-(1|256) is used to select a shard.
+#
+# We shard repos using the first 32-bits of sha256($ABS_GIT_DIR)
+#
+# See PublicInbox::CodeSearch (read-only API) for more
+package PublicInbox::CodeSearchIdx;
+use v5.12;
+# parent order matters, we want ->DESTROY from IPC, not SearchIdx
+use parent qw(PublicInbox::CodeSearch PublicInbox::IPC PublicInbox::SearchIdx);
+use PublicInbox::Eml;
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::PktOp;
+use PublicInbox::IPC qw(nproc_shards);
+use PublicInbox::Admin;
+use POSIX qw(WNOHANG SEEK_SET);
+use File::Path ();
+use File::Spec ();
+use PublicInbox::SHA qw(sha256_hex);
+use PublicInbox::Search qw(xap_terms);
+use PublicInbox::SearchIdx qw(add_val);
+use PublicInbox::Config qw(glob2re);
+use PublicInbox::Spawn qw(spawn popen_rd);
+use PublicInbox::OnDestroy;
+use Socket qw(MSG_EOR);
+use Carp ();
+our (
+        $LIVE, # pid => cmd
+        $DEFER, # [ [ cb, @args ], ... ]
+        $LIVE_JOBS, # integer
+        $MY_SIG, # like %SIG
+        $SIGSET,
+        $TXN_BYTES, # number of bytes in current shard transaction
+        $DO_QUIT, # signal number
+        @RDONLY_SHARDS, # Xapian::Database
+        @IDX_SHARDS, # clones of self
+        $MAX_SIZE,
+        $TMP_GIT, # PublicInbox::Git object for --prune
+        $REINDEX, # PublicInbox::SharedKV
+);
+
+# stop walking history if we see >$SEEN_MAX existing commits, this assumes
+# branches don't diverge by more than this number of commits...
+# git walks commits quickly if it doesn't have to read trees
+our $SEEN_MAX = 100000;
+
+# TODO: do we care about committer name + email? or tree OID?
+my @FMT = qw(H P ct an ae at s b); # (b)ody must be last
+
+# git log --stdin buffers all commits before emitting, thus --reverse
+# doesn't incur extra overhead.  We use --reverse to keep Xapian docids
+# increasing so we may be able to avoid sorting results in some cases
+my @LOG_STDIN = (qw(log --no-decorate --no-color --no-notes -p --stat -M
+        --reverse --stdin --no-walk=unsorted), '--pretty=format:%n%x00'.
+        join('%n', map { "%$_" } @FMT));
+
+sub new {
+        my (undef, $dir, $opt) = @_;
+        my $l = $opt->{indexlevel} // 'full';
+        $l !~ $PublicInbox::SearchIdx::INDEXLEVELS and
+                die "invalid indexlevel=$l\n";
+        $l eq 'basic' and die "E: indexlevel=basic not supported\n";
+        my $self = bless {
+                xpfx => "$dir/cidx".  PublicInbox::CodeSearch::CIDX_SCHEMA_VER,
+                cidx_dir => $dir,
+                creat => 1, # TODO: get rid of this, should be implicit
+                indexlevel => $l,
+                transact_bytes => 0, # for checkpoint
+                total_bytes => 0, # for lock_release
+                current_info => '',
+                parallel => 1,
+                -opt => $opt,
+                lock_path => "$dir/cidx.lock",
+        }, __PACKAGE__;
+        $self->{nshard} = count_shards($self) ||
+                nproc_shards({nproc => $opt->{jobs}});
+        $self->{-no_fsync} = 1 if !$opt->{fsync};
+        $self->{-dangerous} = 1 if $opt->{dangerous};
+        $self;
+}
+
+# TODO: may be used for reshard/compact
+sub count_shards { scalar($_[0]->xdb_shards_flat) }
+
+sub update_commit ($$) {
+        my ($self, $cmt) = @_; # fields from @FMT
+        my $x = 'Q'.$cmt->{H};
+        my ($docid, @extra) = sort { $a <=> $b } docids_by_postlist($self, $x);
+        @extra and warn "W: $cmt->{H} indexed multiple times, pruning ",
+                        join(', ', map { "#$_" } @extra), "\n";
+        $self->{xdb}->delete_document($_) for @extra;
+        my $doc = $PublicInbox::Search::X{Document}->new;
+        $doc->add_boolean_term($x);
+        $doc->add_boolean_term('G'.$_) for @{$self->{roots}};
+        $doc->add_boolean_term('XP'.$_) for split(/ /, $cmt->{P});
+        $doc->add_boolean_term('T'.'c');
+
+        # Author-Time is compatible with dt: for mail search schema_version=15
+        add_val($doc, PublicInbox::CodeSearch::AT,
+                POSIX::strftime('%Y%m%d%H%M%S', gmtime($cmt->{at})));
+
+        # Commit-Time is the fallback used by rt: (TS) for mail search:
+        add_val($doc, PublicInbox::CodeSearch::CT, $cmt->{ct});
+
+        $self->term_generator->set_document($doc);
+
+        # email address is always indexed with positional data for usability
+        $self->index_phrase("$cmt->{an} <$cmt->{ae}>", 1, 'A');
+
+        $x = $cmt->{'s'};
+        $self->index_text($x, 1, 'S') if $x =~ /\S/s;
+        $doc->set_data($x); # subject is the first (and currently only) line
+
+        $x = delete $cmt->{b};
+        $self->index_body_text($doc, \$x) if $x =~ /\S/s;
+        defined($docid) ? $self->{xdb}->replace_document($docid, $doc) :
+                        $self->{xdb}->add_document($doc);
+}
+
+sub progress {
+        my ($self, @msg) = @_;
+        my $pr = $self->{-opt}->{-progress} or return;
+        $pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
+}
+
+sub store_repo { # wq_do - returns docid
+        my ($self, $repo) = @_;
+        $self->begin_txn_lazy;
+        my $xdb = $self->{xdb};
+        for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
+        if (defined $repo->{docid}) {
+                my $doc = $xdb->get_document($repo->{docid}) //
+                        die "$repo->{git_dir} doc #$repo->{docid} gone";
+                add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
+                my %new = map { $_ => undef } @{$repo->{roots}};
+                my $old = xap_terms('G', $doc);
+                delete @new{keys %$old};
+                $doc->add_boolean_term('G'.$_) for keys %new;
+                delete @$old{@{$repo->{roots}}};
+                $doc->remove_term('G'.$_) for keys %$old;
+                $doc->set_data($repo->{fp});
+                $xdb->replace_document($repo->{docid}, $doc);
+                $repo->{docid}
+        } else {
+                my $new = $PublicInbox::Search::X{Document}->new;
+                add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct});
+                $new->add_boolean_term("P$repo->{git_dir}");
+                $new->add_boolean_term('T'.'r');
+                $new->add_boolean_term('G'.$_) for @{$repo->{roots}};
+                $new->set_data($repo->{fp}); # \n delimited
+                $xdb->add_document($new);
+        }
+}
+
+sub cidx_ckpoint ($$) {
+        my ($self, $msg) = @_;
+        progress($self, $msg);
+        return if $PublicInbox::Search::X{CLOEXEC_UNSET};
+        $self->{xdb}->commit_transaction;
+        $self->{xdb}->begin_transaction;
+}
+
+sub truncate_cmt ($$) {
+        my ($cmt) = @_; # _[1] is $buf (giant)
+        my ($orig_len, $len);
+        $len = $orig_len = length($_[1]);
+        @$cmt{@FMT} = split(/\n/, $_[1], scalar(@FMT));
+        undef $_[1];
+        $len -= length($cmt->{b});
+
+        # try to keep the commit message body.
+        # n.b. this diffstat split may be unreliable but it's not worth
+        # perfection for giant commits:
+        my ($bdy) = split(/^---\n/sm, delete($cmt->{b}), 2);
+        if (($len + length($bdy)) <= $MAX_SIZE) {
+                $len += length($bdy);
+                $cmt->{b} = $bdy;
+                warn <<EOM;
+W: $cmt->{H}: truncated body ($orig_len => $len bytes)
+W: to be under --max-size=$MAX_SIZE
+EOM
+        } else {
+                $cmt->{b} = '';
+                warn <<EOM;
+W: $cmt->{H}: deleted body ($orig_len => $len bytes)
+W: to be under --max-size=$MAX_SIZE
+EOM
+        }
+        $len;
+}
+
+# sharded reader for `git log --pretty=format: --stdin'
+sub shard_index { # via wq_io_do
+        my ($self, $git, $n, $roots) = @_;
+        local $self->{current_info} = "$git->{git_dir} [$n]";
+        local $self->{roots} = $roots;
+        my $in = delete($self->{0}) // die 'BUG: no {0} input';
+        my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
+        my $batch_bytes = $self->{-opt}->{batch_size} //
+                                $PublicInbox::SearchIdx::BATCH_BYTES;
+        local $MAX_SIZE = $self->{-opt}->{max_size};
+        # local-ized in parent before fork
+        $TXN_BYTES = $batch_bytes;
+        local $self->{git} = $git; # for patchid
+        return if $DO_QUIT;
+        my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+        close $in or die "close: $!";
+        my $nr = 0;
+
+        # a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
+        # in linux.git, so we use $/ = "\n\0" to check end-of-patch
+        my $FS = "\n\0";
+        my $len;
+        my $cmt = {};
+        local $/ = $FS;
+        my $buf = <$rd> // return close($rd); # leading $FS
+        $buf eq $FS or die "BUG: not LF-NUL: $buf\n";
+        $self->begin_txn_lazy;
+        while (!$DO_QUIT && defined($buf = <$rd>)) {
+                chomp($buf);
+                $/ = "\n";
+                $len = length($buf);
+                if (defined($MAX_SIZE) && $len > $MAX_SIZE) {
+                        $len = truncate_cmt($cmt, $buf);
+                } else {
+                        @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
+                }
+                $TXN_BYTES -= $len;
+                if ($TXN_BYTES <= 0) {
+                        cidx_ckpoint($self, "[$n] $nr");
+                        $TXN_BYTES = $batch_bytes - $len;
+                }
+                update_commit($self, $cmt);
+                ++$nr;
+                if ($TXN_BYTES <= 0) {
+                        cidx_ckpoint($self, "[$n] $nr");
+                        $TXN_BYTES = $batch_bytes;
+                }
+                $/ = $FS;
+        }
+        close($rd);
+        if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+                                ($? & 127) == POSIX::SIGPIPE))) {
+                send($op_p, "shard_done $n", MSG_EOR);
+        } else {
+                warn "E: git @LOG_STDIN: \$?=$?\n";
+                $self->{xdb}->cancel_transaction;
+        }
+}
+
+sub shard_done { # called via PktOp on shard_index completion
+        my ($self, $n) = @_;
+        $self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok});
+}
+
+sub seen ($$) {
+        my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
+        for (1..100) {
+                my $ret = eval {
+                        $xdb->postlist_begin($q) != $xdb->postlist_end($q);
+                };
+                return $ret unless $@;
+                if (ref($@) =~ /\bDatabaseModifiedError\b/) {
+                        $xdb->reopen;
+                } else {
+                        Carp::croak($@);
+                }
+        }
+        Carp::croak('too many Xapian DB modifications in progress');
+}
+
+# used to select the shard for a GIT_DIR
+sub git_dir_hash ($) { hex(substr(sha256_hex($_[0]), 0, 8)) }
+
+sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
+        my ($self, $q) = @_;
+        my $cur = $self->{xdb}->postlist_begin($q);
+        my $end = $self->{xdb}->postlist_end($q);
+        my @ids;
+        for (; $cur != $end; $cur++) { push(@ids, $cur->get_docid) };
+        @ids;
+}
+
+sub run_todo ($) {
+        my ($self) = @_;
+        my $n;
+        while (defined(my $x = shift(@{$self->{todo} // []}))) {
+                my $cb = shift @$x;
+                $cb->(@$x);
+                ++$n;
+        }
+        $n;
+}
+
+sub need_reap { # post_loop_do
+        my (undef, $jobs) = @_;
+        return if !$LIVE || $DO_QUIT;
+        scalar(keys(%$LIVE)) > $jobs;
+}
+
+sub cidx_reap ($$) {
+        my ($self, $jobs) = @_;
+        while (run_todo($self)) {}
+        local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs);
+        while (need_reap(undef, $jobs)) {
+                PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+        }
+        while (!$jobs && run_todo($self)) {}
+}
+
+sub cidx_await_cb { # awaitpid cb
+        my ($pid, $cb, $self, $git, @args) = @_;
+        return if !$LIVE || $DO_QUIT;
+        my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
+        PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
+        if ($?) {
+                $git->{-cidx_err} = 1;
+                return warn("@$cmd error: \$?=$?\n");
+        }
+        push(@$DEFER, [ $cb, $self, $git, @args ]) if $DEFER;
+}
+
+sub cidx_await ($$$$$@) {
+        my ($pid, $cmd, $cb, $self, $git, @args) = @_;
+        $LIVE->{$pid} = $cmd;
+        awaitpid($pid, \&cidx_await_cb, $cb, $self, $git, @args);
+}
+
+# this is different from the grokmirror-compatible fingerprint since we
+# only care about --heads (branches) and --tags, and not even their names
+sub fp_start ($$$) {
+        my ($self, $git, $prep_repo) = @_;
+        return if !$LIVE || $DO_QUIT;
+        cidx_reap($self, $LIVE_JOBS);
+        open my $refs, '+>', undef or die "open: $!";
+        my $cmd = ['git', "--git-dir=$git->{git_dir}",
+                qw(show-ref --heads --tags --hash)];
+        my $pid = spawn($cmd, undef, { 1 => $refs });
+        $git->{-repo}->{refs} = $refs;
+        cidx_await($pid, $cmd, \&fp_fini, $self, $git, $prep_repo);
+}
+
+sub fp_fini { # cidx_await cb
+        my ($self, $git, $prep_repo) = @_;
+        my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
+        seek($refs, 0, SEEK_SET) or die "seek: $!";
+        my $buf;
+        my $dig = PublicInbox::SHA->new(256);
+        while (read($refs, $buf, 65536)) { $dig->add($buf) }
+        $git->{-repo}->{fp} = $dig->hexdigest;
+}
+
+sub ct_start ($$$) {
+        my ($self, $git, $prep_repo) = @_;
+        return if !$LIVE || $DO_QUIT;
+        cidx_reap($self, $LIVE_JOBS);
+        my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
+                qw[for-each-ref --sort=-committerdate
+                --format=%(committerdate:raw) --count=1
+                refs/heads/ refs/tags/] ];
+        my ($rd, $pid) = popen_rd($cmd);
+        cidx_await($pid, $cmd, \&ct_fini, $self, $git, $rd, $prep_repo);
+}
+
+sub ct_fini { # cidx_await cb
+        my ($self, $git, $rd, $prep_repo) = @_;
+        defined(my $ct = <$rd>) or return;
+        $ct =~ s/\s+.*\z//s; # drop TZ + LF
+        $git->{-repo}->{ct} = $ct + 0;
+}
+
+# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
+sub prep_repo ($$) {
+        my ($self, $git) = @_;
+        return if !$LIVE || $DO_QUIT || $git->{-cidx_err};
+        my $repo = $git->{-repo} // die 'BUG: no {-repo}';
+        if (!defined($repo->{ct})) {
+                warn "W: $git->{git_dir} has no commits, skipping\n";
+                delete $git->{-repo};
+                return;
+        }
+        my $n = git_dir_hash($git->{git_dir}) % $self->{nshard};
+        my $shard = bless { %$self, shard => $n }, ref($self);
+        $repo->{shard_n} = $n;
+        delete @$shard{qw(lockfh lock_path)};
+        local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef";
+        $shard->retry_reopen(\&check_existing, $self, $git);
+}
+
+sub check_existing { # retry_reopen callback
+        my ($shard, $self, $git) = @_;
+        my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir});
+        my $docid = shift(@docids) // return get_roots($self, $git);
+        my $doc = $shard->{xdb}->get_document($docid) //
+                        die "BUG: no #$docid ($git->{git_dir})";
+        my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data;
+        if ($old_fp eq $git->{-repo}->{fp}) { # no change
+                delete $git->{-repo};
+                return;
+        }
+        $git->{-repo}->{docid} = $docid;
+        if (@docids) {
+                warn "BUG: $git->{git_dir} indexed multiple times, culling\n";
+                $git->{-repo}->{to_delete} = \@docids; # XXX needed?
+        }
+        get_roots($self, $git);
+}
+
+sub partition_refs ($$$) {
+        my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
+        sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
+        my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
+        close $refs or die "close: $!";
+        my ($seen, $nchange) = (0, 0);
+        my @shard_in = map {
+                $_->reopen;
+                open my $fh, '+>', undef or die "open: $!";
+                $fh;
+        } @RDONLY_SHARDS;
+
+        while (defined(my $cmt = <$rfh>)) {
+                chomp $cmt;
+                my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
+                if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) {
+                        say { $shard_in[$n] } $cmt or die "say: $!";
+                        ++$nchange;
+                } elsif (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
+                        last if ++$seen > $SEEN_MAX;
+                } else {
+                        say { $shard_in[$n] } $cmt or die "say: $!";
+                        ++$nchange;
+                        $seen = 0;
+                }
+                if ($DO_QUIT) {
+                        close($rfh);
+                        return ();
+                }
+        }
+        close($rfh);
+        return () if $DO_QUIT;
+        if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
+                $self->{nchange} += $nchange;
+                progress($self, "$git->{git_dir}: $nchange commits");
+                for my $fh (@shard_in) {
+                        $fh->flush or die "flush: $!";
+                        sysseek($fh, 0, SEEK_SET) or die "seek: $!";
+                }
+                return @shard_in;
+        }
+        die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n";
+}
+
+sub shard_commit { # via wq_io_do
+        my ($self, $n) = @_;
+        my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
+        $self->commit_txn_lazy;
+        send($op_p, "shard_done $n", MSG_EOR);
+}
+
+sub consumers_open { # post_loop_do
+        my (undef, $consumers) = @_;
+        return if $DO_QUIT;
+        scalar(grep { $_->{sock} } values %$consumers);
+}
+
+sub wait_consumers ($$$) {
+        my ($self, $git, $consumers) = @_;
+        local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+        PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+        my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+        die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
+}
+
+sub commit_used_shards ($$$) {
+        my ($self, $git, $consumers) = @_;
+        local $self->{-shard_ok} = {};
+        for my $n (keys %$consumers) {
+                my ($c, $p) = PublicInbox::PktOp->pair;
+                $c->{ops}->{shard_done} = [ $self ];
+                $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
+                $consumers->{$n} = $c;
+        }
+        wait_consumers($self, $git, $consumers);
+}
+
+sub index_repo { # cidx_await cb
+        my ($self, $git, $roots) = @_;
+        return if $git->{-cidx_err} || $DO_QUIT;
+        my $repo = delete $git->{-repo} or return;
+        seek($roots, 0, SEEK_SET) or die "seek: $!";
+        chomp(my @roots = <$roots>);
+        close($roots) or die "close: $!";
+        @roots or return warn("E: $git->{git_dir} has no root commits\n");
+        $repo->{roots} = \@roots;
+        local $self->{current_info} = $git->{git_dir};
+        my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
+        local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
+        my $consumers = {};
+        for my $n (0..$#shard_in) {
+                -s $shard_in[$n] or next;
+                last if $DO_QUIT;
+                my ($c, $p) = PublicInbox::PktOp->pair;
+                $c->{ops}->{shard_done} = [ $self ];
+                $IDX_SHARDS[$n]->wq_io_do('shard_index',
+                                        [ $shard_in[$n], $p->{op_p} ],
+                                        $git, $n, \@roots);
+                $consumers->{$n} = $c;
+        }
+        @shard_in = ();
+        wait_consumers($self, $git, $consumers);
+        if ($DO_QUIT) {
+                commit_used_shards($self, $git, $consumers);
+                progress($self, "$git->{git_dir}: done");
+                return;
+        }
+        $repo->{git_dir} = $git->{git_dir};
+        my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
+        if ($id > 0) {
+                $consumers->{$repo->{shard_n}} = undef;
+                commit_used_shards($self, $git, $consumers);
+                progress($self, "$git->{git_dir}: done");
+                return run_todo($self);
+        }
+        die "E: store_repo $git->{git_dir}: id=$id";
+}
+
+sub get_roots ($$) {
+        my ($self, $git) = @_;
+        return if !$LIVE || $DO_QUIT;
+        my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
+        sysseek($refs, 0, SEEK_SET) or die "seek: $!";
+        open my $roots, '+>', undef or die "open: $!";
+        my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
+                        qw(rev-list --stdin --max-parents=0) ];
+        my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots });
+        cidx_await($pid, $cmd, \&index_repo, $self, $git, $roots);
+}
+
+# for PublicInbox::SearchIdx::patch_id and with_umask
+sub git { $_[0]->{git} }
+
+sub load_existing ($) { # for -u/--update
+        my ($self) = @_;
+        my $dirs = $self->{git_dirs} // [];
+        if ($self->{-opt}->{update} || $self->{-opt}->{prune}) {
+                local $self->{xdb};
+                $self->xdb or
+                        die "E: $self->{cidx_dir} non-existent for --update\n";
+                my @missing;
+                my @cur = grep {
+                        if (-e $_) {
+                                1;
+                        } else {
+                                push @missing, $_;
+                                undef;
+                        }
+                } $self->all_terms('P');
+                @missing = () if $self->{-opt}->{prune};
+                @missing and warn "W: the following repos no longer exist:\n",
+                                (map { "W:\t$_\n" } @missing),
+                                "W: use --prune to remove them from ",
+                                $self->{cidx_dir}, "\n";
+                push @$dirs, @cur;
+        }
+        my %uniq; # List::Util::uniq requires Perl 5.26+
+        @$dirs = grep { !$uniq{$_}++ } @$dirs;
+}
+
+# SIG handlers:
+sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() }
+sub shard_usr1 { $TXN_BYTES = -1 }
+
+sub cidx_init ($) {
+        my ($self) = @_;
+        my $dir = $self->{cidx_dir};
+        unless (-d $dir) {
+                warn "# creating $dir\n" if !$self->{-opt}->{quiet};
+                File::Path::mkpath($dir);
+        }
+        $self->lock_acquire;
+        my @shards;
+        local $TXN_BYTES;
+        for my $n (0..($self->{nshard} - 1)) {
+                my $shard = bless { %$self, shard => $n }, ref($self);
+                delete @$shard{qw(lockfh lock_path)};
+                $shard->idx_acquire;
+                $shard->idx_release;
+                $shard->wq_workers_start("shard[$n]", 1, $SIGSET, {
+                        siblings => \@shards, # for ipc_atfork_child
+                }, \&shard_done_wait, $self);
+                push @shards, $shard;
+        }
+        # this warning needs to happen after idx_acquire
+        state $once;
+        warn <<EOM if $PublicInbox::Search::X{CLOEXEC_UNSET} && !$once++;
+W: Xapian v1.2.21..v1.2.24 were missing close-on-exec on OFD locks,
+W: memory usage may be high for large indexing runs
+EOM
+        @shards;
+}
+
+sub scan_git_dirs ($) {
+        my ($self) = @_;
+        for (@{$self->{git_dirs}}) {
+                my $git = PublicInbox::Git->new($_);
+                my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
+                                                        $self, $git);
+                fp_start($self, $git, $prep_repo);
+                ct_start($self, $git, $prep_repo);
+                last if $DO_QUIT;
+        }
+        cidx_reap($self, 0);
+}
+
+sub prune_cb { # git->check_async callback
+        my ($hex, $type, undef, $self_id) = @_;
+        return if $type eq 'commit';
+        my ($self, $id) = @$self_id;
+        my $len = $self->{xdb}->get_doclength($id);
+        progress($self, "$hex $type (doclength=$len)");
+        ++$self->{pruned};
+        $self->{xdb}->delete_document($id);
+
+        # all math around batch_bytes calculation is pretty fuzzy,
+        # but need a way to regularly flush output to avoid OOM,
+        # so assume the average term + position overhead is the
+        # answer to everything: 42
+        return if ($self->{batch_bytes} -= ($len * 42)) > 0;
+        cidx_ckpoint($self, "[$self->{shard}] $self->{pruned}");
+        $self->{batch_bytes} = $self->{-opt}->{batch_size} //
+                        $PublicInbox::SearchIdx::BATCH_BYTES;
+}
+
+sub shard_prune { # via wq_io_do
+        my ($self, $n, $git_dir) = @_;
+        my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
+        my $git = PublicInbox::Git->new($git_dir); # TMP_GIT copy
+        $self->begin_txn_lazy;
+        my $xdb = $self->{xdb};
+        my $cur = $xdb->postlist_begin('Tc');
+        my $end = $xdb->postlist_end('Tc');
+        my ($id, @cmt, $oid);
+        local $self->{batch_bytes} = $self->{-opt}->{batch_size} //
+                                $PublicInbox::SearchIdx::BATCH_BYTES;
+        local $self->{pruned} = 0;
+        for (; $cur != $end && !$DO_QUIT; $cur++) {
+                @cmt = xap_terms('Q', $xdb, $id = $cur->get_docid);
+                scalar(@cmt) == 1 or
+                        warn "BUG? shard[$n] #$id has multiple commits: @cmt";
+                for $oid (@cmt) {
+                        $git->check_async($oid, \&prune_cb, [ $self, $id ]);
+                }
+        }
+        $git->async_wait_all;
+        for my $d ($self->all_terms('P')) { # GIT_DIR paths
+                last if $DO_QUIT;
+                next if -d $d;
+                for $id (docids_by_postlist($self, 'P'.$d)) {
+                        progress($self, "$d gone #$id");
+                        $xdb->delete_document($id);
+                }
+        }
+        $self->commit_txn_lazy;
+        $self->{pruned} and
+                progress($self, "[$n] pruned $self->{pruned} commits");
+        send($op_p, "shard_done $n", MSG_EOR);
+}
+
+sub do_prune ($) {
+        my ($self) = @_;
+        my $consumers = {};
+        my $git_dir = $TMP_GIT->{git_dir};
+        my $n = 0;
+        local $self->{-shard_ok} = {};
+        for my $s (@IDX_SHARDS) {
+                my ($c, $p) = PublicInbox::PktOp->pair;
+                $c->{ops}->{shard_done} = [ $self ];
+                $s->wq_io_do('shard_prune', [ $p->{op_p} ], $n, $git_dir);
+                $consumers->{$n++} = $c;
+        }
+        wait_consumers($self, $TMP_GIT, $consumers);
+}
+
+sub shards_active { # post_loop_do
+        scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
+}
+
+# signal handlers
+sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS }
+
+sub parent_quit {
+        $DO_QUIT = POSIX->can("SIG$_[0]")->();
+        kill_shards(@_);
+        warn "# SIG$_[0] received, quitting...\n";
+}
+
+sub init_tmp_git_dir ($) {
+        my ($self) = @_;
+        return unless $self->{-opt}->{prune};
+        require File::Temp;
+        require PublicInbox::Import;
+        my $tmp = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
+        PublicInbox::Import::init_bare("$tmp", 'cidx-all');
+        my $f = "$tmp/objects/info/alternates";
+        open my $fh, '>', $f or die "open($f): $!";
+        my $o;
+        for (@{$self->{git_dirs}}) { # TODO: sha256 check?
+                $o = $_.'/objects';
+                say $fh $o if -d $o;
+        }
+        close $fh or die "close($f): $!";
+        $TMP_GIT = PublicInbox::Git->new("$tmp");
+        $TMP_GIT->{-tmp} = $tmp;
+}
+
+sub prep_umask ($) {
+        my ($self) = @_;
+        my $um;
+        my $cur = umask;
+        if ($self->{-internal}) { # respect core.sharedRepository
+                @{$self->{git_dirs}} == 1 or die 'BUG: only for GIT_DIR';
+                # yuck, FIXME move umask handling out of inbox-specific stuff
+                require PublicInbox::InboxWritable;
+                my $git = PublicInbox::Git->new($self->{git_dirs}->[0]);
+                chomp($um = $git->qx('config', 'core.sharedRepository') // '');
+                $um = PublicInbox::InboxWritable::_git_config_perm(undef, $um);
+                $um = PublicInbox::InboxWritable::_umask_for($um);
+                umask == $um or progress($self, 'umask from git: ',
+                                                sprintf('0%03o', $um));
+        } elsif (-d $self->{cidx_dir}) { # respect existing perms
+                my @st = stat(_);
+                $um = (~$st[2] & 0777);
+                umask == $um or progress($self, 'using umask from ',
+                                                $self->{cidx_dir}, ': ',
+                                                sprintf('0%03o', $um));
+        }
+        defined($um) ?
+                PublicInbox::OnDestroy->new(\&CORE::umask, umask($um)) :
+                undef;
+}
+
+sub cidx_run { # main entry point
+        my ($self) = @_;
+        my $restore_umask = prep_umask($self);
+        local $self->{todo} = [];
+        local $DEFER = $self->{todo};
+        local $SIGSET = PublicInbox::DS::block_signals();
+        my $restore = PublicInbox::OnDestroy->new($$,
+                \&PublicInbox::DS::sig_setmask, $SIGSET);
+        local $LIVE = {};
+        local $DO_QUIT;
+        local $TMP_GIT;
+        local @IDX_SHARDS = cidx_init($self);
+        local $self->{current_info} = '';
+        local $MY_SIG = {
+                CHLD => \&PublicInbox::DS::enqueue_reap,
+                USR1 => \&kill_shards,
+        };
+        $MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT);
+        my $cb = $SIG{__WARN__} || \&CORE::warn;
+        local $SIG{__WARN__} = sub {
+                my $m = shift @_;
+                $self->{current_info} eq '' or
+                        $m =~ s/\A(#?\s*)/$1$self->{current_info}: /;
+                $cb->($m, @_);
+        };
+        load_existing($self) unless $self->{-internal};
+        local $REINDEX;
+        if ($self->{-opt}->{reindex}) {
+                require PublicInbox::SharedKV;
+                $REINDEX = PublicInbox::SharedKV->new;
+                delete $REINDEX->{lock_path};
+                $REINDEX->dbh;
+        }
+        my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}};
+        if (@nc) {
+                warn "E: BUG? paths in $self->{cidx_dir} not canonicalized:\n";
+                for my $d (@{$self->{git_dirs}}) {
+                        my $c = File::Spec->canonpath($_);
+                        warn "E: $d => $c\n";
+                        $d = $c;
+                }
+                warn "E: canonicalized and attempting to continue\n";
+        }
+        if (defined(my $excl = $self->{-opt}->{exclude})) {
+                my $re = '(?:'.join('\\z|', map {
+                                glob2re($_) // qr/\A\Q$_\E/
+                        } @$excl).'\\z)';
+                @{$self->{git_dirs}} = grep {
+                        $_ =~ /$re/ ? (warn("# excluding $_\n"), 0) : 1;
+                } @{$self->{git_dirs}};
+        }
+        local $self->{nchange} = 0;
+        local $LIVE_JOBS = $self->{-opt}->{jobs} ||
+                        PublicInbox::IPC::detect_nproc() || 2;
+        local @RDONLY_SHARDS = $self->xdb_shards_flat;
+        init_tmp_git_dir($self);
+        do_prune($self) if $self->{-opt}->{prune};
+        scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+
+        for my $s (@IDX_SHARDS) {
+                $s->{-cidx_quit} = 1;
+                $s->wq_close;
+        }
+
+        local @PublicInbox::DS::post_loop_do = (\&shards_active);
+        PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
+        $self->lock_release(!!$self->{nchange});
+}
+
+sub ipc_atfork_child {
+        my ($self) = @_;
+        $self->SUPER::ipc_atfork_child;
+        $SIG{USR1} = \&shard_usr1;
+        $SIG{$_} = \&shard_quit for qw(INT TERM QUIT);
+        my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
+        $_->wq_close for @$x;
+        undef;
+}
+
+sub shard_done_wait { # awaitpid cb via ipc_worker_reap
+        my ($pid, $shard, $self) = @_;
+        my $quit_req = delete($shard->{-cidx_quit});
+        return if $DO_QUIT || !$LIVE;
+        if ($? == 0) { # success
+                $quit_req // warn 'BUG: {-cidx_quit} unset';
+                return;
+        }
+        warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
+        ++$self->{shard_err} if defined($self->{shard_err});
+}
+
+sub with_umask { # TODO get rid of this treewide and rely on OnDestroy
+        my ($self, $cb, @arg) = @_;
+        $cb->(@arg);
+}
+
+1;