diff options
Diffstat (limited to 'lib/PublicInbox/SearchIdxSkeleton.pm')
-rw-r--r-- | lib/PublicInbox/SearchIdxSkeleton.pm | 63 |
1 files changed, 29 insertions, 34 deletions
diff --git a/lib/PublicInbox/SearchIdxSkeleton.pm b/lib/PublicInbox/SearchIdxSkeleton.pm index 54a59ab0..ba439696 100644 --- a/lib/PublicInbox/SearchIdxSkeleton.pm +++ b/lib/PublicInbox/SearchIdxSkeleton.pm @@ -12,7 +12,12 @@ sub new { # create the DB: $self->_xdb_acquire; $self->_xdb_release; + $self->spawn_worker($v2writable) if $v2writable->{parallel}; + $self +} +sub spawn_worker { + my ($self, $v2writable) = @_; my ($r, $w); pipe($r, $w) or die "pipe failed: $!\n"; my ($barrier_wait, $barrier_note); @@ -39,24 +44,19 @@ sub new { # lock on only exists in parent, not in worker $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock'; - $self; } sub skeleton_worker_loop { my ($self, $r, $barrier_note) = @_; $barrier_note->autoflush(1); $0 = 'pi-v2-skeleton'; - my $xdb = $self->_xdb_acquire; - $xdb->begin_transaction; - my $txn = 1; + $self->begin_txn_lazy; my $barrier = undef; while (my $line = $r->getline) { if ($line eq "commit\n") { - $xdb->commit_transaction if $txn; - $txn = undef; + $self->commit_txn_lazy; } elsif ($line eq "close\n") { $self->_xdb_release; - $xdb = $txn = undef; } elsif ($line =~ /\Abarrier_init (\d+)\n\z/) { my $n = $1 - 1; die "barrier in-progress\n" if defined $barrier; @@ -67,18 +67,13 @@ sub skeleton_worker_loop { delete $barrier->{$1} or die "unknown barrier: $part\n"; if ((scalar keys %$barrier) == 0) { $barrier = undef; - $xdb->commit_transaction if $txn; - $txn = undef; + $self->commit_txn_lazy; print $barrier_note "barrier_done\n" or die "print failed to barrier note: $!"; } } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.*)\n\z/s) { my ($oid, $mid) = ($1, $2); - $xdb ||= $self->_xdb_acquire; - if (!$txn) { - $xdb->begin_transaction; - $txn = 1; - } + $self->begin_txn_lazy; $self->remove_by_oid($oid, $mid); } else { my $len = int($line); @@ -86,35 +81,34 @@ sub skeleton_worker_loop { $n == $len or die "short read: $n != $len\n"; $msg = thaw($msg); # should raise on error defined $msg or die "failed to thaw buffer\n"; - $xdb ||= $self->_xdb_acquire; - if (!$txn) { - $xdb->begin_transaction; - $txn = 1; - } + $self->begin_txn_lazy; eval { index_skeleton_real($self, $msg) }; warn "failed to index message <$msg->[-1]>: $@\n" if $@; } } - die "xdb not released\n" if $xdb; - die "in transaction\n" if $txn; + $self->worker_done; } # called by a partition worker sub index_skeleton { my ($self, $values) = @_; - my $w = $self->{w}; - my $err; - my $str = freeze($values); - $str = length($str) . "\n" . $str; + if (my $w = $self->{w}) { + my $err; + my $str = freeze($values); + $str = length($str) . "\n" . $str; - # multiple processes write to the same pipe, so use flock - # We can't avoid this lock for <=PIPE_BUF writes, either, - # because those atomic writes can break up >PIPE_BUF ones - $self->lock_acquire; - print $w $str or $err = $!; - $self->lock_release; + # multiple processes write to the same pipe, so use flock + # We can't avoid this lock for <=PIPE_BUF writes, either, + # because those atomic writes can break up >PIPE_BUF ones + $self->lock_acquire; + print $w $str or $err = $!; + $self->lock_release; - die "print failed: $err\n" if $err; + die "print failed: $err\n" if $err; + } else { + $self->begin_txn_lazy; + index_skeleton_real($self, $values); + } } sub remote_remove { @@ -148,7 +142,7 @@ sub index_skeleton_real ($$) { # write to the subprocess sub barrier_init { my ($self, $nparts) = @_; - my $w = $self->{w}; + my $w = $self->{w} or return; my $err; $self->lock_acquire; print $w "barrier_init $nparts\n" or $err = "failed to write: $!\n"; @@ -158,7 +152,8 @@ sub barrier_init { sub barrier_wait { my ($self) = @_; - my $l = $self->{barrier_wait}->getline; + my $bw = $self->{barrier_wait} or return; + my $l = $bw->getline; $l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n"; } |