From 3f4c9ce3d2c0796c6c3f352b5b586ddd7a85ffb0 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 31 Jul 2016 00:02:06 +0000 Subject: search: support reindexing existing search indices This should make tweaking the way we search more efficiet by allowing us to avoid doubling destroying the index every time we want to change something. We also give priority to incremental indexing via public-inbox-{watch,mda} and have manual invocations of public-inbox-index perform batch updates while releasing ssoma.lock. --- lib/PublicInbox/SearchIdx.pm | 162 +++++++++++++++++++++++++++---------------- 1 file changed, 103 insertions(+), 59 deletions(-) (limited to 'lib/PublicInbox/SearchIdx.pm') diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 3f2643c6..6a34ce71 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -9,6 +9,7 @@ package PublicInbox::SearchIdx; use strict; use warnings; +use Fcntl qw(:flock :DEFAULT); use Email::MIME; use Email::MIME::ContentType; $Email::MIME::ContentType::STRICT_PARAMS = 0; @@ -29,24 +30,60 @@ use constant { sub new { my ($class, $git_dir, $writable) = @_; - my $dir = $class->xdir($git_dir); + my $dir = PublicInbox::Search->xdir($git_dir); require Search::Xapian::WritableDatabase; my $flag = Search::Xapian::DB_OPEN; my $self = bless { git_dir => $git_dir }, $class; my $perm = $self->_git_config_perm; my $umask = _umask_for($perm); $self->{umask} = $umask; + $self->{lock_path} = "$git_dir/ssoma.lock"; $self->{xdb} = $self->with_umask(sub { if ($writable == 1) { require File::Path; File::Path::mkpath($dir); + $self->{batch_size} = 100; $flag = Search::Xapian::DB_CREATE_OR_OPEN; + _lock_acquire($self); } Search::Xapian::WritableDatabase->new($dir, $flag); }); $self; } +sub _xdb_release { + my ($self) = @_; + my $xdb = delete $self->{xdb}; + $xdb->commit_transaction; + $xdb->close; + _lock_release($self); +} + +sub _xdb_acquire { + my ($self) = @_; + _lock_acquire($self); + my $dir = PublicInbox::Search->xdir($self->{git_dir}); + my $flag = Search::Xapian::DB_OPEN; + my $xdb = Search::Xapian::WritableDatabase->new($dir, $flag); + $xdb->begin_transaction; + $self->{xdb} = $xdb; +} + +sub _lock_acquire { + my ($self) = @_; + sysopen(my $lockfh, $self->{lock_path}, O_WRONLY|O_CREAT) or + die "failed to open lock $self->{lock_path}: $!\n"; + flock($lockfh, LOCK_EX) or die "lock failed: $!\n"; + $self->{lockfh} = $lockfh; +} + +sub _lock_release { + my ($self) = @_; + my $lockfh = delete $self->{lockfh}; + flock($lockfh, LOCK_UN) or die "unlock failed: $!\n"; + close $lockfh or die "close failed: $!\n"; +} + sub add_val { my ($doc, $col, $num) = @_; $num = Search::Xapian::sortable_serialise($num); @@ -57,40 +94,24 @@ sub add_message { my ($self, $mime, $bytes, $num) = @_; # mime = Email::MIME object my $db = $self->{xdb}; - my $doc_id; + my ($doc_id, $old_tid); my $mid = mid_clean(mid_mime($mime)); - my $was_ghost = 0; my $ct_msg = $mime->header('Content-Type') || 'text/plain'; eval { die 'Message-ID too long' if length($mid) > MAX_MID_SIZE; my $smsg = $self->lookup_message($mid); - my $doc; - if ($smsg) { - $smsg->ensure_metadata; # convert a ghost to a regular message # it will also clobber any existing regular message - $smsg->mime($mime); - $doc = $smsg->{doc}; - - my $type = xpfx('type'); - eval { - $doc->remove_term($type . 'ghost'); - $was_ghost = 1; - }; - - # probably does not exist: - eval { $doc->remove_term($type . 'mail') }; - $doc->add_term($type . 'mail'); - } else { - $smsg = PublicInbox::SearchMsg->new($mime); - $doc = $smsg->{doc}; - $doc->add_term(xpfx('mid') . $mid); + $doc_id = $smsg->doc_id; + $old_tid = $smsg->thread_id; } + $smsg = PublicInbox::SearchMsg->new($mime); + my $doc = $smsg->{doc}; + $doc->add_term(xpfx('mid') . $mid); my $subj = $smsg->subject; - if ($subj ne '') { my $path = $self->subject_path($subj); $doc->add_term(xpfx('path') . id_compress($path)); @@ -148,14 +169,11 @@ sub add_message { } }); - if ($was_ghost) { - $doc_id = $smsg->doc_id; - $self->link_message($smsg, $smsg->thread_id); - $doc->set_data($smsg->to_doc_data); + link_message($self, $smsg, $old_tid); + $doc->set_data($smsg->to_doc_data); + if (defined $doc_id) { $db->replace_document($doc_id, $doc); } else { - $self->link_message($smsg); - $doc->set_data($smsg->to_doc_data); $doc_id = $db->add_document($doc); } }; @@ -252,9 +270,7 @@ sub link_message { # the rest of the refs should point to this tid: foreach $ref (@refs) { my $ptid = $self->_resolve_mid_to_tid($ref); - if ($tid ne $ptid) { - $self->merge_threads($tid, $ptid); - } + merge_threads($self, $tid, $ptid); } } else { $tid = $self->next_thread_id; @@ -319,12 +335,12 @@ sub do_cat_mail { } sub index_sync { - my ($self, $head) = @_; - $self->with_umask(sub { $self->_index_sync($head) }); + my ($self, $opts) = @_; + with_umask($self, sub { $self->_index_sync($opts) }); } sub rlog { - my ($self, $range, $add_cb, $del_cb) = @_; + my ($self, $range, $add_cb, $del_cb, $batch_cb) = @_; my $hex = '[a-f0-9]'; my $h40 = $hex .'{40}'; my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!; @@ -334,8 +350,10 @@ sub rlog { --raw -r --no-abbrev/, $range); my $latest; my $bytes; + my $max = $self->{batch_size}; # may be undef local $/ = "\n"; - while (defined(my $line = <$log>)) { + my $line; + while (defined($line = <$log>)) { if ($line =~ /$addmsg/o) { my $mime = do_cat_mail($git, $1, \$bytes) or next; $add_cb->($self, $git, $mime, $bytes); @@ -343,54 +361,73 @@ sub rlog { my $mime = do_cat_mail($git, $1) or next; $del_cb->($self, $git, $mime); } elsif ($line =~ /^commit ($h40)/o) { + if (defined $max && --$max <= 0) { + $max = $self->{batch_size}; + $batch_cb->($latest, 1); + } $latest = $1; } } - $latest; + $batch_cb->($latest, 0); } # indexes all unindexed messages sub _index_sync { - my ($self, $head) = @_; - my $db = $self->{xdb}; - $head ||= 'HEAD'; + my ($self, $opts) = @_; + my $head = 'HEAD'; my $mm = $self->{mm} = eval { require PublicInbox::Msgmap; PublicInbox::Msgmap->new($self->{git_dir}, 1); }; + my $xdb = $self->{xdb}; + $xdb->begin_transaction; + my $reindex = $opts->{reindex}; + my $mkey = $reindex ? undef : 'last_commit'; + my $lx = $reindex ? '' : $xdb->get_metadata('last_commit'); + my $dbh; + my $cb = sub { + my ($commit, $more) = @_; + $xdb->set_metadata($mkey, $commit) if $mkey && $commit; + if ($dbh) { + $mm->last_commit($commit) if $commit; + $dbh->commit; + } + _xdb_release($self); + # let another process do some work... + $dbh->begin_work if $dbh && $more; + $xdb = _xdb_acquire($self); + }; - $db->begin_transaction; - my $lx = $db->get_metadata('last_commit'); my $range = $lx eq '' ? $head : "$lx..$head"; if ($mm) { - $mm->{dbh}->begin_work; + $dbh = $mm->{dbh}; + $dbh->begin_work; my $lm = $mm->last_commit || ''; if ($lm eq $lx) { # Common case is the indexes are synced, # we only need to run git-log once: - $lx = $self->rlog($range, *index_both, *unindex_both); - if (defined $lx) { - $db->set_metadata('last_commit', $lx); - $mm->last_commit($lx); - } - $mm->{dbh}->commit; + rlog($self, $range, *index_both, *unindex_both, $cb); } else { - # dumb case, msgmap and xapian are out-of-sync - # do not care for performance: + # Uncommon case, msgmap and xapian are out-of-sync + # do not care for performance (but git is fast :>) + # This happens if we have to reindex Xapian since + # msgmap is a frozen format and our Xapian format + # is evolving. my $r = $lm eq '' ? $head : "$lm..$head"; - $lm = $self->rlog($r, *index_mm, *unindex_mm); - $mm->last_commit($lm) if defined $lm; - $mm->{dbh}->commit; - $lx = $self->rlog($range, *index_mm2, *unindex_mm2); - $db->set_metadata('last_commit', $lx) if defined $lx; + # first, ensure msgmap is up-to-date: + $mkey = undef; # ignore xapian, for now + rlog($self, $r, *index_mm, *unindex_mm, $cb); + + # now deal with Xapian + $mkey = 'last_commit' unless $reindex; + $dbh = undef; + rlog($self, $range, *index_mm2, *unindex_mm2, $cb); } } else { # user didn't install DBD::SQLite and DBI - $lx = $self->rlog($range, *index_blob, *unindex_blob); - $db->set_metadata('last_commit', $lx) if defined $lx; + rlog($self, $range, *index_blob, *unindex_blob, $cb); } - $db->commit_transaction; } # this will create a ghost as necessary @@ -418,6 +455,7 @@ sub create_ghost { sub merge_threads { my ($self, $winner_tid, $loser_tid) = @_; + return if $winner_tid == $loser_tid; my ($head, $tail) = $self->find_doc_ids('thread', $loser_tid); my $thread_pfx = xpfx('thread'); my $db = $self->{xdb}; @@ -487,4 +525,10 @@ sub with_umask { $rv; } +sub DESTROY { + # order matters for unlocking + $_[0]->{xdb} = undef; + $_[0]->{lockfh} = undef; +} + 1; -- cgit v1.2.3-24-ge0c7