diff options
Diffstat (limited to 'lib/PublicInbox/LeiStore.pm')
-rw-r--r-- | lib/PublicInbox/LeiStore.pm | 194 |
1 files changed, 141 insertions, 53 deletions
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index b4f40912..b2da2bc3 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # # Local storage (cache/memo) for lei(1), suitable for personal/private @@ -27,11 +27,16 @@ use PublicInbox::MDA; use PublicInbox::Spawn qw(spawn); use PublicInbox::MdirReader; use PublicInbox::LeiToMail; -use File::Temp (); +use PublicInbox::Compat qw(uniqstr); +use PublicInbox::OnDestroy; +use File::Temp qw(tmpnam); use POSIX (); use IO::Handle (); # ->autoflush use Sys::Syslog qw(syslog openlog); use Errno qw(EEXIST ENOENT); +use PublicInbox::Syscall qw(rename_noreplace); +use PublicInbox::LeiStoreErr; +use PublicInbox::DS qw(add_uniq_timer); sub new { my (undef, $dir, $opt) = @_; @@ -76,7 +81,7 @@ sub importer { delete $self->{im}; $im->done; undef $im; - $self->checkpoint; + $self->barrier; $max = $self->{priv_eidx}->{mg}->git_epochs + 1; } my (undef, $tl) = eidx_init($self); # acquire lock @@ -106,17 +111,32 @@ sub search { PublicInbox::LeiSearch->new($_[0]->{priv_eidx}->{topdir}); } +sub cat_blob { + my ($self, $oid) = @_; + $self->{im} ? $self->{im}->cat_blob($oid) : undef; +} + +sub schedule_commit { + my ($self, $sec) = @_; + add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&barrier, $self); +} + # follows the stderr file sub _tail_err { my ($self) = @_; - print { $self->{-err_wr} } readline($self->{-tmp_err}); + my $err = $self->{-tmp_err} // return; + $err->clearerr; # clear EOF marker + my @msg = readline($err); + PublicInbox::LeiStoreErr::emit($self->{-err_wr}, @msg) and return; + # syslog is the last resort if lei-daemon broke + syslog('warning', '%s', $_) for @msg; } sub eidx_init { my ($self) = @_; my $eidx = $self->{priv_eidx}; my $tl = wantarray && $self->{-err_wr} ? - PublicInbox::OnDestroy->new($$, \&_tail_err, $self) : + on_destroy(\&_tail_err, $self) : undef; $eidx->idx_init({-private => 1}); # acquires lock wantarray ? ($eidx, $tl) : $eidx; @@ -185,10 +205,7 @@ sub export1_kw_md ($$$$$) { my $dst = "$mdir/cur/$bn"; for my $d (@try) { my $src = "$mdir/$d/$orig"; - if (link($src, $dst)) { - if (!unlink($src) and $! != ENOENT) { - syslog('warning', "unlink($src): $!"); - } + if (rename_noreplace($src, $dst)) { # TODO: verify oidbin? $self->{lms}->mv_src("maildir:$mdir", $oidbin, \$orig, $bn); @@ -196,7 +213,7 @@ sub export1_kw_md ($$$$$) { } elsif ($! == EEXIST) { # lost race with "lei export-kw"? return; } elsif ($! != ENOENT) { - syslog('warning', "link($src -> $dst): $!"); + syslog('warning', "rename_noreplace($src -> $dst): $!"); } } for (@try) { return if -e "$mdir/$_/$orig" }; @@ -257,21 +274,16 @@ sub remove_eml_vmd { # remove just the VMD sub _lms_rw ($) { # it is important to have eidx processes open before lms my ($self) = @_; - my ($eidx, $tl) = eidx_init($self); - $self->{lms} //= do { + $self->{lms} // do { require PublicInbox::LeiMailSync; + my ($eidx, $tl) = eidx_init($self); my $f = "$self->{priv_eidx}->{topdir}/mail_sync.sqlite3"; my $lms = PublicInbox::LeiMailSync->new($f); $lms->lms_write_prepare; - $lms; + $self->{lms} = $lms; }; } -sub set_sync_info { - my ($self, $oidhex, $folder, $id) = @_; - _lms_rw($self)->set_src(pack('H*', $oidhex), $folder, $id); -} - sub _remove_if_local { # git->cat_async arg my ($bref, $oidhex, $type, $size, $self) = @_; $self->{im}->remove($bref) if $bref; @@ -281,8 +293,7 @@ sub remove_docids ($;@) { my ($self, @docids) = @_; my $eidx = eidx_init($self); for my $docid (@docids) { - $eidx->idx_shard($docid)->ipc_do('xdb_remove', $docid); - $eidx->{oidx}->delete_by_num($docid); + $eidx->remove_doc($docid); $eidx->{oidx}->{dbh}->do(<<EOF, undef, $docid); DELETE FROM xref3 WHERE docid = ? EOF @@ -305,7 +316,7 @@ sub remove_eml { $git->cat_async($oidhex, \&_remove_if_local, $self); } } - $git->cat_async_wait; + $git->async_wait_all; remove_docids($self, @docids); \@docids; } @@ -329,6 +340,60 @@ sub _add_vmd ($$$$) { sto_export_kw($self, $docid, $vmd); } +sub _docids_and_maybe_kw ($$) { + my ($self, $docids) = @_; + return $docids unless wantarray; + my (@kw, $idx, @tmp); + for my $num (@$docids) { # likely only 1, unless ContentHash changes + # can't use ->search->msg_keywords on uncommitted docs + $idx = $self->{priv_eidx}->idx_shard($num); + @tmp = eval { $idx->ipc_do('get_terms', 'K', $num) }; + $@ ? warn("#$num get_terms: $@") : push(@kw, @tmp); + } + @kw = sort(uniqstr(@kw)) if @$docids > 1; + ($docids, \@kw); +} + +sub _reindex_1 { # git->cat_async callback + my ($bref, $hex, $type, $size, $smsg) = @_; + my $self = delete $smsg->{-sto}; + my ($eidx, $tl) = eidx_init($self); + $bref //= _lms_rw($self)->local_blob($hex, 1); + if ($bref) { + my $eml = PublicInbox::Eml->new($bref); + $smsg->{-merge_vmd} = 1; # preserve existing keywords + $eidx->idx_shard($smsg->{num})->index_eml($eml, $smsg); + } elsif ($type eq 'missing') { + # pre-release/buggy lei may've indexed external-only msgs, + # try to correct that, here + warn("E: missing $hex, culling (ancient lei artifact?)\n"); + $smsg->{to} = $smsg->{cc} = $smsg->{from} = ''; + $smsg->{bytes} = 0; + $eidx->{oidx}->update_blob($smsg, ''); + my $eml = PublicInbox::Eml->new("\r\n\r\n"); + $eidx->idx_shard($smsg->{num})->index_eml($eml, $smsg); + } else { + warn("E: $type $hex\n"); + } +} + +sub reindex_art { + my ($self, $art) = @_; + my ($eidx, $tl) = eidx_init($self); + my $smsg = $eidx->{oidx}->get_art($art) // return; + return if $smsg->{bytes} == 0; # external-only message + $smsg->{-sto} = $self; + $eidx->git->cat_async($smsg->{blob} // die("no blob (#$art)"), + \&_reindex_1, $smsg); +} + +sub reindex_done { + my ($self) = @_; + my ($eidx, $tl) = eidx_init($self); + $eidx->git->async_wait_all; + # ->done to be called via sto_barrier_request +} + sub add_eml { my ($self, $eml, $vmd, $xoids) = @_; my $im = $self->{-fake_im} // $self->importer; # may create new epoch @@ -338,9 +403,19 @@ sub add_eml { $smsg->{-eidx_git} = $eidx->git if !$self->{-fake_im}; my $im_mark = $im->add($eml, undef, $smsg); if ($vmd && $vmd->{sync_info}) { - set_sync_info($self, $smsg->{blob}, @{$vmd->{sync_info}}); + _lms_rw($self)->set_src($smsg->oidbin, @{$vmd->{sync_info}}); + } + unless ($im_mark) { # duplicate blob returns undef + return unless wantarray || $vmd; + my @docids = $oidx->blob_exists($smsg->{blob}); + if ($vmd) { + for my $docid (@docids) { + my $idx = $eidx->idx_shard($docid); + _add_vmd($self, $idx, $docid, $vmd); + } + } + return _docids_and_maybe_kw $self, \@docids; } - $im_mark or return; # duplicate blob returns undef local $self->{current_info} = $smsg->{blob}; my $vivify_xvmd = delete($smsg->{-vivify_xvmd}) // []; # exact matches @@ -374,7 +449,7 @@ sub add_eml { } _add_vmd($self, $idx, $docid, $vmd) if $vmd; } - $vivify_xvmd; + _docids_and_maybe_kw $self, $vivify_xvmd; } elsif (my @docids = _docids_for($self, $eml)) { # fuzzy match from within lei/store for my $docid (@docids) { @@ -384,8 +459,8 @@ sub add_eml { $idx->ipc_do('add_eidx_info', $docid, '.', $eml); _add_vmd($self, $idx, $docid, $vmd) if $vmd; } - \@docids; - } else { # totally new message + _docids_and_maybe_kw $self, \@docids; + } else { # totally new message, no keywords delete $smsg->{-oidx}; # for IPC-friendliness $smsg->{num} = $oidx->adj_counter('eidx_docid', '+'); $oidx->add_overview($eml, $smsg); @@ -393,7 +468,7 @@ sub add_eml { my $idx = $eidx->idx_shard($smsg->{num}); $idx->index_eml($eml, $smsg); _add_vmd($self, $idx, $smsg->{num}, $vmd) if $vmd; - $smsg; + wantarray ? ($smsg, []) : $smsg; } } @@ -496,13 +571,11 @@ sub set_xvmd { sto_export_kw($self, $smsg->{num}, $vmd); } -sub checkpoint { - my ($self, $wait) = @_; - if (my $im = $self->{im}) { - $wait ? $im->barrier : $im->checkpoint; - } - delete $self->{lms}; - $self->{priv_eidx}->checkpoint($wait); +sub check_done { + my ($self) = @_; + $self->git->_active ? + add_uniq_timer("$self-check_done", 5, \&check_done, $self) : + done($self); } sub xchg_stderr { @@ -510,32 +583,42 @@ sub xchg_stderr { _tail_err($self) if $self->{-err_wr}; my $dir = $self->{priv_eidx}->{topdir}; return unless -e $dir; - my $old = delete $self->{-tmp_err}; - my $pfx = POSIX::strftime('%Y%m%d%H%M%S', gmtime(time)); - my $err = File::Temp->new(TEMPLATE => "$pfx.$$.lei_storeXXXX", - SUFFIX => '.err', DIR => $dir); - open STDERR, '>>', $err->filename or die "dup2: $!"; + delete $self->{-tmp_err}; + my ($err, $name) = tmpnam(); + open STDERR, '>>', $name or die "dup2: $!"; + unlink($name); STDERR->autoflush(1); # shared with shard subprocesses $self->{-tmp_err} = $err; # separate file description for RO access undef; } -sub done { - my ($self, $sock_ref) = @_; - my $err = ''; - if (my $im = delete($self->{im})) { - eval { $im->done }; - if ($@) { - $err .= "import done: $@\n"; - warn $err; - } +sub _commit ($$) { + my ($self, $cmd) = @_; # cmd is 'done' or 'barrier' + my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request + my @err; + if ($self->{im}) { + eval { $self->{im}->$cmd }; + push(@err, "E: import $cmd: $@\n") if $@; } delete $self->{lms}; - $self->{priv_eidx}->done; # V2Writable::done + eval { $self->{priv_eidx}->$cmd }; + push(@err, "E: priv_eidx $cmd: $@\n") if $@; + print { $errfh // \*STDERR } @err; + send($lei_sock, 'child_error 256', 0) if @err && $lei_sock; xchg_stderr($self); - die $err if $err; + die @err if @err; + # $lei_sock goes out-of-scope and script/lei can terminate } +sub barrier { + my ($self) = @_; + _commit $self, 'barrier'; + add_uniq_timer("$self-check_done", 5, \&check_done, $self); + undef; +} + +sub done { _commit $_[0], 'done' } + sub ipc_atfork_child { my ($self) = @_; my $lei = $self->{lei}; @@ -554,10 +637,15 @@ sub recv_and_run { $self->SUPER::recv_and_run(@args); } +sub _sto_atexit { # awaitpid cb + my ($pid) = @_; + warn "lei/store PID:$pid died \$?=$?\n" if $?; +} + sub write_prepare { my ($self, $lei) = @_; $lei // die 'BUG: $lei not passed'; - unless ($self->{-ipc_req}) { + unless ($self->{-wq_s1}) { my $dir = $lei->store_path; substr($dir, -length('/lei/store'), 10, ''); pipe(my ($r, $w)) or die "pipe: $!"; @@ -565,12 +653,12 @@ sub write_prepare { # Mail we import into lei are private, so headers filtered out # by -mda for public mail are not appropriate local @PublicInbox::MDA::BAD_HEADERS = (); + local $SIG{ALRM} = 'IGNORE'; $self->wq_workers_start("lei/store $dir", 1, $lei->oldset, { lei => $lei, -err_wr => $w, to_close => [ $r ], - }); - require PublicInbox::LeiStoreErr; + }, \&_sto_atexit); PublicInbox::LeiStoreErr->new($r, $lei); } $lei->{sto} = $self; |