user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 2/3] extindex: --watch for inotify-based updates
Date: Sat, 26 Dec 2020 01:44:37 +0000	[thread overview]
Message-ID: <20201226014438.28402-3-e@80x24.org> (raw)
In-Reply-To: <20201226014438.28402-1-e@80x24.org>

This reuses existing InboxIdle infrastructure to update external
indices based on per-inbox updates.  This is an alternative to
auto-updating external indices via the -index command and also
works with existing uses of -mda and public-inbox-watch.

Using inotify (or EVFILT_VNODE) allows watching thousands of
inboxes without having to scan every single one at every
invocation.

This is especially beneficial in cases where an external index
is not writable to the users writing to per-inbox indices.
---
 lib/PublicInbox/ExtSearchIdx.pm | 126 ++++++++++++++++++++++++++++++--
 lib/PublicInbox/InboxIdle.pm    |   8 +-
 lib/PublicInbox/OverIdx.pm      |   8 +-
 lib/PublicInbox/V2Writable.pm   |   2 +-
 script/public-inbox-extindex    |  19 ++++-
 5 files changed, 146 insertions(+), 17 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 64ebf6db..53ff2ca1 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -630,7 +630,7 @@ sub eidxq_process ($$) { # for reindexing
 	my $dbh = $self->{oidx}->dbh;
 	my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
 	${$sync->{nr}} = 0;
-	$sync->{-regen_fmt} = "%u/$tot\n";
+	local $sync->{-regen_fmt} = "%u/$tot\n";
 	my $pr = $sync->{-opt}->{-progress};
 	if ($pr) {
 		my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
@@ -709,7 +709,8 @@ sub _reindex_check_unseen ($$$) {
 	my $msgs;
 	my $pr = $sync->{-opt}->{-progress};
 	my $ekey = $ibx->eidx_key;
-	$sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n";
+	local $sync->{-regen_fmt} =
+			"$ekey checking unseen %u/".$ibx->over->max."\n";
 	${$sync->{nr}} = 0;
 
 	while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
@@ -752,7 +753,7 @@ sub _reindex_check_stale ($$$) {
 	my $pr = $sync->{-opt}->{-progress};
 	my $fetching;
 	my $ekey = $ibx->eidx_key;
-	$sync->{-regen_fmt} =
+	local $sync->{-regen_fmt} =
 			"$ekey check stale/missing %u/".$ibx->over->max."\n";
 	${$sync->{nr}} = 0;
 	do {
@@ -838,6 +839,13 @@ sub eidx_reindex {
 	eidxq_process($self, $sync) unless $sync->{quit};
 }
 
+sub sync_inbox {
+	my ($self, $sync, $ibx) = @_;
+	my $err = _sync_inbox($self, $sync, $ibx);
+	delete @$ibx{qw(mm over)};
+	warn $err, "\n" if defined($err);
+}
+
 sub eidx_sync { # main entry point
 	my ($self, $opt) = @_;
 
@@ -868,22 +876,21 @@ sub eidx_sync { # main entry point
 		$ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
 	}
 	if (delete($opt->{reindex})) {
-		$sync->{checkpoint_unlocks} = 1;
+		local $sync->{checkpoint_unlocks} = 1;
 		eidx_reindex($self, $sync);
 	}
 
 	# don't use $_ here, it'll get clobbered by reindex_checkpoint
 	for my $ibx (@{$self->{ibx_list}}) {
 		last if $sync->{quit};
-		my $err = _sync_inbox($self, $sync, $ibx);
-		delete @$ibx{qw(mm over)};
-		warn $err, "\n" if defined($err);
+		sync_inbox($self, $sync, $ibx);
 	}
 	$self->{oidx}->rethread_done($opt) unless $sync->{quit};
 	eidxq_process($self, $sync) unless $sync->{quit};
 
 	eidxq_release($self);
-	PublicInbox::V2Writable::done($self);
+	done($self);
+	$sync; # for eidx_watch
 }
 
 sub update_last_commit { # overrides V2Writable
@@ -970,6 +977,109 @@ sub idx_init { # similar to V2Writable
 	$self->{midx}->begin_txn;
 }
 
+sub _watch_commit { # PublicInbox::DS::add_timer callback
+	my ($self) = @_;
+	delete $self->{-commit_timer};
+	eidxq_process($self, $self->{-watch_sync});
+	eidxq_release($self);
+	delete local $self->{-watch_sync}->{-regen_fmt};
+	reindex_checkpoint($self, $self->{-watch_sync});
+
+	# call event_step => done unless commit_timer is armed
+	PublicInbox::DS::requeue($self);
+}
+
+sub on_inbox_unlock { # called by PublicInbox::InboxIdle
+	my ($self, $ibx) = @_;
+	my $opt = $self->{-watch_sync}->{-opt};
+	my $pr = $opt->{-progress};
+	my $ekey = $ibx->eidx_key;
+	local $0 = "sync $ekey";
+	$pr->("indexing $ekey\n") if $pr;
+	$self->idx_init($opt);
+	sync_inbox($self, $self->{-watch_sync}, $ibx);
+	$self->{-commit_timer} //= PublicInbox::DS::add_timer(
+					$opt->{'commit-interval'} // 10,
+					\&_watch_commit, $self);
+}
+
+sub eidx_reload { # -extindex --watch SIGHUP handler
+	my ($self, $idler) = @_;
+	if ($self->{cfg}) {
+		my $pr = $self->{-watch_sync}->{-opt}->{-progress};
+		$pr->('reloading ...') if $pr;
+		@{$self->{ibx_list}} = ();
+		%{$self->{ibx_map}} = ();
+		delete $self->{-watch_sync}->{id2pos};
+		my $cfg = PublicInbox::Config->new;
+		attach_config($self, $cfg);
+		$idler->refresh($cfg);
+		$pr->(" done\n") if $pr;
+	} else {
+		warn "reload not supported without --all\n";
+	}
+}
+
+sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler
+	my ($self) = @_;
+	$self->{-resync_queue} //= [ @{$self->{ibx_list}} ];
+	PublicInbox::DS::requeue($self); # trigger our ->event_step
+}
+
+sub event_step { # PublicInbox::DS::requeue callback
+	my ($self) = @_;
+	if (my $resync_queue = $self->{-resync_queue}) {
+		if (my $ibx = shift(@$resync_queue)) {
+			on_inbox_unlock($self, $ibx);
+			PublicInbox::DS::requeue($self);
+		} else {
+			delete $self->{-resync_queue};
+			_watch_commit($self);
+		}
+	} else {
+		done($self) unless $self->{-commit_timer};
+	}
+}
+
+sub eidx_watch { # public-inbox-extindex --watch main loop
+	my ($self, $opt) = @_;
+	require PublicInbox::InboxIdle;
+	require PublicInbox::DS;
+	require PublicInbox::Syscall;
+	require PublicInbox::Sigfd;
+	my $idler = PublicInbox::InboxIdle->new($self->{cfg});
+	if (!$self->{cfg}) {
+		$idler->watch_inbox($_) for @{$self->{ibx_list}};
+	}
+	$_->subscribe_unlock(__PACKAGE__, $self) for @{$self->{ibx_list}};
+	my $sync = eidx_sync($self, $opt); # initial sync
+	return if $sync->{quit};
+	my $oldset = PublicInbox::Sigfd::block_signals();
+	local $self->{current_info} = '';
+	my $cb = $SIG{__WARN__} || \&CORE::warn;
+	local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
+	my $sig = {
+		HUP => sub { eidx_reload($self, $idler) },
+		USR1 => sub { eidx_resync_start($self) },
+		TSTP => sub { kill('STOP', $$) },
+	};
+	my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+	$sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
+	my $sigfd = PublicInbox::Sigfd->new($sig,
+					$PublicInbox::Syscall::SFD_NONBLOCK);
+	local %SIG = (%SIG, %$sig) if !$sigfd;
+	local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
+	if (!$sigfd) {
+		# wake up every second to accept signals if we don't
+		# have signalfd or IO::KQueue:
+		PublicInbox::Sigfd::sig_setmask($oldset);
+		PublicInbox::DS->SetLoopTimeout(1000);
+	}
+	PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
+	PublicInbox::DS->EventLoop; # calls InboxIdle->event_step
+	done($self);
+}
+
 no warnings 'once';
 *done = \&PublicInbox::V2Writable::done;
 *with_umask = \&PublicInbox::InboxWritable::with_umask;
diff --git a/lib/PublicInbox/InboxIdle.pm b/lib/PublicInbox/InboxIdle.pm
index f1cbc012..34606186 100644
--- a/lib/PublicInbox/InboxIdle.pm
+++ b/lib/PublicInbox/InboxIdle.pm
@@ -49,6 +49,9 @@ sub refresh {
 	$pi_cfg->each_inbox(\&in2_arm, $self);
 }
 
+# internal API for ease-of-use
+sub watch_inbox { in2_arm($_[1], $_[0]) };
+
 sub new {
 	my ($class, $pi_cfg) = @_;
 	my $self = bless {}, $class;
@@ -64,7 +67,7 @@ sub new {
 	$self->{inot} = $inot;
 	$self->{pathmap} = {}; # inboxdir => [ ibx, watch1, watch2, watch3...]
 	$self->{on_unlock} = {}; # lock path => ibx
-	refresh($self, $pi_cfg);
+	refresh($self, $pi_cfg) if $pi_cfg;
 	PublicInbox::FakeInotify::poll_once($self) if !$ino_cls;
 	$self;
 }
@@ -75,7 +78,8 @@ sub event_step {
 		my @events = $self->{inot}->read; # Linux::Inotify2::read
 		my $on_unlock = $self->{on_unlock};
 		for my $ev (@events) {
-			if (my $ibx = $on_unlock->{$ev->fullname}) {
+			my $fn = $ev->fullname // next; # cancelled
+			if (my $ibx = $on_unlock->{$fn}) {
 				$ibx->on_unlock;
 			}
 		}
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index 4a39bf53..dcc2cff3 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -473,10 +473,14 @@ sub dbh_close {
 
 sub create {
 	my ($self) = @_;
-	unless (-r $self->{filename}) {
+	my $fn = $self->{filename} // do {
+		Carp::confess('BUG: no {filename}') unless $self->{dbh};
+		return;
+	};
+	unless (-r $fn) {
 		require File::Path;
 		require File::Basename;
-		File::Path::mkpath(File::Basename::dirname($self->{filename}));
+		File::Path::mkpath(File::Basename::dirname($fn));
 	}
 	# create the DB:
 	PublicInbox::Over::dbh($self);
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index f20b5c7f..567582c5 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -879,7 +879,7 @@ sub reindex_checkpoint ($$) {
 		$self->done; # release lock
 	}
 
-	if (my $pr = $sync->{-opt}->{-progress}) {
+	if (my $pr = $sync->{-regen_fmt} ? $sync->{-opt}->{-progress} : undef) {
 		$pr->(sprintf($sync->{-regen_fmt}, ${$sync->{nr}}));
 	}
 
diff --git a/script/public-inbox-extindex b/script/public-inbox-extindex
index 17ad59fa..607baa3e 100644
--- a/script/public-inbox-extindex
+++ b/script/public-inbox-extindex
@@ -11,6 +11,7 @@ usage: public-inbox-extindex [options] EXTINDEX_DIR [INBOX_DIR]
   Create and update external (detached) search indices
 
   --no-fsync          speed up indexing, risk corruption on power outage
+  --watch             run persistently and watch for inbox updates
   -L LEVEL            `medium', or `full' (default: full)
   --all               index all configured inboxes
   --jobs=NUM          set or disable parallelization (NUM=0)
@@ -27,7 +28,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i
 		fsync|sync!
 		indexlevel|index-level|L=s max_size|max-size=s
 		batch_size|batch-size=s
-		gc
+		gc commit-interval=i watch
 		all help|h))
 	or die $help;
 if ($opt->{help}) { print $help; exit 0 };
@@ -41,7 +42,8 @@ my $cfg = PublicInbox::Config->new;
 my @ibxs;
 if ($opt->{gc}) {
 	die "E: inbox paths must not be specified with --gc\n" if @ARGV;
-	die "E: --all not compatible --gc\n" if $opt->{all};
+	die "E: --all not compatible with --gc\n" if $opt->{all};
+	die "E: --watch is not compatible with --gc\n" if $opt->{watch};
 } else {
 	@ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV, $opt, $cfg);
 }
@@ -56,6 +58,15 @@ if ($opt->{gc}) {
 	$eidx->attach_config($cfg);
 	$eidx->eidx_gc($opt);
 } else {
-	$eidx->attach_inbox($_) for @ibxs;
-	$eidx->eidx_sync($opt);
+	if ($opt->{all}) {
+		$eidx->attach_config($cfg);
+	} else {
+		$eidx->attach_inbox($_) for @ibxs;
+	}
+	if ($opt->{watch}) {
+		$cfg = undef; # save memory only after SIGHUP
+		$eidx->eidx_watch($opt);
+	} else {
+		$eidx->eidx_sync($opt);
+	}
 }

  parent reply	other threads:[~2020-12-26  1:44 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-12-25 10:21 [PATCH 0/7] index + extindex interaction improvements Eric Wong
2020-12-25 10:21 ` [PATCH 1/7] index: disable --fast-noop on --reindex Eric Wong
2020-12-25 10:21 ` [PATCH 2/7] extsearchidx: delay SQLite availability checks Eric Wong
2020-12-25 10:21 ` [PATCH 3/7] extsearchidx: close DB handles after use if FD constrained Eric Wong
2020-12-25 10:21 ` [PATCH 4/7] index: do not attach inbox to extindex unless updated Eric Wong
2020-12-25 10:21 ` [PATCH 5/7] index: fix --no-fsync flag propagation to extindex Eric Wong
2020-12-25 10:21 ` [PATCH 6/7] v2writable: don't verify tip if reindexing Eric Wong
2020-12-25 10:21 ` [PATCH 7/7] index: filter out indexlevel=basic from extindex Eric Wong
2020-12-25 10:39 ` [PATCH 0/7] index + extindex interaction improvements Eric Wong
2020-12-26  1:44   ` [PATCH 0/3] extindex --watch support Eric Wong
2020-12-26  1:44     ` [PATCH 1/3] default to CORE::warn in $SIG{__WARN__} handlers Eric Wong
2020-12-26  1:44     ` Eric Wong [this message]
2020-12-26  1:44     ` [PATCH 3/3] init: use the return value of rel2abs_collapsed Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20201226014438.28402-3-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).