From 9ecbfc09928dada28094fd3fc79e91a5472b27ea Mon Sep 17 00:00:00 2001 From: "Eric Wong (Contractor, The Linux Foundation)" Date: Thu, 22 Feb 2018 01:49:08 +0000 Subject: v2: parallelize Xapian indexing The parallelization requires splitting Msgmap, text+term indexing, and thread-linking out into separate processes. git-fast-import is fast, so we don't bother parallelizing it. Msgmap (SQLite) and thread-linking (Xapian) must be serialized because they rely on monotonically increasing numbers (NNTP article number and internal thread_id, respectively). We handle msgmap in the main process which drives fast-import. When the article number is retrieved/generated, we write the entire message to per-partition subprocesses via pipes for expensive text+term indexing. When these per-partition subprocesses are done with the expensive text+term indexing, they write SearchMsg (small data) to a shared pipe (inherited from the main V2Writable process) back to the threader, which runs its own subprocess. The number of text+term Xapian partitions is chosen at import and can be made equal to the number of cores in a machine. V2Writable --> Import -> git-fast-import \-> SearchIdxThread -> Msgmap (synchronous) \-> SearchIdxPart[n] -> SearchIdx[*] \-> SearchIdxThread -> SearchIdx ("threader", a subprocess) [* ] each subprocess writes to threader --- lib/PublicInbox/SearchIdx.pm | 80 ++++++++++++++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 22 deletions(-) (limited to 'lib/PublicInbox/SearchIdx.pm') diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index c6c5bd25..cc7e7ec9 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -51,7 +51,7 @@ sub git_unquote ($) { } sub new { - my ($class, $ibx, $creat) = @_; + my ($class, $ibx, $creat, $part) = @_; my $mainrepo = $ibx; # for "public-inbox-index" w/o entry in config my $git_dir = $mainrepo; my ($altid, $git); @@ -83,7 +83,10 @@ sub new { if ($version == 1) { $self->{lock_path} = "$mainrepo/ssoma.lock"; } elsif ($version == 2) { - $self->{lock_path} = "$mainrepo/inbox.lock"; + defined $part or die "partition is required for v2\n"; + # partition is a number or "all" + $self->{partition} = $part; + $self->{lock_path} = undef; $self->{msgmap_path} = "$mainrepo/msgmap.sqlite3"; } else { die "unsupported inbox version=$version\n"; @@ -119,14 +122,16 @@ sub _xdb_acquire { sub _lock_acquire { my ($self) = @_; croak 'already locked' if $self->{lockfh}; - sysopen(my $lockfh, $self->{lock_path}, O_WRONLY|O_CREAT) or - die "failed to open lock $self->{lock_path}: $!\n"; + my $lock_path = $self->{lock_path} or return; + sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or + die "failed to open lock $lock_path: $!\n"; flock($lockfh, LOCK_EX) or die "lock failed: $!\n"; $self->{lockfh} = $lockfh; } sub _lock_release { my ($self) = @_; + return unless $self->{lock_path}; my $lockfh = delete $self->{lockfh} or croak 'not locked'; flock($lockfh, LOCK_UN) or die "unlock failed: $!\n"; close $lockfh or die "close failed: $!\n"; @@ -138,8 +143,8 @@ sub add_val ($$$) { $doc->add_value($col, $num); } -sub add_values ($$$) { - my ($smsg, $bytes, $num) = @_; +sub add_values ($$$$) { + my ($smsg, $bytes, $num, $lines) = @_; my $ts = $smsg->ts; my $doc = $smsg->{doc}; @@ -149,8 +154,7 @@ sub add_values ($$$) { defined($bytes) and add_val($doc, &PublicInbox::Search::BYTES, $bytes); - add_val($doc, &PublicInbox::Search::LINES, - $smsg->{mime}->body_raw =~ tr!\n!\n!); + add_val($doc, &PublicInbox::Search::LINES, $lines); my $yyyymmdd = strftime('%Y%m%d', gmtime($ts)); add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd); @@ -281,6 +285,7 @@ sub add_message { my ($doc_id, $old_tid); my $mid = mid_clean(mid_mime($mime)); + my $threader = $self->{threader}; eval { die 'Message-ID too long' if length($mid) > MAX_MID_SIZE; @@ -289,19 +294,22 @@ sub add_message { # convert a ghost to a regular message # it will also clobber any existing regular message $doc_id = $smsg->{doc_id}; - $old_tid = $smsg->thread_id; + $old_tid = $smsg->thread_id unless $threader; } $smsg = PublicInbox::SearchMsg->new($mime); my $doc = $smsg->{doc}; $doc->add_term('XMID' . $mid); my $subj = $smsg->subject; + my $xpath; if ($subj ne '') { - my $path = $self->subject_path($subj); - $doc->add_term('XPATH' . id_compress($path)); + $xpath = $self->subject_path($subj); + $xpath = id_compress($xpath); + $doc->add_term('XPATH' . $xpath); } - add_values($smsg, $bytes, $num); + my $lines = $mime->body_raw =~ tr!\n!\n!; + add_values($smsg, $bytes, $num, $lines); my $tg = $self->term_generator; @@ -350,9 +358,16 @@ sub add_message { index_body($tg, \@orig, $doc) if @orig; }); - link_message($self, $smsg, $old_tid); + # populates smsg->references for smsg->to_doc_data + my $refs = parse_references($smsg); + my $data = $smsg->to_doc_data($blob); + if ($threader) { + $threader->thread_msg($mid, $smsg->ts, $xpath, $data); + } else { + link_message($self, $smsg, $refs, $old_tid); + } $tg->index_text($mid, 1, 'XM'); - $doc->set_data($smsg->to_doc_data($blob)); + $doc->set_data($data); if (my $altid = $self->{-altid}) { foreach my $alt (@$altid) { @@ -424,8 +439,8 @@ sub next_thread_id { $last_thread_id; } -sub link_message { - my ($self, $smsg, $old_tid) = @_; +sub parse_references ($) { + my ($smsg) = @_; my $doc = $smsg->{doc}; my $mid = $smsg->mid; my $mime = $smsg->{mime}; @@ -436,7 +451,6 @@ sub link_message { my @refs = (($hdr->header_raw('References') || '') =~ /<([^>]+)>/g); push(@refs, (($hdr->header_raw('In-Reply-To') || '') =~ /<([^>]+)>/g)); - my $tid; if (@refs) { my %uniq = ($mid => 1); my @orig_refs = @refs; @@ -452,25 +466,31 @@ sub link_message { push @refs, $ref; } } + $smsg->{references} = '<'.join('> <', @refs).'>' if @refs; + \@refs +} - if (@refs) { - $smsg->{references} = '<'.join('> <', @refs).'>'; +sub link_message { + my ($self, $smsg, $refs, $old_tid) = @_; + my $tid; + + if (@$refs) { # first ref *should* be the thread root, # but we can never trust clients to do the right thing - my $ref = shift @refs; + my $ref = shift @$refs; $tid = $self->_resolve_mid_to_tid($ref); $self->merge_threads($tid, $old_tid) if defined $old_tid; # the rest of the refs should point to this tid: - foreach $ref (@refs) { + foreach $ref (@$refs) { my $ptid = $self->_resolve_mid_to_tid($ref); merge_threads($self, $tid, $ptid); } } else { $tid = defined $old_tid ? $old_tid : $self->next_thread_id; } - $doc->add_term('G' . $tid); + $smsg->{doc}->add_term('G' . $tid); } sub index_blob { @@ -798,4 +818,20 @@ sub DESTROY { $_[0]->{lockfh} = undef; } +# remote_* subs are only used by SearchIdxPart and SearchIdxThread: +sub remote_commit { + my ($self) = @_; + print { $self->{w} } "commit\n" or die "failed to write commit: $!"; +} + +sub remote_close { + my ($self) = @_; + my $pid = delete $self->{pid} or die "no process to wait on\n"; + my $w = delete $self->{w} or die "no pipe to write to\n"; + print $w "close\n" or die "failed to write to pid:$pid: $!\n"; + close $w or die "failed to close pipe for pid:$pid: $!\n"; + waitpid($pid, 0) == $pid or die "remote process did not finish"; + $? == 0 or die ref($self)." exited with: $?"; +} + 1; -- cgit v1.2.3-24-ge0c7