about summary refs log tree commit homepage
path: root/lib/PublicInbox/CodeSearchIdx.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/CodeSearchIdx.pm')
-rw-r--r--lib/PublicInbox/CodeSearchIdx.pm78
1 files changed, 34 insertions, 44 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index e795c2b3..b8afecd2 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -42,6 +42,7 @@ use PublicInbox::IPC qw(nproc_shards);
 use POSIX qw(WNOHANG SEEK_SET);
 use File::Path ();
 use File::Spec ();
+use List::Util qw(max);
 use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
@@ -69,6 +70,9 @@ our (
         @GIT_DIR_GONE, # [ git_dir1, git_dir2 ]
         $PRUNE_DONE, # marks off prune completions
         $NCHANGE, # current number of changes
+        $NPROC,
+        $XH_PID, # XapHelper PID
+        $XHC, # XapClient
         $REPO_CTX, # current repo being indexed in shards
         $IDX_TODO, # [ $git0, $root0, $git1, $root1, ...]
         $GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...]
@@ -81,8 +85,8 @@ our (
         @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
         @ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
         $QRY_STR, # common query string for both code and inbox associations
-        $IBXDIR_FEED, # SOCK_SEQPACKET
-        @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate
+        @DUMP_SHARD_ROOTS_OK, # for associate
+        $DUMP_IBX_WPIPE, # goes to sort(1)
         @ID2ROOT,
 );
 
@@ -529,45 +533,29 @@ sub dump_roots_once {
         progress($self, 'waiting on dump_shard_roots sort');
 }
 
-sub recv_ibx_done { # via PktOp on recv_ibx completion
-        my ($self, $pid, $n) = @_;
-        return if $DO_QUIT;
-        progress($self, "recv_ibx [$n] done");
-        $RECV_IBX_OK[$n] = 1;
-}
-
-# causes a worker to become a dumper for inbox/extindex
-sub recv_ibx { # wq_io_do
-        my ($self, $qry_str) = @_;
-        PublicInbox::CidxRecvIbx->new($self, $qry_str);
-}
-
-sub dump_ibx { # sends to PublicInbox::CidxRecvIbx::event_step
-        my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR"
-        my $n = length($id_dir);
-        my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!";
-        $n == $w or die "send($id_dir) $w != $n";
+sub dump_ibx { # sends to xap_helper.h
+        my ($self, $ibx_id) = @_;
+        my $ibx = $IBX[$ibx_id] // die "BUG: no IBX[$ibx_id]";
+        my @cmd = ('dump_ibx', $ibx->isrch->xh_args,
+                        (map { ('-A', $_) } @ASSOC_PFX),
+                        $ibx_id, $QRY_STR);
+        pipe(my ($r, $w)) or die "pipe: $!";
+        $XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
+        my $ekey = $ibx->eidx_key;
+        $self->{PENDING}->{$ekey} = undef;
+        PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate});
 }
 
