From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 6C3BA1F5AE for ; Sat, 25 Jul 2020 21:12:21 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH] v2writable: support async git blob retrievals Date: Sat, 25 Jul 2020 21:12:21 +0000 Message-Id: <20200725211221.7614-1-e@yhbt.net> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This seems to speed up --reindex on smallish v2 inboxes by about 30% on both HDD and SSD. lore/git (~1GB) on an SSD even gives a 30% improvement with 3 shards. I'm only seeing a ~4% speedup on LKML with a SATA SSD (which is difficult to repeat because it takes around 4 hours). Testing LKML on an HDD will take much more time... --- lib/PublicInbox/SearchIdx.pm | 22 +++----- lib/PublicInbox/V2Writable.pm | 98 +++++++++++++++++++++-------------- 2 files changed, 65 insertions(+), 55 deletions(-) diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index fe089c8e8..1fc574106 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -21,7 +21,7 @@ use PublicInbox::OverIdx; use PublicInbox::Spawn qw(spawn); use PublicInbox::Git qw(git_unquote); use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp); -our @EXPORT_OK = qw(too_big crlf_adjust log2stack is_ancestor); +our @EXPORT_OK = qw(crlf_adjust log2stack is_ancestor check_size); my $X = \%PublicInbox::Search::X; my ($DB_CREATE_OR_OPEN, $DB_OPEN); our $DB_NO_SYNC = 0; @@ -553,21 +553,11 @@ sub index_sync { } } -sub too_big ($$) { - my ($self, $oid) = @_; - my $max_size = $self->{index_max_size} or return; - my (undef, undef, $size) = $self->{ibx}->git->check($oid); - die "E: bad $oid in $self->{ibx}->{inboxdir}\n" if !defined($size); - return if $size <= $max_size; - warn "W: skipping $oid ($size > $max_size)\n"; - 1; -} - -sub ck_size { # check_async cb for -index --max-size=... +sub check_size { # check_async cb for -index --max-size=... my ($oid, $type, $size, $arg, $git) = @_; (($type // '') eq 'blob') or die "E: bad $oid in $git->{git_dir}"; if ($size <= $arg->{index_max_size}) { - $git->cat_async($oid, \&index_both, $arg); + $git->cat_async($oid, $arg->{index_oid}, $arg); } else { warn "W: skipping $oid ($size > $arg->{index_max_size})\n"; } @@ -630,12 +620,14 @@ sub process_stack { $git->cat_async($oid, \&unindex_both, $self); } } - $sync->{index_max_size} = $self->{ibx}->{index_max_size}; + if ($sync->{index_max_size} = $self->{ibx}->{index_max_size}) { + $sync->{index_oid} = \&index_both; + } while (my ($f, $at, $ct, $oid) = $stk->pop_rec) { if ($f eq 'm') { my $arg = { %$sync, autime => $at, cotime => $ct }; if ($sync->{index_max_size}) { - $git->check_async($oid, \&ck_size, $arg); + $git->check_async($oid, \&check_size, $arg); } else { $git->cat_async($oid, \&index_both, $arg); } diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index f159d39c2..6908bd2ee 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -18,7 +18,7 @@ use PublicInbox::InboxWritable; use PublicInbox::OverIdx; use PublicInbox::Msgmap; use PublicInbox::Spawn qw(spawn popen_rd); -use PublicInbox::SearchIdx qw(too_big log2stack crlf_adjust is_ancestor); +use PublicInbox::SearchIdx qw(log2stack crlf_adjust is_ancestor check_size); use IO::Handle; # ->autoflush use File::Temp qw(tempfile); @@ -124,7 +124,6 @@ sub new { }; $self->{over}->{-no_sync} = 1 if $v2ibx->{-no_sync}; $self->{shards} = count_shards($self) || nproc_shards($creat); - $self->{index_max_size} = $v2ibx->{index_max_size}; bless $self, $class; } @@ -307,8 +306,8 @@ sub idx_init { # do not leak read-only FDs to child processes, we only have these # FDs for duplicate detection so they should not be # frequently activated. - # delete @$ibx{qw(git mm search)}; - delete $ibx->{$_} foreach (qw(git mm search)); + delete @$ibx{qw(mm search)}; + $ibx->git->cleanup; $self->{parallel} = 0 if ($ibx->{indexlevel}//'') eq 'basic'; if ($self->{parallel}) { @@ -863,11 +862,13 @@ sub atfork_child { sub reindex_checkpoint ($$) { my ($self, $sync) = @_; + $self->{ibx}->git->cleanup; # *async_wait + ${$sync->{need_checkpoint}} = 0; $sync->{mm_tmp}->atfork_prepare; $self->done; # release lock if (my $pr = $sync->{-opt}->{-progress}) { - $pr->(sprintf($sync->{-regen_fmt}, $sync->{nr})); + $pr->(sprintf($sync->{-regen_fmt}, ${$sync->{nr}})); } # allow -watch or -mda to write... @@ -875,15 +876,14 @@ sub reindex_checkpoint ($$) { $sync->{mm_tmp}->atfork_parent; } -sub reindex_oid ($$$) { - my ($self, $sync, $oid) = @_; - return if too_big($self, $oid); - my ($num, $mid0, $len); - my $msgref = $self->{ibx}->git->cat_file($oid, \$len); - return if $len == 0; # purged - my $mime = PublicInbox::Eml->new($$msgref); +sub index_oid { # cat_async callback + my ($bref, $oid, $type, $size, $arg) = @_; + return if $size == 0; # purged + my ($num, $mid0); + my $mime = PublicInbox::Eml->new($$bref); my $mids = mids($mime->header_obj); my $chash = content_hash($mime); + my $self = $arg->{v2w}; if (scalar(@$mids) == 0) { warn "E: $oid has no Message-ID, skipping\n"; @@ -891,12 +891,12 @@ sub reindex_oid ($$$) { } # {unindexed} is unlikely - if ((my $unindexed = $sync->{unindexed}) && scalar(@$mids) == 1) { + if ((my $unindexed = $arg->{unindexed}) && scalar(@$mids) == 1) { $num = delete($unindexed->{$mids->[0]}); if (defined $num) { $mid0 = $mids->[0]; $self->{mm}->mid_set($num, $mid0); - delete($sync->{unindexed}) if !keys(%$unindexed); + delete($arg->{unindexed}) if !keys(%$unindexed); } } if (!defined($num)) { # reuse if reindexing (or duplicates) @@ -907,12 +907,12 @@ sub reindex_oid ($$$) { } } $mid0 //= do { # is this a number we got before? - $num = $sync->{mm_tmp}->num_for($mids->[0]); + $num = $arg->{mm_tmp}->num_for($mids->[0]); defined($num) ? $mids->[0] : undef; }; if (!defined($num)) { for (my $i = $#$mids; $i >= 1; $i--) { - $num = $sync->{mm_tmp}->num_for($mids->[$i]); + $num = $arg->{mm_tmp}->num_for($mids->[$i]); if (defined($num)) { $mid0 = $mids->[$i]; last; @@ -920,7 +920,7 @@ sub reindex_oid ($$$) { } } if (defined($num)) { - $sync->{mm_tmp}->num_delete($num); + $arg->{mm_tmp}->num_delete($num); } else { # never seen $num = $self->{mm}->mid_insert($mids->[0]); if (defined($num)) { @@ -939,16 +939,16 @@ sub reindex_oid ($$$) { warn "E: $oid <", join('> <', @$mids), "> is a duplicate\n"; return; } - $sync->{nr}++; + ++${$arg->{nr}}; my $smsg = bless { - raw_bytes => $len, + raw_bytes => $size, num => $num, blob => $oid, mid => $mid0, }, 'PublicInbox::Smsg'; - $smsg->populate($mime, $sync); - if (do_idx($self, $msgref, $mime, $smsg)) { - reindex_checkpoint($self, $sync); + $smsg->populate($mime, $arg); + if (do_idx($self, $bref, $mime, $smsg)) { + ${$arg->{need_checkpoint}} = 1; } } @@ -1065,11 +1065,14 @@ sub sync_prepare ($$$) { # our code and blindly injects "d" file history into git repos if (my @leftovers = keys %{delete($sync->{D}) // {}}) { warn('W: unindexing '.scalar(@leftovers)." leftovers\n"); + my $arg = { v2w => $self }; + my $all = $self->{ibx}->git; for my $oid (@leftovers) { $oid = unpack('H*', $oid); $self->{current_info} = "leftover $oid"; - unindex_oid($self, $oid); + $all->cat_async($oid, \&unindex_oid, $arg); } + $all->cat_async_wait; } return 0 if (!$regen_max && !keys(%{$self->{unindex_range}})); @@ -1077,7 +1080,7 @@ sub sync_prepare ($$$) { # it's a problem and we need to notice it via die() my $pad = length($regen_max) + 1; $sync->{-regen_fmt} = "% ${pad}u/$regen_max\n"; - $sync->{nr} = 0; + $sync->{nr} = \(my $nr = 0); return -1 if $sync->{reindex}; $regen_max + $self->{mm}->num_highwater() || 0; } @@ -1091,13 +1094,13 @@ sub unindex_oid_remote ($$$) { } } -sub unindex_oid ($$;$) { - my ($self, $oid, $unindexed) = @_; +sub unindex_oid ($$;$) { # git->cat_async callback + my ($bref, $oid, $type, $size, $sync) = @_; + my $self = $sync->{v2w}; + my $unindexed = $sync->{in_unindex} ? $sync->{unindexed} : undef; my $mm = $self->{mm}; - my $msgref = $self->{ibx}->git->cat_file($oid); - my $mime = PublicInbox::Eml->new($msgref); - my $mids = mids($mime->header_obj); - $mime = $msgref = undef; + my $mids = mids(PublicInbox::Eml->new($bref)->header_obj); + undef $$bref; my $over = $self->{over}; foreach my $mid (@$mids) { my %gone; @@ -1125,17 +1128,20 @@ sub unindex_oid ($$;$) { # a mirror because the source used -purge or -edit sub unindex ($$$$) { my ($self, $sync, $git, $unindex_range) = @_; - my $unindexed = $sync->{unindexed} ||= {}; # $mid0 => $num + my $unindexed = $sync->{unindexed} //= {}; # $mid0 => $num my $before = scalar keys %$unindexed; # order does not matter, here: my @cmd = qw(log --raw -r --no-notes --no-color --no-abbrev --no-renames); my $fh = $git->popen(@cmd, $unindex_range); + my $all = $self->{ibx}->git; + local $sync->{in_unindex} = 1; while (<$fh>) { /\A:\d{6} 100644 $OID ($OID) [AM]\tm$/o or next; - unindex_oid($self, $1, $unindexed); + $all->cat_async($1, \&unindex_oid, $sync); } close $fh or die "git log failed: \$?=$?"; + $all->cat_async_wait; return unless $sync->{-opt}->{prune}; my $after = scalar keys %$unindexed; @@ -1171,17 +1177,25 @@ sub index_epoch ($$$) { } defined(my $stk = $sync->{stacks}->[$i]) or return; $sync->{stacks}->[$i] = undef; + my $all = $self->{ibx}->git; while (my ($f, $at, $ct, $oid) = $stk->pop_rec) { $self->{current_info} = "$i.git $oid"; if ($f eq 'm') { - $sync->{autime} = $at; - $sync->{cotime} = $ct; - reindex_oid($self, $sync, $oid); + my $arg = { %$sync, autime => $at, cotime => $ct }; + if ($sync->{index_max_size}) { + $all->check_async($oid, \&check_size, $arg); + } else { + $all->cat_async($oid, \&index_oid, $arg); + } } elsif ($f eq 'd') { - unindex_oid($self, $oid); + $all->cat_async($oid, \&unindex_oid, $sync); + } + if (${$sync->{need_checkpoint}}) { + reindex_checkpoint($self, $sync); } } - delete @$sync{qw(autime cotime)}; + $all->check_async_wait; + $all->cat_async_wait; update_last_commit($self, $git, $i, $stk->{latest_cmt}); } @@ -1197,9 +1211,11 @@ sub index_sync { fill_alternates($self, $epoch_max); $self->{over}->rethread_prepare($opt); my $sync = { + need_checkpoint => \(my $bool = 0), unindex_range => {}, # EPOCH => oid_old..oid_new reindex => $opt->{reindex}, - -opt => $opt + -opt => $opt, + v2w => $self, }; $sync->{ranges} = sync_ranges($self, $sync, $epoch_max); if (sync_prepare($self, $sync, $epoch_max)) { @@ -1211,14 +1227,16 @@ sub index_sync { $self->{mm}->{dbh}->begin_work; $sync->{mm_tmp} = $self->{mm}->tmp_clone; } - + if ($sync->{index_max_size} = $self->{ibx}->{index_max_size}) { + $sync->{index_oid} = \&index_oid; + } # work forwards through history index_epoch($self, $sync, $_) for (0..$epoch_max); $self->done; if (my $nr = $sync->{nr}) { my $pr = $sync->{-opt}->{-progress}; - $pr->('all.git '.sprintf($sync->{-regen_fmt}, $nr)) if $pr; + $pr->('all.git '.sprintf($sync->{-regen_fmt}, $$nr)) if $pr; } $self->{over}->rethread_done($opt);