about summary refs log tree commit homepage
path: root/lib/PublicInbox/ExtSearchIdx.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-12-31 13:24:36 +0000
committerEric Wong <e@80x24.org>2020-12-31 13:24:36 +0000
commit0c8106d44f317175e122744b43407bf067183175 (patch)
treef59296dc3c63c8ca1dd8445821f43777333ba30e /lib/PublicInbox/ExtSearchIdx.pm
parent9d29ceda4eb8c9973749d74b928416f5c3cc78f8 (diff)
parent0f461dcd3317f44670e2fc50346f87ff41e80127 (diff)
downloadpublic-inbox-0c8106d44f317175e122744b43407bf067183175.tar.gz
* origin/master: (58 commits)
  ds: flatten + reuse @events, epoll_wait style fixes
  ds: simplify EventLoop implementation
  check defined return value for localized slurp errors
  import: check for git->qx errors, clearer return values
  git: qx: avoid extra "local" for scalar context case
  search: remove {mset} option for ->mset method
  search: remove pointless {relevance} setting
  miscsearch: take reopen from Search and use it
  extsearch: unconditionally reopen on access
  extindex: allow using --all without EXTINDEX_DIR
  extindex: add undocumented --no-scan switch
  extindex: enable autoflush on STDOUT/STDERR
  extindex: various --watch signal handling fixes
  extindex: --watch for inotify-based updates
  eml: fix undefined vars on <Perl 5.28
  t/config: test --get-urlmatch for git <2.26
  default to CORE::warn in $SIG{__WARN__} handlers
  inbox: name variable for values loop iterator
  inboxidle: avoid needless syscalls on refresh
  inboxidle: clue users into resolving ENOSPC from inotify
  ...
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm234
1 files changed, 191 insertions, 43 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 56896056..a2d70205 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -30,13 +30,11 @@ use PublicInbox::V2Writable;
 use PublicInbox::InboxWritable;
 use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
-use File::Spec;
 use PublicInbox::DS qw(now);
 use DBI qw(:sql_types); # SQL_BLOB
 
 sub new {
         my (undef, $dir, $opt) = @_;
-        $dir = File::Spec->canonpath($dir);
         my $l = $opt->{indexlevel} // 'full';
         $l !~ $PublicInbox::SearchIdx::INDEXLEVELS and
                 die "invalid indexlevel=$l\n";
@@ -56,28 +54,14 @@ sub new {
         }, __PACKAGE__;
         $self->{shards} = $self->count_shards || nproc_shards($opt->{creat});
         my $oidx = PublicInbox::OverIdx->new("$self->{xpfx}/over.sqlite3");
-        $oidx->{-no_fsync} = 1 if $opt->{-no_fsync};
+        $self->{-no_fsync} = $oidx->{-no_fsync} = 1 if !$opt->{fsync};
         $self->{oidx} = $oidx;
         $self
 }
 
 sub attach_inbox {
         my ($self, $ibx) = @_;
-        my $key = $ibx->eidx_key;
-        if (!$ibx->over || !$ibx->mm) {
-                warn "W: skipping $key (unindexed)\n";
-                return;
-        }
-        if (!defined($ibx->uidvalidity)) {
-                warn "W: skipping $key (no UIDVALIDITY)\n";
-                return;
-        }
-        my $ibxdir = File::Spec->canonpath($ibx->{inboxdir});
-        if ($ibxdir ne $ibx->{inboxdir}) {
-                warn "W: `$ibx->{inboxdir}' canonicalized to `$ibxdir'\n";
-                $ibx->{inboxdir} = $ibxdir;
-        }
-        $self->{ibx_map}->{$key} //= do {
+        $self->{ibx_map}->{$ibx->eidx_key} //= do {
                 push @{$self->{ibx_list}}, $ibx;
                 $ibx;
         }
@@ -281,29 +265,36 @@ sub last_commits {
         $heads;
 }
 
