diff options
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 80 |
1 files changed, 48 insertions, 32 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 7c44a1a4..763a124c 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -16,13 +16,13 @@ package PublicInbox::ExtSearchIdx; use strict; use v5.10.1; -use parent qw(PublicInbox::ExtSearch PublicInbox::Lock); +use parent qw(PublicInbox::ExtSearch PublicInbox::Umask PublicInbox::Lock); use Carp qw(croak carp); use Scalar::Util qw(blessed); use Sys::Hostname qw(hostname); -use POSIX qw(strftime); use File::Glob qw(bsd_glob GLOB_NOSORT); use PublicInbox::MultiGit; +use PublicInbox::Spawn (); use PublicInbox::Search; use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob); use PublicInbox::OverIdx; @@ -34,6 +34,7 @@ use PublicInbox::ContentHash qw(content_hash); use PublicInbox::Eml; use PublicInbox::DS qw(now add_timer); use DBI qw(:sql_types); # SQL_BLOB +use PublicInbox::Admin qw(fmt_localtime); sub new { my (undef, $dir, $opt) = @_; @@ -113,11 +114,30 @@ sub check_batch_limit ($) { ${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes}; } +sub bad_ibx_id ($$;$) { + my ($self, $ibx_id, $cb) = @_; + my $msg = "E: bad/stale ibx_id=#$ibx_id encountered"; + my $ekey = $self->{oidx}->dbh->selectrow_array(<<EOM, undef, $ibx_id); +SELECT eidx_key FROM inboxes WHERE ibx_id = ? LIMIT 1 +EOM + $msg .= " (formerly `$ekey')" if defined $ekey; + $cb //= \&carp; + $cb->($msg, "\nE: running $0 --gc may be required"); +} + +sub check_xr3 ($$$) { + my ($self, $id2pos, $xr3) = @_; + @$xr3 = grep { + defined($id2pos->{$_->[0]}) ? 1 : bad_ibx_id($self, $_->[0]) + } @$xr3; +} + sub apply_boost ($$) { my ($req, $smsg) = @_; my $id2pos = $req->{id2pos}; # index in ibx_sorted my $xr3 = $req->{self}->{oidx}->get_xref3($smsg->{num}, 1); - @$xr3 = sort { + check_xr3($req->{self}, $id2pos, $xr3); + @$xr3 = sort { # sort ascending $id2pos->{$a->[0]} <=> $id2pos->{$b->[0]} || $a->[1] <=> $b->[1] # break ties with {xnum} @@ -406,14 +426,14 @@ EOM while (my ($ibx_id, $eidx_key) = $ibx_ck->fetchrow_array) { next if $self->{ibx_map}->{$eidx_key}; $self->{midx}->remove_eidx_key($eidx_key); - warn "I: deleting messages for $eidx_key...\n"; + warn "# deleting messages for $eidx_key...\n"; $x3_doc->execute($ibx_id); my $ibx = { -ibx_id => $ibx_id, -gc_eidx_key => $eidx_key }; while (my ($docid, $xnum, $oid) = $x3_doc->fetchrow_array) { my $r = _unref_doc($sync, $docid, $ibx, $xnum, $oid); $oid = unpack('H*', $oid); $r = $r ? 'unref' : 'remove'; - warn "I: $r #$docid $eidx_key $oid\n"; + warn "# $r #$docid $eidx_key $oid\n"; if (checkpoint_due($sync)) { $x3_doc = $ibx_ck = undef; reindex_checkpoint($self, $sync); @@ -433,12 +453,12 @@ SELECT key FROM eidx_meta WHERE key LIKE ? ESCAPE ? $lc_i->execute("lc-%:$pat//%", '\\'); while (my ($key) = $lc_i->fetchrow_array) { next if $key !~ m!\Alc-v[1-9]+:\Q$eidx_key\E//!; - warn "I: removing $key\n"; + warn "# removing $key\n"; $self->{oidx}->dbh->do(<<'', undef, $key); DELETE FROM eidx_meta WHERE key = ? } - warn "I: $eidx_key removed\n"; + warn "# $eidx_key removed\n"; } } @@ -447,20 +467,20 @@ sub eidx_gc_scan_shards ($$) { # TODO: use for lei/store my $nr = $self->{oidx}->dbh->do(<<''); DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over) - warn "I: eliminated $nr stale xref3 entries\n" if $nr != 0; + warn "# eliminated $nr stale xref3 entries\n" if $nr != 0; reindex_checkpoint($self, $sync) if checkpoint_due($sync); # fixup from old bugs: $nr = $self->{oidx}->dbh->do(<<''); DELETE FROM over WHERE num > 0 AND num NOT IN (SELECT docid FROM xref3) - warn "I: eliminated $nr stale over entries\n" if $nr != 0; + warn "# eliminated $nr stale over entries\n" if $nr != 0; reindex_checkpoint($self, $sync) if checkpoint_due($sync); $nr = $self->{oidx}->dbh->do(<<''); DELETE FROM eidxq WHERE docid NOT IN (SELECT num FROM over) - warn "I: eliminated $nr stale reindex queue entries\n" if $nr != 0; + warn "# eliminated $nr stale reindex queue entries\n" if $nr != 0; reindex_checkpoint($self, $sync) if checkpoint_due($sync); my ($cur) = $self->{oidx}->dbh->selectrow_array(<<EOM); @@ -490,7 +510,7 @@ SELECT num FROM over WHERE num >= ? ORDER BY num ASC LIMIT 10000 reindex_checkpoint($self, $sync); } } - warn "I: eliminated $nr stale Xapian documents\n" if $nr != 0; + warn "# eliminated $nr stale Xapian documents\n" if $nr != 0; } sub eidx_gc { @@ -513,8 +533,9 @@ sub eidx_gc { sub _ibx_for ($$$) { my ($self, $sync, $smsg) = @_; - my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset'; - my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos"; + my $ibx_id = delete($smsg->{ibx_id}) // die 'BUG: {ibx_id} unset'; + my $pos = $sync->{id2pos}->{$ibx_id} // + bad_ibx_id($self, $ibx_id, \&croak); $self->{-ibx_ary_known}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped" } @@ -657,7 +678,8 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex # hit the common case in _reindex_finalize without rereading # from git (or holding multiple messages in memory). my $id2pos = $sync->{id2pos}; # index in ibx_sorted - @$xr3 = sort { + check_xr3($self, $id2pos, $xr3); + @$xr3 = sort { # sort descending $id2pos->{$b->[0]} <=> $id2pos->{$a->[0]} || $b->[1] <=> $a->[1] # break ties with {xnum} @@ -728,16 +750,14 @@ sub eidxq_lock_acquire ($) { return $locked if $locked eq $cur; } my ($pid, $time, $euid, $ident) = split(/-/, $cur, 4); - my $t = strftime('%Y-%m-%d %k:%M %z', localtime($time)); + my $t = fmt_localtime($time); local $self->{current_info} = 'eidxq'; if ($euid == $> && $ident eq host_ident) { - if (kill(0, $pid)) { - warn <<EOM; return; -I: PID:$pid (re)indexing since $t, it will continue our work + kill(0, $pid) and warn <<EOM and return; +# PID:$pid (re)indexing since $t, it will continue our work EOM - } if ($!{ESRCH}) { - warn "I: eidxq_lock is stale ($cur), clobbering\n"; + warn "# eidxq_lock is stale ($cur), clobbering\n"; return _eidxq_take($self); } warn "E: kill(0, $pid) failed: $!\n"; # fall-through: @@ -837,7 +857,7 @@ sub reindex_unseen ($$$$) { xnum => $xsmsg->{num}, # {mids} and {chash} will be filled in at _reindex_unseen }; - warn "I: reindex_unseen ${\$ibx->eidx_key}:$req->{xnum}:$req->{oid}\n"; + warn "# reindex_unseen ${\$ibx->eidx_key}:$req->{xnum}:$req->{oid}\n"; $self->git->cat_async($xsmsg->{blob}, \&_reindex_unseen, $req); } @@ -1181,12 +1201,6 @@ sub update_last_commit { # overrides V2Writable $self->{oidx}->eidx_meta($meta_key, $latest_cmt); } -sub _idx_init { # with_umask callback - my ($self, $opt) = @_; - PublicInbox::V2Writable::_idx_init($self, $opt); # acquires ei.lock - $self->{midx} = PublicInbox::MiscIdx->new($self); -} - sub symlink_packs ($$) { my ($ibx, $pd) = @_; my $ret = 0; @@ -1272,15 +1286,17 @@ sub idx_init { # similar to V2Writable } ($has_new || $prune_nr || $new ne '') and $self->{mg}->write_alternates($mode, $alt, $new); - $git_midx and $self->with_umask(sub { + my $restore = $self->with_umask; + if ($git_midx) { my @cmd = ('multi-pack-index'); push @cmd, '--no-progress' if ($opt->{quiet}//0) > 1; my $lk = $self->lock_for_scope; system('git', "--git-dir=$ALL", @cmd, 'write'); # ignore errors, fairly new command, may not exist - }); + } $self->parallel_init($self->{indexlevel}); - $self->with_umask(\&_idx_init, $self, $opt); + PublicInbox::V2Writable::_idx_init($self, $opt); # acquires ei.lock + $self->{midx} = PublicInbox::MiscIdx->new($self); $self->{oidx}->begin_lazy; $self->{oidx}->eidx_prep; $self->{midx}->create_xdb if $new ne ''; @@ -1390,7 +1406,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop my $quit = PublicInbox::SearchIdx::quit_cb($sync); $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit; local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock - PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} }); + local @PublicInbox::DS::post_loop_do = (sub { !$sync->{quit} }); $pr->("initial scan complete, entering event loop\n") if $pr; # calls InboxIdle->event_step: PublicInbox::DS::event_loop($sig, $oldset); @@ -1399,7 +1415,6 @@ sub eidx_watch { # public-inbox-extindex --watch main loop no warnings 'once'; *done = \&PublicInbox::V2Writable::done; -*with_umask = \&PublicInbox::InboxWritable::with_umask; *parallel_init = \&PublicInbox::V2Writable::parallel_init; *nproc_shards = \&PublicInbox::V2Writable::nproc_shards; *sync_prepare = \&PublicInbox::V2Writable::sync_prepare; @@ -1409,5 +1424,6 @@ no warnings 'once'; *idx_shard = \&PublicInbox::V2Writable::idx_shard; *reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint; *checkpoint = \&PublicInbox::V2Writable::checkpoint; +*barrier = \&PublicInbox::V2Writable::barrier; 1; |