about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-03-29 20:32:59 +0000
committerEric Wong <e@80x24.org>2023-03-29 23:06:45 +0000
commit9d3b0372bdacd961928f3eabf21616849c188b00 (patch)
tree2040e962474290c74996626b816749d97531c87d /lib/PublicInbox
parentc421c638a77a785903cc829b49131fae3e15a25c (diff)
downloadpublic-inbox-9d3b0372bdacd961928f3eabf21616849c188b00.tar.gz
We need to ensure we don't block indexing for too long while
pruning, since pruning coderepos seems more frequent and
necessary than inbox repos due to the prevalence of force
pushes with branches like `seen' (formerly `pu') in git.git.

Implement this via ->event_step and requeue mechanisms of DS so
we periodically flush our work and let indexing resume.

I originally wanted to implement this as a dedicated group
of workers, but the XS Search::Xapian bug[1] workaround
to handle uncaught C++ exceptions was expensive and complex
compared to the evented mechanism.

[1] https://lists.xapian.org/pipermail/xapian-discuss/2023-March/009967.html
   <20230327114604.M803690@dcvr>
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/CodeSearchIdx.pm237
-rw-r--r--lib/PublicInbox/SearchIdx.pm12
2 files changed, 147 insertions, 102 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 9e70087e..035fab3e 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -39,12 +39,22 @@ our (
         $MY_SIG, # like %SIG
         $SIGSET,
         $TXN_BYTES, # number of bytes in current shard transaction
+        $BATCH_BYTES,
         $DO_QUIT, # signal number
-        @RDONLY_SHARDS, # Xapian::Database
+        @RDONLY_XDB, # Xapian::Database
         @IDX_SHARDS, # clones of self
         $MAX_SIZE,
         $TMP_GIT, # PublicInbox::Git object for --prune
         $REINDEX, # PublicInbox::SharedKV
+        @GIT_DIR_GONE, # [ git_dir1, git_dir2 ]
+        %TO_PRUNE, # (docid => docid) mapping (hash in case of retry_reopen)
+        $PRUNE_CUR, # per-shard document ID
+        $PRUNE_MAX, # per-shard document ID to stop at
+        $PRUNE_OP_P, # prune_done() notification socket
+        $PRUNE_NR, # total number pruned
+        @PRUNE_DONE, # marks off prune completions
+        $NCHANGE, # current number of changes
+        %ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune
 );
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -137,7 +147,7 @@ sub store_repo { # wq_do - returns docid
         my $xdb = $self->{xdb};
         for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
         if (defined $repo->{docid}) {
-                my $doc = $xdb->get_document($repo->{docid}) //
+                my $doc = $self->get_doc($repo->{docid}) //
                         die "$repo->{git_dir} doc #$repo->{docid} gone";
                 add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
                 my %new = map { $_ => undef } @{$repo->{roots}};
@@ -160,12 +170,21 @@ sub store_repo { # wq_do - returns docid
         }
 }
 
-sub cidx_ckpoint ($$) {
+sub cidx_ckpoint ($;$) {
         my ($self, $msg) = @_;
-        progress($self, $msg);
+        progress($self, $msg) if defined($msg);
+        $TXN_BYTES = $BATCH_BYTES; # reset
+        if (my @to_prune = values(%TO_PRUNE)) {
+                %TO_PRUNE = ();
+                $PRUNE_NR += scalar(@to_prune);
+                progress($self,
+                  "prune [$self->{shard}] $PRUNE_NR ($PRUNE_CUR/$PRUNE_MAX)");
+                $self->begin_txn_lazy;
+                $self->{xdb}->delete_document($_) for @to_prune;
+        }
         return if $PublicInbox::Search::X{CLOEXEC_UNSET};
-        $self->{xdb}->commit_transaction;
-        $self->{xdb}->begin_transaction;
+        $self->commit_txn_lazy;
+        $self->begin_txn_lazy;
 }
 
 sub truncate_cmt ($$) {
@@ -198,17 +217,15 @@ EOM
 }
 
 # sharded reader for `git log --pretty=format: --stdin'
-sub shard_index { # via wq_io_do
+sub shard_index { # via wq_io_do in IDX_SHARDS
         my ($self, $git, $n, $roots) = @_;
         local $self->{current_info} = "$git->{git_dir} [$n]";
         local $self->{roots} = $roots;
         my $in = delete($self->{0}) // die 'BUG: no {0} input';
         my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
-        my $batch_bytes = $self->{-opt}->{batch_size} //
-                                $PublicInbox::SearchIdx::BATCH_BYTES;
         local $MAX_SIZE = $self->{-opt}->{max_size};
         # local-ized in parent before fork
-        $TXN_BYTES = $batch_bytes;
+        $TXN_BYTES = $BATCH_BYTES;
         local $self->{git} = $git; # for patchid
         return if $DO_QUIT;
         my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
@@ -233,17 +250,13 @@ sub shard_index { # via wq_io_do
                 } else {
                         @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
                 }
-                $TXN_BYTES -= $len;
-                if ($TXN_BYTES <= 0) {
+                if (($TXN_BYTES -= $len) <= 0) {
                         cidx_ckpoint($self, "[$n] $nr");
-                        $TXN_BYTES = $batch_bytes - $len;
+                        $TXN_BYTES -= $len; # len may be huge, >TXN_BYTES;
                 }
                 update_commit($self, $cmt);
                 ++$nr;
-                if ($TXN_BYTES <= 0) {
-                        cidx_ckpoint($self, "[$n] $nr");
-                        $TXN_BYTES = $batch_bytes;
-                }
+                cidx_ckpoint($self, "[$n] $nr") if $TXN_BYTES <= 0;
                 $/ = $FS;
         }
         close($rd);
@@ -261,6 +274,21 @@ sub shard_done { # called via PktOp on shard_index completion
         $self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok});
 }
 
+sub prune_done { # called via PktOp->event_step completion
+        my ($shard) = @_;
+        $PRUNE_DONE[$shard->{shard}] = 1;
+}
+
+sub prune_busy {
+        return if $DO_QUIT;
+        grep(defined, @PRUNE_DONE) != @IDX_SHARDS;
+}
+
+sub await_prune () {
+        local @PublicInbox::DS::post_loop_do = (\&prune_busy);
+        PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if prune_busy();
+}
+
 sub seen ($$) {
         my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
         for (1..100) {
@@ -390,7 +418,7 @@ sub prep_repo ($$) {
         my $shard = bless { %$self, shard => $n }, ref($self);
         $repo->{shard_n} = $n;
         delete @$shard{qw(lockfh lock_path)};
-        local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef";
+        local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef";
         $shard->retry_reopen(\&check_existing, $self, $git);
 }
 
@@ -398,7 +426,7 @@ sub check_existing { # retry_reopen callback
         my ($shard, $self, $git) = @_;
         my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir});
         my $docid = shift(@docids) // return get_roots($self, $git);
-        my $doc = $shard->{xdb}->get_document($docid) //
+        my $doc = $shard->get_doc($docid) //
                         die "BUG: no #$docid ($git->{git_dir})";
         my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data;
         if ($old_fp eq $git->{-repo}->{fp}) { # no change
@@ -418,24 +446,24 @@ sub partition_refs ($$$) {
         sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
         my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
         close $refs or die "close: $!";
-        my ($seen, $nchange) = (0, 0);
+        my $seen = 0;
         my @shard_in = map {
                 $_->reopen;
                 open my $fh, '+>', undef or die "open: $!";
                 $fh;
-        } @RDONLY_SHARDS;
+        } @RDONLY_XDB;
 
         while (defined(my $cmt = <$rfh>)) {
                 chomp $cmt;
-                my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
+                my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_XDB);
                 if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) {
                         say { $shard_in[$n] } $cmt or die "say: $!";
-                        ++$nchange;
-                } elsif (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
+                        ++$NCHANGE;
+                } elsif (seen($RDONLY_XDB[$n], 'Q'.$cmt)) {
                         last if ++$seen > $SEEN_MAX;
                 } else {
                         say { $shard_in[$n] } $cmt or die "say: $!";
-                        ++$nchange;
+                        ++$NCHANGE;
                         $seen = 0;
                 }
                 if ($DO_QUIT) {
@@ -446,8 +474,7 @@ sub partition_refs ($$$) {
         close($rfh);
         return () if $DO_QUIT;
         if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
-                $self->{nchange} += $nchange;
-                progress($self, "$git->{git_dir}: $nchange commits");
+                progress($self, "$git->{git_dir}: $NCHANGE commits");
                 for my $fh (@shard_in) {
                         $fh->flush or die "flush: $!";
                         sysseek($fh, 0, SEEK_SET) or die "seek: $!";
@@ -548,25 +575,25 @@ sub git { $_[0]->{git} }
 
 sub load_existing ($) { # for -u/--update
         my ($self) = @_;
-        my $dirs = $self->{git_dirs} // [];
+        my $dirs = $self->{git_dirs} //= [];
         if ($self->{-opt}->{update} || $self->{-opt}->{prune}) {
                 local $self->{xdb};
                 $self->xdb or
                         die "E: $self->{cidx_dir} non-existent for --update\n";
-                my @missing;
                 my @cur = grep {
                         if (-e $_) {
                                 1;
                         } else {
-                                push @missing, $_;
+                                push @GIT_DIR_GONE, $_;
                                 undef;
                         }
                 } $self->all_terms('P');
-                @missing = () if $self->{-opt}->{prune};
-                @missing and warn "W: the following repos no longer exist:\n",
-                                (map { "W:\t$_\n" } @missing),
+                if (@GIT_DIR_GONE && !$self->{-opt}->{prune}) {
+                        warn "W: the following repos no longer exist:\n",
+                                (map { "W:\t$_\n" } @GIT_DIR_GONE),
                                 "W: use --prune to remove them from ",
                                 $self->{cidx_dir}, "\n";
+                }
                 push @$dirs, @cur;
         }
         my %uniq; # List::Util::uniq requires Perl 5.26+
@@ -586,13 +613,12 @@ sub cidx_init ($) {
         }
         $self->lock_acquire;
         my @shards;
-        local $TXN_BYTES;
         for my $n (0..($self->{nshard} - 1)) {
                 my $shard = bless { %$self, shard => $n }, ref($self);
                 delete @$shard{qw(lockfh lock_path)};
                 $shard->idx_acquire;
                 $shard->idx_release;
-                $shard->wq_workers_start("shard[$n]", 1, $SIGSET, {
+                $shard->wq_workers_start("cidx shard[$n]", 1, $SIGSET, {
                         siblings => \@shards, # for ipc_atfork_child
                 }, \&shard_done_wait, $self);
                 push @shards, $shard;
@@ -621,80 +647,79 @@ sub scan_git_dirs ($) {
 
 sub prune_cb { # git->check_async callback
         my ($hex, $type, undef, $self_id) = @_;
-        return if $type eq 'commit';
         my ($self, $id) = @$self_id;
+        return if $type eq 'commit';
+        progress($self, "$hex $type #$id") if ($self->{-opt}->{verbose}//0) > 1;
         my $len = $self->{xdb}->get_doclength($id);
-        progress($self, "$hex $type (doclength=$len)");
-        ++$self->{pruned};
-        $self->{xdb}->delete_document($id);
+        $TO_PRUNE{$id} = $id;
 
-        # all math around batch_bytes calculation is pretty fuzzy,
+        # all math around TXN_BYTES calculation is pretty fuzzy,
         # but need a way to regularly flush output to avoid OOM,
         # so assume the average term + position overhead is the
         # answer to everything: 42
-        return if ($self->{batch_bytes} -= ($len * 42)) > 0;
-        cidx_ckpoint($self, "[$self->{shard}] $self->{pruned}");
-        $self->{batch_bytes} = $self->{-opt}->{batch_size} //
-                        $PublicInbox::SearchIdx::BATCH_BYTES;
-}
-
-sub shard_prune { # via wq_io_do
-        my ($self, $n, $git_dir) = @_;
-        my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
-        my $git = PublicInbox::Git->new($git_dir); # TMP_GIT copy
-        $self->begin_txn_lazy;
-        my $xdb = $self->{xdb};
-        my $cur = $xdb->postlist_begin('Tc');
-        my $end = $xdb->postlist_end('Tc');
-        my ($id, @cmt, $oid);
-        local $self->{batch_bytes} = $self->{-opt}->{batch_size} //
-                                $PublicInbox::SearchIdx::BATCH_BYTES;
-        local $self->{pruned} = 0;
-        for (; $cur != $end && !$DO_QUIT; $cur++) {
-                @cmt = xap_terms('Q', $xdb, $id = $cur->get_docid);
-                scalar(@cmt) == 1 or
-                        warn "BUG? shard[$n] #$id has multiple commits: @cmt";
-                for $oid (@cmt) {
-                        $git->check_async($oid, \&prune_cb, [ $self, $id ]);
-                }
+        cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0;
+}
+
+sub prune_git_dir ($$$) {
+        my ($self, $id, $doc) = @_;
+        my @P = xap_terms('P', $doc);
+        scalar(@P) == 1 or warn
+"BUG? shard[$self->{shard}] #$id has zero or multiple paths: @P";
+        for my $P (@P) {
+                next if exists($ACTIVE_GIT_DIR{$P}) && -d $P;
+                $TO_PRUNE{$id} = $id;
+                progress($self, "$P gone #$id");
+                my $len = $self->{xdb}->get_doclength($id);
+                cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0;
         }
-        $git->async_wait_all;
-        for my $d ($self->all_terms('P')) { # GIT_DIR paths
-                last if $DO_QUIT;
-                next if -d $d;
-                for $id (docids_by_postlist($self, 'P'.$d)) {
-                        progress($self, "$d gone #$id");
-                        $xdb->delete_document($id);
-                }
-        }
-        $self->commit_txn_lazy;
-        $self->{pruned} and
-                progress($self, "[$n] pruned $self->{pruned} commits");
-        send($op_p, "shard_done $n", MSG_EOR);
 }
 
-sub do_prune ($) {
+sub event_step { # may be requeued via DS
         my ($self) = @_;
-        my $consumers = {};
-        my $git_dir = $TMP_GIT->{git_dir};
-        my $n = 0;
-        local $self->{-shard_ok} = {};
-        for my $s (@IDX_SHARDS) {
-                my ($c, $p) = PublicInbox::PktOp->pair;
-                $c->{ops}->{shard_done} = [ $self ];
-                $s->wq_io_do('shard_prune', [ $p->{op_p} ], $n, $git_dir);
-                $consumers->{$n++} = $c;
+        my $PRUNE_BATCH = 1000;
+        $TXN_BYTES = $BATCH_BYTES;
+        for (; --$PRUNE_BATCH && !$DO_QUIT && $PRUNE_CUR <= $PRUNE_MAX;
+                        $PRUNE_CUR++) {
+                my $doc = $self->get_doc($PRUNE_CUR) // next;
+                my @cmt = xap_terms('Q', $doc);
+                if (scalar(@cmt) == 0) {
+                        prune_git_dir($self, $PRUNE_CUR, $doc);
+                } else {
+                        scalar(@cmt) == 1 or warn
+"BUG? shard[$self->{shard}] #$PRUNE_CUR has multiple commits: @cmt";
+                        for my $o (@cmt) {
+                                $TMP_GIT->check_async($o, \&prune_cb,
+                                                        [$self, $PRUNE_CUR])
+                        }
+                }
         }
-        wait_consumers($self, $TMP_GIT, $consumers);
+        $TMP_GIT->async_wait_all;
+        cidx_ckpoint($self);
+        return PublicInbox::DS::requeue($self) if $PRUNE_CUR <= $PRUNE_MAX;
+        send($PRUNE_OP_P, 'prune_done', MSG_EOR);
+        $TMP_GIT->cleanup;
+        $TMP_GIT = $PRUNE_OP_P = $PRUNE_CUR = $PRUNE_MAX = undef;
+        %ACTIVE_GIT_DIR = ();
+}
+
+sub prune_start { # via wq_io_do in IDX_SHARDS
+        my ($self, $git_dir, @active_git_dir) = @_;
+        $PRUNE_CUR = 1;
+        $PRUNE_OP_P = delete $self->{0} // die 'BUG: no {0} op_p';
+        %ACTIVE_GIT_DIR = map { $_ => undef } @active_git_dir;
+        $TMP_GIT = PublicInbox::Git->new($git_dir); # TMP_GIT copy
+        $self->begin_txn_lazy;
+        $PRUNE_MAX = $self->{xdb}->get_lastdocid // 1;
+        event_step($self);
 }
 
 sub shards_active { # post_loop_do
         return if $DO_QUIT;
-        scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
+        scalar(grep { $_->{-cidx_quit} } (@IDX_SHARDS));
 }
 
 # signal handlers
-sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS }
+sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
 
 sub parent_quit {
         $DO_QUIT = POSIX->can("SIG$_[0]")->();
@@ -704,7 +729,6 @@ sub parent_quit {
 
 sub init_tmp_git_dir ($) {
         my ($self) = @_;
-        return unless $self->{-opt}->{prune};
         require File::Temp;
         require PublicInbox::Import;
         my $tmp = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
@@ -747,6 +771,18 @@ sub prep_umask ($) {
                 undef;
 }
 
+sub start_prune ($) {
+        my ($self) = @_;
+        init_tmp_git_dir($self);
+        my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE);
+        for my $s (@IDX_SHARDS) {
+                my ($c, $p) = PublicInbox::PktOp->pair;
+                $c->{ops}->{prune_done} = [ $s ];
+                $s->wq_io_do('prune_start', [ $p->{op_p} ],
+                                $TMP_GIT->{git_dir}, @active_git_dir)
+        }
+}
+
 sub cidx_run { # main entry point
         my ($self) = @_;
         my $restore_umask = prep_umask($self);
@@ -756,7 +792,10 @@ sub cidx_run { # main entry point
         my $restore = PublicInbox::OnDestroy->new($$,
                 \&PublicInbox::DS::sig_setmask, $SIGSET);
         local $LIVE = {};
-        local ($DO_QUIT, $TMP_GIT, $REINDEX);
+        local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE,
+                @PRUNE_DONE);
+        local $BATCH_BYTES = $self->{-opt}->{batch_size} //
+                                $PublicInbox::SearchIdx::BATCH_BYTES;
         local @IDX_SHARDS = cidx_init($self);
         local $self->{current_info} = '';
         local $MY_SIG = {
@@ -796,13 +835,13 @@ sub cidx_run { # main entry point
                         $_ =~ /$re/ ? (warn("# excluding $_\n"), 0) : 1;
                 } @{$self->{git_dirs}};
         }
-        local $self->{nchange} = 0;
+        local $NCHANGE = 0;
         local $LIVE_JOBS = $self->{-opt}->{jobs} ||
                         PublicInbox::IPC::detect_nproc() || 2;
-        local @RDONLY_SHARDS = $self->xdb_shards_flat;
-        init_tmp_git_dir($self);
-        do_prune($self) if $self->{-opt}->{prune};
+        local @RDONLY_XDB = $self->xdb_shards_flat;
+        start_prune($self) if $self->{-opt}->{prune};
         scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+        await_prune if $self->{-opt}->{prune};
 
         for my $s (@IDX_SHARDS) {
                 $s->{-cidx_quit} = 1;
@@ -811,10 +850,10 @@ sub cidx_run { # main entry point
 
         local @PublicInbox::DS::post_loop_do = (\&shards_active);
         PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
-        $self->lock_release(!!$self->{nchange});
+        $self->lock_release(!!$NCHANGE);
 }
 
-sub ipc_atfork_child {
+sub ipc_atfork_child { # @IDX_SHARDS
         my ($self) = @_;
         $self->SUPER::ipc_atfork_child;
         $SIG{USR1} = \&shard_usr1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 3baeaa9c..b907772e 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -552,11 +552,17 @@ sub add_message {
         $smsg->{num};
 }
 
+sub get_doc ($$) {
+        my ($self, $docid) = @_;
+        eval { $self->{xdb}->get_document($docid) } // do {
+                die $@ if $@ && ref($@) !~ /\bDocNotFoundError\b/;
+                undef;
+        }
+}
+
 sub _get_doc ($$) {
         my ($self, $docid) = @_;
-        my $doc = eval { $self->{xdb}->get_document($docid) };
-        $doc // do {
-                warn "E: $@\n" if $@;
+        get_doc($self, $docid) // do {
                 warn "E: #$docid missing in Xapian\n";
                 undef;
         }