about summary refs log tree commit homepage
path: root/lib/PublicInbox/ExtSearchIdx.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm80
1 files changed, 48 insertions, 32 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 7c44a1a4..763a124c 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -16,13 +16,13 @@
 package PublicInbox::ExtSearchIdx;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::ExtSearch PublicInbox::Lock);
+use parent qw(PublicInbox::ExtSearch PublicInbox::Umask PublicInbox::Lock);
 use Carp qw(croak carp);
 use Scalar::Util qw(blessed);
 use Sys::Hostname qw(hostname);
-use POSIX qw(strftime);
 use File::Glob qw(bsd_glob GLOB_NOSORT);
 use PublicInbox::MultiGit;
+use PublicInbox::Spawn ();
 use PublicInbox::Search;
 use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob);
 use PublicInbox::OverIdx;
@@ -34,6 +34,7 @@ use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
 use PublicInbox::DS qw(now add_timer);
 use DBI qw(:sql_types); # SQL_BLOB
+use PublicInbox::Admin qw(fmt_localtime);
 
 sub new {
         my (undef, $dir, $opt) = @_;
@@ -113,11 +114,30 @@ sub check_batch_limit ($) {
         ${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes};
 }
 
+sub bad_ibx_id ($$;$) {
+        my ($self, $ibx_id, $cb) = @_;
+        my $msg = "E: bad/stale ibx_id=#$ibx_id encountered";
+        my $ekey = $self->{oidx}->dbh->selectrow_array(<<EOM, undef, $ibx_id);
+SELECT eidx_key FROM inboxes WHERE ibx_id = ? LIMIT 1
+EOM
+        $msg .= " (formerly `$ekey')" if defined $ekey;
+        $cb //= \&carp;
+        $cb->($msg, "\nE: running $0 --gc may be required");
+}
+
+sub check_xr3 ($$$) {
+        my ($self, $id2pos, $xr3) = @_;
+        @$xr3 = grep {
+                defined($id2pos->{$_->[0]}) ? 1 : bad_ibx_id($self, $_->[0])
+        } @$xr3;
+}
+
 sub apply_boost ($$) {
         my ($req, $smsg) = @_;
         my $id2pos = $req->{id2pos}; # index in ibx_sorted
         my $xr3 = $req->{self}->{oidx}->get_xref3($smsg->{num}, 1);
-        @$xr3 = sort {
+        check_xr3($req->{self}, $id2pos, $xr3);
+        @$xr3 = sort { # sort ascending
                 $id2pos->{$a->[0]} <=> $id2pos->{$b->[0]}
                                 ||
                 $a->[1] <=> $b->[1] # break ties with {xnum}
@@ -406,14 +426,14 @@ EOM
         while (my ($ibx_id, $eidx_key) = $ibx_ck->fetchrow_array) {
                 next if $self->{ibx_map}->{$eidx_key};
                 $self->{midx}->remove_eidx_key($eidx_key);
-                warn "I: deleting messages for $eidx_key...\n";
+                warn "# deleting messages for $eidx_key...\n";
                 $x3_doc->execute($ibx_id);
                 my $ibx = { -ibx_id => $ibx_id, -gc_eidx_key => $eidx_key };
                 while (my ($docid, $xnum, $oid) = $x3_doc->fetchrow_array) {
                         my $r = _unref_doc($sync, $docid, $ibx, $xnum, $oid);
                         $oid = unpack('H*', $oid);
                         $r = $r ? 'unref' : 'remove';
-                        warn "I: $r #$docid $eidx_key $oid\n";
+                        warn "# $r #$docid $eidx_key $oid\n";
                         if (checkpoint_due($sync)) {
                                 $x3_doc = $ibx_ck = undef;
                                 reindex_checkpoint($self, $sync);
@@ -433,12 +453,12 @@ SELECT key FROM eidx_meta WHERE key LIKE ? ESCAPE ?
                 $lc_i->execute("lc-%:$pat//%", '\\');
                 while (my ($key) = $lc_i->fetchrow_array) {
                         next if $key !~ m!\Alc-v[1-9]+:\Q$eidx_key\E//!;
-                        warn "I: removing $key\n";
+                        warn "# removing $key\n";
                         $self->{oidx}->dbh->do(<<'', undef, $key);
 DELETE FROM eidx_meta WHERE key = ?
 
                 }
-                warn "I: $eidx_key removed\n";
+                warn "# $eidx_key removed\n";
         }
 }
 
@@ -447,20 +467,20 @@ sub eidx_gc_scan_shards ($$) { # TODO: use for lei/store
         my $nr = $self->{oidx}->dbh->do(<<'');
 DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
 
-        warn "I: eliminated $nr stale xref3 entries\n" if $nr != 0;
+        warn "# eliminated $nr stale xref3 entries\n" if $nr != 0;
         reindex_checkpoint($self, $sync) if checkpoint_due($sync);
 
         # fixup from old bugs:
         $nr = $self->{oidx}->dbh->do(<<'');
 DELETE FROM over WHERE num > 0 AND num NOT IN (SELECT docid FROM xref3)
 
-        warn "I: eliminated $nr stale over entries\n" if $nr != 0;
+        warn "# eliminated $nr stale over entries\n" if $nr != 0;
         reindex_checkpoint($self, $sync) if checkpoint_due($sync);
 
         $nr = $self->{oidx}->dbh->do(<<'');
 DELETE FROM eidxq WHERE docid NOT IN (SELECT num FROM over)
 
-        warn "I: eliminated $nr stale reindex queue entries\n" if $nr != 0;
+        warn "# eliminated $nr stale reindex queue entries\n" if $nr != 0;
         reindex_checkpoint($self, $sync) if checkpoint_due($sync);
 
         my ($cur) = $self->{oidx}->dbh->selectrow_array(<<EOM);
@@ -490,7 +510,7 @@ SELECT num FROM over WHERE num >= ? ORDER BY num ASC LIMIT 10000
                         reindex_checkpoint($self, $sync);
                 }
         }
-        warn "I: eliminated $nr stale Xapian documents\n" if $nr != 0;
+        warn "# eliminated $nr stale Xapian documents\n" if $nr != 0;
 }
 
 sub eidx_gc {
@@ -513,8 +533,9 @@ sub eidx_gc {
 
 sub _ibx_for ($$$) {
         my ($self, $sync, $smsg) = @_;
-        my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset';
-        my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+        my $ibx_id = delete($smsg->{ibx_id}) // die 'BUG: {ibx_id} unset';
+        my $pos = $sync->{id2pos}->{$ibx_id} //
+                bad_ibx_id($self, $ibx_id, \&croak);
         $self->{-ibx_ary_known}->[$pos] //
                 die "BUG: ibx for $smsg->{blob} not mapped"
 }
@@ -657,7 +678,8 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
         # hit the common case in _reindex_finalize without rereading
         # from git (or holding multiple messages in memory).
         my $id2pos = $sync->{id2pos}; # index in ibx_sorted
-        @$xr3 = sort {
+        check_xr3($self, $id2pos, $xr3);
+        @$xr3 = sort { # sort descending
                 $id2pos->{$b->[0]} <=> $id2pos->{$a->[0]}
                                 ||
                 $b->[1] <=> $a->[1] # break ties with {xnum}
@@ -728,16 +750,14 @@ sub eidxq_lock_acquire ($) {
                 return $locked if $locked eq $cur;
         }
         my ($pid, $time, $euid, $ident) = split(/-/, $cur, 4);
-        my $t = strftime('%Y-%m-%d %k:%M %z', localtime($time));
+        my $t = fmt_localtime($time);
         local $self->{current_info} = 'eidxq';
         if ($euid == $> && $ident eq host_ident) {
-                if (kill(0, $pid)) {
-                        warn <<EOM; return;
-I: PID:$pid (re)indexing since $t, it will continue our work
+                kill(0, $pid) and warn <<EOM and return;
+# PID:$pid (re)indexing since $t, it will continue our work
 EOM
-                }
                 if ($!{ESRCH}) {
-                        warn "I: eidxq_lock is stale ($cur), clobbering\n";
+                        warn "# eidxq_lock is stale ($cur), clobbering\n";
                         return _eidxq_take($self);
                 }
                 warn "E: kill(0, $pid) failed: $!\n"; # fall-through:
@@ -837,7 +857,7 @@ sub reindex_unseen ($$$$) {
                 xnum => $xsmsg->{num},
                 # {mids} and {chash} will be filled in at _reindex_unseen
         };
-        warn "I: reindex_unseen ${\$ibx->eidx_key}:$req->{xnum}:$req->{oid}\n";
+        warn "# reindex_unseen ${\$ibx->eidx_key}:$req->{xnum}:$req->{oid}\n";
         $self->git->cat_async($xsmsg->{blob}, \&_reindex_unseen, $req);
 }
 
@@ -1181,12 +1201,6 @@ sub update_last_commit { # overrides V2Writable
         $self->{oidx}->eidx_meta($meta_key, $latest_cmt);
 }
 
-sub _idx_init { # with_umask callback
-        my ($self, $opt) = @_;
-        PublicInbox::V2Writable::_idx_init($self, $opt); # acquires ei.lock
-        $self->{midx} = PublicInbox::MiscIdx->new($self);
-}
-
 sub symlink_packs ($$) {
         my ($ibx, $pd) = @_;
         my $ret = 0;
@@ -1272,15 +1286,17 @@ sub idx_init { # similar to V2Writable
         }
         ($has_new || $prune_nr || $new ne '') and
                 $self->{mg}->write_alternates($mode, $alt, $new);
-        $git_midx and $self->with_umask(sub {
+        my $restore = $self->with_umask;
+        if ($git_midx) {
                 my @cmd = ('multi-pack-index');
                 push @cmd, '--no-progress' if ($opt->{quiet}//0) > 1;
                 my $lk = $self->lock_for_scope;
                 system('git', "--git-dir=$ALL", @cmd, 'write');
                 # ignore errors, fairly new command, may not exist
-        });
+        }
         $self->parallel_init($self->{indexlevel});
-        $self->with_umask(\&_idx_init, $self, $opt);
+        PublicInbox::V2Writable::_idx_init($self, $opt); # acquires ei.lock
+        $self->{midx} = PublicInbox::MiscIdx->new($self);
         $self->{oidx}->begin_lazy;
         $self->{oidx}->eidx_prep;
         $self->{midx}->create_xdb if $new ne '';
@@ -1390,7 +1406,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
         my $quit = PublicInbox::SearchIdx::quit_cb($sync);
         $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
         local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
-        PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
+        local @PublicInbox::DS::post_loop_do = (sub { !$sync->{quit} });
         $pr->("initial scan complete, entering event loop\n") if $pr;
         # calls InboxIdle->event_step:
         PublicInbox::DS::event_loop($sig, $oldset);
@@ -1399,7 +1415,6 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
 
 no warnings 'once';
 *done = \&PublicInbox::V2Writable::done;
-*with_umask = \&PublicInbox::InboxWritable::with_umask;
 *parallel_init = \&PublicInbox::V2Writable::parallel_init;
 *nproc_shards = \&PublicInbox::V2Writable::nproc_shards;
 *sync_prepare = \&PublicInbox::V2Writable::sync_prepare;
@@ -1409,5 +1424,6 @@ no warnings 'once';
 *idx_shard = \&PublicInbox::V2Writable::idx_shard;
 *reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint;
 *checkpoint = \&PublicInbox::V2Writable::checkpoint;
+*barrier = \&PublicInbox::V2Writable::barrier;
 
 1;