about summary refs log tree commit homepage
path: root/lib/PublicInbox/SearchIdxPart.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/SearchIdxPart.pm')
-rw-r--r--lib/PublicInbox/SearchIdxPart.pm62
1 files changed, 33 insertions, 29 deletions
diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm
index d8c8c8bb..82f5c1bc 100644
--- a/lib/PublicInbox/SearchIdxPart.pm
+++ b/lib/PublicInbox/SearchIdxPart.pm
@@ -9,6 +9,15 @@ sub new {
         my ($class, $v2writable, $part, $skel) = @_;
         my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part);
         $self->{skeleton} = $skel;
+        # create the DB:
+        $self->_xdb_acquire;
+        $self->_xdb_release;
+        $self->spawn_worker($v2writable, $part) if $v2writable->{parallel};
+        $self;
+}
+
+sub spawn_worker {
+        my ($self, $v2writable, $part) = @_;
         my ($r, $w);
         pipe($r, $w) or die "pipe failed: $!\n";
         binmode $r, ':raw';
@@ -32,44 +41,30 @@ sub new {
         $self->{pid} = $pid;
         $self->{w} = $w;
         close $r;
-        $self;
 }
 
 sub partition_worker_loop ($$$) {
         my ($self, $r, $part) = @_;
         $0 = "pi-v2-partition[$part]";
-        my $xdb = $self->_xdb_acquire;
-        $xdb->begin_transaction;
-        my $txn = 1;
+        $self->begin_txn_lazy;
         while (my $line = $r->getline) {
                 if ($line eq "commit\n") {
-                        $xdb->commit_transaction if $txn;
-                        $txn = undef;
+                        $self->commit_txn_lazy;
                         $self->{skeleton}->remote_commit;
                 } elsif ($line eq "close\n") {
                         $self->_xdb_release;
-                        $xdb = $txn = undef;
                 } elsif ($line eq "barrier\n") {
-                        $xdb->commit_transaction if $txn;
-                        $txn = undef;
+                        $self->commit_txn_lazy;
                         print { $self->{skeleton}->{w} } "barrier $part\n" or
                                         die "write failed to skeleton: $!\n";
                 } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) {
                         my ($oid, $mid) = ($1, $2);
-                        $xdb ||= $self->_xdb_acquire;
-                        if (!$txn) {
-                                $xdb->begin_transaction;
-                                $txn = 1;
-                        }
+                        $self->begin_txn_lazy;
                         $self->remove_by_oid($oid, $mid);
                 } else {
                         chomp $line;
                         my ($len, $artnum, $oid, $mid0) = split(/ /, $line);
-                        $xdb ||= $self->_xdb_acquire;
-                        if (!$txn) {
-                                $xdb->begin_transaction;
-                                $txn = 1;
-                        }
+                        $self->begin_txn_lazy;
                         my $n = read($r, my $msg, $len) or die "read: $!\n";
                         $n == $len or die "short read: $n != $len\n";
                         my $mime = PublicInbox::MIME->new(\$msg);
@@ -77,17 +72,21 @@ sub partition_worker_loop ($$$) {
                         $self->add_message($mime, $n, $artnum, $oid, $mid0);
                 }
         }
-        warn "$$ still in transaction\n" if $txn;
-        warn "$$ xdb active\n" if $xdb;
+        $self->worker_done;
 }
 
 # called by V2Writable
 sub index_raw {
-        my ($self, $len, $msgref, $artnum, $object_id, $mid0) = @_;
-        my $w = $self->{w};
-        print $w "$len $artnum $object_id $mid0\n", $$msgref or die
-                "failed to write partition $!\n";
-        $w->flush or die "failed to flush: $!\n";
+        my ($self, $bytes, $msgref, $artnum, $oid, $mid0, $mime) = @_;
+        if (my $w = $self->{w}) {
+                print $w "$bytes $artnum $oid $mid0\n", $$msgref or die
+                        "failed to write partition $!\n";
+                $w->flush or die "failed to flush: $!\n";
+        } else {
+                $$msgref = undef;
+                $self->begin_txn_lazy;
+                $self->add_message($mime, $bytes, $artnum, $oid, $mid0);
+        }
 }
 
 sub atfork_child {
@@ -96,9 +95,14 @@ sub atfork_child {
 
 # called by V2Writable:
 sub remote_barrier {
-        my $w = $_[0]->{w};
-        print $w "barrier\n" or die "failed to print: $!";
-        $w->flush or die "failed to flush: $!";
+        my ($self) = @_;
+        if (my $w = $self->{w}) {
+                print $w "barrier\n" or die "failed to print: $!";
+                $w->flush or die "failed to flush: $!";
+        } else {
+                $self->commit_txn_lazy;
+                $self->{skeleton}->remote_commit;
+        }
 }
 
 1;