diff options
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 126 | ||||
-rw-r--r-- | lib/PublicInbox/InboxIdle.pm | 8 | ||||
-rw-r--r-- | lib/PublicInbox/OverIdx.pm | 8 | ||||
-rw-r--r-- | lib/PublicInbox/V2Writable.pm | 2 | ||||
-rw-r--r-- | script/public-inbox-extindex | 19 |
5 files changed, 146 insertions, 17 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 64ebf6db..53ff2ca1 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -630,7 +630,7 @@ sub eidxq_process ($$) { # for reindexing my $dbh = $self->{oidx}->dbh; my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return; ${$sync->{nr}} = 0; - $sync->{-regen_fmt} = "%u/$tot\n"; + local $sync->{-regen_fmt} = "%u/$tot\n"; my $pr = $sync->{-opt}->{-progress}; if ($pr) { my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq'); @@ -709,7 +709,8 @@ sub _reindex_check_unseen ($$$) { my $msgs; my $pr = $sync->{-opt}->{-progress}; my $ekey = $ibx->eidx_key; - $sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n"; + local $sync->{-regen_fmt} = + "$ekey checking unseen %u/".$ibx->over->max."\n"; ${$sync->{nr}} = 0; while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) { @@ -752,7 +753,7 @@ sub _reindex_check_stale ($$$) { my $pr = $sync->{-opt}->{-progress}; my $fetching; my $ekey = $ibx->eidx_key; - $sync->{-regen_fmt} = + local $sync->{-regen_fmt} = "$ekey check stale/missing %u/".$ibx->over->max."\n"; ${$sync->{nr}} = 0; do { @@ -838,6 +839,13 @@ sub eidx_reindex { eidxq_process($self, $sync) unless $sync->{quit}; } +sub sync_inbox { + my ($self, $sync, $ibx) = @_; + my $err = _sync_inbox($self, $sync, $ibx); + delete @$ibx{qw(mm over)}; + warn $err, "\n" if defined($err); +} + sub eidx_sync { # main entry point my ($self, $opt) = @_; @@ -868,22 +876,21 @@ sub eidx_sync { # main entry point $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key); } if (delete($opt->{reindex})) { - $sync->{checkpoint_unlocks} = 1; + local $sync->{checkpoint_unlocks} = 1; eidx_reindex($self, $sync); } # don't use $_ here, it'll get clobbered by reindex_checkpoint for my $ibx (@{$self->{ibx_list}}) { last if $sync->{quit}; - my $err = _sync_inbox($self, $sync, $ibx); - delete @$ibx{qw(mm over)}; - warn $err, "\n" if defined($err); + sync_inbox($self, $sync, $ibx); } $self->{oidx}->rethread_done($opt) unless $sync->{quit}; eidxq_process($self, $sync) unless $sync->{quit}; eidxq_release($self); - PublicInbox::V2Writable::done($self); + done($self); + $sync; # for eidx_watch } sub update_last_commit { # overrides V2Writable @@ -970,6 +977,109 @@ sub idx_init { # similar to V2Writable $self->{midx}->begin_txn; } +sub _watch_commit { # PublicInbox::DS::add_timer callback + my ($self) = @_; + delete $self->{-commit_timer}; + eidxq_process($self, $self->{-watch_sync}); + eidxq_release($self); + delete local $self->{-watch_sync}->{-regen_fmt}; + reindex_checkpoint($self, $self->{-watch_sync}); + + # call event_step => done unless commit_timer is armed + PublicInbox::DS::requeue($self); +} + +sub on_inbox_unlock { # called by PublicInbox::InboxIdle + my ($self, $ibx) = @_; + my $opt = $self->{-watch_sync}->{-opt}; + my $pr = $opt->{-progress}; + my $ekey = $ibx->eidx_key; + local $0 = "sync $ekey"; + $pr->("indexing $ekey\n") if $pr; + $self->idx_init($opt); + sync_inbox($self, $self->{-watch_sync}, $ibx); + $self->{-commit_timer} //= PublicInbox::DS::add_timer( + $opt->{'commit-interval'} // 10, + \&_watch_commit, $self); +} + +sub eidx_reload { # -extindex --watch SIGHUP handler + my ($self, $idler) = @_; + if ($self->{cfg}) { + my $pr = $self->{-watch_sync}->{-opt}->{-progress}; + $pr->('reloading ...') if $pr; + @{$self->{ibx_list}} = (); + %{$self->{ibx_map}} = (); + delete $self->{-watch_sync}->{id2pos}; + my $cfg = PublicInbox::Config->new; + attach_config($self, $cfg); + $idler->refresh($cfg); + $pr->(" done\n") if $pr; + } else { + warn "reload not supported without --all\n"; + } +} + +sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler + my ($self) = @_; + $self->{-resync_queue} //= [ @{$self->{ibx_list}} ]; + PublicInbox::DS::requeue($self); # trigger our ->event_step +} + +sub event_step { # PublicInbox::DS::requeue callback + my ($self) = @_; + if (my $resync_queue = $self->{-resync_queue}) { + if (my $ibx = shift(@$resync_queue)) { + on_inbox_unlock($self, $ibx); + PublicInbox::DS::requeue($self); + } else { + delete $self->{-resync_queue}; + _watch_commit($self); + } + } else { + done($self) unless $self->{-commit_timer}; + } +} + +sub eidx_watch { # public-inbox-extindex --watch main loop + my ($self, $opt) = @_; + require PublicInbox::InboxIdle; + require PublicInbox::DS; + require PublicInbox::Syscall; + require PublicInbox::Sigfd; + my $idler = PublicInbox::InboxIdle->new($self->{cfg}); + if (!$self->{cfg}) { + $idler->watch_inbox($_) for @{$self->{ibx_list}}; + } + $_->subscribe_unlock(__PACKAGE__, $self) for @{$self->{ibx_list}}; + my $sync = eidx_sync($self, $opt); # initial sync + return if $sync->{quit}; + my $oldset = PublicInbox::Sigfd::block_signals(); + local $self->{current_info} = ''; + my $cb = $SIG{__WARN__} || \&CORE::warn; + local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) }; + my $sig = { + HUP => sub { eidx_reload($self, $idler) }, + USR1 => sub { eidx_resync_start($self) }, + TSTP => sub { kill('STOP', $$) }, + }; + my $quit = PublicInbox::SearchIdx::quit_cb($sync); + $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit; + my $sigfd = PublicInbox::Sigfd->new($sig, + $PublicInbox::Syscall::SFD_NONBLOCK); + local %SIG = (%SIG, %$sig) if !$sigfd; + local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock + if (!$sigfd) { + # wake up every second to accept signals if we don't + # have signalfd or IO::KQueue: + PublicInbox::Sigfd::sig_setmask($oldset); + PublicInbox::DS->SetLoopTimeout(1000); + } + PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} }); + PublicInbox::DS->EventLoop; # calls InboxIdle->event_step + done($self); +} + no warnings 'once'; *done = \&PublicInbox::V2Writable::done; *with_umask = \&PublicInbox::InboxWritable::with_umask; diff --git a/lib/PublicInbox/InboxIdle.pm b/lib/PublicInbox/InboxIdle.pm index 508007d7..35aed696 100644 --- a/lib/PublicInbox/InboxIdle.pm +++ b/lib/PublicInbox/InboxIdle.pm @@ -63,6 +63,9 @@ sub refresh { $pi_cfg->each_inbox(\&in2_arm, $self); } +# internal API for ease-of-use +sub watch_inbox { in2_arm($_[1], $_[0]) }; + sub new { my ($class, $pi_cfg) = @_; my $self = bless {}, $class; @@ -78,7 +81,7 @@ sub new { $self->{inot} = $inot; $self->{pathmap} = {}; # inboxdir => [ ibx, watch1, watch2, watch3...] $self->{on_unlock} = {}; # lock path => ibx - refresh($self, $pi_cfg); + refresh($self, $pi_cfg) if $pi_cfg; PublicInbox::FakeInotify::poll_once($self) if !$ino_cls; $self; } @@ -89,7 +92,8 @@ sub event_step { my @events = $self->{inot}->read; # Linux::Inotify2::read my $on_unlock = $self->{on_unlock}; for my $ev (@events) { - if (my $ibx = $on_unlock->{$ev->fullname}) { + my $fn = $ev->fullname // next; # cancelled + if (my $ibx = $on_unlock->{$fn}) { $ibx->on_unlock; } } diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm index 4a39bf53..dcc2cff3 100644 --- a/lib/PublicInbox/OverIdx.pm +++ b/lib/PublicInbox/OverIdx.pm @@ -473,10 +473,14 @@ sub dbh_close { sub create { my ($self) = @_; - unless (-r $self->{filename}) { + my $fn = $self->{filename} // do { + Carp::confess('BUG: no {filename}') unless $self->{dbh}; + return; + }; + unless (-r $fn) { require File::Path; require File::Basename; - File::Path::mkpath(File::Basename::dirname($self->{filename})); + File::Path::mkpath(File::Basename::dirname($fn)); } # create the DB: PublicInbox::Over::dbh($self); diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index f20b5c7f..567582c5 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -879,7 +879,7 @@ sub reindex_checkpoint ($$) { $self->done; # release lock } - if (my $pr = $sync->{-opt}->{-progress}) { + if (my $pr = $sync->{-regen_fmt} ? $sync->{-opt}->{-progress} : undef) { $pr->(sprintf($sync->{-regen_fmt}, ${$sync->{nr}})); } diff --git a/script/public-inbox-extindex b/script/public-inbox-extindex index 17ad59fa..607baa3e 100644 --- a/script/public-inbox-extindex +++ b/script/public-inbox-extindex @@ -11,6 +11,7 @@ usage: public-inbox-extindex [options] EXTINDEX_DIR [INBOX_DIR] Create and update external (detached) search indices --no-fsync speed up indexing, risk corruption on power outage + --watch run persistently and watch for inbox updates -L LEVEL `medium', or `full' (default: full) --all index all configured inboxes --jobs=NUM set or disable parallelization (NUM=0) @@ -27,7 +28,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i fsync|sync! indexlevel|index-level|L=s max_size|max-size=s batch_size|batch-size=s - gc + gc commit-interval=i watch all help|h)) or die $help; if ($opt->{help}) { print $help; exit 0 }; @@ -41,7 +42,8 @@ my $cfg = PublicInbox::Config->new; my @ibxs; if ($opt->{gc}) { die "E: inbox paths must not be specified with --gc\n" if @ARGV; - die "E: --all not compatible --gc\n" if $opt->{all}; + die "E: --all not compatible with --gc\n" if $opt->{all}; + die "E: --watch is not compatible with --gc\n" if $opt->{watch}; } else { @ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV, $opt, $cfg); } @@ -56,6 +58,15 @@ if ($opt->{gc}) { $eidx->attach_config($cfg); $eidx->eidx_gc($opt); } else { - $eidx->attach_inbox($_) for @ibxs; - $eidx->eidx_sync($opt); + if ($opt->{all}) { + $eidx->attach_config($cfg); + } else { + $eidx->attach_inbox($_) for @ibxs; + } + if ($opt->{watch}) { + $cfg = undef; # save memory only after SIGHUP + $eidx->eidx_watch($opt); + } else { + $eidx->eidx_sync($opt); + } } |