about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm126
-rw-r--r--lib/PublicInbox/InboxIdle.pm8
-rw-r--r--lib/PublicInbox/OverIdx.pm8
-rw-r--r--lib/PublicInbox/V2Writable.pm2
-rw-r--r--script/public-inbox-extindex19
5 files changed, 146 insertions, 17 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 64ebf6db..53ff2ca1 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -630,7 +630,7 @@ sub eidxq_process ($$) { # for reindexing
         my $dbh = $self->{oidx}->dbh;
         my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
         ${$sync->{nr}} = 0;
-        $sync->{-regen_fmt} = "%u/$tot\n";
+        local $sync->{-regen_fmt} = "%u/$tot\n";
         my $pr = $sync->{-opt}->{-progress};
         if ($pr) {
                 my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
@@ -709,7 +709,8 @@ sub _reindex_check_unseen ($$$) {
         my $msgs;
         my $pr = $sync->{-opt}->{-progress};
         my $ekey = $ibx->eidx_key;
-        $sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n";
+        local $sync->{-regen_fmt} =
+                        "$ekey checking unseen %u/".$ibx->over->max."\n";
         ${$sync->{nr}} = 0;
 
         while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
@@ -752,7 +753,7 @@ sub _reindex_check_stale ($$$) {
         my $pr = $sync->{-opt}->{-progress};
         my $fetching;
         my $ekey = $ibx->eidx_key;
-        $sync->{-regen_fmt} =
+        local $sync->{-regen_fmt} =
                         "$ekey check stale/missing %u/".$ibx->over->max."\n";
         ${$sync->{nr}} = 0;
         do {
@@ -838,6 +839,13 @@ sub eidx_reindex {
         eidxq_process($self, $sync) unless $sync->{quit};
 }
 
+sub sync_inbox {
+        my ($self, $sync, $ibx) = @_;
+        my $err = _sync_inbox($self, $sync, $ibx);
+        delete @$ibx{qw(mm over)};
+        warn $err, "\n" if defined($err);
+}
+
 sub eidx_sync { # main entry point
         my ($self, $opt) = @_;
 
@@ -868,22 +876,21 @@ sub eidx_sync { # main entry point
                 $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
         }
         if (delete($opt->{reindex})) {
-                $sync->{checkpoint_unlocks} = 1;
+                local $sync->{checkpoint_unlocks} = 1;
                 eidx_reindex($self, $sync);
         }
 
         # don't use $_ here, it'll get clobbered by reindex_checkpoint
         for my $ibx (@{$self->{ibx_list}}) {
                 last if $sync->{quit};
-                my $err = _sync_inbox($self, $sync, $ibx);
-                delete @$ibx{qw(mm over)};
-                warn $err, "\n" if defined($err);
+                sync_inbox($self, $sync, $ibx);
         }
         $self->{oidx}->rethread_done($opt) unless $sync->{quit};
         eidxq_process($self, $sync) unless $sync->{quit};
 
         eidxq_release($self);
-        PublicInbox::V2Writable::done($self);
+        done($self);
+        $sync; # for eidx_watch
 }
 
 sub update_last_commit { # overrides V2Writable
@@ -970,6 +977,109 @@ sub idx_init { # similar to V2Writable
         $self->{midx}->begin_txn;
 }
 
+sub _watch_commit { # PublicInbox::DS::add_timer callback
+        my ($self) = @_;
+        delete $self->{-commit_timer};
+        eidxq_process($self, $self->{-watch_sync});
+        eidxq_release($self);
+        delete local $self->{-watch_sync}->{-regen_fmt};
+        reindex_checkpoint($self, $self->{-watch_sync});
+
+        # call event_step => done unless commit_timer is armed
+        PublicInbox::DS::requeue($self);
+}
+
+sub on_inbox_unlock { # called by PublicInbox::InboxIdle
+        my ($self, $ibx) = @_;
+        my $opt = $self->{-watch_sync}->{-opt};
+        my $pr = $opt->{-progress};
+        my $ekey = $ibx->eidx_key;
+        local $0 = "sync $ekey";
+        $pr->("indexing $ekey\n") if $pr;
+        $self->idx_init($opt);
+        sync_inbox($self, $self->{-watch_sync}, $ibx);
+        $self->{-commit_timer} //= PublicInbox::DS::add_timer(
+                                        $opt->{'commit-interval'} // 10,
+                                        \&_watch_commit, $self);
+}
+
+sub eidx_reload { # -extindex --watch SIGHUP handler
+        my ($self, $idler) = @_;
+        if ($self->{cfg}) {
+                my $pr = $self->{-watch_sync}->{-opt}->{-progress};
+                $pr->('reloading ...') if $pr;
+                @{$self->{ibx_list}} = ();
+                %{$self->{ibx_map}} = ();
+                delete $self->{-watch_sync}->{id2pos};
+                my $cfg = PublicInbox::Config->new;
+                attach_config($self, $cfg);
+                $idler->refresh($cfg);
+                $pr->(" done\n") if $pr;
+        } else {
+                warn "reload not supported without --all\n";
+        }
+}
+
+sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler
+        my ($self) = @_;
+        $self->{-resync_queue} //= [ @{$self->{ibx_list}} ];
+        PublicInbox::DS::requeue($self); # trigger our ->event_step
+}
+
+sub event_step { # PublicInbox::DS::requeue callback
+        my ($self) = @_;
+        if (my $resync_queue = $self->{-resync_queue}) {
+                if (my $ibx = shift(@$resync_queue)) {
+                        on_inbox_unlock($self, $ibx);
+                        PublicInbox::DS::requeue($self);
+                } else {
+                        delete $self->{-resync_queue};
+                        _watch_commit($self);
+                }
+        } else {
+                done($self) unless $self->{-commit_timer};
+        }
+}
+
+sub eidx_watch { # public-inbox-extindex --watch main loop
+        my ($self, $opt) = @_;
+        require PublicInbox::InboxIdle;
+        require PublicInbox::DS;
+        require PublicInbox::Syscall;
+        require PublicInbox::Sigfd;
+        my $idler = PublicInbox::InboxIdle->new($self->{cfg});
+        if (!$self->{cfg}) {
+                $idler->watch_inbox($_) for @{$self->{ibx_list}};
+        }
+        $_->subscribe_unlock(__PACKAGE__, $self) for @{$self->{ibx_list}};
+        my $sync = eidx_sync($self, $opt); # initial sync
+        return if $sync->{quit};
+        my $oldset = PublicInbox::Sigfd::block_signals();
+        local $self->{current_info} = '';
+        my $cb = $SIG{__WARN__} || \&CORE::warn;
+        local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
+        my $sig = {
+                HUP => sub { eidx_reload($self, $idler) },
+                USR1 => sub { eidx_resync_start($self) },
+                TSTP => sub { kill('STOP', $$) },
+        };
+        my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+        $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
+        my $sigfd = PublicInbox::Sigfd->new($sig,
+                                        $PublicInbox::Syscall::SFD_NONBLOCK);
+        local %SIG = (%SIG, %$sig) if !$sigfd;
+        local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
+        if (!$sigfd) {
+                # wake up every second to accept signals if we don't
+                # have signalfd or IO::KQueue:
+                PublicInbox::Sigfd::sig_setmask($oldset);
+                PublicInbox::DS->SetLoopTimeout(1000);
+        }
+        PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
+        PublicInbox::DS->EventLoop; # calls InboxIdle->event_step
+        done($self);
+}
+
 no warnings 'once';
 *done = \&PublicInbox::V2Writable::done;
 *with_umask = \&PublicInbox::InboxWritable::with_umask;
diff --git a/lib/PublicInbox/InboxIdle.pm b/lib/PublicInbox/InboxIdle.pm
index 508007d7..35aed696 100644
--- a/lib/PublicInbox/InboxIdle.pm
+++ b/lib/PublicInbox/InboxIdle.pm
@@ -63,6 +63,9 @@ sub refresh {
         $pi_cfg->each_inbox(\&in2_arm, $self);
 }
 
+# internal API for ease-of-use
+sub watch_inbox { in2_arm($_[1], $_[0]) };
+
 sub new {
         my ($class, $pi_cfg) = @_;
         my $self = bless {}, $class;
@@ -78,7 +81,7 @@ sub new {
         $self->{inot} = $inot;
         $self->{pathmap} = {}; # inboxdir => [ ibx, watch1, watch2, watch3...]
         $self->{on_unlock} = {}; # lock path => ibx
-        refresh($self, $pi_cfg);
+        refresh($self, $pi_cfg) if $pi_cfg;
         PublicInbox::FakeInotify::poll_once($self) if !$ino_cls;
         $self;
 }
@@ -89,7 +92,8 @@ sub event_step {
                 my @events = $self->{inot}->read; # Linux::Inotify2::read
                 my $on_unlock = $self->{on_unlock};
                 for my $ev (@events) {
-                        if (my $ibx = $on_unlock->{$ev->fullname}) {
+                        my $fn = $ev->fullname // next; # cancelled
+                        if (my $ibx = $on_unlock->{$fn}) {
                                 $ibx->on_unlock;
                         }
                 }
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index 4a39bf53..dcc2cff3 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -473,10 +473,14 @@ sub dbh_close {
 
 sub create {
         my ($self) = @_;
-        unless (-r $self->{filename}) {
+        my $fn = $self->{filename} // do {
+                Carp::confess('BUG: no {filename}') unless $self->{dbh};
+                return;
+        };
+        unless (-r $fn) {
                 require File::Path;
                 require File::Basename;
-                File::Path::mkpath(File::Basename::dirname($self->{filename}));
+                File::Path::mkpath(File::Basename::dirname($fn));
         }
         # create the DB:
         PublicInbox::Over::dbh($self);
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index f20b5c7f..567582c5 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -879,7 +879,7 @@ sub reindex_checkpoint ($$) {
                 $self->done; # release lock
         }
 
-        if (my $pr = $sync->{-opt}->{-progress}) {
+        if (my $pr = $sync->{-regen_fmt} ? $sync->{-opt}->{-progress} : undef) {
                 $pr->(sprintf($sync->{-regen_fmt}, ${$sync->{nr}}));
         }
 
diff --git a/script/public-inbox-extindex b/script/public-inbox-extindex
index 17ad59fa..607baa3e 100644
--- a/script/public-inbox-extindex
+++ b/script/public-inbox-extindex
@@ -11,6 +11,7 @@ usage: public-inbox-extindex [options] EXTINDEX_DIR [INBOX_DIR]
   Create and update external (detached) search indices
 
   --no-fsync          speed up indexing, risk corruption on power outage
+  --watch             run persistently and watch for inbox updates
   -L LEVEL            `medium', or `full' (default: full)
   --all               index all configured inboxes
   --jobs=NUM          set or disable parallelization (NUM=0)
@@ -27,7 +28,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i
                 fsync|sync!
                 indexlevel|index-level|L=s max_size|max-size=s
                 batch_size|batch-size=s
-                gc
+                gc commit-interval=i watch
                 all help|h))
         or die $help;
 if ($opt->{help}) { print $help; exit 0 };
@@ -41,7 +42,8 @@ my $cfg = PublicInbox::Config->new;
 my @ibxs;
 if ($opt->{gc}) {
         die "E: inbox paths must not be specified with --gc\n" if @ARGV;
-        die "E: --all not compatible --gc\n" if $opt->{all};
+        die "E: --all not compatible with --gc\n" if $opt->{all};
+        die "E: --watch is not compatible with --gc\n" if $opt->{watch};
 } else {
         @ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV, $opt, $cfg);
 }
@@ -56,6 +58,15 @@ if ($opt->{gc}) {
         $eidx->attach_config($cfg);
         $eidx->eidx_gc($opt);
 } else {
-        $eidx->attach_inbox($_) for @ibxs;
-        $eidx->eidx_sync($opt);
+        if ($opt->{all}) {
+                $eidx->attach_config($cfg);
+        } else {
+                $eidx->attach_inbox($_) for @ibxs;
+        }
+        if ($opt->{watch}) {
+                $cfg = undef; # save memory only after SIGHUP
+                $eidx->eidx_watch($opt);
+        } else {
+                $eidx->eidx_sync($opt);
+        }
 }