From 0586a456f97fd41879a266e14a98dcbc148679ea Mon Sep 17 00:00:00 2001 From: "Eric Wong (Contractor, The Linux Foundation)" Date: Thu, 22 Feb 2018 17:55:35 +0000 Subject: v2writable: round-robin to partitions based on article number Instead of relying on the git object_id hash to partition, round-robin to these partitions based on the NNTP article number. This reduces the partition pipes as a source of contention when two (or more) sequential messages end up going to the same partition. --- lib/PublicInbox/V2Writable.pm | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index cb74ab1a..cf19c761 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -17,6 +17,9 @@ $Email::MIME::ContentType::STRICT_PARAMS = 0; # an estimate of the post-packed size to the raw uncompressed size my $PACKING_FACTOR = 0.4; +# assume 2 cores if GNU nproc(1) is not available +my $NPROC = int($ENV{NPROC} || `nproc 2>/dev/null` || 2); + sub new { my ($class, $v2ibx, $creat) = @_; my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n"; @@ -33,7 +36,7 @@ sub new { im => undef, # PublicInbox::Import xap_rw => undef, # PublicInbox::V2SearchIdx xap_ro => undef, - partitions => 4, + partitions => $NPROC, transact_bytes => 0, # limit each repo to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), @@ -59,11 +62,11 @@ sub add { my $oid = $im->{last_object_id}; my ($len, $msgref) = @{$im->{last_object}}; + $self->idx_init; + my $num = $self->{all}->index_mm($mime); my $nparts = $self->{partitions}; - my $part = hex(substr($oid, 0, 8)) % $nparts; + my $part = $num % $nparts; my $idx = $self->idx_part($part); - my $all = $self->{all}; - my $num = $all->index_mm($mime); $idx->index_raw($len, $msgref, $num, $oid); my $n = $self->{transact_bytes} += $len; if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) { @@ -75,21 +78,23 @@ sub add { sub idx_part { my ($self, $part) = @_; - my $idx = $self->{idx_parts}; - return $idx->[$part] if $idx; # fast path + $self->{idx_parts}->[$part]; +} +sub idx_init { + my ($self) = @_; + return if $self->{idx_parts}; # first time initialization: - my $all = $self->{all} = + my $all = $self->{all} = PublicInbox::SearchIdxThread->new($self->{-inbox}); # need to create all parts before initializing msgmap FD my $max = $self->{partitions} - 1; - $idx = $self->{idx_parts} = []; + my $idx = $self->{idx_parts} = []; for my $i (0..$max) { push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all); } $all->_msgmap_init->{dbh}->begin_work; - $idx->[$part]; } sub remove { @@ -127,10 +132,12 @@ sub searchidx_checkpoint { # order matters, we can only close {all} after all partitions # are done because the partitions also write to {all} - my $parts = $self->{idx_parts}; - foreach my $idx (@$parts) { - $idx->remote_commit; - $idx->remote_close unless $more; + if (my $parts = $self->{idx_parts}) { + foreach my $idx (@$parts) { + $idx->remote_commit; + $idx->remote_close unless $more; + } + delete $self->{idx_parts} unless $more; } if (my $all = $self->{all}) { @@ -140,6 +147,7 @@ sub searchidx_checkpoint { } $all->remote_commit; $all->remote_close unless $more; + delete $self->{all} unless $more; } $self->{transact_bytes} = 0; } -- cgit v1.2.3-24-ge0c7