From 678fb3c2ba03a4a284620c039717c0d94dd6106a Mon Sep 17 00:00:00 2001 From: "Eric Wong (Contractor, The Linux Foundation)" Date: Wed, 4 Apr 2018 21:24:59 +0000 Subject: v2: support incremental indexing + purge This is important for people running mirrors via "git fetch", as they need to be kept up-to-date. Purging is also now supported in mirrors. The short-lived "--regenerate" option is gone and is now implicitly enabled as a result. It's still cheap when article number regeneration is unnecessary, as we track the range for each git repository. --- lib/PublicInbox/Import.pm | 3 +- lib/PublicInbox/Msgmap.pm | 5 + lib/PublicInbox/V2Writable.pm | 275 ++++++++++++++++++++++++++++++++---------- 3 files changed, 220 insertions(+), 63 deletions(-) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index 73290eed..2529798f 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -192,6 +192,7 @@ sub get_mark { my ($r, $w) = $self->gfi_start; print $w "get-mark $mark\n" or wfail; defined(my $oid = <$r>) or die "get-mark failed, need git 2.6.0+\n"; + chomp($oid); $oid; } @@ -379,7 +380,7 @@ sub add { # v2: we need this for Xapian if ($self->{want_object_info}) { - chomp(my $oid = $self->get_mark(":$blob")); + my $oid = $self->get_mark(":$blob"); $self->{last_object} = [ $oid, $n, \$str ]; } my $ref = $self->{ref}; diff --git a/lib/PublicInbox/Msgmap.pm b/lib/PublicInbox/Msgmap.pm index c6a73155..5c37e169 100644 --- a/lib/PublicInbox/Msgmap.pm +++ b/lib/PublicInbox/Msgmap.pm @@ -92,6 +92,11 @@ sub last_commit { $self->meta_accessor('last_commit', $commit); } +sub last_commit_n { + my ($self, $i, $commit) = @_; + $self->meta_accessor('last_commit'.$i, $commit); +} + sub created_at { my ($self, $second) = @_; $self->meta_accessor('created_at', $second); diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index b6532ac5..5b4d9c0d 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -15,6 +15,7 @@ use PublicInbox::ContentId qw(content_id content_digest); use PublicInbox::Inbox; use PublicInbox::OverIdxFork; use PublicInbox::Msgmap; +use PublicInbox::Spawn; use IO::Handle; # an estimate of the post-packed size to the raw uncompressed size @@ -63,6 +64,7 @@ sub new { lock_path => "$dir/inbox.lock", # limit each repo to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), + last_commit => [], }; bless $self, $class; } @@ -105,6 +107,7 @@ sub add { my $nparts = $self->{partitions}; my $part = $num % $nparts; + $self->{last_commit}->[$self->{max_git}] = $cmt; my $idx = $self->idx_part($part); $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime); my $n = $self->{transact_bytes} += $len; @@ -112,7 +115,7 @@ sub add { $self->checkpoint; } - $mime; + $cmt; } sub num_for { @@ -227,7 +230,7 @@ sub purge_oids { my $purges = []; foreach my $i (0..$self->{max_git}) { my $git = PublicInbox::Git->new("$pfx/$i.git"); - my $im = $self->import_init($git, 0); + my $im = $self->import_init($git, 0, 1); $purges->[$i] = $im->purge_oids($purge); } $purges; @@ -250,6 +253,7 @@ sub remove_internal { # can be slightly different, so we do not need the user-supplied # message now that we have the mids and content_id $mime = undef; + my $mark; foreach my $mid (@$mids) { $srch->reopen->each_smsg_by_mid($mid, sub { @@ -272,7 +276,8 @@ sub remove_internal { if ($purge) { $purge->{$oid} = 1; } else { - $im->remove(\$orig, $cmt_msg); + ($mark, undef) = + $im->remove(\$orig, $cmt_msg); } $orig = undef; $removed->num; # memoize this for callers @@ -286,6 +291,11 @@ sub remove_internal { }); $self->barrier; } + + if (defined $mark) { + my $cmt = $im->get_mark($mark); + $self->{last_commit}->[$self->{max_git}] = $cmt; + } if ($purge && scalar keys %$purge) { return purge_oids($self, $purge); } @@ -299,18 +309,34 @@ sub remove { sub purge { my ($self, $mime) = @_; - remove_internal($self, $mime, undef, {}); + my $purges = remove_internal($self, $mime, undef, {}); + $self->idx_init if @$purges; # ->done is called on purges + for my $i (0..$#$purges) { + defined(my $cmt = $purges->[$i]) or next; + $self->{last_commit}->[$i] = $cmt; + } + $purges; } +sub set_last_commits ($) { + my ($self) = @_; + defined(my $max_git = $self->{max_git}) or return; + my $mm = $self->{mm}; + my $last_commit = $self->{last_commit}; + foreach my $i (0..$max_git) { + defined(my $cmt = $last_commit->[$i]) or next; + $last_commit->[$i] = undef; + $mm->last_commit_n($i, $cmt); + } +} sub done { my ($self) = @_; my $im = delete $self->{im}; $im->done if $im; # PublicInbox::Import::done - if (my $mm = delete $self->{mm}) { - $mm->{dbh}->commit; - } + my $mm = $self->{mm}; + $mm->{dbh}->commit if $mm; # order matters, we can only close {over} after all partitions # are done because the partitions also write to {over} @@ -323,6 +349,14 @@ sub done { my $over = $self->{over}; $over->remote_commit; $over->remote_close; + + if ($mm) { + $mm->{dbh}->begin_work; + set_last_commits($self); + $mm->{dbh}->commit; + delete $self->{mm}; + } + $self->{transact_bytes} = 0; $self->lock_release if $parts; } @@ -358,6 +392,12 @@ sub barrier { $over->barrier_wait; # wait for each Xapian partition $over->commit_fsync if $fsync; + # last_commit is special, don't commit these until + # remote partitions are done: + $dbh->begin_work; + set_last_commits($self); + $dbh->commit; + $dbh->begin_work; } $self->{transact_bytes} = 0; @@ -449,13 +489,14 @@ sub importer { } sub import_init { - my ($self, $git, $packed_bytes) = @_; + my ($self, $git, $packed_bytes, $tmp) = @_; my $im = PublicInbox::Import->new($git, undef, undef, $self->{-inbox}); $im->{bytes_added} = int($packed_bytes / $PACKING_FACTOR); $im->{want_object_info} = 1; $im->{lock_path} = undef; $im->{path_type} = 'v2'; - $self->{im} = $im; + $self->{im} = $im unless $tmp; + $im; } # XXX experimental @@ -608,63 +649,181 @@ sub reindex_oid { } } -sub reindex { - my ($self, $regen) = @_; +# only update last_commit for $i on reindex iff newer than current +sub update_last_commit { + my ($self, $git, $i, $cmt) = @_; + my $last = $self->{mm}->last_commit_n($i); + if (defined $last && is_ancestor($git, $last, $cmt)) { + my @cmd = (qw(rev-list --count), "$last..$cmt"); + chomp(my $n = $git->qx(@cmd)); + return if $n ne '' && $n == 0; + } + $self->{mm}->last_commit_n($i, $cmt); +} + +sub git_dir_n ($$) { "$_[0]->{-inbox}->{mainrepo}/git/$_[1].git" } + +sub last_commits { + my ($self, $max_git) = @_; + my $heads = []; + for (my $i = $max_git; $i >= 0; $i--) { + $heads->[$i] = $self->{mm}->last_commit_n($i); + } + $heads; +} + +sub is_ancestor ($$$) { + my ($git, $cur, $tip) = @_; + return 0 unless $git->check($cur); + my $cmd = [ 'git', "--git-dir=$git->{git_dir}", + qw(merge-base --is-ancestor), $cur, $tip ]; + my $pid = spawn($cmd); + defined $pid or die "spawning ".join(' ', @$cmd)." failed: $!"; + waitpid($pid, 0) == $pid or die join(' ', @$cmd) .' did not finish'; + $? == 0; +} + +sub index_prepare { + my ($self, $opts, $max_git, $ranges) = @_; + my $regen_max = 0; + my $head = $self->{-inbox}->{ref_head} || 'refs/heads/master'; + for (my $i = $max_git; $i >= 0; $i--) { + die "already indexing!\n" if $self->{index_pipe}; + my $git_dir = git_dir_n($self, $i); + -d $git_dir or next; # missing parts are fine + my $git = PublicInbox::Git->new($git_dir); + chomp(my $tip = $git->qx('rev-parse', $head)); + my $range; + if (defined(my $cur = $ranges->[$i])) { + $range = "$cur..$tip"; + if (is_ancestor($git, $cur, $tip)) { # common case + my $n = $git->qx(qw(rev-list --count), $range); + chomp($n); + if ($n == 0) { + $ranges->[$i] = undef; + next; + } + } else { + warn <<""; +discontiguous range: $range +Rewritten history? (in $git_dir) + + my $base = $git->qx('merge-base', $tip, $cur); + chomp $base; + if ($base) { + $range = "$base..$tip"; + warn "found merge-base: $base\n" + } else { + $range = $tip; + warn <<""; +discarding history at $cur + + } + warn <<""; +reindexing $git_dir starting at +$range + + $self->{"unindex-range.$i"} = "$base..$cur"; + } + } else { + $range = $tip; # all of it + } + $ranges->[$i] = $range; + + # can't use 'rev-list --count' if we use --diff-filter + my $fh = $git->popen(qw(log --pretty=tformat:%h + --no-notes --no-color --no-renames + --diff-filter=AM), $range, '--', 'm'); + ++$regen_max while <$fh>; + } + \$regen_max; +} + +sub unindex_oid { + my ($self, $git, $oid) = @_; + my $msgref = $git->cat_file($oid); + my $mime = PublicInbox::MIME->new($msgref); + my $mids = mids($mime->header_obj); + $mime = $msgref = undef; + + foreach my $mid (@$mids) { + my %gone; + $self->{-inbox}->search->reopen->each_smsg_by_mid($mid, sub { + my ($smsg) = @_; + $smsg->load_expand; + $gone{$smsg->num} = 1 if $oid eq $smsg->{blob}; + 1; # continue + }); + my $n = scalar keys %gone; + next unless $n; + if ($n > 1) { + warn "BUG: multiple articles linked to $oid\n", + join(',',sort keys %gone), "\n"; + } + $self->{unindexed}->{$_}++ foreach keys %gone; + $_->remote_remove($oid, $mid) foreach @{$self->{idx_parts}}; + $self->{over}->remove_oid($oid, $mid); + $self->barrier; + } +} + +my $x40 = qr/[a-f0-9]{40}/; +sub unindex { + my ($self, $opts, $git, $unindex_range) = @_; + my $un = $self->{unindexed} ||= {}; # num => removal count + $self->barrier; + my $before = scalar keys %$un; + my @cmd = qw(log --raw -r + --no-notes --no-color --no-abbrev --no-renames); + my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $unindex_range); + while (<$fh>) { + /\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o or next; + $self->unindex_oid($git, $1); + } + delete $self->{reindex_pipe}; + $fh = undef; + + return unless $opts->{prune}; + my $after = scalar keys %$un; + return if $before == $after; + + # ensure any blob can not longer be accessed via dumb HTTP + PublicInbox::Import::run_die(['git', "--git-dir=$git->{git_dir}", + qw(-c gc.reflogExpire=now gc --prune=all)]); +} + +sub index_sync { + my ($self, $opts) = @_; + $opts ||= {}; my $ibx = $self->{-inbox}; - my $pfx = "$ibx->{mainrepo}/git"; my $max_git; my $latest = git_dir_latest($self, \$max_git); return unless defined $latest; - my $head = $ibx->{ref_head} || 'refs/heads/master'; $self->idx_init; # acquire lock - my $x40 = qr/[a-f0-9]{40}/; my $mm_tmp = $self->{mm}->tmp_clone; - if (!$regen) { - my (undef, $max) = $mm_tmp->minmax; - unless (defined $max) { - $regen = 1; - warn -"empty msgmap.sqlite3, regenerating article numbers\n"; - } - } - my $tip; # latest commit out of all git repos - if ($regen) { - my $regen_max = 0; - for (my $cur = $max_git; $cur >= 0; $cur--) { - die "already reindexing!\n" if $self->{reindex_pipe}; - my $git = PublicInbox::Git->new("$pfx/$cur.git"); - -d $git->{git_dir} or next; # missing parts are fine - chomp($tip = $git->qx('rev-parse', $head)) unless $tip; - my $h = $cur == $max_git ? $tip : $head; - - # can't use 'rev-list --count' if we use --diff-filter - my $fh = $git->popen(qw(log --pretty=tformat:%h - --no-notes --no-color --no-renames - --diff-filter=AM), $h, '--', 'm'); - ++$regen_max while <$fh>; - } - die "No messages found in $pfx/*.git, bug?\n" unless $regen_max; - $regen = \$regen_max; - } + my $ranges = $opts->{reindex} ? [] : $self->last_commits($max_git); + + my ($min, $max) = $mm_tmp->minmax; + my $regen = $self->index_prepare($opts, $max_git, $ranges); + $$regen += $max if $max; my $D = {}; my @cmd = qw(log --raw -r --pretty=tformat:%h --no-notes --no-color --no-abbrev --no-renames); - # if we are regenerating, we must not use a newer tip commit than what - # the regeneration counter used: - $tip ||= $head; - # work backwards through history - for (my $cur = $max_git; $cur >= 0; $cur--) { + my $last_commit = []; + for (my $i = $max_git; $i >= 0; $i--) { + my $git_dir = git_dir_n($self, $i); die "already reindexing!\n" if delete $self->{reindex_pipe}; - my $cmt; - my $git_dir = "$pfx/$cur.git"; -d $git_dir or next; # missing parts are fine my $git = PublicInbox::Git->new($git_dir); - my $h = $cur == $max_git ? $tip : $head; - my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $h); + my $unindex = delete $self->{"unindex-range.$i"}; + $self->unindex($opts, $git, $unindex) if $unindex; + defined(my $range = $ranges->[$i]) or next; + my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $range); + my $cmt; while (<$fh>) { - if (/\A$x40$/o) { + if (/\A$x40$/o && !defined($cmt)) { chomp($cmt = $_); } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o) { $self->reindex_oid($mm_tmp, $D, $git, $1, @@ -673,19 +832,10 @@ sub reindex { $self->mark_deleted($D, $git, $1); } } + $fh = undef; delete $self->{reindex_pipe}; + $self->update_last_commit($git, $i, $cmt) if defined $cmt; } - my $gaps; - if ($regen && $$regen != 0) { - warn "W: leftover article number ($$regen)\n"; - $gaps = 1; - } - my ($min, $max) = $mm_tmp->minmax; - if (defined $max) { - warn "W: leftover article numbers at $min..$max\n"; - $gaps = 1; - } - warn "W: were old git partitions deleted?\n" if $gaps; my @d = sort keys %$D; if (@d) { warn "BUG: ", scalar(@d)," unseen deleted messages marked\n"; @@ -694,6 +844,7 @@ sub reindex { warn "<$mid>\n"; } } + $self->done; } 1; -- cgit v1.2.3-24-ge0c7