diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/PublicInbox/CodeSearchIdx.pm | 237 | ||||
-rw-r--r-- | lib/PublicInbox/SearchIdx.pm | 12 |
2 files changed, 147 insertions, 102 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 9e70087e..035fab3e 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -39,12 +39,22 @@ our ( $MY_SIG, # like %SIG $SIGSET, $TXN_BYTES, # number of bytes in current shard transaction + $BATCH_BYTES, $DO_QUIT, # signal number - @RDONLY_SHARDS, # Xapian::Database + @RDONLY_XDB, # Xapian::Database @IDX_SHARDS, # clones of self $MAX_SIZE, $TMP_GIT, # PublicInbox::Git object for --prune $REINDEX, # PublicInbox::SharedKV + @GIT_DIR_GONE, # [ git_dir1, git_dir2 ] + %TO_PRUNE, # (docid => docid) mapping (hash in case of retry_reopen) + $PRUNE_CUR, # per-shard document ID + $PRUNE_MAX, # per-shard document ID to stop at + $PRUNE_OP_P, # prune_done() notification socket + $PRUNE_NR, # total number pruned + @PRUNE_DONE, # marks off prune completions + $NCHANGE, # current number of changes + %ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune ); # stop walking history if we see >$SEEN_MAX existing commits, this assumes @@ -137,7 +147,7 @@ sub store_repo { # wq_do - returns docid my $xdb = $self->{xdb}; for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed? if (defined $repo->{docid}) { - my $doc = $xdb->get_document($repo->{docid}) // + my $doc = $self->get_doc($repo->{docid}) // die "$repo->{git_dir} doc #$repo->{docid} gone"; add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct}); my %new = map { $_ => undef } @{$repo->{roots}}; @@ -160,12 +170,21 @@ sub store_repo { # wq_do - returns docid } } -sub cidx_ckpoint ($$) { +sub cidx_ckpoint ($;$) { my ($self, $msg) = @_; - progress($self, $msg); + progress($self, $msg) if defined($msg); + $TXN_BYTES = $BATCH_BYTES; # reset + if (my @to_prune = values(%TO_PRUNE)) { + %TO_PRUNE = (); + $PRUNE_NR += scalar(@to_prune); + progress($self, + "prune [$self->{shard}] $PRUNE_NR ($PRUNE_CUR/$PRUNE_MAX)"); + $self->begin_txn_lazy; + $self->{xdb}->delete_document($_) for @to_prune; + } return if $PublicInbox::Search::X{CLOEXEC_UNSET}; - $self->{xdb}->commit_transaction; - $self->{xdb}->begin_transaction; + $self->commit_txn_lazy; + $self->begin_txn_lazy; } sub truncate_cmt ($$) { @@ -198,17 +217,15 @@ EOM } # sharded reader for `git log --pretty=format: --stdin' -sub shard_index { # via wq_io_do +sub shard_index { # via wq_io_do in IDX_SHARDS my ($self, $git, $n, $roots) = @_; local $self->{current_info} = "$git->{git_dir} [$n]"; local $self->{roots} = $roots; my $in = delete($self->{0}) // die 'BUG: no {0} input'; my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p'; - my $batch_bytes = $self->{-opt}->{batch_size} // - $PublicInbox::SearchIdx::BATCH_BYTES; local $MAX_SIZE = $self->{-opt}->{max_size}; # local-ized in parent before fork - $TXN_BYTES = $batch_bytes; + $TXN_BYTES = $BATCH_BYTES; local $self->{git} = $git; # for patchid return if $DO_QUIT; my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in }); @@ -233,17 +250,13 @@ sub shard_index { # via wq_io_do } else { @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT)); } - $TXN_BYTES -= $len; - if ($TXN_BYTES <= 0) { + if (($TXN_BYTES -= $len) <= 0) { cidx_ckpoint($self, "[$n] $nr"); - $TXN_BYTES = $batch_bytes - $len; + $TXN_BYTES -= $len; # len may be huge, >TXN_BYTES; } update_commit($self, $cmt); ++$nr; - if ($TXN_BYTES <= 0) { - cidx_ckpoint($self, "[$n] $nr"); - $TXN_BYTES = $batch_bytes; - } + cidx_ckpoint($self, "[$n] $nr") if $TXN_BYTES <= 0; $/ = $FS; } close($rd); @@ -261,6 +274,21 @@ sub shard_done { # called via PktOp on shard_index completion $self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok}); } +sub prune_done { # called via PktOp->event_step completion + my ($shard) = @_; + $PRUNE_DONE[$shard->{shard}] = 1; +} + +sub prune_busy { + return if $DO_QUIT; + grep(defined, @PRUNE_DONE) != @IDX_SHARDS; +} + +sub await_prune () { + local @PublicInbox::DS::post_loop_do = (\&prune_busy); + PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if prune_busy(); +} + sub seen ($$) { my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH" for (1..100) { @@ -390,7 +418,7 @@ sub prep_repo ($$) { my $shard = bless { %$self, shard => $n }, ref($self); $repo->{shard_n} = $n; delete @$shard{qw(lockfh lock_path)}; - local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef"; + local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef"; $shard->retry_reopen(\&check_existing, $self, $git); } @@ -398,7 +426,7 @@ sub check_existing { # retry_reopen callback my ($shard, $self, $git) = @_; my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir}); my $docid = shift(@docids) // return get_roots($self, $git); - my $doc = $shard->{xdb}->get_document($docid) // + my $doc = $shard->get_doc($docid) // die "BUG: no #$docid ($git->{git_dir})"; my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data; if ($old_fp eq $git->{-repo}->{fp}) { # no change @@ -418,24 +446,24 @@ sub partition_refs ($$$) { sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs }); close $refs or die "close: $!"; - my ($seen, $nchange) = (0, 0); + my $seen = 0; my @shard_in = map { $_->reopen; open my $fh, '+>', undef or die "open: $!"; $fh; - } @RDONLY_SHARDS; + } @RDONLY_XDB; while (defined(my $cmt = <$rfh>)) { chomp $cmt; - my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS); + my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_XDB); if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) { say { $shard_in[$n] } $cmt or die "say: $!"; - ++$nchange; - } elsif (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) { + ++$NCHANGE; + } elsif (seen($RDONLY_XDB[$n], 'Q'.$cmt)) { last if ++$seen > $SEEN_MAX; } else { say { $shard_in[$n] } $cmt or die "say: $!"; - ++$nchange; + ++$NCHANGE; $seen = 0; } if ($DO_QUIT) { @@ -446,8 +474,7 @@ sub partition_refs ($$$) { close($rfh); return () if $DO_QUIT; if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) { - $self->{nchange} += $nchange; - progress($self, "$git->{git_dir}: $nchange commits"); + progress($self, "$git->{git_dir}: $NCHANGE commits"); for my $fh (@shard_in) { $fh->flush or die "flush: $!"; sysseek($fh, 0, SEEK_SET) or die "seek: $!"; @@ -548,25 +575,25 @@ sub git { $_[0]->{git} } sub load_existing ($) { # for -u/--update my ($self) = @_; - my $dirs = $self->{git_dirs} // []; + my $dirs = $self->{git_dirs} //= []; if ($self->{-opt}->{update} || $self->{-opt}->{prune}) { local $self->{xdb}; $self->xdb or die "E: $self->{cidx_dir} non-existent for --update\n"; - my @missing; my @cur = grep { if (-e $_) { 1; } else { - push @missing, $_; + push @GIT_DIR_GONE, $_; undef; } } $self->all_terms('P'); - @missing = () if $self->{-opt}->{prune}; - @missing and warn "W: the following repos no longer exist:\n", - (map { "W:\t$_\n" } @missing), + if (@GIT_DIR_GONE && !$self->{-opt}->{prune}) { + warn "W: the following repos no longer exist:\n", + (map { "W:\t$_\n" } @GIT_DIR_GONE), "W: use --prune to remove them from ", $self->{cidx_dir}, "\n"; + } push @$dirs, @cur; } my %uniq; # List::Util::uniq requires Perl 5.26+ @@ -586,13 +613,12 @@ sub cidx_init ($) { } $self->lock_acquire; my @shards; - local $TXN_BYTES; for my $n (0..($self->{nshard} - 1)) { my $shard = bless { %$self, shard => $n }, ref($self); delete @$shard{qw(lockfh lock_path)}; $shard->idx_acquire; $shard->idx_release; - $shard->wq_workers_start("shard[$n]", 1, $SIGSET, { + $shard->wq_workers_start("cidx shard[$n]", 1, $SIGSET, { siblings => \@shards, # for ipc_atfork_child }, \&shard_done_wait, $self); push @shards, $shard; @@ -621,80 +647,79 @@ sub scan_git_dirs ($) { sub prune_cb { # git->check_async callback my ($hex, $type, undef, $self_id) = @_; - return if $type eq 'commit'; my ($self, $id) = @$self_id; + return if $type eq 'commit'; + progress($self, "$hex $type #$id") if ($self->{-opt}->{verbose}//0) > 1; my $len = $self->{xdb}->get_doclength($id); - progress($self, "$hex $type (doclength=$len)"); - ++$self->{pruned}; - $self->{xdb}->delete_document($id); + $TO_PRUNE{$id} = $id; - # all math around batch_bytes calculation is pretty fuzzy, + # all math around TXN_BYTES calculation is pretty fuzzy, # but need a way to regularly flush output to avoid OOM, # so assume the average term + position overhead is the # answer to everything: 42 - return if ($self->{batch_bytes} -= ($len * 42)) > 0; - cidx_ckpoint($self, "[$self->{shard}] $self->{pruned}"); - $self->{batch_bytes} = $self->{-opt}->{batch_size} // - $PublicInbox::SearchIdx::BATCH_BYTES; -} - -sub shard_prune { # via wq_io_do - my ($self, $n, $git_dir) = @_; - my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; - my $git = PublicInbox::Git->new($git_dir); # TMP_GIT copy - $self->begin_txn_lazy; - my $xdb = $self->{xdb}; - my $cur = $xdb->postlist_begin('Tc'); - my $end = $xdb->postlist_end('Tc'); - my ($id, @cmt, $oid); - local $self->{batch_bytes} = $self->{-opt}->{batch_size} // - $PublicInbox::SearchIdx::BATCH_BYTES; - local $self->{pruned} = 0; - for (; $cur != $end && !$DO_QUIT; $cur++) { - @cmt = xap_terms('Q', $xdb, $id = $cur->get_docid); - scalar(@cmt) == 1 or - warn "BUG? shard[$n] #$id has multiple commits: @cmt"; - for $oid (@cmt) { - $git->check_async($oid, \&prune_cb, [ $self, $id ]); - } + cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0; +} + +sub prune_git_dir ($$$) { + my ($self, $id, $doc) = @_; + my @P = xap_terms('P', $doc); + scalar(@P) == 1 or warn +"BUG? shard[$self->{shard}] #$id has zero or multiple paths: @P"; + for my $P (@P) { + next if exists($ACTIVE_GIT_DIR{$P}) && -d $P; + $TO_PRUNE{$id} = $id; + progress($self, "$P gone #$id"); + my $len = $self->{xdb}->get_doclength($id); + cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0; } - $git->async_wait_all; - for my $d ($self->all_terms('P')) { # GIT_DIR paths - last if $DO_QUIT; - next if -d $d; - for $id (docids_by_postlist($self, 'P'.$d)) { - progress($self, "$d gone #$id"); - $xdb->delete_document($id); - } - } - $self->commit_txn_lazy; - $self->{pruned} and - progress($self, "[$n] pruned $self->{pruned} commits"); - send($op_p, "shard_done $n", MSG_EOR); } -sub do_prune ($) { +sub event_step { # may be requeued via DS my ($self) = @_; - my $consumers = {}; - my $git_dir = $TMP_GIT->{git_dir}; - my $n = 0; - local $self->{-shard_ok} = {}; - for my $s (@IDX_SHARDS) { - my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $self ]; - $s->wq_io_do('shard_prune', [ $p->{op_p} ], $n, $git_dir); - $consumers->{$n++} = $c; + my $PRUNE_BATCH = 1000; + $TXN_BYTES = $BATCH_BYTES; + for (; --$PRUNE_BATCH && !$DO_QUIT && $PRUNE_CUR <= $PRUNE_MAX; + $PRUNE_CUR++) { + my $doc = $self->get_doc($PRUNE_CUR) // next; + my @cmt = xap_terms('Q', $doc); + if (scalar(@cmt) == 0) { + prune_git_dir($self, $PRUNE_CUR, $doc); + } else { + scalar(@cmt) == 1 or warn +"BUG? shard[$self->{shard}] #$PRUNE_CUR has multiple commits: @cmt"; + for my $o (@cmt) { + $TMP_GIT->check_async($o, \&prune_cb, + [$self, $PRUNE_CUR]) + } + } } - wait_consumers($self, $TMP_GIT, $consumers); + $TMP_GIT->async_wait_all; + cidx_ckpoint($self); + return PublicInbox::DS::requeue($self) if $PRUNE_CUR <= $PRUNE_MAX; + send($PRUNE_OP_P, 'prune_done', MSG_EOR); + $TMP_GIT->cleanup; + $TMP_GIT = $PRUNE_OP_P = $PRUNE_CUR = $PRUNE_MAX = undef; + %ACTIVE_GIT_DIR = (); +} + +sub prune_start { # via wq_io_do in IDX_SHARDS + my ($self, $git_dir, @active_git_dir) = @_; + $PRUNE_CUR = 1; + $PRUNE_OP_P = delete $self->{0} // die 'BUG: no {0} op_p'; + %ACTIVE_GIT_DIR = map { $_ => undef } @active_git_dir; + $TMP_GIT = PublicInbox::Git->new($git_dir); # TMP_GIT copy + $self->begin_txn_lazy; + $PRUNE_MAX = $self->{xdb}->get_lastdocid // 1; + event_step($self); } sub shards_active { # post_loop_do return if $DO_QUIT; - scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); + scalar(grep { $_->{-cidx_quit} } (@IDX_SHARDS)); } # signal handlers -sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS } +sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) } sub parent_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->(); @@ -704,7 +729,6 @@ sub parent_quit { sub init_tmp_git_dir ($) { my ($self) = @_; - return unless $self->{-opt}->{prune}; require File::Temp; require PublicInbox::Import; my $tmp = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); @@ -747,6 +771,18 @@ sub prep_umask ($) { undef; } +sub start_prune ($) { + my ($self) = @_; + init_tmp_git_dir($self); + my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE); + for my $s (@IDX_SHARDS) { + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{prune_done} = [ $s ]; + $s->wq_io_do('prune_start', [ $p->{op_p} ], + $TMP_GIT->{git_dir}, @active_git_dir) + } +} + sub cidx_run { # main entry point my ($self) = @_; my $restore_umask = prep_umask($self); @@ -756,7 +792,10 @@ sub cidx_run { # main entry point my $restore = PublicInbox::OnDestroy->new($$, \&PublicInbox::DS::sig_setmask, $SIGSET); local $LIVE = {}; - local ($DO_QUIT, $TMP_GIT, $REINDEX); + local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, + @PRUNE_DONE); + local $BATCH_BYTES = $self->{-opt}->{batch_size} // + $PublicInbox::SearchIdx::BATCH_BYTES; local @IDX_SHARDS = cidx_init($self); local $self->{current_info} = ''; local $MY_SIG = { @@ -796,13 +835,13 @@ sub cidx_run { # main entry point $_ =~ /$re/ ? (warn("# excluding $_\n"), 0) : 1; } @{$self->{git_dirs}}; } - local $self->{nchange} = 0; + local $NCHANGE = 0; local $LIVE_JOBS = $self->{-opt}->{jobs} || PublicInbox::IPC::detect_nproc() || 2; - local @RDONLY_SHARDS = $self->xdb_shards_flat; - init_tmp_git_dir($self); - do_prune($self) if $self->{-opt}->{prune}; + local @RDONLY_XDB = $self->xdb_shards_flat; + start_prune($self) if $self->{-opt}->{prune}; scan_git_dirs($self) if $self->{-opt}->{scan} // 1; + await_prune if $self->{-opt}->{prune}; for my $s (@IDX_SHARDS) { $s->{-cidx_quit} = 1; @@ -811,10 +850,10 @@ sub cidx_run { # main entry point local @PublicInbox::DS::post_loop_do = (\&shards_active); PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active(); - $self->lock_release(!!$self->{nchange}); + $self->lock_release(!!$NCHANGE); } -sub ipc_atfork_child { +sub ipc_atfork_child { # @IDX_SHARDS my ($self) = @_; $self->SUPER::ipc_atfork_child; $SIG{USR1} = \&shard_usr1; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 3baeaa9c..b907772e 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -552,11 +552,17 @@ sub add_message { $smsg->{num}; } +sub get_doc ($$) { + my ($self, $docid) = @_; + eval { $self->{xdb}->get_document($docid) } // do { + die $@ if $@ && ref($@) !~ /\bDocNotFoundError\b/; + undef; + } +} + sub _get_doc ($$) { my ($self, $docid) = @_; - my $doc = eval { $self->{xdb}->get_document($docid) }; - $doc // do { - warn "E: $@\n" if $@; + get_doc($self, $docid) // do { warn "E: #$docid missing in Xapian\n"; undef; } |