about summary refs log tree commit homepage
path: root/lib/PublicInbox/V2Writable.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/V2Writable.pm')
-rw-r--r--lib/PublicInbox/V2Writable.pm63
1 files changed, 20 insertions, 43 deletions
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 724fa804..cad559c5 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -287,16 +287,7 @@ sub _idx_init { # with_umask callback
 
 sub parallel_init ($$) {
         my ($self, $indexlevel) = @_;
-        if (($indexlevel // 'full') eq 'basic') {
-                $self->{parallel} = 0;
-        } else {
-                pipe(my ($r, $w)) or die "pipe failed: $!";
-                # pipe for barrier notifications doesn't need to be big,
-                # 1031: F_SETPIPE_SZ
-                fcntl($w, 1031, 4096) if $^O eq 'linux';
-                $self->{bnote} = [ $r, $w ];
-                $w->autoflush(1);
-        }
+        $self->{parallel} = 0 if ($indexlevel // 'full') eq 'basic';
 }
 
 # idempotent
@@ -574,24 +565,6 @@ sub set_last_commits ($) { # this is NOT for ExtSearchIdx
         }
 }
 
-sub barrier_init {
-        my ($self, $n) = @_;
-        $self->{bnote} or return;
-        --$n;
-        my $barrier = { map { $_ => 1 } (0..$n) };
-}
-
-sub barrier_wait {
-        my ($self, $barrier) = @_;
-        my $bnote = $self->{bnote} or return;
-        my $r = $bnote->[0];
-        while (scalar keys %$barrier) {
-                defined(my $l = readline($r)) or die "EOF on barrier_wait: $!";
-                $l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l";
-                delete $barrier->{$1} or die "bad shard[$1] on barrier wait";
-        }
-}
-
 # public
 sub checkpoint ($;$) {
         my ($self, $wait) = @_;
@@ -615,16 +588,23 @@ sub checkpoint ($;$) {
                 $self->{oidx}->commit_lazy;
 
                 # Now deal with Xapian
-                if ($wait) {
-                        my $barrier = barrier_init($self, scalar @$shards);
-
-                        # each shard needs to issue a barrier command
-                        $_->shard_barrier for @$shards;
 
-                        # wait for each Xapian shard
-                        barrier_wait($self, $barrier);
-                } else {
-                        $_->shard_commit for @$shards;
+                # start commit_txn_lazy asynchronously on all parallel shards
+                # (non-parallel waits here)
+                $_->ipc_do('commit_txn_lazy') for @$shards;
+
+                # transactions started on parallel shards,
+                # wait for them by issuing an echo command (echo can only
+                # run after commit_txn_lazy is done)
+                if ($wait && $self->{parallel}) {
+                        my $i = 0;
+                        for my $shard (@$shards) {
+                                my $echo = $shard->ipc_do('echo', $i);
+                                $echo == $i or die <<"";
+shard[$i] bad echo:$echo != $i waiting for txn commit
+
+                                ++$i;
+                        }
                 }
 
                 my $midx = $self->{midx}; # misc index
@@ -679,7 +659,6 @@ sub done {
         eval { $self->{oidx}->dbh_close };
         $err .= "over close: $@\n" if $@;
         delete $self->{midx};
-        delete $self->{bnote};
         my $nbytes = $self->{total_bytes};
         $self->{total_bytes} = 0;
         $self->lock_release(!!$nbytes) if $shards;
@@ -846,15 +825,13 @@ sub content_exists ($$$) {
 
 sub atfork_child {
         my ($self) = @_;
-        if (my $shards = $self->{idx_shards}) {
-                $_->atfork_child foreach @$shards;
+        if (my $older_siblings = $self->{idx_shards}) {
+                $_->shard_atfork_child for @$older_siblings;
         }
         if (my $im = $self->{im}) {
                 $im->atfork_child;
         }
-        die "unexpected mm" if $self->{mm};
-        close $self->{bnote}->[0] or die "close bnote[0]: $!\n";
-        $self->{bnote}->[1];
+        die "BUG: unexpected mm" if $self->{mm};
 }
 
 sub reindex_checkpoint ($$) {