about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-03-21 23:07:25 +0000
committerEric Wong <e@80x24.org>2023-03-25 09:37:47 +0000
commit87a8527cb3e09b88f224b8ba0aad28cb8ec8eba8 (patch)
treeaaac373df1c52db860f3cc16d8420d8b848e71c7 /lib
parentea9a1099745820977e0ad2bf2e406c068c0a5f44 (diff)
downloadpublic-inbox-87a8527cb3e09b88f224b8ba0aad28cb8ec8eba8.tar.gz
This avoids forking new shard processes for each repo we scan,
but we can't avoid many excessive commits since we need to
ensure the `seen()' sub can avoid excessive work.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/CodeSearchIdx.pm374
1 files changed, 240 insertions, 134 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 02c9ed84..13fe1c28 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -14,9 +14,11 @@
 # See PublicInbox::CodeSearch (read-only API) for more
 package PublicInbox::CodeSearchIdx;
 use v5.12;
-use parent qw(PublicInbox::Lock PublicInbox::CodeSearch PublicInbox::SearchIdx);
+# parent order matters, we want ->DESTROY from IPC, not SearchIdx
+use parent qw(PublicInbox::CodeSearch PublicInbox::IPC PublicInbox::SearchIdx);
 use PublicInbox::Eml;
-use PublicInbox::DS ();
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::PktOp;
 use PublicInbox::IPC qw(nproc_shards);
 use PublicInbox::Admin;
 use POSIX qw(WNOHANG SEEK_SET);
@@ -26,11 +28,19 @@ use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config;
-use PublicInbox::Spawn qw(spawn);
+use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::OnDestroy;
-our $LIVE; # pid => callback
-our $LIVE_JOBS;
-our @XDB_SHARDS_FLAT;
+use Socket qw(MSG_EOR);
+use Carp ();
+our (
+        $LIVE, # pid => cmd
+        $DEFER, # [ [ cb, @args ], ... ]
+        $LIVE_JOBS, # integer
+        $MY_SIG, # like %SIG
+        $SIGSET,
+        @RDONLY_SHARDS, # Xapian::Database
+        @IDX_SHARDS # clones of self
+);
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
 # branches don't diverge by more than this number of commits...
@@ -110,14 +120,14 @@ sub progress {
         $pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
 }
 
-sub store_repo ($$$) {
-        my ($self, $git, $repo) = @_;
-        my $xdb = delete($repo->{shard})->idx_acquire;
-        $xdb->begin_transaction;
+sub store_repo { # wq_do - returns docid
+        my ($self, $repo) = @_;
+        $self->begin_txn_lazy;
+        my $xdb = $self->{xdb};
         for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
-        if (defined $repo->{id}) {
-                my $doc = $xdb->get_document($repo->{id}) //
-                        die "$git->{git_dir} doc #$repo->{id} gone";
+        if (defined $repo->{docid}) {
+                my $doc = $xdb->get_document($repo->{docid}) //
+                        die "$repo->{git_dir} doc #$repo->{docid} gone";
                 add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
                 my %new = map { $_ => undef } @{$repo->{roots}};
                 my $old = xap_terms('G', $doc);
@@ -126,34 +136,38 @@ sub store_repo ($$$) {
                 delete @$old{@{$repo->{roots}}};
                 $doc->remove_term('G'.$_) for keys %$old;
                 $doc->set_data($repo->{fp});
-                $xdb->replace_document($repo->{id}, $doc);
+                $xdb->replace_document($repo->{docid}, $doc);
+                $repo->{docid}
         } else {
                 my $new = $PublicInbox::Search::X{Document}->new;
                 add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct});
-                $new->add_boolean_term("P$git->{git_dir}");
+                $new->add_boolean_term("P$repo->{git_dir}");
                 $new->add_boolean_term('T'.'r');
                 $new->add_boolean_term('G'.$_) for @{$repo->{roots}};
                 $new->set_data($repo->{fp}); # \n delimited
                 $xdb->add_document($new);
         }
-        $xdb->commit_transaction;
 }
 
 # sharded reader for `git log --pretty=format: --stdin'
