about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong (Contractor, The Linux Foundation) <e@80x24.org>2018-02-22 17:55:35 +0000
committerEric Wong (Contractor, The Linux Foundation) <e@80x24.org>2018-02-22 18:38:39 +0000
commit0586a456f97fd41879a266e14a98dcbc148679ea (patch)
tree5ed262d27e5d44742953389a1a6f180c6c0de6ad /lib
parent9ecbfc09928dada28094fd3fc79e91a5472b27ea (diff)
downloadpublic-inbox-0586a456f97fd41879a266e14a98dcbc148679ea.tar.gz
Instead of relying on the git object_id hash to partition,
round-robin to these partitions based on the NNTP article
number.  This reduces the partition pipes as a source of
contention when two (or more) sequential messages end up
going to the same partition.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/V2Writable.pm34
1 files changed, 21 insertions, 13 deletions
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index cb74ab1a..cf19c761 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -17,6 +17,9 @@ $Email::MIME::ContentType::STRICT_PARAMS = 0;
 # an estimate of the post-packed size to the raw uncompressed size
 my $PACKING_FACTOR = 0.4;
 
+# assume 2 cores if GNU nproc(1) is not available
+my $NPROC = int($ENV{NPROC} || `nproc 2>/dev/null` || 2);
+
 sub new {
         my ($class, $v2ibx, $creat) = @_;
         my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n";
@@ -33,7 +36,7 @@ sub new {
                 im => undef, #  PublicInbox::Import
                 xap_rw => undef, # PublicInbox::V2SearchIdx
                 xap_ro => undef,
-                partitions => 4,
+                partitions => $NPROC,
                 transact_bytes => 0,
                 # limit each repo to 1GB or so
                 rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
@@ -59,11 +62,11 @@ sub add {
         my $oid = $im->{last_object_id};
         my ($len, $msgref) = @{$im->{last_object}};
 
+        $self->idx_init;
+        my $num = $self->{all}->index_mm($mime);
         my $nparts = $self->{partitions};
-        my $part = hex(substr($oid, 0, 8)) % $nparts;
+        my $part = $num % $nparts;
         my $idx = $self->idx_part($part);
-        my $all = $self->{all};
-        my $num = $all->index_mm($mime);
         $idx->index_raw($len, $msgref, $num, $oid);
         my $n = $self->{transact_bytes} += $len;
         if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
@@ -75,21 +78,23 @@ sub add {
 
 sub idx_part {
         my ($self, $part) = @_;
-        my $idx = $self->{idx_parts};
-        return $idx->[$part] if $idx; # fast path
+        $self->{idx_parts}->[$part];
+}
 
+sub idx_init {
+        my ($self) = @_;
+        return if $self->{idx_parts};
         # first time initialization:
-        my $all = $self->{all} =
+        my $all = $self->{all} =
                 PublicInbox::SearchIdxThread->new($self->{-inbox});
 
         # need to create all parts before initializing msgmap FD
         my $max = $self->{partitions} - 1;
-        $idx = $self->{idx_parts} = [];
+        my $idx = $self->{idx_parts} = [];
         for my $i (0..$max) {
                 push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all);
         }
         $all->_msgmap_init->{dbh}->begin_work;
-        $idx->[$part];
 }
 
 sub remove {
@@ -127,10 +132,12 @@ sub searchidx_checkpoint {
         # order matters, we can only close {all} after all partitions
         # are done because the partitions also write to {all}
 
-        my $parts = $self->{idx_parts};
-        foreach my $idx (@$parts) {
-                $idx->remote_commit;
-                $idx->remote_close unless $more;
+        if (my $parts = $self->{idx_parts}) {
+                foreach my $idx (@$parts) {
+                        $idx->remote_commit;
+                        $idx->remote_close unless $more;
+                }
+                delete $self->{idx_parts} unless $more;
         }
 
         if (my $all = $self->{all}) {
@@ -140,6 +147,7 @@ sub searchidx_checkpoint {
                 }
                 $all->remote_commit;
                 $all->remote_close unless $more;
+                delete $self->{all} unless $more;
         }
         $self->{transact_bytes} = 0;
 }