From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.0 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 058631FAE4 for ; Mon, 19 Mar 2018 08:14:59 +0000 (UTC) From: "Eric Wong (Contractor, The Linux Foundation)" To: meta@public-inbox.org Subject: [PATCH 02/27] v2writable: support "barrier" operation to avoid reforking Date: Mon, 19 Mar 2018 08:14:34 +0000 Message-Id: <20180319081459.10645-3-e@80x24.org> In-Reply-To: <20180319081459.10645-1-e@80x24.org> References: <20180319081459.10645-1-e@80x24.org> List-Id: Stopping and starting a bunch of processes to look up duplicates or removals is inefficient. Take advantage of checkpointing in "git fast-import" and transactions in Xapian and SQLite. --- lib/PublicInbox/Import.pm | 10 ++++++++- lib/PublicInbox/SearchIdxPart.pm | 12 ++++++++++ lib/PublicInbox/SearchIdxSkeleton.pm | 43 ++++++++++++++++++++++++++++++++---- lib/PublicInbox/V2Writable.pm | 34 +++++++++++++++++++++++++++- t/v2writable.t | 3 ++- 5 files changed, 95 insertions(+), 7 deletions(-) diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index 664bec6..8406c9e 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -133,7 +133,6 @@ sub check_remove_v1 { (undef, $cur); } -# used for v2 (maybe) sub checkpoint { my ($self) = @_; return unless $self->{pid}; @@ -141,6 +140,15 @@ sub checkpoint { undef; } +sub progress { + my ($self, $msg) = @_; + return unless $self->{pid}; + print { $self->{out} } "progress $msg\n" or wfail; + $self->{in}->getline eq "progress $msg\n" or die + "progress $msg not received\n"; + undef; +} + # used for v2 sub get_mark { my ($self, $mark) = @_; diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm index 6d8cb2a..dd7ace6 100644 --- a/lib/PublicInbox/SearchIdxPart.pm +++ b/lib/PublicInbox/SearchIdxPart.pm @@ -49,6 +49,11 @@ sub partition_worker_loop ($$$) { } elsif ($line eq "close\n") { $self->_xdb_release; $xdb = $txn = undef; + } elsif ($line eq "barrier\n") { + $xdb->commit_transaction if $txn; + $txn = undef; + print { $self->{skeleton}->{w} } "barrier $part\n" or + die "write failed to skeleton: $!\n"; } else { chomp $line; my ($len, $artnum, $oid, $mid0) = split(/ /, $line); @@ -81,4 +86,11 @@ sub atfork_child { close $_[0]->{w} or die "failed to close write pipe: $!\n"; } +# called by V2Writable: +sub barrier { + my $w = $_[0]->{w}; + print $w "barrier\n" or die "failed to print: $!"; + $w->flush or die "failed to flush: $!"; +} + 1; diff --git a/lib/PublicInbox/SearchIdxSkeleton.pm b/lib/PublicInbox/SearchIdxSkeleton.pm index 40b28c5..4cb10f5 100644 --- a/lib/PublicInbox/SearchIdxSkeleton.pm +++ b/lib/PublicInbox/SearchIdxSkeleton.pm @@ -15,21 +15,25 @@ sub new { my ($r, $w); pipe($r, $w) or die "pipe failed: $!\n"; - binmode $r, ':raw'; - binmode $w, ':raw'; + my ($barrier_wait, $barrier_note); + pipe($barrier_wait, $barrier_note) or die "pipe failed: $!\n"; + binmode $_, ':raw' foreach ($r, $w, $barrier_wait, $barrier_note); my $pid = fork; defined $pid or die "fork failed: $!\n"; if ($pid == 0) { $v2writable->atfork_child; $v2writable = undef; close $w; - eval { skeleton_worker_loop($self, $r) }; + close $barrier_wait; + eval { skeleton_worker_loop($self, $r, $barrier_note) }; die "skeleton worker died: $@\n" if $@; exit; } $self->{w} = $w; $self->{pid} = $pid; close $r; + close $barrier_note; + $self->{barrier_wait} = $barrier_wait; $w->autoflush(1); @@ -40,11 +44,13 @@ sub new { } sub skeleton_worker_loop { - my ($self, $r) = @_; + my ($self, $r, $barrier_note) = @_; + $barrier_note->autoflush(1); $0 = 'pi-v2-skeleton'; my $xdb = $self->_xdb_acquire; $xdb->begin_transaction; my $txn = 1; + my $barrier = undef; while (my $line = $r->getline) { if ($line eq "commit\n") { $xdb->commit_transaction if $txn; @@ -52,6 +58,21 @@ sub skeleton_worker_loop { } elsif ($line eq "close\n") { $self->_xdb_release; $xdb = $txn = undef; + } elsif ($line =~ /\Abarrier_init (\d+)\n\z/) { + my $n = $1 - 1; + die "barrier in-progress\n" if defined $barrier; + $barrier = { map { $_ => 1 } (0..$n) }; + } elsif ($line =~ /\Abarrier (\d+)\n\z/) { + my $part = $1; + die "no barrier in-progress\n" unless defined $barrier; + delete $barrier->{$1} or die "unknown barrier: $part\n"; + if ((scalar keys %$barrier) == 0) { + $barrier = undef; + $xdb->commit_transaction if $txn; + $txn = undef; + print $barrier_note "barrier_done\n" or die + "print failed to barrier note: $!"; + } } else { my $len = int($line); my $n = read($r, my $msg, $len) or die "read: $!\n"; @@ -107,4 +128,18 @@ sub index_skeleton_real ($$) { $self->link_and_save($doc, $mids, \@refs, $num, $xpath); } +# write to the subprocess +sub barrier_init { + my ($self, $nparts) = @_; + my $w = $_[0]->{w}; + print $w "barrier_init $nparts\n" or die "failed to write: $!"; + $w->flush or die "failed to flush: $!"; +} + +sub barrier_wait { + my ($self) = @_; + my $l = $self->{barrier_wait}->getline; + $l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n"; +} + 1; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 7728b91..6e2a8d6 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -113,7 +113,7 @@ sub num_for { }; # crap, Message-ID is already known, hope somebody just resent: - $self->done; # write barrier, clears $self->{skel} + $self->barrier; foreach my $m (@$mids) { # read-only lookup now safe to do after above barrier my $existing = $self->lookup_content($mime, $m); @@ -228,6 +228,37 @@ sub checkpoint { $self->searchidx_checkpoint(1); } +# issue a write barrier to ensure all data is visible to other processes +# and read-only ops. Order of data importance is: git > SQLite > Xapian +sub barrier { + my ($self) = @_; + + # For safety, we ensure git checkpoint is complete before because + # the data in git is still more important than what is in Xapian. + # Performance may be gained by delaying ->progress call but we + # lose safety + if (my $im = $self->{im}) { + $im->checkpoint; + $im->progress('checkpoint'); + } + my $skel = $self->{skel}; + my $parts = $self->{idx_parts}; + if ($parts && $skel) { + my $dbh = $skel->{mm}->{dbh}; + $dbh->commit; # SQLite data is second in importance + + # Now deal with Xapian + $skel->barrier_init(scalar(@$parts)); + # each partition needs to issue a barrier command to skel: + $_->barrier foreach @$parts; + + $skel->barrier_wait; # wait for each Xapian partition + + $dbh->begin_work; + } + $self->{transact_bytes} = 0; +} + sub searchidx_checkpoint { my ($self, $more) = @_; @@ -349,6 +380,7 @@ sub lookup_content { my $ibx = $self->{-inbox}; my $srch = $ibx->search; + $srch->reopen; my $cid = content_id($mime); my $found; $srch->each_smsg_by_mid($mid, sub { diff --git a/t/v2writable.t b/t/v2writable.t index 404c865..7d276da 100644 --- a/t/v2writable.t +++ b/t/v2writable.t @@ -55,7 +55,7 @@ if ('ensure git configs are correct') { { my @warn; local $SIG{__WARN__} = sub { push @warn, @_ }; - is(undef, $im->add($mime), 'obvious duplicate rejected'); + is($im->add($mime), undef, 'obvious duplicate rejected'); like(join(' ', @warn), qr/resent/, 'warned about resent message'); @warn = (); @@ -105,6 +105,7 @@ if ('ensure git configs are correct') { ok($im->add($mime), 'message with multiple Message-ID'); $im->done; my @found; + $ibx->search->reopen; $ibx->search->each_smsg_by_mid('abcde@1', sub { push @found, @_; 1 }); is(scalar(@found), 1, 'message found by first MID'); $ibx->search->each_smsg_by_mid('abcde@2', sub { push @found, @_; 1 }); -- EW