diff options
Diffstat (limited to 'lib/PublicInbox/SearchIdx.pm')
-rw-r--r-- | lib/PublicInbox/SearchIdx.pm | 162 |
1 files changed, 103 insertions, 59 deletions
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 3f2643c6..6a34ce71 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -9,6 +9,7 @@ package PublicInbox::SearchIdx; use strict; use warnings; +use Fcntl qw(:flock :DEFAULT); use Email::MIME; use Email::MIME::ContentType; $Email::MIME::ContentType::STRICT_PARAMS = 0; @@ -29,24 +30,60 @@ use constant { sub new { my ($class, $git_dir, $writable) = @_; - my $dir = $class->xdir($git_dir); + my $dir = PublicInbox::Search->xdir($git_dir); require Search::Xapian::WritableDatabase; my $flag = Search::Xapian::DB_OPEN; my $self = bless { git_dir => $git_dir }, $class; my $perm = $self->_git_config_perm; my $umask = _umask_for($perm); $self->{umask} = $umask; + $self->{lock_path} = "$git_dir/ssoma.lock"; $self->{xdb} = $self->with_umask(sub { if ($writable == 1) { require File::Path; File::Path::mkpath($dir); + $self->{batch_size} = 100; $flag = Search::Xapian::DB_CREATE_OR_OPEN; + _lock_acquire($self); } Search::Xapian::WritableDatabase->new($dir, $flag); }); $self; } +sub _xdb_release { + my ($self) = @_; + my $xdb = delete $self->{xdb}; + $xdb->commit_transaction; + $xdb->close; + _lock_release($self); +} + +sub _xdb_acquire { + my ($self) = @_; + _lock_acquire($self); + my $dir = PublicInbox::Search->xdir($self->{git_dir}); + my $flag = Search::Xapian::DB_OPEN; + my $xdb = Search::Xapian::WritableDatabase->new($dir, $flag); + $xdb->begin_transaction; + $self->{xdb} = $xdb; +} + +sub _lock_acquire { + my ($self) = @_; + sysopen(my $lockfh, $self->{lock_path}, O_WRONLY|O_CREAT) or + die "failed to open lock $self->{lock_path}: $!\n"; + flock($lockfh, LOCK_EX) or die "lock failed: $!\n"; + $self->{lockfh} = $lockfh; +} + +sub _lock_release { + my ($self) = @_; + my $lockfh = delete $self->{lockfh}; + flock($lockfh, LOCK_UN) or die "unlock failed: $!\n"; + close $lockfh or die "close failed: $!\n"; +} + sub add_val { my ($doc, $col, $num) = @_; $num = Search::Xapian::sortable_serialise($num); @@ -57,40 +94,24 @@ sub add_message { my ($self, $mime, $bytes, $num) = @_; # mime = Email::MIME object my $db = $self->{xdb}; - my $doc_id; + my ($doc_id, $old_tid); my $mid = mid_clean(mid_mime($mime)); - my $was_ghost = 0; my $ct_msg = $mime->header('Content-Type') || 'text/plain'; eval { die 'Message-ID too long' if length($mid) > MAX_MID_SIZE; my $smsg = $self->lookup_message($mid); - my $doc; - if ($smsg) { - $smsg->ensure_metadata; # convert a ghost to a regular message # it will also clobber any existing regular message - $smsg->mime($mime); - $doc = $smsg->{doc}; - - my $type = xpfx('type'); - eval { - $doc->remove_term($type . 'ghost'); - $was_ghost = 1; - }; - - # probably does not exist: - eval { $doc->remove_term($type . 'mail') }; - $doc->add_term($type . 'mail'); - } else { - $smsg = PublicInbox::SearchMsg->new($mime); - $doc = $smsg->{doc}; - $doc->add_term(xpfx('mid') . $mid); + $doc_id = $smsg->doc_id; + $old_tid = $smsg->thread_id; } + $smsg = PublicInbox::SearchMsg->new($mime); + my $doc = $smsg->{doc}; + $doc->add_term(xpfx('mid') . $mid); my $subj = $smsg->subject; - if ($subj ne '') { my $path = $self->subject_path($subj); $doc->add_term(xpfx('path') . id_compress($path)); @@ -148,14 +169,11 @@ sub add_message { } }); - if ($was_ghost) { - $doc_id = $smsg->doc_id; - $self->link_message($smsg, $smsg->thread_id); - $doc->set_data($smsg->to_doc_data); + link_message($self, $smsg, $old_tid); + $doc->set_data($smsg->to_doc_data); + if (defined $doc_id) { $db->replace_document($doc_id, $doc); } else { - $self->link_message($smsg); - $doc->set_data($smsg->to_doc_data); $doc_id = $db->add_document($doc); } }; @@ -252,9 +270,7 @@ sub link_message { # the rest of the refs should point to this tid: foreach $ref (@refs) { my $ptid = $self->_resolve_mid_to_tid($ref); - if ($tid ne $ptid) { - $self->merge_threads($tid, $ptid); - } + merge_threads($self, $tid, $ptid); } } else { $tid = $self->next_thread_id; @@ -319,12 +335,12 @@ sub do_cat_mail { } sub index_sync { - my ($self, $head) = @_; - $self->with_umask(sub { $self->_index_sync($head) }); + my ($self, $opts) = @_; + with_umask($self, sub { $self->_index_sync($opts) }); } sub rlog { - my ($self, $range, $add_cb, $del_cb) = @_; + my ($self, $range, $add_cb, $del_cb, $batch_cb) = @_; my $hex = '[a-f0-9]'; my $h40 = $hex .'{40}'; my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!; @@ -334,8 +350,10 @@ sub rlog { --raw -r --no-abbrev/, $range); my $latest; my $bytes; + my $max = $self->{batch_size}; # may be undef local $/ = "\n"; - while (defined(my $line = <$log>)) { + my $line; + while (defined($line = <$log>)) { if ($line =~ /$addmsg/o) { my $mime = do_cat_mail($git, $1, \$bytes) or next; $add_cb->($self, $git, $mime, $bytes); @@ -343,54 +361,73 @@ sub rlog { my $mime = do_cat_mail($git, $1) or next; $del_cb->($self, $git, $mime); } elsif ($line =~ /^commit ($h40)/o) { + if (defined $max && --$max <= 0) { + $max = $self->{batch_size}; + $batch_cb->($latest, 1); + } $latest = $1; } } - $latest; + $batch_cb->($latest, 0); } # indexes all unindexed messages sub _index_sync { - my ($self, $head) = @_; - my $db = $self->{xdb}; - $head ||= 'HEAD'; + my ($self, $opts) = @_; + my $head = 'HEAD'; my $mm = $self->{mm} = eval { require PublicInbox::Msgmap; PublicInbox::Msgmap->new($self->{git_dir}, 1); }; + my $xdb = $self->{xdb}; + $xdb->begin_transaction; + my $reindex = $opts->{reindex}; + my $mkey = $reindex ? undef : 'last_commit'; + my $lx = $reindex ? '' : $xdb->get_metadata('last_commit'); + my $dbh; + my $cb = sub { + my ($commit, $more) = @_; + $xdb->set_metadata($mkey, $commit) if $mkey && $commit; + if ($dbh) { + $mm->last_commit($commit) if $commit; + $dbh->commit; + } + _xdb_release($self); + # let another process do some work... + $dbh->begin_work if $dbh && $more; + $xdb = _xdb_acquire($self); + }; - $db->begin_transaction; - my $lx = $db->get_metadata('last_commit'); my $range = $lx eq '' ? $head : "$lx..$head"; if ($mm) { - $mm->{dbh}->begin_work; + $dbh = $mm->{dbh}; + $dbh->begin_work; my $lm = $mm->last_commit || ''; if ($lm eq $lx) { # Common case is the indexes are synced, # we only need to run git-log once: - $lx = $self->rlog($range, *index_both, *unindex_both); - if (defined $lx) { - $db->set_metadata('last_commit', $lx); - $mm->last_commit($lx); - } - $mm->{dbh}->commit; + rlog($self, $range, *index_both, *unindex_both, $cb); } else { - # dumb case, msgmap and xapian are out-of-sync - # do not care for performance: + # Uncommon case, msgmap and xapian are out-of-sync + # do not care for performance (but git is fast :>) + # This happens if we have to reindex Xapian since + # msgmap is a frozen format and our Xapian format + # is evolving. my $r = $lm eq '' ? $head : "$lm..$head"; - $lm = $self->rlog($r, *index_mm, *unindex_mm); - $mm->last_commit($lm) if defined $lm; - $mm->{dbh}->commit; - $lx = $self->rlog($range, *index_mm2, *unindex_mm2); - $db->set_metadata('last_commit', $lx) if defined $lx; + # first, ensure msgmap is up-to-date: + $mkey = undef; # ignore xapian, for now + rlog($self, $r, *index_mm, *unindex_mm, $cb); + + # now deal with Xapian + $mkey = 'last_commit' unless $reindex; + $dbh = undef; + rlog($self, $range, *index_mm2, *unindex_mm2, $cb); } } else { # user didn't install DBD::SQLite and DBI - $lx = $self->rlog($range, *index_blob, *unindex_blob); - $db->set_metadata('last_commit', $lx) if defined $lx; + rlog($self, $range, *index_blob, *unindex_blob, $cb); } - $db->commit_transaction; } # this will create a ghost as necessary @@ -418,6 +455,7 @@ sub create_ghost { sub merge_threads { my ($self, $winner_tid, $loser_tid) = @_; + return if $winner_tid == $loser_tid; my ($head, $tail) = $self->find_doc_ids('thread', $loser_tid); my $thread_pfx = xpfx('thread'); my $db = $self->{xdb}; @@ -487,4 +525,10 @@ sub with_umask { $rv; } +sub DESTROY { + # order matters for unlocking + $_[0]->{xdb} = undef; + $_[0]->{lockfh} = undef; +} + 1; |