From 4f4264f7ac384b02442bdf7c5cc6e838feb86c93 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 22 Aug 2015 11:41:19 +0000 Subject: search: split search indexing to a separate file This makes organization easier and reduces the amount of code loaded for a PSGI, mod_perl or CGI instance. --- lib/PublicInbox/SearchIdx.pm | 363 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 363 insertions(+) create mode 100644 lib/PublicInbox/SearchIdx.pm (limited to 'lib/PublicInbox/SearchIdx.pm') diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm new file mode 100644 index 00000000..408b21fd --- /dev/null +++ b/lib/PublicInbox/SearchIdx.pm @@ -0,0 +1,363 @@ +# Copyright (C) 2015, all contributors +# License: AGPLv3 or later (https://www.gnu.org/licenses/agpl-3.0.txt) +# based on notmuch, but with no concept of folders, files or flags +package PublicInbox::SearchIdx; +use strict; +use warnings; +use base qw(PublicInbox::Search); +use PublicInbox::MID qw/mid_clean mid_compressed/; +*xpfx = *PublicInbox::Search::xpfx; + +sub new { + my ($class, $git_dir, $writable) = @_; + my $dir = $class->xdir($git_dir); + require Search::Xapian::WritableDatabase; + my $flag = Search::Xapian::DB_OPEN; + if ($writable == 1) { + require File::Path; + File::Path::mkpath($dir); + $flag = Search::Xapian::DB_CREATE_OR_OPEN; + } + my $db = Search::Xapian::WritableDatabase->new($dir, $flag); + bless { xdb => $db, git_dir => $git_dir }, $class; +} + +sub add_message { + my ($self, $mime) = @_; # mime = Email::MIME object + my $db = $self->{xdb}; + + my $doc_id; + my $mid_orig = mid_clean($mime->header_obj->header('Message-ID')); + my $mid = mid_compressed($mid_orig); + my $was_ghost = 0; + my $ct_msg = $mime->header('Content-Type') || 'text/plain'; + + eval { + my $smsg = $self->lookup_message($mid); + my $doc; + + if ($smsg) { + $smsg->ensure_metadata; + # convert a ghost to a regular message + # it will also clobber any existing regular message + $smsg->mime($mime); + $doc = $smsg->{doc}; + + my $type = xpfx('type'); + eval { + $doc->remove_term($type . 'ghost'); + $was_ghost = 1; + }; + + # probably does not exist: + eval { $doc->remove_term($type . 'mail') }; + $doc->add_term($type . 'mail'); + } else { + $smsg = PublicInbox::SearchMsg->new($mime); + $doc = $smsg->{doc}; + $doc->add_term(xpfx('mid') . $mid); + } + + my $subj = $smsg->subject; + + if (length $subj) { + $doc->add_term(xpfx('subject') . $subj); + + my $path = $self->subject_path($subj); + $doc->add_term(xpfx('path') . mid_compressed($path)); + } + + my $from = $smsg->from_name; + my $date = $smsg->date; + my $ts = Search::Xapian::sortable_serialise($smsg->ts); + $doc->add_value(PublicInbox::Search::TS, $ts); + + my $tg = $self->term_generator; + + $tg->set_document($doc); + $tg->index_text($subj, 1, 'S') if $subj; + $tg->increase_termpos; + $tg->index_text($subj) if $subj; + $tg->increase_termpos; + + $tg->index_text($smsg->from->format); + $tg->increase_termpos; + + $mime->walk_parts(sub { + my ($part) = @_; + return if $part->subparts; # walk_parts already recurses + my $ct = $part->content_type || $ct_msg; + + # account for filter bugs... + $ct =~ m!\btext/plain\b!i or return; + + my (@orig, @quot); + my $body = $part->body; + $part->body_set(''); + my @lines = split(/\n/, $body); + while (defined(my $l = shift @lines)) { + if ($l =~ /^\s*>/) { + push @quot, $l; + } else { + push @orig, $l; + } + } + if (@quot) { + $tg->index_text(join("\n", @quot), 0); + @quot = (); + $tg->increase_termpos; + } + if (@orig) { + $tg->index_text(join("\n", @orig)); + @orig = (); + $tg->increase_termpos; + } + }); + + if ($was_ghost) { + $doc_id = $smsg->doc_id; + $self->link_message($smsg, 0); + $doc->set_data($smsg->to_doc_data); + $db->replace_document($doc_id, $doc); + } else { + $self->link_message($smsg, 0); + $doc->set_data($smsg->to_doc_data); + $doc_id = $db->add_document($doc); + } + }; + + if ($@) { + warn "failed to index message <$mid_orig>: $@\n"; + return undef; + } + $doc_id; +} + +# returns deleted doc_id on success, undef on missing +sub remove_message { + my ($self, $mid_orig) = @_; + my $db = $self->{xdb}; + my $doc_id; + $mid_orig = mid_clean($mid_orig); + my $mid = mid_compressed($mid_orig); + + eval { + $doc_id = $self->find_unique_doc_id('mid', $mid); + $db->delete_document($doc_id) if defined $doc_id; + }; + + if ($@) { + warn "failed to remove message <$mid_orig>: $@\n"; + return undef; + } + $doc_id; +} + +sub term_generator { # write-only + my ($self) = @_; + + my $tg = $self->{term_generator}; + return $tg if $tg; + + $tg = Search::Xapian::TermGenerator->new; + $tg->set_stemmer($self->stemmer); + + $self->{term_generator} = $tg; +} + +sub next_doc_id { $_[0]->{xdb}->get_lastdocid + 1 } + +# increments last_thread_id counter +# returns a 64-bit integer represented as a hex string +sub next_thread_id { + my ($self) = @_; + my $db = $self->{xdb}; + my $last_thread_id = int($db->get_metadata('last_thread_id') || 0); + + $db->set_metadata('last_thread_id', ++$last_thread_id); + + $last_thread_id; +} + +sub link_message { + my ($self, $smsg, $is_ghost) = @_; + + if ($is_ghost) { + $smsg->ensure_metadata; + } else { + $self->link_message_to_parents($smsg); + } +} + +sub link_message_to_parents { + my ($self, $smsg) = @_; + my $doc = $smsg->{doc}; + my $mid = mid_compressed($smsg->mid); + my $mime = $smsg->mime; + my $refs = $mime->header_obj->header('References'); + my @refs = $refs ? ($refs =~ /<([^>]+)>/g) : (); + my $irt = $mime->header_obj->header('In-Reply-To'); + if ($irt) { + $irt = mid_compressed(mid_clean($irt)); + + # maybe some crazies will try to make a circular reference: + if ($irt eq $mid) { + $irt = undef; + } else { + # last References should be $irt + # we will de-dupe later + push @refs, $irt; + } + } + + my $tid; + if (@refs) { + my @crefs = map { mid_compressed($_) } @refs; + my %uniq = ($mid => 1); + + # prevent circular references via References: here: + @refs = (); + foreach my $ref (@crefs) { + next if $uniq{$ref}; + $uniq{$ref} = 1; + push @refs, $ref; + } + } + if (@refs) { + $doc->add_term(xpfx('inreplyto') . $irt) if defined $irt; + $smsg->{references_sorted} = '<'.join('><', @refs).'>'; + + my $ref_pfx = xpfx('references'); + + # first ref *should* be the thread root, + # but we can never trust clients to do the right thing + my $ref = shift @refs; + $doc->add_term($ref_pfx . $ref); + $tid = $self->_resolve_mid_to_tid($ref); + + # the rest of the refs should point to this tid: + foreach $ref (@refs) { + $doc->add_term($ref_pfx . $ref); + my $ptid = $self->_resolve_mid_to_tid($ref); + if ($tid ne $ptid) { + $self->merge_threads($tid, $ptid); + } + } + } else { + $tid = $self->next_thread_id; + } + $doc->add_term(xpfx('thread') . $tid); +} + +sub index_blob { + my ($self, $git, $blob) = @_; + my $mime = do_cat_mail($git, $blob) or return; + eval { $self->add_message($mime) }; + warn "W: index_blob $blob: $@\n" if $@; +} + +sub unindex_blob { + my ($self, $git, $blob) = @_; + my $mime = do_cat_mail($git, $blob) or return; + my $mid = $mime->header_obj->header('Message-ID'); + eval { $self->remove_message($mid) } if defined $mid; + warn "W: unindex_blob $blob: $@\n" if $@; +} + +sub do_cat_mail { + my ($git, $blob) = @_; + my $mime = eval { + my $str = $git->cat_file($blob); + Email::MIME->new($str); + }; + $@ ? undef : $mime; +} + +# indexes all unindexed messages +sub index_sync { + my ($self, $head) = @_; + require PublicInbox::GitCatFile; + my $db = $self->{xdb}; + my $hex = '[a-f0-9]'; + my $h40 = $hex .'{40}'; + my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!; + my $delmsg = qr!^:100644 000000 ($h40) \S+ D\t${hex}{2}/${hex}{38}$!; + $head ||= 'HEAD'; + + $db->begin_transaction; + eval { + my $git = PublicInbox::GitCatFile->new($self->{git_dir}); + + my $latest = $db->get_metadata('last_commit'); + my $range = length $latest ? "$latest..$head" : $head; + $latest = undef; + + # get indexed messages + my @cmd = ('git', "--git-dir=$self->{git_dir}", "log", + qw/--reverse --no-notes --no-color --raw -r + --no-abbrev/, $range); + my $pid = open(my $log, '-|', @cmd) or + die('open` '.join(' ', @cmd) . " pipe failed: $!\n"); + + while (my $line = <$log>) { + if ($line =~ /$addmsg/o) { + $self->index_blob($git, $1); + } elsif ($line =~ /$delmsg/o) { + $self->unindex_blob($git, $1); + } elsif ($line =~ /^commit ($h40)/o) { + $latest = $1; + } + } + close $log; + $db->set_metadata('last_commit', $latest) if defined $latest; + }; + if ($@) { + warn "indexing failed: $@\n"; + $db->cancel_transaction; + } else { + $db->commit_transaction; + } +} + +# this will create a ghost as necessary +sub _resolve_mid_to_tid { + my ($self, $mid) = @_; + + my $smsg = $self->lookup_message($mid) || $self->create_ghost($mid); + $smsg->thread_id; +} + +sub create_ghost { + my ($self, $mid, $tid) = @_; + + $mid = mid_compressed($mid); + $tid = $self->next_thread_id unless defined $tid; + + my $doc = Search::Xapian::Document->new; + $doc->add_term(xpfx('mid') . $mid); + $doc->add_term(xpfx('thread') . $tid); + $doc->add_term(xpfx('type') . 'ghost'); + + my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid); + $self->link_message($smsg, 1); + $self->{xdb}->add_document($doc); + + $smsg; +} + +sub merge_threads { + my ($self, $winner_tid, $loser_tid) = @_; + my ($head, $tail) = $self->find_doc_ids('thread', $loser_tid); + my $thread_pfx = xpfx('thread'); + my $db = $self->{xdb}; + + for (; $head != $tail; $head->inc) { + my $docid = $head->get_docid; + my $doc = $db->get_document($docid); + $doc->remove_term($thread_pfx . $loser_tid); + $doc->add_term($thread_pfx . $winner_tid); + $db->replace_document($docid, $doc); + } +} + +1; -- cgit v1.2.3-24-ge0c7