From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, LOTS_OF_MONEY shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 1B1521F919 for ; Fri, 24 Jul 2020 05:56:10 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 15/20] searchidx: make v1 indexing closer to v2 Date: Fri, 24 Jul 2020 05:56:01 +0000 Message-Id: <20200724055606.27332-16-e@yhbt.net> In-Reply-To: <20200724055606.27332-1-e@yhbt.net> References: <20200724055606.27332-1-e@yhbt.net> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We'll switch to using IdxStack here to ensure we get repeatable results and ascending THREADIDs according to git chronology. This means we'll need a two-pass reindex to index existing messages before indexing new messages. Since we no longer have a long-lived git-log process, we don't have to worry about old Xapian referencing the git-log pipe w/o FD_CLOEXEC, either. --- lib/PublicInbox/SearchIdx.pm | 253 ++++++++++++++++------------------- t/v1reindex.t | 2 +- 2 files changed, 113 insertions(+), 142 deletions(-) diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 89c716793..c57a7e164 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -14,6 +14,7 @@ use PublicInbox::Eml; use PublicInbox::InboxWritable; use PublicInbox::MID qw(mid_mime mids_for_index mids); use PublicInbox::MsgIter; +use PublicInbox::IdxStack; use Carp qw(croak); use POSIX qw(strftime); use PublicInbox::OverIdx; @@ -27,6 +28,10 @@ our $BATCH_BYTES = defined($ENV{XAPIAN_FLUSH_THRESHOLD}) ? use constant DEBUG => !!$ENV{DEBUG}; my $xapianlevels = qr/\A(?:full|medium)\z/; +my $hex = '[a-f0-9]'; +my $OID = $hex .'{40,}'; +my $addmsg = qr!^:000000 100644 \S+ ($OID) A\t${hex}{2}/${hex}{38}$!; +my $delmsg = qr!^:100644 000000 ($OID) \S+ D\t${hex}{2}/${hex}{38}$!; sub new { my ($class, $ibx, $creat, $shard) = @_; @@ -385,7 +390,7 @@ sub add_message { $smsg->{mid} //= $mids->[0]; # v1 compatibility $smsg->{num} //= do { # v1 _msgmap_init($self); - index_mm($self, $mime); + index_mm($self, $mime, $smsg->{blob}, $sync); }; # v1 and tests only: @@ -477,34 +482,20 @@ sub unindex_eml { } sub index_mm { - my ($self, $mime) = @_; - my $mid = mid_mime($mime); + my ($self, $mime, $oid, $sync) = @_; + my $mids = mids($mime); my $mm = $self->{mm}; - my $num; - - if (defined $self->{regen_down}) { - $num = $mm->num_for($mid) and return $num; - - while (($num = $self->{regen_down}--) > 0) { - if ($mm->mid_set($num, $mid) != 0) { - return $num; - } - } - } elsif (defined $self->{regen_up}) { - $num = $mm->num_for($mid) and return $num; - - # this is to fixup old bugs due to add-remove-add - while (($num = ++$self->{regen_up})) { - if ($mm->mid_set($num, $mid) != 0) { - return $num; - } + if ($sync->{reindex}) { + my $over = $self->{over}; + for my $mid (@$mids) { + my ($num, undef) = $over->num_mid0_for_oid($oid, $mid); + return $num if defined $num; } + $mm->num_for($mids->[0]) // $mm->mid_insert($mids->[0]); + } else { + # fallback to num_for since filters like RubyLang set the number + $mm->mid_insert($mids->[0]) // $mm->num_for($mids->[0]); } - - $num = $mm->mid_insert($mid) and return $num; - - # fallback to num_for since filters like RubyLang set the number - $mm->num_for($mid); } sub unindex_mm { @@ -532,8 +523,8 @@ sub index_both { # git->cat_async callback my $smsg = bless { bytes => $size, blob => $oid }, 'PublicInbox::Smsg'; my $self = $sync->{sidx}; my $eml = PublicInbox::Eml->new($bref); - my $num = index_mm($self, $eml); - $smsg->{num} = $num; + $smsg->{num} = index_mm($self, $eml, $oid, $sync) or + die "E: could not generate NNTP article number for $oid"; add_message($self, $eml, $smsg, $sync); } @@ -549,6 +540,11 @@ sub index_sync { my ($self, $opts) = @_; delete $self->{lock_path} if $opts->{-skip_lock}; $self->{ibx}->with_umask(\&_index_sync, $self, $opts); + if ($opts->{reindex}) { + my %again = %$opts; + delete @again{qw(rethread reindex)}; + index_sync($self, \%again); + } } sub too_big ($$) { @@ -562,110 +558,87 @@ sub too_big ($$) { } # only for v1 -sub read_log { - my ($self, $log, $batch_cb) = @_; - my $hex = '[a-f0-9]'; - my $h40 = $hex .'{40}'; - my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!; - my $delmsg = qr!^:100644 000000 ($h40) \S+ D\t${hex}{2}/${hex}{38}$!; +sub process_stack { + my ($self, $stk, $sync, $batch_cb) = @_; my $git = $self->{ibx}->git; - my $latest; my $max = $BATCH_BYTES; - local $/ = "\n"; - my %D; - my $line; - my $newest; my $nr = 0; - my $sync = { sidx => $self, nr => \$nr, max => \$max }; - while (defined($line = <$log>)) { - if ($line =~ /$addmsg/o) { - my $blob = $1; - if (delete $D{$blob}) { - # make sure pending index writes are done - # before writing to ->mm - $git->cat_async_wait; - - if (defined $self->{regen_down}) { - my $num = $self->{regen_down}--; - $self->{mm}->num_highwater($num); - } - next; - } - next if too_big($self, $blob); - $git->cat_async($blob, \&index_both, { %$sync }); + $sync->{nr} = \$nr; + $sync->{max} = \$max; + $sync->{sidx} = $self; + + if (my @leftovers = keys %{delete($sync->{D}) // {}}) { + warn('W: unindexing '.scalar(@leftovers)." leftovers\n"); + for my $oid (@leftovers) { + $oid = unpack('H*', $oid); + $git->cat_async($oid, \&unindex_both, $self); + } + } + while (my ($f, $at, $ct, $oid) = $stk->pop_rec) { + if ($f eq 'm') { + $sync->{autime} = $at; + $sync->{cotime} = $ct; + next if too_big($self, $oid); + $git->cat_async($oid, \&index_both, { %$sync }); if ($max <= 0) { $git->cat_async_wait; $max = $BATCH_BYTES; - $batch_cb->($nr, $latest); + $batch_cb->($nr); } - } elsif ($line =~ /$delmsg/o) { - my $blob = $1; - $D{$blob} = 1 unless too_big($self, $blob); - } elsif ($line =~ /^commit ($h40)/o) { - $latest = $1; - $newest ||= $latest; - } elsif ($line =~ /^author .*? ([0-9]+) [\-\+][0-9]+$/) { - $sync->{autime} = $1; - } elsif ($line =~ /^committer .*? ([0-9]+) [\-\+][0-9]+$/) { - $sync->{cotime} = $1; + } elsif ($f eq 'd') { + $git->cat_async($oid, \&unindex_both, $self); } } - close($log) or die "git log failed: \$?=$?"; - # get the leftovers - foreach my $blob (keys %D) { - $git->cat_async($blob, \&unindex_both, $self); - } $git->cat_async_wait; - $batch_cb->($nr, $latest, $newest); + $batch_cb->($nr, $stk); } -sub _git_log { - my ($self, $opts, $range) = @_; +sub prepare_stack ($$$) { + my ($self, $sync, $range) = @_; my $git = $self->{ibx}->git; if (index($range, '..') < 0) { # don't show annoying git errors to users who run -index # on empty inboxes $git->qx(qw(rev-parse -q --verify), "$range^0"); - if ($?) { - open my $fh, '<', '/dev/null' or - die "failed to open /dev/null: $!\n"; - return $fh; - } + return PublicInbox::IdxStack->new->read_prepare if $?; } + my $D = $sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR # Count the new files so they can be added newest to oldest # and still have numbers increasing from oldest to newest - my $fcount = 0; - my $pr = $opts->{-progress}; - $pr->("counting changes\n\t$range ... ") if $pr; - # can't use 'rev-list --count' if we use --diff-filter - my $fh = $git->popen(qw(log --pretty=tformat:%h - --no-notes --no-color --no-renames - --diff-filter=AM), $range); - ++$fcount while <$fh>; - close $fh or die "git log failed: \$?=$?"; - my $high = $self->{mm}->num_highwater; - $pr->("$fcount\n") if $pr; # continue previous line - $self->{ntodo} = $fcount; - - if (index($range, '..') < 0) { - if ($high && $high == $fcount) { - # fix up old bugs in full indexes which caused messages to - # not appear in Msgmap - $self->{regen_up} = $high; - } else { - # normal regen is for for fresh data - $self->{regen_down} = $fcount; - $self->{regen_down} += $high unless $opts->{reindex}; + my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H + --no-notes --no-color --no-renames --no-abbrev), + $range); + my ($at, $ct, $stk); + while (<$fh>) { + if (/\A([0-9]+)-([0-9]+)-($OID)$/o) { + ($at, $ct) = ($1 + 0, $2 + 0); + $stk //= PublicInbox::IdxStack->new($3); + } elsif (/$delmsg/) { + my $oid = $1; + if ($D) { # reindex case + $D->{pack('H*', $oid)}++; + } else { # non-reindex case: + $stk->push_rec('d', $at, $ct, $oid); + } + } elsif (/$addmsg/) { + my $oid = $1; + if ($D) { + my $oid_bin = pack('H*', $oid); + my $nr = --$D->{$oid_bin}; + delete($D->{$oid_bin}) if $nr <= 0; + + # nr < 0 (-1) means it never existed + $stk->push_rec('m', $at, $ct, $oid) if $nr < 0; + } else { + $stk->push_rec('m', $at, $ct, $oid); + } } - } else { - # Give oldest messages the smallest numbers - $self->{regen_down} = $high + $fcount; } - - $git->popen(qw/log --pretty=raw --no-notes --no-color --no-renames - --raw -r --no-abbrev/, $range); + close $fh or die "git log failed: \$?=$?"; + $stk //= PublicInbox::IdxStack->new; + $stk->read_prepare; } # --is-ancestor requires git 1.8.0+ @@ -717,45 +690,43 @@ sub reindex_from ($$) { sub _index_sync { my ($self, $opts) = @_; my $tip = $opts->{ref} || 'HEAD'; - my ($last_commit, $lx, $xlog); my $git = $self->{ibx}->git; $git->batch_prepare; my $pr = $opts->{-progress}; - + my $sync = { reindex => $opts->{reindex} }; my $xdb = $self->begin_txn_lazy; $self->{over}->rethread_prepare($opts); my $mm = _msgmap_init($self); - do { - $xlog = undef; # stop previous git-log via SIGPIPE - $last_commit = _last_x_commit($self, $mm); - $lx = reindex_from($opts->{reindex}, $last_commit); - - $self->{over}->rollback_lazy; - $self->{over}->disconnect; - $git->cleanup; - delete $self->{txn}; - $xdb->cancel_transaction if $xdb; - $xdb = idx_release($self); - - # ensure we leak no FDs to "git log" with Xapian <= 1.2 - my $range = $lx eq '' ? $tip : "$lx..$tip"; - $xlog = _git_log($self, $opts, $range); - - $xdb = $self->begin_txn_lazy; - } while (_last_x_commit($self, $mm) ne $last_commit); + if ($sync->{reindex}) { + my $last = $mm->last_commit; + if ($last) { + $tip = $last; + } else { + # somebody just blindly added --reindex when indexing + # for the first time, allow it: + undef $sync->{reindex}; + } + } + my $last_commit = _last_x_commit($self, $mm); + my $lx = reindex_from($sync->{reindex}, $last_commit); + my $range = $lx eq '' ? $tip : "$lx..$tip"; + $pr->("counting changes\n\t$range ... ") if $pr; + my $stk = prepare_stack($self, $sync, $range); + $sync->{ntodo} = $stk ? $stk->num_records : 0; + $pr->("$sync->{ntodo}\n") if $pr; # continue previous line - my $dbh = $mm->{dbh} if $mm; + my $dbh = $mm->{dbh}; my $batch_cb = sub { - my ($nr, $commit, $newest) = @_; - if ($dbh) { - if ($newest) { - my $cur = $mm->last_commit || ''; - if (need_update($self, $cur, $newest)) { - $mm->last_commit($newest); - } + my ($nr, $stk) = @_; + # latest_cmt may be undef + my $newest = $stk ? $stk->{latest_cmt} : undef; + if ($newest) { + my $cur = $mm->last_commit || ''; + if (need_update($self, $cur, $newest)) { + $mm->last_commit($newest); } - $dbh->commit; } + $dbh->commit; if ($newest && need_xapian($self)) { my $cur = $xdb->get_metadata('last_commit'); if (need_update($self, $cur, $newest)) { @@ -768,15 +739,15 @@ sub _index_sync { $git->cleanup; $xdb = idx_release($self, $nr); # let another process do some work... - $pr->("indexed $nr/$self->{ntodo}\n") if $pr && $nr; - if (!$newest) { # more to come + $pr->("indexed $nr/$sync->{ntodo}\n") if $pr && $nr; + if (!$stk) { # more to come $xdb = $self->begin_txn_lazy; - $dbh->begin_work if $dbh; + $dbh->begin_work; } }; $dbh->begin_work; - read_log($self, $xlog, $batch_cb); + process_stack($self, $stk, $sync, $batch_cb); } sub DESTROY { diff --git a/t/v1reindex.t b/t/v1reindex.t index 8cb751881..d70ed4b93 100644 --- a/t/v1reindex.t +++ b/t/v1reindex.t @@ -221,7 +221,7 @@ ok(!-d $xap, 'Xapian directories removed again'); $config{indexlevel} = 'medium'; my $ibx = PublicInbox::Inbox->new(\%config); my $rw = PublicInbox::SearchIdx->new($ibx, 1); - eval { $rw->index_sync }; + eval { $rw->index_sync({reindex => 1}) }; is($@, '', 'no error from indexing'); is_deeply(\@warn, [], 'no warnings'); my $mset = $ibx->search->reopen->query('hello world', {mset=>1});