-# repurpose shard workers to dump inbox patchids with perfect balance
 sub dump_ibx_start {
         my ($self, $associate) = @_;
-        pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+        pipe(my $sort_r, $DUMP_IBX_WPIPE) or die "pipe: $!";
         my @sort = (@SORT, '-k1,1');
         my $dst = "$TMPDIR/to_ibx_id";
         open my $fh, '>', $dst or die "open($dst): $!";
         my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
         close $sort_r or die "close: $!";
         awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
-
-        my ($r, $w);
-        socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
-        my ($c, $p) = PublicInbox::PktOp->pair;
-        $c->{ops}->{recv_ibx_done} = [ $self, $associate ];
-        $c->{ops}->{index_next} = [ $self ];
-        my $io = [ $p->{op_p}, $r, $sort_w ];
-        $_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS;
-        $IBXDIR_FEED = $w;
+        ($XHC, $XH_PID) = PublicInbox::XapClient::start_helper("-j$NPROC");
 }
 
 sub index_next ($) {
@@ -584,8 +572,8 @@ sub index_next ($) {
         } elsif ($TMPDIR) {
                 delete $TODO{dump_ibx_start}; # runs OnDestroy once
                 return dump_ibx($self, shift @IBXQ) if @IBXQ;
-                progress($self, 'done dumping inboxes') if $IBXDIR_FEED;
-                undef $IBXDIR_FEED; # done dumping inboxes, dump roots
+                progress($self, 'done dumping inboxes') if $DUMP_IBX_WPIPE;
+                undef $DUMP_IBX_WPIPE; # done dumping inboxes, dump roots
                 dump_roots_once($self, delete($TODO{associate}) // return);
         }
         # else: wait for shards_active (post_loop_do) callback
@@ -795,7 +783,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
 
 sub parent_quit {
         $DO_QUIT = POSIX->can("SIG$_[0]")->();
-        $IBXDIR_FEED = undef;
+        $XHC = undef;
         kill_shards(@_);
         warn "# SIG$_[0] received, quitting...\n";
 }
@@ -875,8 +863,8 @@ sub associate {
         @IDX_SHARDS or return warn("# aborting on no shards\n");
         grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
                 die "E: shards not dumped properly\n";
-        grep(defined, @RECV_IBX_OK) == @IDX_SHARDS or
-                die "E: inboxes not dumped properly\n";
+        my @pending = keys %{$self->{PENDING}};
+        die "E: pending=@pending jobs not done\n" if @pending;
         progress($self, 'associating...');
         my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
         my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
@@ -1032,8 +1020,9 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
 sub init_associate_prefork ($) {
         my ($self) = @_;
         return unless $self->{-opt}->{associate};
-        require PublicInbox::CidxRecvIbx;
         require PublicInbox::CidxDumpShardRoots;
+        require PublicInbox::CidxXapHelperAux;
+        require PublicInbox::XapClient;
         $self->{-pi_cfg} = PublicInbox::Config->new;
         my @unknown;
         my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]};
@@ -1055,7 +1044,7 @@ EOM
 sub _prep_ibx { # each_inbox callback
         my ($ibx, $self, $incl) = @_;
         ($self->{-opt}->{all} || exists($incl->{$ibx->{inboxdir}})) and
-                push @{$self->{IBX}}, $ibx;
+                push @IBX, $ibx;
 }
 
 sub show_roots { # for diagnostics
@@ -1102,13 +1091,13 @@ sub cidx_run { # main entry point
         local $GIT_TODO = [];
         local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
                 $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
-                %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT,
-                @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK);
+                %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
+                @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC);
         local $BATCH_BYTES = $self->{-opt}->{batch_size} //
                                 $PublicInbox::SearchIdx::BATCH_BYTES;
         local @SORT = (undef, '-u');
-        local $self->{IBX} = \@IBX;
         local $self->{ASSOC_PFX} = \@ASSOC_PFX;
+        local $self->{PENDING} = {};
         local $self->{-pi_cfg};
         if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) {
                 require File::Temp;
@@ -1165,13 +1154,14 @@ sub cidx_run { # main entry point
                 @GIT_DIR_GONE = uniqstr @GIT_DIR_GONE, @excl;
         }
         local $NCHANGE = 0;
-        local $LIVE_JOBS = $self->{-opt}->{jobs} ||
-                        PublicInbox::IPC::detect_nproc() || 2;
+        local $NPROC = PublicInbox::IPC::detect_nproc();
+        local $LIVE_JOBS = $self->{-opt}->{jobs} || $NPROC || 2;
         local @RDONLY_XDB = $self->xdb_shards_flat;
         init_prune($self);
         init_associate_postfork($self);
         scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
-        index_next($self) for (1..$LIVE_JOBS);
+        my $max = $TODO{associate} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS;
+        index_next($self) for (1..$max);
 
         # FreeBSD ignores/discards SIGCHLD while signals are blocked and
         # EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending