about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-12-26 01:44:37 +0000
committerEric Wong <e@80x24.org>2020-12-27 09:30:33 +0000
commit1d96509a3f59c38394d2f3ac4323dc54c74dc202 (patch)
treeca024650d5f29d9a13e95d93d5bebae4166da18b /lib/PublicInbox
parent46bd595f57cc3d425754b0d0770c125616e74448 (diff)
downloadpublic-inbox-1d96509a3f59c38394d2f3ac4323dc54c74dc202.tar.gz
This reuses existing InboxIdle infrastructure to update external
indices based on per-inbox updates.  This is an alternative to
auto-updating external indices via the -index command and also
works with existing uses of -mda and public-inbox-watch.

Using inotify (or EVFILT_VNODE) allows watching thousands of
inboxes without having to scan every single one at every
invocation.

This is especially beneficial in cases where an external index
is not writable to the users writing to per-inbox indices.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm126
-rw-r--r--lib/PublicInbox/InboxIdle.pm8
-rw-r--r--lib/PublicInbox/OverIdx.pm8
-rw-r--r--lib/PublicInbox/V2Writable.pm2
4 files changed, 131 insertions, 13 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 64ebf6db..53ff2ca1 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -630,7 +630,7 @@ sub eidxq_process ($$) { # for reindexing
         my $dbh = $self->{oidx}->dbh;
         my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
         ${$sync->{nr}} = 0;
-        $sync->{-regen_fmt} = "%u/$tot\n";
+        local $sync->{-regen_fmt} = "%u/$tot\n";
         my $pr = $sync->{-opt}->{-progress};
         if ($pr) {
                 my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
@@ -709,7 +709,8 @@ sub _reindex_check_unseen ($$$) {
         my $msgs;
         my $pr = $sync->{-opt}->{-progress};
         my $ekey = $ibx->eidx_key;
-        $sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n";
+        local $sync->{-regen_fmt} =
+                        "$ekey checking unseen %u/".$ibx->over->max."\n";
         ${$sync->{nr}} = 0;
 
         while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
@@ -752,7 +753,7 @@ sub _reindex_check_stale ($$$) {
         my $pr = $sync->{-opt}->{-progress};
         my $fetching;
         my $ekey = $ibx->eidx_key;
-        $sync->{-regen_fmt} =
+        local $sync->{-regen_fmt} =
                         "$ekey check stale/missing %u/".$ibx->over->max."\n";
         ${$sync->{nr}} = 0;
         do {
@@ -838,6 +839,13 @@ sub eidx_reindex {
         eidxq_process($self, $sync) unless $sync->{quit};
 }
 
+sub sync_inbox {
+        my ($self, $sync, $ibx) = @_;
+        my $err = _sync_inbox($self, $sync, $ibx);
+        delete @$ibx{qw(mm over)};
+        warn $err, "\n" if defined($err);
+}
+
 sub eidx_sync { # main entry point
         my ($self, $opt) = @_;
 
@@ -868,22 +876,21 @@ sub eidx_sync { # main entry point
                 $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
         }
         if (delete($opt->{reindex})) {
-                $sync->{checkpoint_unlocks} = 1;
+                local $sync->{checkpoint_unlocks} = 1;
                 eidx_reindex($self, $sync);
         }
 
         # don't use $_ here, it'll get clobbered by reindex_checkpoint
         for my $ibx (@{$self->{ibx_list}}) {
                 last if $sync->{quit};
-                my $err = _sync_inbox($self, $sync, $ibx);
-                delete @$ibx{qw(mm over)};
-                warn $err, "\n" if defined($err);
+                sync_inbox($self, $sync, $ibx);
         }
         $self->{oidx}->rethread_done($opt) unless $sync->{quit};
         eidxq_process($self, $sync) unless $sync->{quit};
 
         eidxq_release($self);
-        PublicInbox::V2Writable::done($self);
+        done($self);
+        $sync; # for eidx_watch
 }
 
 sub update_last_commit { # overrides V2Writable
@@ -970,6 +977,109 @@ sub idx_init { # similar to V2Writable
         $self->{midx}->begin_txn;
 }
 
+sub _watch_commit { # PublicInbox::DS::add_timer callback
+        my ($self) = @_;
+        delete $self->{-commit_timer};
+        eidxq_process($self, $self->{-watch_sync});
+        eidxq_release($self);
+        delete local $self->{-watch_sync}->{-regen_fmt};
+        reindex_checkpoint($self, $self->{-watch_sync});
+
+        # call event_step => done unless commit_timer is armed
+        PublicInbox::DS::requeue($self);
+}
+
+sub on_inbox_unlock { # called by PublicInbox::InboxIdle
+        my ($self, $ibx) = @_;
+        my $opt = $self->{-watch_sync}->{-opt};
+        my $pr = $opt->{-progress};
+        my $ekey = $ibx->eidx_key;
+        local $0 = "sync $ekey";
+        $pr->("indexing $ekey\n") if $pr;
+        $self->idx_init($opt);
+        sync_inbox($self, $self->{-watch_sync}, $ibx);
+        $self->{-commit_timer} //= PublicInbox::DS::add_timer(
+                                        $opt->{'commit-interval'} // 10,
+                                        \&_watch_commit, $self);
+}
+
+sub eidx_reload { # -extindex --watch SIGHUP handler
+        my ($self, $idler) = @_;
+        if ($self->{cfg}) {
+                my $pr = $self->{-watch_sync}->{-opt}->{-progress};
+                $pr->('reloading ...') if $pr;
+                @{$self->{ibx_list}} = ();
+                %{$self->{ibx_map}} = ();
+                delete $self->{-watch_sync}->{id2pos};
+                my $cfg = PublicInbox::Config->new;
+                attach_config($self, $cfg);
+                $idler->refresh($cfg);
+                $pr->(" done\n") if $pr;
+        } else {
+                warn "reload not supported without --all\n";
+        }
+}
+
+sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler
+        my ($self) = @_;
+        $self->{-resync_queue} //= [ @{$self->{ibx_list}} ];
+        PublicInbox::DS::requeue($self); # trigger our ->event_step
+}
+
+sub event_step { # PublicInbox::DS::requeue callback
+        my ($self) = @_;
+        if (my $resync_queue = $self->{-resync_queue}) {
+                if (my $ibx = shift(@$resync_queue)) {
+                        on_inbox_unlock($self, $ibx);
+                        PublicInbox::DS::requeue($self);
+                } else {
+                        delete $self->{-resync_queue};
+                        _watch_commit($self);
+                }
+        } else {
+                done($self) unless $self->{-commit_timer};
+        }
+}
+
+sub eidx_watch { # public-inbox-extindex --watch main loop
+        my ($self, $opt) = @_;
+        require PublicInbox::InboxIdle;
+        require PublicInbox::DS;
+        require PublicInbox::Syscall;
+        require PublicInbox::Sigfd;
+        my $idler = PublicInbox::InboxIdle->new($self->{cfg});
+        if (!$self->{cfg}) {
+                $idler->watch_inbox($_) for @{$self->{ibx_list}};
+        }
+        $_->subscribe_unlock(__PACKAGE__, $self) for @{$self->{ibx_list}};
+        my $sync = eidx_sync($self, $opt); # initial sync
+        return if $sync->{quit};
+        my $oldset = PublicInbox::Sigfd::block_signals();
+        local $self->{current_info} = '';
+        my $cb = $SIG{__WARN__} || \&CORE::warn;
+        local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
+        my $sig = {
+                HUP => sub { eidx_reload($self, $idler) },
+                USR1 => sub { eidx_resync_start($self) },
+                TSTP => sub { kill('STOP', $$) },
+        };
+        my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+        $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
+        my $sigfd = PublicInbox::Sigfd->new($sig,
+                                        $PublicInbox::Syscall::SFD_NONBLOCK);
+        local %SIG = (%SIG, %$sig) if !$sigfd;
+        local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
+        if (!$sigfd) {
+                # wake up every second to accept signals if we don't
+                # have signalfd or IO::KQueue:
+                PublicInbox::Sigfd::sig_setmask($oldset);
+                PublicInbox::DS->SetLoopTimeout(1000);
+        }
+        PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
+        PublicInbox::DS->EventLoop; # calls InboxIdle->event_step
+        done($self);
+}
+
 no warnings 'once';
 *done = \&PublicInbox::V2Writable::done;
 *with_umask = \&PublicInbox::InboxWritable::with_umask;
diff --git a/lib/PublicInbox/InboxIdle.pm b/lib/PublicInbox/InboxIdle.pm
index 508007d7..35aed696 100644
--- a/lib/PublicInbox/InboxIdle.pm
+++ b/lib/PublicInbox/InboxIdle.pm
@@ -63,6 +63,9 @@ sub refresh {
         $pi_cfg->each_inbox(\&in2_arm, $self);
 }
 
+# internal API for ease-of-use
+sub watch_inbox { in2_arm($_[1], $_[0]) };
+
 sub new {
         my ($class, $pi_cfg) = @_;
         my $self = bless {}, $class;
@@ -78,7 +81,7 @@ sub new {
         $self->{inot} = $inot;
         $self->{pathmap} = {}; # inboxdir => [ ibx, watch1, watch2, watch3...]
         $self->{on_unlock} = {}; # lock path => ibx
-        refresh($self, $pi_cfg);
+        refresh($self, $pi_cfg) if $pi_cfg;
         PublicInbox::FakeInotify::poll_once($self) if !$ino_cls;
         $self;
 }
@@ -89,7 +92,8 @@ sub event_step {
                 my @events = $self->{inot}->read; # Linux::Inotify2::read
                 my $on_unlock = $self->{on_unlock};
                 for my $ev (@events) {
-                        if (my $ibx = $on_unlock->{$ev->fullname}) {
+                        my $fn = $ev->fullname // next; # cancelled
+                        if (my $ibx = $on_unlock->{$fn}) {
                                 $ibx->on_unlock;
                         }
                 }
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index 4a39bf53..dcc2cff3 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -473,10 +473,14 @@ sub dbh_close {
 
 sub create {
         my ($self) = @_;
-        unless (-r $self->{filename}) {
+        my $fn = $self->{filename} // do {
+                Carp::confess('BUG: no {filename}') unless $self->{dbh};
+                return;
+        };
+        unless (-r $fn) {
                 require File::Path;
                 require File::Basename;
-                File::Path::mkpath(File::Basename::dirname($self->{filename}));
+                File::Path::mkpath(File::Basename::dirname($fn));
         }
         # create the DB:
         PublicInbox::Over::dbh($self);
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index f20b5c7f..567582c5 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -879,7 +879,7 @@ sub reindex_checkpoint ($$) {
                 $self->done; # release lock
         }
 
-        if (my $pr = $sync->{-opt}->{-progress}) {
+        if (my $pr = $sync->{-regen_fmt} ? $sync->{-opt}->{-progress} : undef) {
                 $pr->(sprintf($sync->{-regen_fmt}, ${$sync->{nr}}));
         }