user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@yhbt.net>
To: meta@public-inbox.org
Subject: [PATCH 15/20] searchidx: make v1 indexing closer to v2
Date: Fri, 24 Jul 2020 05:56:01 +0000	[thread overview]
Message-ID: <20200724055606.27332-16-e@yhbt.net> (raw)
In-Reply-To: <20200724055606.27332-1-e@yhbt.net>

We'll switch to using IdxStack here to ensure we get repeatable
results and ascending THREADIDs according to git chronology.
This means we'll need a two-pass reindex to index existing
messages before indexing new messages.

Since we no longer have a long-lived git-log process, we don't
have to worry about old Xapian referencing the git-log pipe
w/o FD_CLOEXEC, either.
---
 lib/PublicInbox/SearchIdx.pm | 253 ++++++++++++++++-------------------
 t/v1reindex.t                |   2 +-
 2 files changed, 113 insertions(+), 142 deletions(-)

diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 89c716793..c57a7e164 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -14,6 +14,7 @@ use PublicInbox::Eml;
 use PublicInbox::InboxWritable;
 use PublicInbox::MID qw(mid_mime mids_for_index mids);
 use PublicInbox::MsgIter;
+use PublicInbox::IdxStack;
 use Carp qw(croak);
 use POSIX qw(strftime);
 use PublicInbox::OverIdx;
@@ -27,6 +28,10 @@ our $BATCH_BYTES = defined($ENV{XAPIAN_FLUSH_THRESHOLD}) ?
 use constant DEBUG => !!$ENV{DEBUG};
 
 my $xapianlevels = qr/\A(?:full|medium)\z/;
