diff options
Diffstat (limited to 'lib/PublicInbox/LeiStore.pm')
-rw-r--r-- | lib/PublicInbox/LeiStore.pm | 49 |
1 files changed, 29 insertions, 20 deletions
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index a752174d..b2da2bc3 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -28,6 +28,7 @@ use PublicInbox::Spawn qw(spawn); use PublicInbox::MdirReader; use PublicInbox::LeiToMail; use PublicInbox::Compat qw(uniqstr); +use PublicInbox::OnDestroy; use File::Temp qw(tmpnam); use POSIX (); use IO::Handle (); # ->autoflush @@ -80,7 +81,7 @@ sub importer { delete $self->{im}; $im->done; undef $im; - $self->checkpoint; + $self->barrier; $max = $self->{priv_eidx}->{mg}->git_epochs + 1; } my (undef, $tl) = eidx_init($self); # acquire lock @@ -117,7 +118,7 @@ sub cat_blob { sub schedule_commit { my ($self, $sec) = @_; - add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&done, $self); + add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&barrier, $self); } # follows the stderr file @@ -135,7 +136,7 @@ sub eidx_init { my ($self) = @_; my $eidx = $self->{priv_eidx}; my $tl = wantarray && $self->{-err_wr} ? - PublicInbox::OnDestroy->new($$, \&_tail_err, $self) : + on_destroy(\&_tail_err, $self) : undef; $eidx->idx_init({-private => 1}); # acquires lock wantarray ? ($eidx, $tl) : $eidx; @@ -390,7 +391,7 @@ sub reindex_done { my ($self) = @_; my ($eidx, $tl) = eidx_init($self); $eidx->git->async_wait_all; - # ->done to be called via sto_done_request + # ->done to be called via sto_barrier_request } sub add_eml { @@ -570,13 +571,11 @@ sub set_xvmd { sto_export_kw($self, $smsg->{num}, $vmd); } -sub checkpoint { - my ($self, $wait) = @_; - if (my $im = $self->{im}) { - $wait ? $im->barrier : $im->checkpoint; - } - delete $self->{lms}; - $self->{priv_eidx}->checkpoint($wait); +sub check_done { + my ($self) = @_; + $self->git->_active ? + add_uniq_timer("$self-check_done", 5, \&check_done, $self) : + done($self); } sub xchg_stderr { @@ -593,23 +592,33 @@ sub xchg_stderr { undef; } -sub done { - my ($self) = @_; - my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_done_request +sub _commit ($$) { + my ($self, $cmd) = @_; # cmd is 'done' or 'barrier' + my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request my @err; - if (my $im = delete($self->{im})) { - eval { $im->done }; - push(@err, "E: import done: $@\n") if $@; + if ($self->{im}) { + eval { $self->{im}->$cmd }; + push(@err, "E: import $cmd: $@\n") if $@; } delete $self->{lms}; - eval { $self->{priv_eidx}->done }; # V2Writable::done - push(@err, "E: priv_eidx done: $@\n") if $@; - print { $errfh // *STDERR{GLOB} } @err; + eval { $self->{priv_eidx}->$cmd }; + push(@err, "E: priv_eidx $cmd: $@\n") if $@; + print { $errfh // \*STDERR } @err; send($lei_sock, 'child_error 256', 0) if @err && $lei_sock; xchg_stderr($self); die @err if @err; + # $lei_sock goes out-of-scope and script/lei can terminate } +sub barrier { + my ($self) = @_; + _commit $self, 'barrier'; + add_uniq_timer("$self-check_done", 5, \&check_done, $self); + undef; +} + +sub done { _commit $_[0], 'done' } + sub ipc_atfork_child { my ($self) = @_; my $lei = $self->{lei}; |