about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@yhbt.net>2020-07-24 05:56:01 +0000
committerEric Wong <e@yhbt.net>2020-07-25 20:48:18 +0000
commitde8e1586d732ae6c09a92588a8e4d442aedbff37 (patch)
treea0c441e9b3ca32aef414242f22d86207178dd82b
parent5ce7e26b1d7efc2fb1bb5310a756e7c873145246 (diff)
downloadpublic-inbox-de8e1586d732ae6c09a92588a8e4d442aedbff37.tar.gz
We'll switch to using IdxStack here to ensure we get repeatable
results and ascending THREADIDs according to git chronology.
This means we'll need a two-pass reindex to index existing
messages before indexing new messages.

Since we no longer have a long-lived git-log process, we don't
have to worry about old Xapian referencing the git-log pipe
w/o FD_CLOEXEC, either.
-rw-r--r--lib/PublicInbox/SearchIdx.pm253
-rw-r--r--t/v1reindex.t2
2 files changed, 113 insertions, 142 deletions
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 89c71679..c57a7e16 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -14,6 +14,7 @@ use PublicInbox::Eml;
 use PublicInbox::InboxWritable;
 use PublicInbox::MID qw(mid_mime mids_for_index mids);
 use PublicInbox::MsgIter;
+use PublicInbox::IdxStack;
 use Carp qw(croak);
 use POSIX qw(strftime);
 use PublicInbox::OverIdx;
@@ -27,6 +28,10 @@ our $BATCH_BYTES = defined($ENV{XAPIAN_FLUSH_THRESHOLD}) ?
 use constant DEBUG => !!$ENV{DEBUG};
 
 my $xapianlevels = qr/\A(?:full|medium)\z/;