+my $hex = '[a-f0-9]';
+my $OID = $hex .'{40,}';
+my $addmsg = qr!^:000000 100644 \S+ ($OID) A\t${hex}{2}/${hex}{38}$!;
+my $delmsg = qr!^:100644 000000 ($OID) \S+ D\t${hex}{2}/${hex}{38}$!;
 
 sub new {
 	my ($class, $ibx, $creat, $shard) = @_;
@@ -385,7 +390,7 @@ sub add_message {
 	$smsg->{mid} //= $mids->[0]; # v1 compatibility
 	$smsg->{num} //= do { # v1
 		_msgmap_init($self);
-		index_mm($self, $mime);
+		index_mm($self, $mime, $smsg->{blob}, $sync);
 	};
 
 	# v1 and tests only:
@@ -477,34 +482,20 @@ sub unindex_eml {
 }
 
 sub index_mm {
-	my ($self, $mime) = @_;
-	my $mid = mid_mime($mime);
+	my ($self, $mime, $oid, $sync) = @_;
+	my $mids = mids($mime);
 	my $mm = $self->{mm};
-	my $num;
-
-	if (defined $self->{regen_down}) {
-		$num = $mm->num_for($mid) and return $num;
-
-		while (($num = $self->{regen_down}--) > 0) {
-			if ($mm->mid_set($num, $mid) != 0) {
-				return $num;
-			}
-		}
-	} elsif (defined $self->{regen_up}) {
-		$num = $mm->num_for($mid) and return $num;
-
-		# this is to fixup old bugs due to add-remove-add
-		while (($num = ++$self->{regen_up})) {
-			if ($mm->mid_set($num, $mid) != 0) {
-				return $num;
-			}
+	if ($sync->{reindex}) {
+		my $over = $self->{over};
+		for my $mid (@$mids) {
+			my ($num, undef) = $over->num_mid0_for_oid($oid, $mid);
+			return $num if defined $num;
 		}
+		$mm->num_for($mids->[0]) // $mm->mid_insert($mids->[0]);
+	} else {
+		# fallback to num_for since filters like RubyLang set the number
+		$mm->mid_insert($mids->[0]) // $mm->num_for($mids->[0]);
 	}
-
-	$num = $mm->mid_insert($mid) and return $num;
-
-	# fallback to num_for since filters like RubyLang set the number
-	$mm->num_for($mid);
 }
 
 sub unindex_mm {
@@ -532,8 +523,8 @@ sub index_both { # git->cat_async callback
 	my $smsg = bless { bytes => $size, blob => $oid }, 'PublicInbox::Smsg';
 	my $self = $sync->{sidx};
 	my $eml = PublicInbox::Eml->new($bref);
-	my $num = index_mm($self, $eml);
-	$smsg->{num} = $num;
+	$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
+		die "E: could not generate NNTP article number for $oid";
 	add_message($self, $eml, $smsg, $sync);
 }
 
@@ -549,6 +540,11 @@ sub index_sync {
 	my ($self, $opts) = @_;
 	delete $self->{lock_path} if $opts->{-skip_lock};
 	$self->{ibx}->with_umask(\&_index_sync, $self, $opts);
+	if ($opts->{reindex}) {
+		my %again = %$opts;
+		delete @again{qw(rethread reindex)};
+		index_sync($self, \%again);
+	}
 }
 
 sub too_big ($$) {
@@ -562,110 +558,87 @@ sub too_big ($$) {
 }
 
 # only for v1
-sub read_log {
-	my ($self, $log, $batch_cb) = @_;
-	my $hex = '[a-f0-9]';
-	my $h40 = $hex .'{40}';
-	my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!;
-	my $delmsg = qr!^:100644 000000 ($h40) \S+ D\t${hex}{2}/${hex}{38}$!;
+sub process_stack {
+	my ($self, $stk, $sync, $batch_cb) = @_;
 	my $git = $self->{ibx}->git;
-	my $latest;
 	my $max = $BATCH_BYTES;
-	local $/ = "\n";
-	my %D;
-	my $line;
-	my $newest;
 	my $nr = 0;
-	my $sync = { sidx => $self, nr => \$nr, max => \$max };
-	while (defined($line = <$log>)) {
-		if ($line =~ /$addmsg/o) {
-			my $blob = $1;
-			if (delete $D{$blob}) {
-				# make sure pending index writes are done
-				# before writing to ->mm
-				$git->cat_async_wait;
-
-				if (defined $self->{regen_down}) {
-					my $num = $self->{regen_down}--;
-					$self->{mm}->num_highwater($num);
-				}
-				next;
-			}
-			next if too_big($self, $blob);
-			$git->cat_async($blob, \&index_both, { %$sync });
+	$sync->{nr} = \$nr;
+	$sync->{max} = \$max;
+	$sync->{sidx} = $self;
+
+	if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
+		warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
+		for my $oid (@leftovers) {
+			$oid = unpack('H*', $oid);
+			$git->cat_async($oid, \&unindex_both, $self);
+		}
+	}
+	while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+		if ($f eq 'm') {
+			$sync->{autime} = $at;
+			$sync->{cotime} = $ct;
+			next if too_big($self, $oid);
+			$git->cat_async($oid, \&index_both, { %$sync });
 			if ($max <= 0) {
 				$git->cat_async_wait;
 				$max = $BATCH_BYTES;
-				$batch_cb->($nr, $latest);
+				$batch_cb->($nr);
 			}
-		} elsif ($line =~ /$delmsg/o) {
-			my $blob = $1;
-			$D{$blob} = 1 unless too_big($self, $blob);
-		} elsif ($line =~ /^commit ($h40)/o) {
-			$latest = $1;
-			$newest ||= $latest;
-		} elsif ($line =~ /^author .*? ([0-9]+) [\-\+][0-9]+$/) {
-			$sync->{autime} = $1;
-		} elsif ($line =~ /^committer .*? ([0-9]+) [\-\+][0-9]+$/) {
-			$sync->{cotime} = $1;
+		} elsif ($f eq 'd') {
+			$git->cat_async($oid, \&unindex_both, $self);
 		}
 	}
-	close($log) or die "git log failed: \$?=$?";
-	# get the leftovers
-	foreach my $blob (keys %D) {
-		$git->cat_async($blob, \&unindex_both, $self);
-	}
 	$git->cat_async_wait;
-	$batch_cb->($nr, $latest, $newest);
+	$batch_cb->($nr, $stk);
 }
 
-sub _git_log {
-	my ($self, $opts, $range) = @_;
+sub prepare_stack ($$$) {
+	my ($self, $sync, $range) = @_;
 	my $git = $self->{ibx}->git;
 
 	if (index($range, '..') < 0) {
 		# don't show annoying git errors to users who run -index
 		# on empty inboxes
 		$git->qx(qw(rev-parse -q --verify), "$range^0");
-		if ($?) {
-			open my $fh, '<', '/dev/null' or
-				die "failed to open /dev/null: $!\n";
-			return $fh;
-		}
+		return PublicInbox::IdxStack->new->read_prepare if $?;
 	}
+	my $D = $sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR
 
 	# Count the new files so they can be added newest to oldest
 	# and still have numbers increasing from oldest to newest
-	my $fcount = 0;
-	my $pr = $opts->{-progress};
-	$pr->("counting changes\n\t$range ... ") if $pr;
-	# can't use 'rev-list --count' if we use --diff-filter
-	my $fh = $git->popen(qw(log --pretty=tformat:%h
-			     --no-notes --no-color --no-renames
-			     --diff-filter=AM), $range);
-	++$fcount while <$fh>;
-	close $fh or die "git log failed: \$?=$?";
-	my $high = $self->{mm}->num_highwater;
-	$pr->("$fcount\n") if $pr; # continue previous line
-	$self->{ntodo} = $fcount;
-
-	if (index($range, '..') < 0) {
-		if ($high && $high == $fcount) {
-			# fix up old bugs in full indexes which caused messages to
-			# not appear in Msgmap
-			$self->{regen_up} = $high;
-		} else {
-			# normal regen is for for fresh data
-			$self->{regen_down} = $fcount;
-			$self->{regen_down} += $high unless $opts->{reindex};
+	my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
+				--no-notes --no-color --no-renames --no-abbrev),
+				$range);
+	my ($at, $ct, $stk);
+	while (<$fh>) {
+		if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
+			($at, $ct) = ($1 + 0, $2 + 0);
+			$stk //= PublicInbox::IdxStack->new($3);
+		} elsif (/$delmsg/) {
+			my $oid = $1;
+			if ($D) { # reindex case
+				$D->{pack('H*', $oid)}++;
+			} else { # non-reindex case:
+				$stk->push_rec('d', $at, $ct, $oid);
+			}
+		} elsif (/$addmsg/) {
+			my $oid = $1;
+			if ($D) {
+				my $oid_bin = pack('H*', $oid);
+				my $nr = --$D->{$oid_bin};
+				delete($D->{$oid_bin}) if $nr <= 0;
+
+				# nr < 0 (-1) means it never existed
+				$stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
+			} else {
+				$stk->push_rec('m', $at, $ct, $oid);
+			}
 		}
-	} else {
-		# Give oldest messages the smallest numbers
-		$self->{regen_down} = $high + $fcount;
 	}
-
-	$git->popen(qw/log --pretty=raw --no-notes --no-color --no-renames
-				--raw -r --no-abbrev/, $range);
+	close $fh or die "git log failed: \$?=$?";
+	$stk //= PublicInbox::IdxStack->new;
+	$stk->read_prepare;
 }
 
 # --is-ancestor requires git 1.8.0+
@@ -717,45 +690,43 @@ sub reindex_from ($$) {
 sub _index_sync {
 	my ($self, $opts) = @_;
 	my $tip = $opts->{ref} || 'HEAD';
-	my ($last_commit, $lx, $xlog);
 	my $git = $self->{ibx}->git;
 	$git->batch_prepare;
 	my $pr = $opts->{-progress};
-
+	my $sync = { reindex => $opts->{reindex} };
 	my $xdb = $self->begin_txn_lazy;
 	$self->{over}->rethread_prepare($opts);
 	my $mm = _msgmap_init($self);
-	do {
-		$xlog = undef; # stop previous git-log via SIGPIPE
-		$last_commit = _last_x_commit($self, $mm);
-		$lx = reindex_from($opts->{reindex}, $last_commit);
-
-		$self->{over}->rollback_lazy;
-		$self->{over}->disconnect;
-		$git->cleanup;
-		delete $self->{txn};
-		$xdb->cancel_transaction if $xdb;
-		$xdb = idx_release($self);
-
-		# ensure we leak no FDs to "git log" with Xapian <= 1.2
-		my $range = $lx eq '' ? $tip : "$lx..$tip";
-		$xlog = _git_log($self, $opts, $range);
-
-		$xdb = $self->begin_txn_lazy;
-	} while (_last_x_commit($self, $mm) ne $last_commit);
+	if ($sync->{reindex}) {
+		my $last = $mm->last_commit;
+		if ($last) {
+			$tip = $last;
+		} else {
+			# somebody just blindly added --reindex when indexing
+			# for the first time, allow it:
+			undef $sync->{reindex};
+		}
+	}
+	my $last_commit = _last_x_commit($self, $mm);
+	my $lx = reindex_from($sync->{reindex}, $last_commit);
+	my $range = $lx eq '' ? $tip : "$lx..$tip";
+	$pr->("counting changes\n\t$range ... ") if $pr;
+	my $stk = prepare_stack($self, $sync, $range);
+	$sync->{ntodo} = $stk ? $stk->num_records : 0;
+	$pr->("$sync->{ntodo}\n") if $pr; # continue previous line
 
-	my $dbh = $mm->{dbh} if $mm;
+	my $dbh = $mm->{dbh};
 	my $batch_cb = sub {
-		my ($nr, $commit, $newest) = @_;
-		if ($dbh) {
-			if ($newest) {
-				my $cur = $mm->last_commit || '';
-				if (need_update($self, $cur, $newest)) {
-					$mm->last_commit($newest);
-				}
+		my ($nr, $stk) = @_;
+		# latest_cmt may be undef
+		my $newest = $stk ? $stk->{latest_cmt} : undef;
+		if ($newest) {
+			my $cur = $mm->last_commit || '';
+			if (need_update($self, $cur, $newest)) {
+				$mm->last_commit($newest);
 			}
-			$dbh->commit;
 		}
+		$dbh->commit;
 		if ($newest && need_xapian($self)) {
 			my $cur = $xdb->get_metadata('last_commit');
 			if (need_update($self, $cur, $newest)) {
@@ -768,15 +739,15 @@ sub _index_sync {
 		$git->cleanup;
 		$xdb = idx_release($self, $nr);
 		# let another process do some work...
-		$pr->("indexed $nr/$self->{ntodo}\n") if $pr && $nr;
-		if (!$newest) { # more to come
+		$pr->("indexed $nr/$sync->{ntodo}\n") if $pr && $nr;
+		if (!$stk) { # more to come
 			$xdb = $self->begin_txn_lazy;
-			$dbh->begin_work if $dbh;
+			$dbh->begin_work;
 		}
 	};
 
 	$dbh->begin_work;
-	read_log($self, $xlog, $batch_cb);
+	process_stack($self, $stk, $sync, $batch_cb);
 }
 
 sub DESTROY {
diff --git a/t/v1reindex.t b/t/v1reindex.t
index 8cb751881..d70ed4b93 100644
--- a/t/v1reindex.t
+++ b/t/v1reindex.t
@@ -221,7 +221,7 @@ ok(!-d $xap, 'Xapian directories removed again');
 	$config{indexlevel} = 'medium';
 	my $ibx = PublicInbox::Inbox->new(\%config);
 	my $rw = PublicInbox::SearchIdx->new($ibx, 1);
-	eval { $rw->index_sync };
+	eval { $rw->index_sync({reindex => 1}) };
 	is($@, '', 'no error from indexing');
 	is_deeply(\@warn, [], 'no warnings');
 	my $mset = $ibx->search->reopen->query('hello world', {mset=>1});

  parent reply	other threads:[~2020-07-24  5:56 UTC|newest]

Thread overview: 21+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-07-24  5:55 [PATCH 00/20] indexing changes and new features Eric Wong
2020-07-24  5:55 ` [PATCH 01/20] index: support --rethread switch to fix old indices Eric Wong
2020-07-24  5:55 ` [PATCH 02/20] v2: index forwards (via `git log --reverse') Eric Wong
2020-07-24  5:55 ` [PATCH 03/20] v2writable: introduce idx_stack Eric Wong
2020-07-24  5:55 ` [PATCH 04/20] v2writable: index_sync: reduce fill_alternates calls Eric Wong
2020-07-24  5:55 ` [PATCH 05/20] v2writable: move {autime} and {cotime} into $sync state Eric Wong
2020-07-24  5:55 ` [PATCH 06/20] v2writable: allow >= 40 byte git object IDs Eric Wong
2020-07-24  5:55 ` [PATCH 07/20] v2writable: drop "EPOCH.git indexing $RANGE" progress Eric Wong
2020-07-24  5:55 ` [PATCH 08/20] use consistent {ibx} field for writable code paths Eric Wong
2020-07-24  5:55 ` [PATCH 09/20] search: avoid copying {inboxdir} Eric Wong
2020-07-24  5:55 ` [PATCH 10/20] v2writable: use read-only PublicInbox::Git for cat_file Eric Wong
2020-07-24  5:55 ` [PATCH 11/20] v2writable: get rid of {reindex_pipe} field Eric Wong
2020-07-24  5:55 ` [PATCH 12/20] v2writable: clarify "epoch" comment Eric Wong
2020-07-24  5:55 ` [PATCH 13/20] xapcmd: set {from} properly for v1 inboxes Eric Wong
2020-07-24  5:56 ` [PATCH 14/20] searchidx: rename _xdb_{acquire,release} => idx_ Eric Wong
2020-07-24  5:56 ` Eric Wong [this message]
2020-07-24  5:56 ` [PATCH 16/20] index+xcpdb: support --no-sync flag Eric Wong
2020-07-24  5:56 ` [PATCH 17/20] v2writable: share log2stack code with v1 Eric Wong
2020-07-24  5:56 ` [PATCH 18/20] searchidx: support async git check Eric Wong
2020-07-24  5:56 ` [PATCH 19/20] searchidx: $batch_cb => v1_checkpoint Eric Wong
2020-07-24  5:56 ` [PATCH 20/20] v2writable: {unindexed} belongs in $sync state Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: http://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200724055606.27332-16-e@yhbt.net \
    --to=e@yhbt.net \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).