-sub shard_worker ($$$) {
-        my ($self, $r, $sigset) = @_;
+sub shard_index { # via wq_io_do
+        my ($self, $git, $n, $roots) = @_;
+        local $self->{current_info} = "$git->{git_dir} [$n]";
         my ($quit, $cmt);
+        local $self->{roots} = $roots;
+        my $in = delete($self->{0}) // die 'BUG: no {0} input';
+        my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
         my $batch_bytes = $self->{-opt}->{batch_size} //
                                 $PublicInbox::SearchIdx::BATCH_BYTES;
         my $max = $batch_bytes;
-        $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
-        $SIG{QUIT} = $SIG{TERM} = $SIG{INT} = sub { $quit = shift };
-        PublicInbox::DS::sig_setmask($sigset);
-
-        # the parent process of this shard process writes directly to
-        # the stdin of `git log', we consume git log's stdout:
-        my $rd = $self->{git}->popen(@LOG_STDIN, undef, { 0 => $r });
-        close $r or die "close: $!";
+        my $set_quit = sub { $quit = shift };
+        local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
+        local $SIG{QUIT} = $set_quit;
+        local $SIG{TERM} = $set_quit;
+        local $SIG{INT} = $set_quit;
+        local $self->{git} = $git; # for patchid
+        my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+        close $in or die "close: $!";
         my $nr = 0;
 
         # a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
@@ -162,8 +176,7 @@ sub shard_worker ($$$) {
         local $/ = $FS;
         my $buf = <$rd> // return; # leading $FS
         $buf eq $FS or die "BUG: not LF-NUL: $buf\n";
-        my $xdb = $self->idx_acquire;
-        $xdb->begin_transaction;
+        $self->begin_txn_lazy;
         while (defined($buf = <$rd>)) {
                 chomp($buf);
                 $max -= length($buf);
@@ -174,24 +187,40 @@ sub shard_worker ($$$) {
                 ++$nr;
                 if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
                         progress($self, $nr);
-                        $xdb->commit_transaction;
+                        $self->{xdb}->commit_transaction;
                         $max = $batch_bytes;
-                        $xdb->begin_transaction;
+                        $self->{xdb}->begin_transaction;
                 }
                 $/ = $FS;
         }
         close($rd);
         if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
-                $xdb->commit_transaction;
+                send($op_p, "shard_done $n", MSG_EOR);
         } else {
                 warn "E: git @LOG_STDIN: \$?=$?\n";
-                $xdb->cancel_transaction;
+                $self->{xdb}->cancel_transaction;
         }
 }
 
+sub shard_done { # called via PktOp on shard_index completion
+        my ($self, $n) = @_;
+        $self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok});
+}
+
 sub seen ($$) {
         my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
-        $xdb->postlist_begin($q) != $xdb->postlist_end($q)
+        for (1..100) {
+                my $ret = eval {
+                        $xdb->postlist_begin($q) != $xdb->postlist_end($q);
+                };
+                return $ret unless $@;
+                if (ref($@) =~ /\bDatabaseModifiedError\b/) {
+                        $xdb->reopen;
+                } else {
+                        Carp::croak($@);
+                }
+        }
+        Carp::croak('too many Xapian DB modifications in progress');
 }
 
 # used to select the shard for a GIT_DIR
@@ -206,18 +235,42 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
         @ids;
 }
 
