user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 19/52] extsearchidx: initial implementation
Date: Tue, 27 Oct 2020 07:54:20 +0000	[thread overview]
Message-ID: <20201027075453.19163-20-e@80x24.org> (raw)
In-Reply-To: <20201027075453.19163-1-e@80x24.org>

It compiles...
---
 MANIFEST                        |   1 +
 lib/PublicInbox/ExtSearchIdx.pm | 311 ++++++++++++++++++++++++++++++++
 t/extsearch.t                   |   1 +
 3 files changed, 313 insertions(+)
 create mode 100644 lib/PublicInbox/ExtSearchIdx.pm

diff --git a/MANIFEST b/MANIFEST
index 60055d2b..418a2f17 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -122,6 +122,7 @@ lib/PublicInbox/Eml.pm
 lib/PublicInbox/EmlContentFoo.pm
 lib/PublicInbox/ExtMsg.pm
 lib/PublicInbox/ExtSearch.pm
+lib/PublicInbox/ExtSearchIdx.pm
 lib/PublicInbox/FakeInotify.pm
 lib/PublicInbox/Feed.pm
 lib/PublicInbox/Filter/Base.pm
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
new file mode 100644
index 00000000..edf17974
--- /dev/null
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -0,0 +1,311 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Detached/external index cross inbox search indexing support
+# read-write counterpart to PublicInbox::ExtSearch
+#
+# It's based on the same ideas as public-inbox-v2-format(5) using
+# over.sqlite3 for dedupe and sharded Xapian.  msgmap.sqlite3 is
+# missing, so there is no Message-ID conflict resolution, meaning
+# no NNTP support for now.
+#
+# v2 has a 1:1 mapping of index:inbox or msgmap for NNTP support.
+# This is intended to be an M:N index:inbox mapping, but it'll likely
+# be 1:N in common practice (M==1)
+
+package PublicInbox::ExtSearchIdx;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::ExtSearch PublicInbox::Lock);
+use Carp qw(croak carp);
+use PublicInbox::Search;
+use PublicInbox::SearchIdx qw(crlf_adjust);
+use PublicInbox::OverIdx;
+use PublicInbox::V2Writable;
+use PublicInbox::InboxWritable;
+use PublicInbox::Eml;
+use File::Spec;
+
+sub new {
+	my (undef, $dir, $opt, $shard) = @_;
+	my $l = $opt->{indexlevel} // 'full';
+	$l !~ $PublicInbox::SearchIdx::INDEXLEVELS and
+		die "invalid indexlevel=$l\n";
+	$l eq 'basic' and die "E: indexlevel=basic not yet supported\n";
+	my $self = bless {
+		xpfx => "$dir/ei".PublicInbox::Search::SCHEMA_VERSION,
+		topdir => $dir,
+		creat => $opt->{creat},
+		ibx_map => {}, # (newsgroup//inboxdir) => $ibx
+		ibx_list => [],
+		indexlevel => $l,
+		transact_bytes => 0,
+		total_bytes => 0,
+		current_info => '',
+		parallel => 1,
+		lock_path => "$dir/ei.lock",
+	}, __PACKAGE__;
+	$self->{shards} = $self->count_shards || nproc_shards($opt->{creat});
+	my $oidx = PublicInbox::OverIdx->new("$self->{xpfx}/over.sqlite3");
+	$oidx->{-no_fsync} = 1 if $opt->{-no_fsync};
+	$self->{oidx} = $oidx;
+	$self
+}
+
+sub attach_inbox {
+	my ($self, $ibx) = @_;
+	my $key = $ibx->eidx_key;
+	if (!$ibx->over || !$ibx->mm) {
+		warn "W: skipping $key (unindexed)\n";
+		return;
+	}
+	if (!defined($ibx->uidvalidity)) {
+		warn "W: skipping $key (no UIDVALIDITY)\n";
+		return;
+	}
+	my $ibxdir = File::Spec->canonpath($ibx->{inboxdir});
+	if ($ibxdir ne $ibx->{inboxdir}) {
+		warn "W: `$ibx->{inboxdir}' canonicalized to `$ibxdir'\n";
+		$ibx->{inboxdir} = $ibxdir;
+	}
+	$ibx = PublicInbox::InboxWritable->new($ibx);
+	$self->{ibx_map}->{$key} //= do {
+		push @{$self->{ibx_list}}, $ibx;
+		$ibx;
+	}
+}
+
+sub _ibx_attach { # each_inbox callback
+	my ($ibx, $self) = @_;
+	attach_inbox($self, $ibx);
+}
+
+sub attach_config {
+	my ($self, $cfg) = @_;
+	$cfg->each_inbox(\&_ibx_attach, $self);
+}
+
+sub git_blob_digest ($) {
+	my ($bref) = @_;
+	my $dig = Digest::SHA->new(1); # XXX SHA256 later
+	$dig->add('blob '.length($$bref)."\0");
+	$dig->add($$bref);
+	$dig;
+}
+
+sub is_bad_blob ($$$$) {
+	my ($oid, $type, $size, $expect_oid) = @_;
+	if ($type ne 'blob') {
+		carp "W: $expect_oid is not a blob (type=$type)";
+		return 1;
+	}
+	croak "BUG: $oid != $expect_oid" if $oid ne $expect_oid;
+	$size == 0 ? 1 : 0; # size == 0 means purged
+}
+
+sub do_xpost ($$) {
+	my ($req, $smsg) = @_;
+	my $self = $req->{eidx};
+	my $docid = $smsg->{num};
+	my $idx = $self->idx_shard($docid);
+	my $oid = $req->{oid};
+	my $xibx = $self->{sync}->{ibx};
+	my $eml = $req->{eml};
+	if (my $new_smsg = $req->{new_smsg}) { # 'm' on cross-posted message
+		my $xnum = $req->{xnum};
+		$idx->shard_add_xref3($docid, $xnum, $oid, $xibx, $eml);
+	} else { # 'd'
+		$idx->shard_remove_xref3($docid, $oid, $xibx, $eml);
+	}
+}
+
+sub index_unseen ($) {
+	my ($req) = @_;
+	my $new_smsg = $req->{new_smsg} or die 'BUG: {new_smsg} unset';
+	$new_smsg->populate($req->{eml}, $req);
+	my $self = $req->{eidx};
+	my $docid = $self->{oidx}->adj_counter('eidx_docid', '+');
+	$new_smsg->{num} = $docid;
+	my $idx = $self->idx_shard($docid);
+	my $eml = delete $req->{eml};
+	$self->{oidx}->add_overview($eml, $new_smsg);
+	$idx->index_raw(undef, $eml, $new_smsg, delete $new_smsg->{ibx});
+}
+
+sub do_finalize ($) {
+	my ($req) = @_;
+	if (my $indexed = $req->{indexed}) {
+		do_xpost($req, $_) for @$indexed;
+	} elsif (exists $req->{new_smsg}) { # totally unseen messsage
+		index_unseen($req);
+	} else {
+		warn "W: ignoring delete $req->{oid} (not found)\n";
+	}
+}
+
+sub do_step ($) { # main iterator for adding messages to the index
+	my ($req) = @_;
+	my $self = $req->{eidx};
+	while (1) {
+		if (my $next_arg = $req->{next_arg}) {
+			if (my $smsg = $self->{oidx}->next_by_mid(@$next_arg)) {
+				$req->{cur_smsg} = $smsg;
+				$self->git->cat_async($smsg->{blob},
+							\&ck_existing, $req);
+				return; # ck_existing calls do_step
+			}
+			delete $req->{cur_smsg};
+			delete $req->{next_arg};
+		}
+		my $mid = shift(@{$req->{mids}});
+		last unless defined $mid;
+		my ($id, $prev);
+		$req->{next_arg} = [ $mid, \$id, \$prev ];
+		# loop again
+	}
+	do_finalize($req);
+}
+
+sub ck_existing { # git->cat_async callback
+	my ($bref, $oid, $type, $size, $req) = @_;
+	my $smsg = $req->{cur_smsg} or die 'BUG: {cur_smsg} missing';
+	return if is_bad_blob($oid, $type, $size, $smsg->{blob});
+	my $cur = PublicInbox::Eml->new($bref);
+	if (content_digest($cur) eq $req->{chash}) {
+		push @{$req->{indexed}}, $smsg; # for do_xpost
+	} # else { index_unseen later }
+	do_step($req);
+}
+
+# is the messages visible in the inbox currently being indexed?
+# return the number if so
+sub cur_ibx_xnum ($$) {
+	my ($req, $bref) = @_;
+	my $ibx = $req->{sync}->{ibx} or die 'BUG: current {ibx} missing';
+
+	# XXX overkill?
+	git_blob_digest($bref)->hexdigest eq $req->{oid} or die
+		"BUG: blob mismatch $req->{oid}";
+
+	$req->{eml} = PublicInbox::Eml->new($bref);
+	$req->{chash} = content_hash($req->{eml});
+	$req->{mids} = mids($req->{eml});
+	my @q = @{$req->{mids}}; # copy
+	while (defined(my $mid = shift @q)) {
+		my ($id, $prev);
+		while (my $x = $ibx->over->next_by_mid($mid, \$id, \$prev)) {
+			return $x->{num} if $x->{blob} eq $req->{oid};
+		}
+	}
+	undef;
+}
+
+sub m_start { # git->cat_async callback for 'm'
+	my ($bref, $oid, $type, $size, $req) = @_;
+	return if is_bad_blob($oid, $type, $size, $req->{oid});
+	my $new_smsg = $req->{new_smsg} = bless {
+		blob => $oid,
+		raw_bytes => $size,
+	}, 'PublicInbox::Smsg';
+	$new_smsg->{bytes} = $new_smsg->{raw_bytes} + crlf_adjust($$bref);
+	defined($req->{xnum} = cur_ibx_xnum($req, $bref)) or return;
+	$new_smsg->{ibx} = $req->{sync}->{ibx};
+	do_step($req);
+}
+
+sub d_start { # git->cat_async callback for 'd'
+	my ($bref, $oid, $type, $size, $req) = @_;
+	return if is_bad_blob($oid, $type, $size, $req->{oid});
+	return if defined(cur_ibx_xnum($req, $bref)); # was re-added
+	do_step($req);
+}
+
+sub eidx_last_epoch_commit ($$$) {
+	my ($self, $sync, $epoch) = @_;
+	my $key = $sync->{ibx}->eidx_key;
+	my $uidvalidity = $sync->{ibx}->uidvalidity;
+	$self->{oidx}->eidx_meta("lc-v2:$key//$uidvalidity;$epoch");
+}
+
+sub _sync_inbox ($$$) {
+	my ($self, $opt, $ibx) = @_;
+	my $sync = {
+		need_checkpoint => \(my $bool = 0),
+		unindex_range => {}, # EPOCH => oid_old..oid_new
+		reindex => $opt->{reindex},
+		-opt => $opt,
+		eidx => $self,
+		ibx => $ibx,
+	};
+	my $key = $ibx->eidx_key;
+	my $u = $ibx->uidvalidity;
+	my $oidx = $self->{oidx};
+	my $v = $ibx->version;
+	if ($v == 2) {
+		my $epoch_max;
+		defined($ibx->git_dir_latest(\$epoch_max)) or return;
+		my $heads = $sync->{ranges} = [];
+		for my $i (0..$epoch_max) {
+			$heads->[$i] = $oidx->eidx_meta("lc-v2:$key//$u;$i");
+		}
+
+
+	} elsif ($v == 1) {
+		my $lc = $oidx->eidx_meta("lc-v1:$key//$u");
+		prepare_stack($sync, $lc ? "$lc..HEAD" : 'HEAD');
+	} else {
+		warn "E: $key unsupported inbox version (v$v)\n";
+		return;
+	}
+}
+
+sub eidx_sync { # main entry point
+	my ($self, $opt) = @_;
+	$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
+	$self->{oidx}->rethread_prepare($opt);
+
+	_sync_inbox($self, $opt, $_) for (@{$self->{ibx_list}});
+}
+
+sub idx_init { # similar to V2Writable
+	my ($self, $opt) = @_;
+	return if $self->{idx_shards};
+
+	$self->git->cleanup;
+
+	my $ALL = $self->git->{git_dir}; # ALL.git
+	PublicInbox::Import::init_bare($ALL) unless -d $ALL;
+	my $info_dir = "$ALL/objects/info";
+	my $alt = "$info_dir/alternates";
+	my $mode = 0644;
+	my (%old, @old, %new, @new);
+	if (-e $alt) {
+		open(my $fh, '<', $alt) or die "open $alt: $!";
+		$mode = (stat($fh))[2] & 07777;
+		while (<$fh>) {
+			push @old, $_ if !$old{$_}++;
+		}
+	}
+	for my $ibx (@{$self->{ibx_list}}) {
+		my $line = $ibx->git->{git_dir} . "/objects\n";
+		next if $old{$line};
+		$new{$line} = 1;
+		push @new, $line;
+	}
+	push @old, @new;
+	PublicInbox::V2Writable::write_alternates($info_dir, $mode, \@old);
+	$self->parallel_init($self->{indexlevel});
+	$self->umask_prepare;
+	$self->with_umask(\&PublicInbox::V2Writable::_idx_init, $self, $opt);
+	$self->{oidx}->begin_lazy;
+	$self->{oidx}->eidx_prep;
+}
+
+no warnings 'once';
+*done = \&PublicInbox::V2Writable::done;
+*umask_prepare = \&PublicInbox::InboxWritable::umask_prepare;
+*with_umask = \&PublicInbox::InboxWritable::with_umask;
+*parallel_init = \&PublicInbox::V2Writable::parallel_init;
+*nproc_shards = \&PublicInbox::V2Writable::nproc_shards;
+
+1;
diff --git a/t/extsearch.t b/t/extsearch.t
index 7687f5f0..54927c50 100644
--- a/t/extsearch.t
+++ b/t/extsearch.t
@@ -7,5 +7,6 @@ use PublicInbox::TestCommon;
 require_git(2.6);
 require_mods(qw(DBD::SQLite Search::Xapian));
 use_ok 'PublicInbox::ExtSearch';
