about summary refs log tree commit homepage
path: root/lib/PublicInbox/V2Writable.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/V2Writable.pm')
-rw-r--r--lib/PublicInbox/V2Writable.pm80
1 files changed, 53 insertions, 27 deletions
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 41bfb8d1..cb74ab1a 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -6,7 +6,8 @@ package PublicInbox::V2Writable;
 use strict;
 use warnings;
 use Fcntl qw(:flock :DEFAULT);
-use PublicInbox::SearchIdx;
+use PublicInbox::SearchIdxPart;
+use PublicInbox::SearchIdxThread;
 use PublicInbox::MIME;
 use PublicInbox::Git;
 use PublicInbox::Import;
@@ -32,7 +33,8 @@ sub new {
                 im => undef, #  PublicInbox::Import
                 xap_rw => undef, # PublicInbox::V2SearchIdx
                 xap_ro => undef,
-
+                partitions => 4,
+                transact_bytes => 0,
                 # limit each repo to 1GB or so
                 rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
         };
@@ -55,29 +57,39 @@ sub add {
         my $cmt = $im->add($mime, $check_cb) or return;
         $cmt = $im->get_mark($cmt);
         my $oid = $im->{last_object_id};
-        my $size = $im->{last_object_size};
-
-        my $idx = $self->search_idx;
-        $idx->index_both($mime, $size, $oid);
-        $idx->{xdb}->set_metadata('last_commit', $cmt);
-        my $n = $self->{transact_bytes} += $size;
-        if ($n > PublicInbox::SearchIdx::BATCH_BYTES) {
+        my ($len, $msgref) = @{$im->{last_object}};
+
+        my $nparts = $self->{partitions};
+        my $part = hex(substr($oid, 0, 8)) % $nparts;
+        my $idx = $self->idx_part($part);
+        my $all = $self->{all};
+        my $num = $all->index_mm($mime);
+        $idx->index_raw($len, $msgref, $num, $oid);
+        my $n = $self->{transact_bytes} += $len;
+        if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
                 $self->checkpoint;
         }
 
         $mime;
 }
 
-sub search_idx {
-        my ($self) = @_;
-        $self->{idx} ||= eval {
-                my $idx = PublicInbox::SearchIdx->new($self->{-inbox}, 1);
-                my $mm = $idx->_msgmap_init;
-                $idx->_xdb_acquire->begin_transaction;
-                $self->{transact_bytes} = 0;
-                $mm->{dbh}->begin_work;
-                $idx
-        };
+sub idx_part {
+        my ($self, $part) = @_;
+        my $idx = $self->{idx_parts};
+        return $idx->[$part] if $idx; # fast path
+
+        # first time initialization:
+        my $all = $self->{all} =
+                PublicInbox::SearchIdxThread->new($self->{-inbox});
+
+        # need to create all parts before initializing msgmap FD
+        my $max = $self->{partitions} - 1;
+        $idx = $self->{idx_parts} = [];
+        for my $i (0..$max) {
+                push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all);
+        }
+        $all->_msgmap_init->{dbh}->begin_work;
+        $idx->[$part];
 }
 
 sub remove {
@@ -99,23 +111,37 @@ sub done {
         my ($self) = @_;
         my $im = $self->{im};
         $im->done if $im; # PublicInbox::Import::done
-        $self->searchidx_checkpoint;
+        $self->searchidx_checkpoint(0);
 }
 
 sub checkpoint {
         my ($self) = @_;
         my $im = $self->{im};
         $im->checkpoint if $im; # PublicInbox::Import::checkpoint
-        $self->searchidx_checkpoint;
+        $self->searchidx_checkpoint(1);
 }
 
 sub searchidx_checkpoint {
-        my ($self) = @_;
-        my $idx = delete $self->{idx} or return;
+        my ($self, $more) = @_;
+
+        # order matters, we can only close {all} after all partitions
+        # are done because the partitions also write to {all}
+
+        my $parts = $self->{idx_parts};
+        foreach my $idx (@$parts) {
+                $idx->remote_commit;
+                $idx->remote_close unless $more;
+        }
 
-        $idx->{mm}->{dbh}->commit;
-        $idx->{xdb}->commit_transaction;
-        $idx->_xdb_release;
+        if (my $all = $self->{all}) {
+                $all->{mm}->{dbh}->commit;
+                if ($more) {
+                        $all->{mm}->{dbh}->begin_work;
+                }
+                $all->remote_commit;
+                $all->remote_close unless $more;
+        }
+        $self->{transact_bytes} = 0;
 }
 
 sub git_init {
@@ -158,7 +184,7 @@ sub importer {
                 } else {
                         $self->{im} = undef;
                         $im->done;
-                        $self->searchidx_checkpoint;
+                        $self->searchidx_checkpoint(1);
                         $im = undef;
                         my $git_dir = $self->git_init(++$self->{max_git});
                         my $git = PublicInbox::Git->new($git_dir);