+sub run_todo ($) {
+        my ($self) = @_;
+        my $n;
+        while (defined(my $x = shift(@{$self->{todo} // []}))) {
+                my $cb = shift @$x;
+                $cb->(@$x);
+                ++$n;
+        }
+        $n;
+}
+
 sub cidx_reap ($$) {
         my ($self, $jobs) = @_;
-        while (keys(%$LIVE) >= $jobs) {
-                my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
-                last if $pid < 0;
-                if (my $x = delete $LIVE->{$pid}) {
-                        my $cb = shift @$x;
-                        $cb->(@$x) if $cb;
-                } else {
-                        warn "reaped unknown PID=$pid ($?)\n";
-                }
+        while (run_todo($self)) {}
+        my $cb = sub { keys(%$LIVE) > $jobs };
+        PublicInbox::DS->SetPostLoopCallback($cb);
+        PublicInbox::DS::event_loop($MY_SIG, $SIGSET) while $cb->();
+        while (!$jobs && run_todo($self)) {}
+}
+
+sub cidx_await_cb { # awaitpid cb
+        my ($pid, $cb, $self, $git, @args) = @_;
+        return if !$LIVE; # premature shutdown
+        my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
+        PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
+        if ($?) {
+                $git->{-cidx_err} = 1;
+                return warn("@$cmd error: \$?=$?\n");
         }
+        push(@$DEFER, [ $cb, $self, $git, @args ]) if $DEFER;
+}
+
+sub cidx_await ($$$$$@) {
+        my ($pid, $cmd, $cb, $self, $git, @args) = @_;
+        $LIVE->{$pid} = $cmd;
+        awaitpid($pid, \&cidx_await_cb, $cb, $self, $git, @args);
 }
 
 # this is different from the grokmirror-compatible fingerprint since we
@@ -227,13 +280,14 @@ sub fp_start ($$$) {
         return if !$LIVE; # premature exit
         cidx_reap($self, $LIVE_JOBS);
         open my $refs, '+>', undef or die "open: $!";
-        my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
-                qw(show-ref --heads --tags --hash)], undef, { 1 => $refs });
+        my $cmd = ['git', "--git-dir=$git->{git_dir}",
+                qw(show-ref --heads --tags --hash)];
+        my $pid = spawn($cmd, undef, { 1 => $refs });
         $git->{-repo}->{refs} = $refs;
-        $LIVE->{$pid} = [ \&fp_fini, $self, $git, $prep_repo ];
+        cidx_await($pid, $cmd, \&fp_fini, $self, $git, $prep_repo);
 }
 
-sub fp_fini {
+sub fp_fini { # cidx_await cb
         my ($self, $git, $prep_repo) = @_;
         my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
         seek($refs, 0, SEEK_SET) or die "seek: $!";
@@ -247,13 +301,15 @@ sub ct_start ($$$) {
         my ($self, $git, $prep_repo) = @_;
         return if !$LIVE; # premature exit
         cidx_reap($self, $LIVE_JOBS);
-        my ($rd, $pid) = $git->popen([qw[for-each-ref --sort=-committerdate
+        my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
+                qw[for-each-ref --sort=-committerdate
                 --format=%(committerdate:raw) --count=1
-                refs/heads/ refs/tags/]]);
-        $LIVE->{$pid} = [ \&ct_fini, $self, $git, $rd, $prep_repo ];
+                refs/heads/ refs/tags/] ];
+        my ($rd, $pid) = popen_rd($cmd);
+        cidx_await($pid, $cmd, \&ct_fini, $self, $git, $rd, $prep_repo);
 }
 
-sub ct_fini {
+sub ct_fini { # cidx_await cb
         my ($self, $git, $rd, $prep_repo) = @_;
         defined(my $ct = <$rd>) or return;
         $ct =~ s/\s+.*\z//s; # drop TZ + LF
@@ -263,34 +319,38 @@ sub ct_fini {
 # TODO: also index gitweb.owner and the full fingerprint for grokmirror?
 sub prep_repo ($$) {
         my ($self, $git) = @_;
-        return if !$LIVE; # premature exit
+        return if !$LIVE || $git->{-cidx_err}; # premature exit
         my $repo = $git->{-repo} // die 'BUG: no {-repo}';
-        my $git_dir = $git->{git_dir};
         if (!defined($repo->{ct})) {
-                warn "W: $git_dir has no commits, skipping\n";
+                warn "W: $git->{git_dir} has no commits, skipping\n";
                 delete $git->{-repo};
                 return;
         }
-        my $n = git_dir_hash($git_dir) % $self->{nshard};
-        my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self);
+        my $n = git_dir_hash($git->{git_dir}) % $self->{nshard};
+        my $shard = bless { %$self, shard => $n }, ref($self);
+        $repo->{shard_n} = $n;
         delete @$shard{qw(lockfh lock_path)};
-        my $xdb = $XDB_SHARDS_FLAT[$n] // die "BUG: shard[$n] undef";
-        $xdb->reopen;
-        my @docids = docids_by_postlist({ xdb => $xdb }, 'P'.$git_dir);
+        local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef";
+        $shard->retry_reopen(\&check_existing, $self, $git);
+}
+
+sub check_existing { # retry_reopen callback
+        my ($shard, $self, $git) = @_;
+        my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir});
         my $docid = shift(@docids) // return get_roots($self, $git);
-        if (@docids) {
-                warn "BUG: $git_dir indexed multiple times, culling\n";
-                $repo->{to_delete} = \@docids; # XXX needed?
-        }
-        my $doc = $xdb->get_document($docid) //
-                die "BUG: no #$docid ($git_dir)";
+        my $doc = $shard->{xdb}->get_document($docid) //
+                        die "BUG: no #$docid ($git->{git_dir})";
         my $old_fp = $doc->get_data;
-        if ($old_fp eq $repo->{fp}) { # no change
-                progress($self, "$git_dir unchanged");
+        if ($old_fp eq $git->{-repo}->{fp}) { # no change
+                progress($self, "$git->{git_dir} unchanged");
                 delete $git->{-repo};
                 return;
         }
-        $repo->{id} = $docid;
+        $git->{-repo}->{docid} = $docid;
+        if (@docids) {
+                warn "BUG: $git->{git_dir} indexed multiple times, culling\n";
+                $git->{-repo}->{to_delete} = \@docids; # XXX needed?
+        }
         get_roots($self, $git);
 }
 
@@ -304,12 +364,12 @@ sub partition_refs ($$$) {
                 $_->reopen;
                 open my $fh, '+>', undef or die "open: $!";
                 $fh;
-        } @XDB_SHARDS_FLAT;
+        } @RDONLY_SHARDS;
 
         while (defined(my $cmt = <$fh>)) {
                 chomp $cmt;
-                my $n = hex(substr($cmt, 0, 8)) % scalar(@XDB_SHARDS_FLAT);
-                if (seen($XDB_SHARDS_FLAT[$n], 'Q'.$cmt)) {
+                my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
+                if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
                         last if ++$seen > $SEEN_MAX;
                 } else {
                         say { $shard_in[$n] } $cmt or die "say: $!";
@@ -330,9 +390,33 @@ sub partition_refs ($$$) {
         die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n";
 }
 
-sub index_repo {
+sub shard_commit { # via wq_io_do
+        my ($self, $n) = @_;
+        my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
+        $self->commit_txn_lazy;
+        send($op_p, "shard_done $n", MSG_EOR);
+}
+
+sub commit_used_shards ($$$) {
+        my ($self, $git, $consumers) = @_;
+        local $self->{-shard_ok} = {};
+        for my $n (keys %$consumers) {
+                my ($c, $p) = PublicInbox::PktOp->pair;
+                $c->{ops}->{shard_done} = [ $self ];
+                $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
+                $consumers->{$n} = $c;
+        }
+        PublicInbox::DS->SetPostLoopCallback(sub {
+                scalar(grep { $_->{sock} } values %$consumers);
+        });
+        PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+        my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+        die "E: $git->{git_dir} $n shards failed" if $n;
+}
+
+sub index_repo { # cidx_await cb
         my ($self, $git, $roots) = @_;
-        return if !$LIVE; # premature exit
+        return if $git->{-cidx_err};
         my $repo = delete $git->{-repo} or return;
         seek($roots, 0, SEEK_SET) or die "seek: $!";
         chomp(my @roots = <$roots>);
@@ -341,73 +425,45 @@ sub index_repo {
         $repo->{roots} = \@roots;
         local $self->{current_info} = $git->{git_dir};
         my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
-        my %pids;
-        my $fwd_kill = sub {
-                my ($sig) = @_;
-                kill($sig, $_) for keys %pids;
-        };
-        local $SIG{USR1} = $fwd_kill;
-        local $SIG{QUIT} = $fwd_kill;
-        local $SIG{INT} = $fwd_kill;
-        local $SIG{TERM} = $fwd_kill;
-        my $sigset = PublicInbox::DS::block_signals();
-        for (my $n = 0; $n <= $#shard_in; $n++) {
+        local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
+        my %CONSUMERS;
+        for my $n (0..$#shard_in) {
                 -s $shard_in[$n] or next;
-                my $pid = fork // die "fork: $!";
-                if ($pid == 0) { # no RNG use, here
-                        $0 = "code index [$n]";
-                        $self->{git} = $git;
-                        $self->{shard} = $n;
-                        $self->{current_info} = "$self->{current_info} [$n]";
-                        delete @$self{qw(lockfh lock_path)};
-                        my $in = $shard_in[$n];
-                        @shard_in = ();
-                        $self->{roots} = \@roots;
-                        undef $repo;
-                        eval { shard_worker($self, $in, $sigset) };
-                        warn "E: $@" if $@;
-                        POSIX::_exit($@ ? 1 : 0);
-                } else {
-                        $pids{$pid} = "code index [$n]";
-                }
+                my ($c, $p) = PublicInbox::PktOp->pair;
+                $c->{ops}->{shard_done} = [ $self ];
+                $IDX_SHARDS[$n]->wq_io_do('shard_index',
+                                        [ $shard_in[$n], $p->{op_p} ],
+                                        $git, $n, \@roots);
+                $CONSUMERS{$n} = $c;
         }
-        PublicInbox::DS::sig_setmask($sigset);
         @shard_in = ();
-        my ($err, @todo);
-        while (keys %pids) {
-                my $pid = waitpid(-1, 0) // die "waitpid: $!";
-                if (my $j = delete $pids{$pid}) {
-                        next if $? == 0;
-                        warn "PID:$pid $j exited with \$?=$?\n";
-                        $err = 1;
-                } elsif (my $todo = delete $LIVE->{$pid}) {
-                        warn "PID:$pid exited with \$?=$?\n" if $?;
-                        push @todo, $todo;
-                } else {
-                        warn "reaped unknown PID=$pid ($?)\n";
-                }
-        }
-        die "subprocess(es) failed\n" if $err;
-        store_repo($self, $git, $repo);
-        progress($self, "$git->{git_dir}: done");
-        # TODO: check fp afterwards?
-        while (my $x = shift @todo) {
-                my $cb = shift @$x;
-                $cb->(@$x) if $cb;
+        PublicInbox::DS->SetPostLoopCallback(sub {
+                scalar(grep { $_->{sock} } values %CONSUMERS);
+        });
+        PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+        my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
+        die "E: $git->{git_dir} $n shards failed" if $n;
+        $repo->{git_dir} = $git->{git_dir};
+        my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
+        if ($id > 0) {
+                $CONSUMERS{$repo->{shard_n}} = undef;
+                commit_used_shards($self, $git, \%CONSUMERS);
+                progress($self, "$git->{git_dir}: done");
+                return run_todo($self);
         }
+        die "E: store_repo $git->{git_dir}: id=$id";
 }
 
 sub get_roots ($$) {
         my ($self, $git) = @_;
         return if !$LIVE; # premature exit
-        cidx_reap($self, $LIVE_JOBS);
         my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
         sysseek($refs, 0, SEEK_SET) or die "seek: $!";
         open my $roots, '+>', undef or die "open: $!";
-        my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
-                        qw(rev-list --stdin --max-parents=0)],
-                        undef, { 0 => $refs, 1 => $roots });
-        $LIVE->{$pid} = [ \&index_repo, $self, $git, $roots ];
+        my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
+                        qw(rev-list --stdin --max-parents=0) ];
+        my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots });
+        cidx_await($pid, $cmd, \&index_repo, $self, $git, $roots);
 }
 
 # for PublicInbox::SearchIdx::patch_id and with_umask
@@ -434,9 +490,17 @@ sub cidx_init ($) {
                 warn "# creating $dir\n" if !$self->{-opt}->{quiet};
                 File::Path::mkpath($dir);
         }
+        $self->lock_acquire;
+        my @shards;
         for my $n (0..($self->{nshard} - 1)) {
                 my $shard = bless { %$self, shard => $n }, ref($self);
+                delete @$shard{qw(lockfh lock_path)};
                 $shard->idx_acquire;
+                $shard->idx_release;
+                $shard->wq_workers_start("shard[$n]", 1, undef, {
+                        siblings => \@shards, # for ipc_atfork_child
+                }, \&shard_done_wait, $self);
+                push @shards, $shard;
         }
         # this warning needs to happen after idx_acquire
         state $once;
@@ -444,14 +508,11 @@ sub cidx_init ($) {
 W: Xapian v1.2.21..v1.2.24 were missing close-on-exec on OFD locks,
 W: memory usage may be high for large indexing runs
 EOM
+        @shards;
 }
 
 sub scan_git_dirs ($) {
         my ($self) = @_;
-        local $LIVE_JOBS = $self->{-opt}->{jobs} //
-                        PublicInbox::IPC::detect_nproc() // 2;
-        local $LIVE = {};
-        local @XDB_SHARDS_FLAT = $self->xdb_shards_flat;
         for (@{$self->{git_dirs}}) {
                 my $git = PublicInbox::Git->new($_);
                 my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
@@ -462,18 +523,31 @@ sub scan_git_dirs ($) {
         cidx_reap($self, 0);
 }
 
-sub cidx_run {
+sub shards_active { # PostLoopCallback
+        scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
+}
+
+sub cidx_run { # main entry point
         my ($self) = @_;
-        cidx_init($self);
+        local $self->{todo} = [];
+        local $DEFER = $self->{todo};
+        local $SIGSET = PublicInbox::DS::block_signals();
+        my $restore = PublicInbox::OnDestroy->new($$,
+                \&PublicInbox::DS::sig_setmask, $SIGSET);
+        local $LIVE = {};
+        local @IDX_SHARDS = cidx_init($self);
         local $self->{current_info} = '';
         my $cb = $SIG{__WARN__} || \&CORE::warn;
+        local $MY_SIG = {
+                CHLD => \&PublicInbox::DS::enqueue_reap,
+                INT => sub { exit },
+        };
         local $SIG{__WARN__} = sub {
                 my $m = shift @_;
                 $self->{current_info} eq '' or
                         $m =~ s/\A(#?\s*)/$1$self->{current_info}: /;
                 $cb->($m, @_);
         };
-        $self->lock_acquire;
         load_existing($self);
         my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}};
         if (@nc) {
@@ -486,9 +560,41 @@ sub cidx_run {
                 warn "E: canonicalized and attempting to continue\n";
         }
         local $self->{nchange} = 0;
+        local $LIVE_JOBS = $self->{-opt}->{jobs} ||
+                        PublicInbox::IPC::detect_nproc() || 2;
+        local @RDONLY_SHARDS = $self->xdb_shards_flat;
+
         # do_prune($self) if $self->{-opt}->{prune}; TODO
         scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+
+        for my $s (@IDX_SHARDS) {
+                $s->{-cidx_quit} = 1;
+                $s->wq_close;
+        }
+
+        PublicInbox::DS->SetPostLoopCallback(\&shards_active);
+        PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
         $self->lock_release(!!$self->{nchange});
 }
 
+sub ipc_atfork_child {
+        my ($self) = @_;
+        $self->SUPER::ipc_atfork_child;
+        my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
+        $_->wq_close for @$x;
+}
+
+sub shard_done_wait { # awaitpid cb via ipc_worker_reap
+        my ($pid, $shard, $self) = @_;
+        delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+        return unless $?;
+        warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
+        ++$self->{shard_err} if defined($self->{shard_err});
+}
+
+sub with_umask { # TODO
+        my ($self, $cb, @arg) = @_;
+        $cb->(@arg);
+}
+
 1;