about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-04-05 11:26:53 +0000
committerEric Wong <e@80x24.org>2023-04-05 20:12:12 +0000
commitcd115d4c99aa1134d3713d48101b9355423748e8 (patch)
tree53f1e207db8b7f60832d7744aec5e9326d4646c7
parentb9905a03c5b76f999301f30ab839cb78293d661d (diff)
downloadpublic-inbox-cd115d4c99aa1134d3713d48101b9355423748e8.tar.gz
`git log -p' can several seconds to generate its initial output.
SMP systems can be processing prunes during this delay, so let
DS do a one-shot notification for us while prune is running.  On
Linux, we'll also use the biggest pipe possible so git can do
more CPU-intensive work to generate diffs while our Perl
processes are indexing and likely hitting I/O wait.
-rw-r--r--MANIFEST1
-rw-r--r--lib/PublicInbox/CidxLogP.pm29
-rw-r--r--lib/PublicInbox/CodeSearchIdx.pm51
3 files changed, 63 insertions, 18 deletions
diff --git a/MANIFEST b/MANIFEST
index 3c421645..a0e64c6a 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -160,6 +160,7 @@ lib/PublicInbox/AdminEdit.pm
 lib/PublicInbox/AltId.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
+lib/PublicInbox/CidxLogP.pm
 lib/PublicInbox/CmdIPC4.pm
 lib/PublicInbox/CodeSearch.pm
 lib/PublicInbox/CodeSearchIdx.pm
diff --git a/lib/PublicInbox/CidxLogP.pm b/lib/PublicInbox/CidxLogP.pm
new file mode 100644
index 00000000..7877d5ac
--- /dev/null
+++ b/lib/PublicInbox/CidxLogP.pm
@@ -0,0 +1,29 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# Waits for initial `git log -p' output for PublicInbox::CodeSearchIdx.
+# The initial output from `git log -p' can take a while to generate,
+# CodeSearchIdx can process prune work while it's happening.  Once
+# `git log -p' starts generating output, it should be able to keep
+# up with Xapian indexing, so we still rely on blocking reads to simplify
+# cidx_read_log_p
+package PublicInbox::CidxLogP;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+
+sub new {
+        my ($cls, $rd, $cidx, $git, $roots) = @_;
+        my $self = bless { cidx => $cidx, git => $git, roots => $roots }, $cls;
+        fcntl($rd, 1031, 1048576) if $^O eq 'linux'; # fatter pipes
+        $self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
+}
+
+sub event_step {
+        my ($self) = @_;
+        my $rd = $self->{sock} // return warn('BUG?: no {sock}');
+        $self->close; # PublicInbox::DS::close, deferred, so $sock is usable
+        delete($self->{cidx})->cidx_read_log_p($self, $rd);
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 035fab3e..215e337f 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -30,6 +30,7 @@ use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config qw(glob2re);
 use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::OnDestroy;
+use PublicInbox::CidxLogP;
 use Socket qw(MSG_EOR);
 use Carp ();
 our (
@@ -216,20 +217,41 @@ EOM
         $len;
 }
 
-# sharded reader for `git log --pretty=format: --stdin'
+sub cidx_reap_log { # awaitpid cb
+        my ($pid, $self, $op_p) = @_;
+        if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+                                ($? & 127) == POSIX::SIGPIPE))) {
+                send($op_p, "shard_done $self->{shard}", MSG_EOR);
+        } else {
+                warn "E: git @LOG_STDIN: \$?=$?\n";
+                $self->{xdb}->cancel_transaction;
+        }
+}
+
 sub shard_index { # via wq_io_do in IDX_SHARDS
-        my ($self, $git, $n, $roots) = @_;
-        local $self->{current_info} = "$git->{git_dir} [$n]";
-        local $self->{roots} = $roots;
+        my ($self, $git, $roots) = @_;
+
         my $in = delete($self->{0}) // die 'BUG: no {0} input';
         my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
+        my ($rd, $pid) = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+        close $in or die "close: $!";
+        awaitpid($pid, \&cidx_reap_log, $self, $op_p);
+        PublicInbox::CidxLogP->new($rd, $self, $git, $roots);
+        # CidxLogP->event_step will call cidx_read_log_p once there's input
+}
+
+# sharded reader for `git log --pretty=format: --stdin'
+sub cidx_read_log_p {
+        my ($self, $log_p, $rd) = @_;
+        my $git = delete $log_p->{git} // die 'BUG: no {git}';
+        local $self->{current_info} = "$git->{git_dir} [$self->{shard}]";
+        local $self->{roots} = delete $log_p->{roots} // die 'BUG: no {roots}';
+
         local $MAX_SIZE = $self->{-opt}->{max_size};
         # local-ized in parent before fork
         $TXN_BYTES = $BATCH_BYTES;
         local $self->{git} = $git; # for patchid
         return if $DO_QUIT;
-        my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
-        close $in or die "close: $!";
         my $nr = 0;
 
         # a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
@@ -238,7 +260,7 @@ sub shard_index { # via wq_io_do in IDX_SHARDS
         my $len;
         my $cmt = {};
         local $/ = $FS;
-        my $buf = <$rd> // return close($rd); # leading $FS
+        my $buf = <$rd> // return; # leading $FS
         $buf eq $FS or die "BUG: not LF-NUL: $buf\n";
         $self->begin_txn_lazy;
         while (!$DO_QUIT && defined($buf = <$rd>)) {
@@ -251,22 +273,15 @@ sub shard_index { # via wq_io_do in IDX_SHARDS
                         @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
                 }
                 if (($TXN_BYTES -= $len) <= 0) {
-                        cidx_ckpoint($self, "[$n] $nr");
+                        cidx_ckpoint($self, "[$self->{shard}] $nr");
                         $TXN_BYTES -= $len; # len may be huge, >TXN_BYTES;
                 }
                 update_commit($self, $cmt);
                 ++$nr;
-                cidx_ckpoint($self, "[$n] $nr") if $TXN_BYTES <= 0;
+                cidx_ckpoint($self, "[$self->{shard}] $nr") if $TXN_BYTES <= 0;
                 $/ = $FS;
         }
-        close($rd);
-        if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
-                                ($? & 127) == POSIX::SIGPIPE))) {
-                send($op_p, "shard_done $n", MSG_EOR);
-        } else {
-                warn "E: git @LOG_STDIN: \$?=$?\n";
-                $self->{xdb}->cancel_transaction;
-        }
+        # return and wait for cidx_reap_log
 }
 
 sub shard_done { # called via PktOp on shard_index completion
@@ -537,7 +552,7 @@ sub index_repo { # cidx_await cb
                 $c->{ops}->{shard_done} = [ $self ];
                 $IDX_SHARDS[$n]->wq_io_do('shard_index',
                                         [ $shard_in[$n], $p->{op_p} ],
-                                        $git, $n, \@roots);
+                                        $git, \@roots);
                 $consumers->{$n} = $c;
         }
         @shard_in = ();