+sub _ibx_index_reject ($) {
+        my ($ibx) = @_;
+        $ibx->mm // return 'unindexed, no msgmap.sqlite3';
+        $ibx->uidvalidity // return 'no UIDVALIDITY';
+        $ibx->over // return 'unindexed, no over.sqlite3';
+        undef;
+}
+
 sub _sync_inbox ($$$) {
         my ($self, $sync, $ibx) = @_;
+        my $ekey = $ibx->eidx_key;
+        if (defined(my $err = _ibx_index_reject($ibx))) {
+                return "W: skipping $ekey ($err)";
+        }
         $sync->{ibx} = $ibx;
         $sync->{nr} = \(my $nr = 0);
         my $v = $ibx->version;
-        my $ekey = $ibx->eidx_key;
         if ($v == 2) {
                 $sync->{epoch_max} = $ibx->max_git_epoch // return;
                 sync_prepare($self, $sync); # or return # TODO: once MiscIdx is stable
         } elsif ($v == 1) {
                 my $uv = $ibx->uidvalidity;
                 my $lc = $self->{oidx}->eidx_meta("lc-v1:$ekey//$uv");
-                my $head = $ibx->mm->last_commit;
-                unless (defined $head) {
-                        warn "E: $ibx->{inboxdir} is not indexed\n";
-                        return;
-                }
+                my $head = $ibx->mm->last_commit //
+                        return "E: $ibx->{inboxdir} is not indexed";
                 my $stk = prepare_stack($sync, $lc ? "$lc..$head" : $head);
                 my $unit = { stack => $stk, git => $ibx->git };
                 push @{$sync->{todo}}, $unit;
         } else {
-                warn "E: $ekey unsupported inbox version (v$v)\n";
-                return;
+                return "E: $ekey unsupported inbox version (v$v)";
         }
         for my $unit (@{delete($sync->{todo}) // []}) {
                 last if $sync->{quit};
@@ -311,6 +302,7 @@ sub _sync_inbox ($$$) {
         }
         $self->{midx}->index_ibx($ibx) unless $sync->{quit};
         $ibx->git->cleanup; # done with this inbox, now
+        undef;
 }
 
 sub gc_unref_doc ($$$$) {
@@ -401,6 +393,32 @@ sub _ibx_for ($$$) {
         $self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
 }
 
+sub _fd_constrained ($) {
+        my ($self) = @_;
+        $self->{-fd_constrained} //= do {
+                my $soft;
+                if (eval { require BSD::Resource; 1 }) {
+                        my $NOFILE = BSD::Resource::RLIMIT_NOFILE();
+                        ($soft, undef) = BSD::Resource::getrlimit($NOFILE);
+                } else {
+                        chomp($soft = `sh -c 'ulimit -n'`);
+                }
+                if (defined($soft)) {
+                        my $want = scalar(@{$self->{ibx_list}}) + 64; # estimate
+                        my $ret = $want > $soft;
+                        if ($ret) {
+                                warn <<EOF;
+RLIMIT_NOFILE=$soft insufficient (want: $want), will close DB handles early
+EOF
+                        }
+                        $ret;
+                } else {
+                        warn "Unable to determine RLIMIT_NOFILE: $@\n";
+                        1;
+                }
+        };
+}
+
 sub _reindex_finalize ($$$) {
         my ($req, $smsg, $eml) = @_;
         my $sync = $req->{sync};
@@ -437,11 +455,16 @@ sub _reindex_finalize ($$$) {
                 my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
                 $x->{num} = delete($x->{xnum}) // die '{xnum} unset';
                 $ibx = _ibx_for($self, $sync, $x);
-                my $e = $ibx->over->get_art($x->{num});
-                $e->{blob} eq $x->{blob} or die <<EOF;
+                if (my $over = $ibx->over) {
+                        my $e = $over->get_art($x->{num});
+                        $e->{blob} eq $x->{blob} or die <<EOF;
 $x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
 EOF
-                push @todo, $ibx, $e;
+                        push @todo, $ibx, $e;
+                        $over->dbh_close if _fd_constrained($self);
+                } else {
+                        die "$ibx->{inboxdir}: over.sqlite3 unusable: $!\n";
+                }
         }
         undef $by_chash;
         while (my ($ibx, $e) = splice(@todo, 0, 2)) {
@@ -607,7 +630,7 @@ sub eidxq_process ($$) { # for reindexing
         my $dbh = $self->{oidx}->dbh;
         my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
         ${$sync->{nr}} = 0;
-        $sync->{-regen_fmt} = "%u/$tot\n";
+        local $sync->{-regen_fmt} = "%u/$tot\n";
         my $pr = $sync->{-opt}->{-progress};
         if ($pr) {
                 my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
@@ -686,7 +709,8 @@ sub _reindex_check_unseen ($$$) {
         my $msgs;
         my $pr = $sync->{-opt}->{-progress};
         my $ekey = $ibx->eidx_key;
-        $sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n";
+        local $sync->{-regen_fmt} =
+                        "$ekey checking unseen %u/".$ibx->over->max."\n";
         ${$sync->{nr}} = 0;
 
         while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
@@ -729,7 +753,7 @@ sub _reindex_check_stale ($$$) {
         my $pr = $sync->{-opt}->{-progress};
         my $fetching;
         my $ekey = $ibx->eidx_key;
-        $sync->{-regen_fmt} =
+        local $sync->{-regen_fmt} =
                         "$ekey check stale/missing %u/".$ibx->over->max."\n";
         ${$sync->{nr}} = 0;
         do {
@@ -787,9 +811,14 @@ DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 
 sub _reindex_inbox ($$$) {
         my ($self, $sync, $ibx) = @_;
-        local $self->{current_info} = $ibx->eidx_key;
-        _reindex_check_unseen($self, $sync, $ibx);
-        _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+        my $ekey = $ibx->eidx_key;
+        local $self->{current_info} = $ekey;
+        if (defined(my $err = _ibx_index_reject($ibx))) {
+                warn "W: cannot reindex $ekey ($err)\n";
+        } else {
+                _reindex_check_unseen($self, $sync, $ibx);
+                _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+        }
         delete @$ibx{qw(over mm search git)}; # won't need these for a bit
 }
 
@@ -810,10 +839,17 @@ sub eidx_reindex {
         eidxq_process($self, $sync) unless $sync->{quit};
 }
 
+sub sync_inbox {
+        my ($self, $sync, $ibx) = @_;
+        my $err = _sync_inbox($self, $sync, $ibx);
+        delete @$ibx{qw(mm over)};
+        warn $err, "\n" if defined($err);
+}
+
 sub eidx_sync { # main entry point
         my ($self, $opt) = @_;
 
-        my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
+        my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
         local $self->{current_info} = '';
         local $SIG{__WARN__} = sub {
                 $warn_cb->($self->{current_info}, ': ', @_);
@@ -840,20 +876,23 @@ sub eidx_sync { # main entry point
                 $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
         }
         if (delete($opt->{reindex})) {
-                $sync->{checkpoint_unlocks} = 1;
+                local $sync->{checkpoint_unlocks} = 1;
                 eidx_reindex($self, $sync);
         }
 
         # don't use $_ here, it'll get clobbered by reindex_checkpoint
-        for my $ibx (@{$self->{ibx_list}}) {
-                last if $sync->{quit};
-                _sync_inbox($self, $sync, $ibx);
+        if ($opt->{scan} // 1) {
+                for my $ibx (@{$self->{ibx_list}}) {
+                        last if $sync->{quit};
+                        sync_inbox($self, $sync, $ibx);
+                }
         }
         $self->{oidx}->rethread_done($opt) unless $sync->{quit};
         eidxq_process($self, $sync) unless $sync->{quit};
 
         eidxq_release($self);
-        PublicInbox::V2Writable::done($self);
+        done($self);
+        $sync; # for eidx_watch
 }
 
 sub update_last_commit { # overrides V2Writable
@@ -963,16 +1002,125 @@ sub idx_init { # similar to V2Writable
                 PublicInbox::V2Writable::write_alternates($info_dir, $mode, $o);
         }
         $self->parallel_init($self->{indexlevel});
-        $self->umask_prepare;
         $self->with_umask(\&_idx_init, $self, $opt);
         $self->{oidx}->begin_lazy;
         $self->{oidx}->eidx_prep;
         $self->{midx}->begin_txn;
 }
 
+sub _watch_commit { # PublicInbox::DS::add_timer callback
+        my ($self) = @_;
+        delete $self->{-commit_timer};
+        eidxq_process($self, $self->{-watch_sync});
+        eidxq_release($self);
+        delete local $self->{-watch_sync}->{-regen_fmt};
+        reindex_checkpoint($self, $self->{-watch_sync});
+
+        # call event_step => done unless commit_timer is armed
+        PublicInbox::DS::requeue($self);
+}
+
+sub on_inbox_unlock { # called by PublicInbox::InboxIdle
+        my ($self, $ibx) = @_;
+        my $opt = $self->{-watch_sync}->{-opt};
+        my $pr = $opt->{-progress};
+        my $ekey = $ibx->eidx_key;
+        local $0 = "sync $ekey";
+        $pr->("indexing $ekey\n") if $pr;
+        $self->idx_init($opt);
+        sync_inbox($self, $self->{-watch_sync}, $ibx);
+        $self->{-commit_timer} //= PublicInbox::DS::add_timer(
+                                        $opt->{'commit-interval'} // 10,
+                                        \&_watch_commit, $self);
+}
+
+sub eidx_reload { # -extindex --watch SIGHUP handler
+        my ($self, $idler) = @_;
+        if ($self->{cfg}) {
+                my $pr = $self->{-watch_sync}->{-opt}->{-progress};
+                $pr->('reloading ...') if $pr;
+                delete $self->{-resync_queue};
+                @{$self->{ibx_list}} = ();
+                %{$self->{ibx_map}} = ();
+                delete $self->{-watch_sync}->{id2pos};
+                my $cfg = PublicInbox::Config->new;
+                attach_config($self, $cfg);
+                $idler->refresh($cfg);
+                $pr->(" done\n") if $pr;
+        } else {
+                warn "reload not supported without --all\n";
+        }
+}
+
+sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler
+        my ($self) = @_;
+        $self->{-resync_queue} //= [ @{$self->{ibx_list}} ];
+        PublicInbox::DS::requeue($self); # trigger our ->event_step
+}
+
+sub event_step { # PublicInbox::DS::requeue callback
+        my ($self) = @_;
+        if (my $resync_queue = $self->{-resync_queue}) {
+                if (my $ibx = shift(@$resync_queue)) {
+                        on_inbox_unlock($self, $ibx);
+                        PublicInbox::DS::requeue($self);
+                } else {
+                        delete $self->{-resync_queue};
+                        _watch_commit($self);
+                }
+        } else {
+                done($self) unless $self->{-commit_timer};
+        }
+}
+
+sub eidx_watch { # public-inbox-extindex --watch main loop
+        my ($self, $opt) = @_;
+        local %SIG = %SIG;
+        for my $sig (qw(HUP USR1 TSTP QUIT INT TERM)) {
+                $SIG{$sig} = sub { warn "SIG$sig ignored while scanning\n" };
+        }
+        require PublicInbox::InboxIdle;
+        require PublicInbox::DS;
+        require PublicInbox::Syscall;
+        require PublicInbox::Sigfd;
+        my $idler = PublicInbox::InboxIdle->new($self->{cfg});
+        if (!$self->{cfg}) {
+                $idler->watch_inbox($_) for @{$self->{ibx_list}};
+        }
+        $_->subscribe_unlock(__PACKAGE__, $self) for @{$self->{ibx_list}};
+        my $pr = $opt->{-progress};
+        $pr->("performing initial scan ...\n") if $pr;
+        my $sync = eidx_sync($self, $opt); # initial sync
+        return if $sync->{quit};
+        my $oldset = PublicInbox::Sigfd::block_signals();
+        local $self->{current_info} = '';
+        my $cb = $SIG{__WARN__} || \&CORE::warn;
+        local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
+        my $sig = {
+                HUP => sub { eidx_reload($self, $idler) },
+                USR1 => sub { eidx_resync_start($self) },
+                TSTP => sub { kill('STOP', $$) },
+        };
+        my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+        $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
+        my $sigfd = PublicInbox::Sigfd->new($sig,
+                                        $PublicInbox::Syscall::SFD_NONBLOCK);
+        %SIG = (%SIG, %$sig) if !$sigfd;
+        local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
+        if (!$sigfd) {
+                # wake up every second to accept signals if we don't
+                # have signalfd or IO::KQueue:
+                PublicInbox::Sigfd::sig_setmask($oldset);
+                PublicInbox::DS->SetLoopTimeout(1000);
+        }
+        PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
+        $pr->("initial scan complete, entering event loop\n") if $pr;
+        PublicInbox::DS->EventLoop; # calls InboxIdle->event_step
+        done($self);
+}
+
 no warnings 'once';
 *done = \&PublicInbox::V2Writable::done;
-*umask_prepare = \&PublicInbox::InboxWritable::umask_prepare;
 *with_umask = \&PublicInbox::InboxWritable::with_umask;
 *parallel_init = \&PublicInbox::V2Writable::parallel_init;
 *nproc_shards = \&PublicInbox::V2Writable::nproc_shards;