From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.0 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 8A47A1F517 for ; Sat, 7 Apr 2018 03:41:55 +0000 (UTC) From: "Eric Wong (Contractor, The Linux Foundation)" To: meta@public-inbox.org Subject: [PATCH 4/8] v2writable: reduce barriers Date: Sat, 7 Apr 2018 03:41:50 +0000 Message-Id: <20180407034154.2309-5-e@80x24.org> In-Reply-To: <20180407034154.2309-1-e@80x24.org> References: <20180407034154.2309-1-e@80x24.org> List-Id: Since we handle the overview info synchronously, we only need barriers in tests, now. We will use asynchronous checkpoints to sync less-important Xapian data. For data deduplication, this requires us to hoist out the cat-blob support in ::Import for reading uncommitted data in git. --- lib/PublicInbox/Import.pm | 34 ++++++++----- lib/PublicInbox/V2Writable.pm | 111 ++++++++++++++++++++---------------------- t/v2writable.t | 2 +- 3 files changed, 75 insertions(+), 72 deletions(-) diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index 2529798..9e8900f 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -95,19 +95,13 @@ sub _check_path ($$$$) { $info =~ /\Amissing / ? undef : $info; } -sub check_remove_v1 { - my ($r, $w, $tip, $path, $mime) = @_; - - my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef); - $info =~ m!\A100644 blob ([a-f0-9]{40})\t!s or die "not blob: $info"; - my $blob = $1; - - print $w "cat-blob $blob\n" or wfail; +sub _cat_blob ($$$) { + my ($r, $w, $oid) = @_; + print $w "cat-blob $oid\n" or wfail; local $/ = "\n"; - $info = <$r>; + my $info = <$r>; defined $info or die "EOF from fast-import / cat-blob: $!"; - $info =~ /\A[a-f0-9]{40} blob (\d+)\n\z/ or - die "unexpected cat-blob response: $info"; + $info =~ /\A[a-f0-9]{40} blob (\d+)\n\z/ or return; my $left = $1; my $offset = 0; my $buf = ''; @@ -122,7 +116,23 @@ sub check_remove_v1 { $n = read($r, my $lf, 1); defined($n) or die "read final byte of cat-blob failed: $!"; die "bad read on final byte: <$lf>" if $lf ne "\n"; - my $cur = PublicInbox::MIME->new(\$buf); + \$buf; +} + +sub cat_blob { + my ($self, $oid) = @_; + my ($r, $w) = $self->gfi_start; + _cat_blob($r, $w, $oid); +} + +sub check_remove_v1 { + my ($r, $w, $tip, $path, $mime) = @_; + + my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef); + $info =~ m!\A100644 blob ([a-f0-9]{40})\t!s or die "not blob: $info"; + my $oid = $1; + my $msg = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed"; + my $cur = PublicInbox::MIME->new($msg); my $cur_s = $cur->header('Subject'); $cur_s = '' unless defined $cur_s; my $cur_m = $mime->header('Subject'); diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 8361d09..53fdb73 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -139,7 +139,6 @@ sub num_for { }; # crap, Message-ID is already known, hope somebody just resent: - $self->barrier; foreach my $m (@$mids) { # read-only lookup now safe to do after above barrier my $existing = $self->lookup_content($mime, $m); @@ -259,10 +258,8 @@ sub purge_oids { sub remove_internal { my ($self, $mime, $cmt_msg, $purge) = @_; - $self->barrier; $self->idx_init; my $im = $self->importer unless $purge; - my $ibx = $self->{-inbox}; my $over = $self->{over}; my $cid = content_id($mime); my $parts = $self->{idx_parts}; @@ -280,7 +277,7 @@ sub remove_internal { my %gone; my ($id, $prev); while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) { - my $msg = $ibx->msg_by_smsg($smsg); + my $msg = get_blob($self, $smsg); if (!defined($msg)) { warn "broken smsg for $mid\n"; next; # continue @@ -313,7 +310,6 @@ sub remove_internal { $orig = undef; $self->unindex_oid_remote($oid, $mid); } - $self->barrier; } if (defined $mark) { @@ -359,45 +355,6 @@ sub set_last_commits ($) { } } -sub done { - my ($self) = @_; - my $im = delete $self->{im}; - $im->done if $im; # PublicInbox::Import::done - - my $mm = $self->{mm}; - $mm->{dbh}->commit if $mm; - - # order matters, we can only close {over} after all partitions - # are done because the partitions also write to {over} - my $parts = delete $self->{idx_parts}; - if ($parts) { - $_->remote_commit for @$parts; - $_->remote_close for @$parts; - } - - my $over = $self->{over}; - $over->commit_lazy; - $over->disconnect; - - if ($mm) { - $mm->{dbh}->begin_work; - set_last_commits($self); - $mm->{dbh}->commit; - delete $self->{mm}; - } - - delete $self->{bnote}; - $self->{transact_bytes} = 0; - $self->lock_release if $parts; -} - -sub checkpoint { - my ($self) = @_; - my $im = $self->{im}; - $im->checkpoint if $im; # PublicInbox::Import::checkpoint - $self->barrier; -} - sub barrier_init { my ($self, $n) = @_; $self->{bnote} or return; @@ -416,13 +373,15 @@ sub barrier_wait { } } -# issue a write barrier to ensure all data is visible to other processes -# and read-only ops. Order of data importance is: git > SQLite > Xapian -sub barrier { - my ($self) = @_; +sub checkpoint ($;$) { + my ($self, $wait) = @_; if (my $im = $self->{im}) { - $im->barrier; + if ($wait) { + $im->barrier; + } else { + $im->checkpoint; + } } my $parts = $self->{idx_parts}; if ($parts) { @@ -435,11 +394,17 @@ sub barrier { $self->{over}->commit_lazy; # Now deal with Xapian - my $barrier = $self->barrier_init(scalar @$parts); + if ($wait) { + my $barrier = $self->barrier_init(scalar @$parts); + + # each partition needs to issue a barrier command + $_->remote_barrier for @$parts; - # each partition needs to issue a barrier command - $_->remote_barrier for @$parts; - $self->barrier_wait($barrier); # wait for each Xapian partition + # wait for each Xapian partition + $self->barrier_wait($barrier); + } else { + $_->remote_commit for @$parts; + } # last_commit is special, don't commit these until # remote partitions are done: @@ -452,6 +417,27 @@ sub barrier { $self->{transact_bytes} = 0; } +# issue a write barrier to ensure all data is visible to other processes +# and read-only ops. Order of data importance is: git > SQLite > Xapian +sub barrier { checkpoint($_[0], 1) }; + +sub done { + my ($self) = @_; + my $im = delete $self->{im}; + $im->done if $im; # PublicInbox::Import::done + checkpoint($self); + my $mm = delete $self->{mm}; + $mm->{dbh}->commit if $mm; + my $parts = delete $self->{idx_parts}; + if ($parts) { + $_->remote_close for @$parts; + } + $self->{over}->disconnect; + delete $self->{bnote}; + $self->{transact_bytes} = 0; + $self->lock_release if $parts; +} + sub git_init { my ($self, $epoch) = @_; my $pfx = "$self->{-inbox}->{mainrepo}/git"; @@ -512,8 +498,8 @@ sub importer { } else { $self->{im} = undef; $im->done; - $self->barrier; $im = undef; + $self->checkpoint; my $git_dir = $self->git_init(++$self->{epoch_max}); my $git = PublicInbox::Git->new($git_dir); return $self->import_init($git, 0); @@ -569,15 +555,25 @@ sub diff ($$$) { unlink($an, $bn); } +sub get_blob ($$) { + my ($self, $smsg) = @_; + if (my $im = $self->{im}) { + my $msg = $im->cat_blob($smsg->{blob}); + return $msg if $msg; + } + # older message, should be in alternates + my $ibx = $self->{-inbox}; + $ibx->msg_by_smsg($smsg); +} + sub lookup_content { my ($self, $mime, $mid) = @_; - my $ibx = $self->{-inbox}; my $over = $self->{over}; my $cid = content_id($mime); my $found; my ($id, $prev); while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) { - my $msg = $ibx->msg_by_smsg($smsg); + my $msg = get_blob($self, $smsg); if (!defined($msg)) { warn "broken smsg for $mid\n"; next; @@ -815,7 +811,6 @@ sub unindex_oid { } $self->{unindexed}->{$_}++ foreach keys %gone; $self->unindex_oid_remote($oid, $mid); - $self->barrier; } } @@ -823,7 +818,6 @@ my $x40 = qr/[a-f0-9]{40}/; sub unindex { my ($self, $opts, $git, $unindex_range) = @_; my $un = $self->{unindexed} ||= {}; # num => removal count - $self->barrier; my $before = scalar keys %$un; my @cmd = qw(log --raw -r --no-notes --no-color --no-abbrev --no-renames); @@ -847,7 +841,6 @@ sub unindex { sub index_sync { my ($self, $opts) = @_; $opts ||= {}; - my $ibx = $self->{-inbox}; my $epoch_max; my $latest = git_dir_latest($self, \$epoch_max); return unless defined $latest; diff --git a/t/v2writable.t b/t/v2writable.t index 4a42c01..b543c53 100644 --- a/t/v2writable.t +++ b/t/v2writable.t @@ -213,8 +213,8 @@ EOF $im = PublicInbox::V2Writable->new($ibx, 1); is($im->{partitions}, 1, 'detected single partition from previous'); my $smsg = $im->remove($mime, 'test removal'); - my @after = $git0->qx(qw(log --pretty=oneline)); $im->done; + my @after = $git0->qx(qw(log --pretty=oneline)); my $tip = shift @after; like($tip, qr/\A[a-f0-9]+ test removal\n\z/s, 'commit message propagated to git'); -- EW