+my $hex = '[a-f0-9]';
+my $OID = $hex .'{40,}';
+my $addmsg = qr!^:000000 100644 \S+ ($OID) A\t${hex}{2}/${hex}{38}$!;
+my $delmsg = qr!^:100644 000000 ($OID) \S+ D\t${hex}{2}/${hex}{38}$!;
 
 sub new {
         my ($class, $ibx, $creat, $shard) = @_;
@@ -385,7 +390,7 @@ sub add_message {
         $smsg->{mid} //= $mids->[0]; # v1 compatibility
         $smsg->{num} //= do { # v1
                 _msgmap_init($self);
-                index_mm($self, $mime);
+                index_mm($self, $mime, $smsg->{blob}, $sync);
         };
 
         # v1 and tests only:
@@ -477,34 +482,20 @@ sub unindex_eml {
 }
 
 sub index_mm {
-        my ($self, $mime) = @_;
-        my $mid = mid_mime($mime);
+        my ($self, $mime, $oid, $sync) = @_;
+        my $mids = mids($mime);
         my $mm = $self->{mm};
-        my $num;
-
-        if (defined $self->{regen_down}) {
-                $num = $mm->num_for($mid) and return $num;
-
-                while (($num = $self->{regen_down}--) > 0) {
-                        if ($mm->mid_set($num, $mid) != 0) {
-                                return $num;
-                        }
-                }
-        } elsif (defined $self->{regen_up}) {
-                $num = $mm->num_for($mid) and return $num;
-
-                # this is to fixup old bugs due to add-remove-add
-                while (($num = ++$self->{regen_up})) {
-                        if ($mm->mid_set($num, $mid) != 0) {
-                                return $num;
-                        }
+        if ($sync->{reindex}) {
+                my $over = $self->{over};
+                for my $mid (@$mids) {
+                        my ($num, undef) = $over->num_mid0_for_oid($oid, $mid);
+                        return $num if defined $num;
                 }
+                $mm->num_for($mids->[0]) // $mm->mid_insert($mids->[0]);
+        } else {
+                # fallback to num_for since filters like RubyLang set the number
+                $mm->mid_insert($mids->[0]) // $mm->num_for($mids->[0]);
         }
-
-        $num = $mm->mid_insert($mid) and return $num;
-
-        # fallback to num_for since filters like RubyLang set the number
-        $mm->num_for($mid);
 }
 
 sub unindex_mm {
@@ -532,8 +523,8 @@ sub index_both { # git->cat_async callback
         my $smsg = bless { bytes => $size, blob => $oid }, 'PublicInbox::Smsg';
         my $self = $sync->{sidx};
         my $eml = PublicInbox::Eml->new($bref);
-        my $num = index_mm($self, $eml);
-        $smsg->{num} = $num;
+        $smsg->{num} = index_mm($self, $eml, $oid, $sync) or
+                die "E: could not generate NNTP article number for $oid";
         add_message($self, $eml, $smsg, $sync);
 }
 
@@ -549,6 +540,11 @@ sub index_sync {
         my ($self, $opts) = @_;
         delete $self->{lock_path} if $opts->{-skip_lock};
         $self->{ibx}->with_umask(\&_index_sync, $self, $opts);
+        if ($opts->{reindex}) {
+                my %again = %$opts;
+                delete @again{qw(rethread reindex)};
+                index_sync($self, \%again);
+        }
 }
 
 sub too_big ($$) {
@@ -562,110 +558,87 @@ sub too_big ($$) {
 }
 
 # only for v1
-sub read_log {
-        my ($self, $log, $batch_cb) = @_;
-        my $hex = '[a-f0-9]';
-        my $h40 = $hex .'{40}';
-        my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!;
-        my $delmsg = qr!^:100644 000000 ($h40) \S+ D\t${hex}{2}/${hex}{38}$!;
+sub process_stack {
+        my ($self, $stk, $sync, $batch_cb) = @_;
         my $git = $self->{ibx}->git;
-        my $latest;
         my $max = $BATCH_BYTES;
-        local $/ = "\n";
-        my %D;
-        my $line;
-        my $newest;
         my $nr = 0;
-        my $sync = { sidx => $self, nr => \$nr, max => \$max };
-        while (defined($line = <$log>)) {
-                if ($line =~ /$addmsg/o) {
-                        my $blob = $1;
-                        if (delete $D{$blob}) {
-                                # make sure pending index writes are done
-                                # before writing to ->mm
-                                $git->cat_async_wait;
-
-                                if (defined $self->{regen_down}) {
-                                        my $num = $self->{regen_down}--;
-                                        $self->{mm}->num_highwater($num);
-                                }
-                                next;
-                        }
-                        next if too_big($self, $blob);
-                        $git->cat_async($blob, \&index_both, { %$sync });
+        $sync->{nr} = \$nr;
+        $sync->{max} = \$max;
+        $sync->{sidx} = $self;
+
+        if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
+                warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
+                for my $oid (@leftovers) {
+                        $oid = unpack('H*', $oid);
+                        $git->cat_async($oid, \&unindex_both, $self);
+                }
+        }
+        while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+                if ($f eq 'm') {
+                        $sync->{autime} = $at;
+                        $sync->{cotime} = $ct;
+                        next if too_big($self, $oid);
+                        $git->cat_async($oid, \&index_both, { %$sync });
                         if ($max <= 0) {
                                 $git->cat_async_wait;
                                 $max = $BATCH_BYTES;
-                                $batch_cb->($nr, $latest);
+                                $batch_cb->($nr);
                         }
-                } elsif ($line =~ /$delmsg/o) {
-                        my $blob = $1;
-                        $D{$blob} = 1 unless too_big($self, $blob);
-                } elsif ($line =~ /^commit ($h40)/o) {
-                        $latest = $1;
-                        $newest ||= $latest;
-                } elsif ($line =~ /^author .*? ([0-9]+) [\-\+][0-9]+$/) {
-                        $sync->{autime} = $1;
-                } elsif ($line =~ /^committer .*? ([0-9]+) [\-\+][0-9]+$/) {
-                        $sync->{cotime} = $1;
+                } elsif ($f eq 'd') {
+                        $git->cat_async($oid, \&unindex_both, $self);
                 }
         }
-        close($log) or die "git log failed: \$?=$?";
-        # get the leftovers
-        foreach my $blob (keys %D) {
-                $git->cat_async($blob, \&unindex_both, $self);
-        }
         $git->cat_async_wait;
-        $batch_cb->($nr, $latest, $newest);
+        $batch_cb->($nr, $stk);
 }
 
-sub _git_log {
-        my ($self, $opts, $range) = @_;
+sub prepare_stack ($$$) {
+        my ($self, $sync, $range) = @_;
         my $git = $self->{ibx}->git;
 
         if (index($range, '..') < 0) {
                 # don't show annoying git errors to users who run -index
                 # on empty inboxes
                 $git->qx(qw(rev-parse -q --verify), "$range^0");
-                if ($?) {
-                        open my $fh, '<', '/dev/null' or
-                                die "failed to open /dev/null: $!\n";
-                        return $fh;
-                }
+                return PublicInbox::IdxStack->new->read_prepare if $?;
         }
+        my $D = $sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR
 
         # Count the new files so they can be added newest to oldest
         # and still have numbers increasing from oldest to newest
-        my $fcount = 0;
-        my $pr = $opts->{-progress};
-        $pr->("counting changes\n\t$range ... ") if $pr;
-        # can't use 'rev-list --count' if we use --diff-filter
-        my $fh = $git->popen(qw(log --pretty=tformat:%h
-                             --no-notes --no-color --no-renames
-                             --diff-filter=AM), $range);
-        ++$fcount while <$fh>;
-        close $fh or die "git log failed: \$?=$?";
-        my $high = $self->{mm}->num_highwater;
-        $pr->("$fcount\n") if $pr; # continue previous line
-        $self->{ntodo} = $fcount;
-
-        if (index($range, '..') < 0) {
-                if ($high && $high == $fcount) {
-                        # fix up old bugs in full indexes which caused messages to
-                        # not appear in Msgmap
-                        $self->{regen_up} = $high;
-                } else {
-                        # normal regen is for for fresh data
-                        $self->{regen_down} = $fcount;
-                        $self->{regen_down} += $high unless $opts->{reindex};
+        my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
+                                --no-notes --no-color --no-renames --no-abbrev),
+                                $range);
+        my ($at, $ct, $stk);
+        while (<$fh>) {
+                if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
+                        ($at, $ct) = ($1 + 0, $2 + 0);
+                        $stk //= PublicInbox::IdxStack->new($3);
+                } elsif (/$delmsg/) {
+                        my $oid = $1;
+                        if ($D) { # reindex case
+                                $D->{pack('H*', $oid)}++;
+                        } else { # non-reindex case:
+                                $stk->push_rec('d', $at, $ct, $oid);
+                        }
+                } elsif (/$addmsg/) {
+                        my $oid = $1;
+                        if ($D) {
+                                my $oid_bin = pack('H*', $oid);
+                                my $nr = --$D->{$oid_bin};
+                                delete($D->{$oid_bin}) if $nr <= 0;
+
+                                # nr < 0 (-1) means it never existed
+                                $stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
+                        } else {
+                                $stk->push_rec('m', $at, $ct, $oid);
+                        }
                 }
-        } else {
-                # Give oldest messages the smallest numbers
-                $self->{regen_down} = $high + $fcount;
         }
-
-        $git->popen(qw/log --pretty=raw --no-notes --no-color --no-renames
-                                --raw -r --no-abbrev/, $range);
+        close $fh or die "git log failed: \$?=$?";
+        $stk //= PublicInbox::IdxStack->new;
+        $stk->read_prepare;
 }
 
 # --is-ancestor requires git 1.8.0+
@@ -717,45 +690,43 @@ sub reindex_from ($$) {
 sub _index_sync {
         my ($self, $opts) = @_;
         my $tip = $opts->{ref} || 'HEAD';
-        my ($last_commit, $lx, $xlog);
         my $git = $self->{ibx}->git;
         $git->batch_prepare;
         my $pr = $opts->{-progress};
-
+        my $sync = { reindex => $opts->{reindex} };
         my $xdb = $self->begin_txn_lazy;
         $self->{over}->rethread_prepare($opts);
         my $mm = _msgmap_init($self);
-        do {
-                $xlog = undef; # stop previous git-log via SIGPIPE
-                $last_commit = _last_x_commit($self, $mm);
-                $lx = reindex_from($opts->{reindex}, $last_commit);
-
-                $self->{over}->rollback_lazy;
-                $self->{over}->disconnect;
-                $git->cleanup;
-                delete $self->{txn};
-                $xdb->cancel_transaction if $xdb;
-                $xdb = idx_release($self);
-
-                # ensure we leak no FDs to "git log" with Xapian <= 1.2
-                my $range = $lx eq '' ? $tip : "$lx..$tip";
-                $xlog = _git_log($self, $opts, $range);
-
-                $xdb = $self->begin_txn_lazy;
-        } while (_last_x_commit($self, $mm) ne $last_commit);
+        if ($sync->{reindex}) {
+                my $last = $mm->last_commit;
+                if ($last) {
+                        $tip = $last;
+                } else {
+                        # somebody just blindly added --reindex when indexing
+                        # for the first time, allow it:
+                        undef $sync->{reindex};
+                }
+        }
+        my $last_commit = _last_x_commit($self, $mm);
+        my $lx = reindex_from($sync->{reindex}, $last_commit);
+        my $range = $lx eq '' ? $tip : "$lx..$tip";
+        $pr->("counting changes\n\t$range ... ") if $pr;
+        my $stk = prepare_stack($self, $sync, $range);
+        $sync->{ntodo} = $stk ? $stk->num_records : 0;
+        $pr->("$sync->{ntodo}\n") if $pr; # continue previous line
 
-        my $dbh = $mm->{dbh} if $mm;
+        my $dbh = $mm->{dbh};
         my $batch_cb = sub {
-                my ($nr, $commit, $newest) = @_;
-                if ($dbh) {
-                        if ($newest) {
-                                my $cur = $mm->last_commit || '';
-                                if (need_update($self, $cur, $newest)) {
-                                        $mm->last_commit($newest);
-                                }
+                my ($nr, $stk) = @_;
+                # latest_cmt may be undef
+                my $newest = $stk ? $stk->{latest_cmt} : undef;
+                if ($newest) {
+                        my $cur = $mm->last_commit || '';
+                        if (need_update($self, $cur, $newest)) {
+                                $mm->last_commit($newest);
                         }
-                        $dbh->commit;
                 }
+                $dbh->commit;
                 if ($newest && need_xapian($self)) {
                         my $cur = $xdb->get_metadata('last_commit');
                         if (need_update($self, $cur, $newest)) {
@@ -768,15 +739,15 @@ sub _index_sync {
                 $git->cleanup;
                 $xdb = idx_release($self, $nr);
                 # let another process do some work...
-                $pr->("indexed $nr/$self->{ntodo}\n") if $pr && $nr;
-                if (!$newest) { # more to come
+                $pr->("indexed $nr/$sync->{ntodo}\n") if $pr && $nr;
+                if (!$stk) { # more to come
                         $xdb = $self->begin_txn_lazy;
-                        $dbh->begin_work if $dbh;
+                        $dbh->begin_work;
                 }
         };
 
         $dbh->begin_work;
-        read_log($self, $xlog, $batch_cb);
+        process_stack($self, $stk, $sync, $batch_cb);
 }
 
 sub DESTROY {
diff --git a/t/v1reindex.t b/t/v1reindex.t
index 8cb75188..d70ed4b9 100644
--- a/t/v1reindex.t
+++ b/t/v1reindex.t
@@ -221,7 +221,7 @@ ok(!-d $xap, 'Xapian directories removed again');
         $config{indexlevel} = 'medium';
         my $ibx = PublicInbox::Inbox->new(\%config);
         my $rw = PublicInbox::SearchIdx->new($ibx, 1);
-        eval { $rw->index_sync };
+        eval { $rw->index_sync({reindex => 1}) };
         is($@, '', 'no error from indexing');
         is_deeply(\@warn, [], 'no warnings');
         my $mset = $ibx->search->reopen->query('hello world', {mset=>1});