+use_ok 'PublicInbox::ExtSearchIdx';
 
 done_testing;

  parent reply	other threads:[~2020-10-27  7:54 UTC|newest]

Thread overview: 57+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-10-27  7:54 [PATCH 00/52] detached external index: mostly Eric Wong
2020-10-27  7:54 ` [PATCH 01/52] doc/standards: add RFCs for URL schemes Eric Wong
2020-11-05  7:54   ` [PATCH v2] " Eric Wong
2020-10-27  7:54 ` [PATCH 02/52] search: hoist out _xdb_sharded for v2 inboxes Eric Wong
2020-10-27  7:54 ` [PATCH 03/52] extsearch: start mocking out Eric Wong
2020-10-27  7:54 ` [PATCH 04/52] searchidx: expose INDEXLEVELS as `our' Eric Wong
2020-10-27  7:54 ` [PATCH 05/52] v2writable: add git method Eric Wong
2020-10-27  7:54 ` [PATCH 06/52] v2writable: make OO calls to last_commit-related methods Eric Wong
2020-10-27  7:54 ` [PATCH 07/52] search: xdb_sharded: make this a public method for ExtSearch Eric Wong
2020-10-27  7:54 ` [PATCH 08/52] searchidx: introduce "xref3" concept Eric Wong
2020-10-27  7:54 ` [PATCH 09/52] v2writable: prepare initialization for external indices Eric Wong
2020-10-27  7:54 ` [PATCH 10/52] v2writable: hoist out write_alternates Eric Wong
2020-10-27  7:54 ` [PATCH 11/52] searchidxshard: allow msgref to be undef Eric Wong
2020-10-27  7:54 ` [PATCH 12/52] v2writable: idx_shard: simplify callers Eric Wong
2020-10-27  7:54 ` [PATCH 13/52] v2writable: count_shards: allow working without {ibx} Eric Wong
2020-10-27  7:54 ` [PATCH 14/52] overidx: introduce changes for external index Eric Wong
2020-10-27  7:54 ` [PATCH 15/52] v2: some changes for ExtSearchIdx compatibility Eric Wong
2020-10-27  7:54 ` [PATCH 16/52] inboxwritable: eidx_key for external index Eric Wong
2020-10-27  7:54 ` [PATCH 17/52] v2writable: rename remaining "remote" terminology Eric Wong
2020-10-27  7:54 ` [PATCH 18/52] v2writable: checkpoint: account for lack of {mm} Eric Wong
2020-10-27  7:54 ` Eric Wong [this message]
2020-10-27  7:54 ` [PATCH 20/52] searchidx: index eidx_key as a boolean term Eric Wong
2020-10-27  7:54 ` [PATCH 21/52] searchidx: xref3 delete support Eric Wong
2020-10-27  7:54 ` [PATCH 22/52] searchidxshard: special init for eidx Eric Wong
2020-10-27  7:54 ` [PATCH 23/52] searchidx: put {ibx} into $sync state Eric Wong
2020-10-27  7:54 ` [PATCH 24/52] searchidx: log2stack: simplify callers Eric Wong
2020-10-27  7:54 ` [PATCH 25/52] v2writable: more generic sync setup code Eric Wong
2020-10-27  7:54 ` [PATCH 26/52] v2writable: allow OO method references Eric Wong
2020-10-27  7:54 ` [PATCH 27/52] v2writable: rename {v2w} field to {self} Eric Wong
2020-10-27  7:54 ` [PATCH 28/52] v2writable: make *last_commits and sync_prepare OO methods Eric Wong
2020-10-27  7:54 ` [PATCH 29/52] v2writable: move size check init to sync_prepare Eric Wong
2020-10-27  7:54 ` [PATCH 30/52] extsearchidx: more compatibility with V2Writable callers Eric Wong
2020-10-27  7:54 ` [PATCH 31/52] v2writable: reduce scope of epoch-aware code Eric Wong
2020-10-27  7:54 ` [PATCH 32/52] extsearchidx: remove {unindex_range} field Eric Wong
2020-10-27  7:54 ` [PATCH 33/52] v2writable: pass oid to uindex_oid Eric Wong
2020-10-27  7:54 ` [PATCH 34/52] extsearchidx: sync unit updates Eric Wong
2020-10-27  7:54 ` [PATCH 35/52] searchidx: export prepare_stack Eric Wong
2020-10-27  7:54 ` [PATCH 36/52] extsearchidx: sync updates Eric Wong
2020-10-27  7:54 ` [PATCH 37/52] searchidx: reduce inbox-dependency, wrap ->with_umask Eric Wong
2020-10-27  7:54 ` [PATCH 38/52] searchidx: favor $sync->{ibx} (over $self->{ibx}) Eric Wong
2020-10-27  7:54 ` [PATCH 39/52] Makefile.PL: do not build manpage if POD is missing Eric Wong
2020-10-27  7:54 ` [PATCH 40/52] script: add preliminary eindex implementation Eric Wong
2020-10-27  7:54 ` [PATCH 41/52] index: eindex wiring Eric Wong
2020-10-27  7:54 ` [PATCH 42/52] over: store xref3 data in over.sqlite3 Eric Wong
2020-10-27  7:54 ` [PATCH 43/52] searchidx: remove xref3 support for Xapian Eric Wong
2020-10-27  7:54 ` [PATCH 44/52] t/extsearch.t: verify results and xref3 ordering Eric Wong
2020-10-27  7:54 ` [PATCH 45/52] t/v2writable: remove pointless ->barrier call Eric Wong
2020-10-27  7:54 ` [PATCH 46/52] extsearch: wire up smsg_eml Eric Wong
2020-10-27  7:54 ` [PATCH 47/52] extsearchidx: handle edits Eric Wong
2020-10-27  7:54 ` [PATCH 48/52] extsearch: wire up remaining Inbox-like methods for WWW Eric Wong
2020-10-27  7:54 ` [PATCH 49/52] searchidx: ignore exceptions from ->remove_term Eric Wong
2020-10-27  7:54 ` [PATCH 50/52] extsearchidx: set current_info in warning callbacks Eric Wong
2020-10-27  7:54 ` [PATCH 51/52] extsearchidx: support --batch-size checkpoints Eric Wong
2020-11-03  9:08   ` Eric Wong
2020-10-27  7:54 ` [PATCH 52/52] searchidxshard: make warnings with eidx_key less confusing Eric Wong
2020-10-27 12:08 ` [PATCH 00/52] detached external index: mostly Konstantin Ryabitsev
2020-11-10 18:53 ` detached external index: performance note Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20201027075453.19163-20-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).