diff options
Diffstat (limited to 'lib/PublicInbox/CodeSearchIdx.pm')
-rw-r--r-- | lib/PublicInbox/CodeSearchIdx.pm | 78 |
1 files changed, 34 insertions, 44 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index e795c2b3..b8afecd2 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -42,6 +42,7 @@ use PublicInbox::IPC qw(nproc_shards); use POSIX qw(WNOHANG SEEK_SET); use File::Path (); use File::Spec (); +use List::Util qw(max); use PublicInbox::SHA qw(sha256_hex); use PublicInbox::Search qw(xap_terms); use PublicInbox::SearchIdx qw(add_val); @@ -69,6 +70,9 @@ our ( @GIT_DIR_GONE, # [ git_dir1, git_dir2 ] $PRUNE_DONE, # marks off prune completions $NCHANGE, # current number of changes + $NPROC, + $XH_PID, # XapHelper PID + $XHC, # XapClient $REPO_CTX, # current repo being indexed in shards $IDX_TODO, # [ $git0, $root0, $git1, $root1, ...] $GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...] @@ -81,8 +85,8 @@ our ( @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands @ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST $QRY_STR, # common query string for both code and inbox associations - $IBXDIR_FEED, # SOCK_SEQPACKET - @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate + @DUMP_SHARD_ROOTS_OK, # for associate + $DUMP_IBX_WPIPE, # goes to sort(1) @ID2ROOT, ); @@ -529,45 +533,29 @@ sub dump_roots_once { progress($self, 'waiting on dump_shard_roots sort'); } -sub recv_ibx_done { # via PktOp on recv_ibx completion - my ($self, $pid, $n) = @_; - return if $DO_QUIT; - progress($self, "recv_ibx [$n] done"); - $RECV_IBX_OK[$n] = 1; -} - -# causes a worker to become a dumper for inbox/extindex -sub recv_ibx { # wq_io_do - my ($self, $qry_str) = @_; - PublicInbox::CidxRecvIbx->new($self, $qry_str); -} - -sub dump_ibx { # sends to PublicInbox::CidxRecvIbx::event_step - my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR" - my $n = length($id_dir); - my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!"; - $n == $w or die "send($id_dir) $w != $n"; +sub dump_ibx { # sends to xap_helper.h + my ($self, $ibx_id) = @_; + my $ibx = $IBX[$ibx_id] // die "BUG: no IBX[$ibx_id]"; + my @cmd = ('dump_ibx', $ibx->isrch->xh_args, + (map { ('-A', $_) } @ASSOC_PFX), + $ibx_id, $QRY_STR); + pipe(my ($r, $w)) or die "pipe: $!"; + $XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd); + my $ekey = $ibx->eidx_key; + $self->{PENDING}->{$ekey} = undef; + PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate}); } -# repurpose shard workers to dump inbox patchids with perfect balance sub dump_ibx_start { my ($self, $associate) = @_; - pipe(my ($sort_r, $sort_w)) or die "pipe: $!"; + pipe(my $sort_r, $DUMP_IBX_WPIPE) or die "pipe: $!"; my @sort = (@SORT, '-k1,1'); my $dst = "$TMPDIR/to_ibx_id"; open my $fh, '>', $dst or die "open($dst): $!"; my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh }); close $sort_r or die "close: $!"; awaitpid($sort_pid, \&cmd_done, \@sort, $associate); - - my ($r, $w); - socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!"; - my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{recv_ibx_done} = [ $self, $associate ]; - $c->{ops}->{index_next} = [ $self ]; - my $io = [ $p->{op_p}, $r, $sort_w ]; - $_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS; - $IBXDIR_FEED = $w; + ($XHC, $XH_PID) = PublicInbox::XapClient::start_helper("-j$NPROC"); } sub index_next ($) { @@ -584,8 +572,8 @@ sub index_next ($) { } elsif ($TMPDIR) { delete $TODO{dump_ibx_start}; # runs OnDestroy once return dump_ibx($self, shift @IBXQ) if @IBXQ; - progress($self, 'done dumping inboxes') if $IBXDIR_FEED; - undef $IBXDIR_FEED; # done dumping inboxes, dump roots + progress($self, 'done dumping inboxes') if $DUMP_IBX_WPIPE; + undef $DUMP_IBX_WPIPE; # done dumping inboxes, dump roots dump_roots_once($self, delete($TODO{associate}) // return); } # else: wait for shards_active (post_loop_do) callback @@ -795,7 +783,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) } sub parent_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->(); - $IBXDIR_FEED = undef; + $XHC = undef; kill_shards(@_); warn "# SIG$_[0] received, quitting...\n"; } @@ -875,8 +863,8 @@ sub associate { @IDX_SHARDS or return warn("# aborting on no shards\n"); grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or die "E: shards not dumped properly\n"; - grep(defined, @RECV_IBX_OK) == @IDX_SHARDS or - die "E: inboxes not dumped properly\n"; + my @pending = keys %{$self->{PENDING}}; + die "E: pending=@pending jobs not done\n" if @pending; progress($self, 'associating...'); my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id'); my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" }); @@ -1032,8 +1020,9 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step sub init_associate_prefork ($) { my ($self) = @_; return unless $self->{-opt}->{associate}; - require PublicInbox::CidxRecvIbx; require PublicInbox::CidxDumpShardRoots; + require PublicInbox::CidxXapHelperAux; + require PublicInbox::XapClient; $self->{-pi_cfg} = PublicInbox::Config->new; my @unknown; my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]}; @@ -1055,7 +1044,7 @@ EOM sub _prep_ibx { # each_inbox callback my ($ibx, $self, $incl) = @_; ($self->{-opt}->{all} || exists($incl->{$ibx->{inboxdir}})) and - push @{$self->{IBX}}, $ibx; + push @IBX, $ibx; } sub show_roots { # for diagnostics @@ -1102,13 +1091,13 @@ sub cidx_run { # main entry point local $GIT_TODO = []; local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE, $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV, - %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT, - @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK); + %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE, + @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC); local $BATCH_BYTES = $self->{-opt}->{batch_size} // $PublicInbox::SearchIdx::BATCH_BYTES; local @SORT = (undef, '-u'); - local $self->{IBX} = \@IBX; local $self->{ASSOC_PFX} = \@ASSOC_PFX; + local $self->{PENDING} = {}; local $self->{-pi_cfg}; if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) { require File::Temp; @@ -1165,13 +1154,14 @@ sub cidx_run { # main entry point @GIT_DIR_GONE = uniqstr @GIT_DIR_GONE, @excl; } local $NCHANGE = 0; - local $LIVE_JOBS = $self->{-opt}->{jobs} || - PublicInbox::IPC::detect_nproc() || 2; + local $NPROC = PublicInbox::IPC::detect_nproc(); + local $LIVE_JOBS = $self->{-opt}->{jobs} || $NPROC || 2; local @RDONLY_XDB = $self->xdb_shards_flat; init_prune($self); init_associate_postfork($self); scan_git_dirs($self) if $self->{-opt}->{scan} // 1; - index_next($self) for (1..$LIVE_JOBS); + my $max = $TODO{associate} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS; + index_next($self) for (1..$max); # FreeBSD ignores/discards SIGCHLD while signals are blocked and # EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending |