From 9ecbfc09928dada28094fd3fc79e91a5472b27ea Mon Sep 17 00:00:00 2001 From: "Eric Wong (Contractor, The Linux Foundation)" Date: Thu, 22 Feb 2018 01:49:08 +0000 Subject: v2: parallelize Xapian indexing The parallelization requires splitting Msgmap, text+term indexing, and thread-linking out into separate processes. git-fast-import is fast, so we don't bother parallelizing it. Msgmap (SQLite) and thread-linking (Xapian) must be serialized because they rely on monotonically increasing numbers (NNTP article number and internal thread_id, respectively). We handle msgmap in the main process which drives fast-import. When the article number is retrieved/generated, we write the entire message to per-partition subprocesses via pipes for expensive text+term indexing. When these per-partition subprocesses are done with the expensive text+term indexing, they write SearchMsg (small data) to a shared pipe (inherited from the main V2Writable process) back to the threader, which runs its own subprocess. The number of text+term Xapian partitions is chosen at import and can be made equal to the number of cores in a machine. V2Writable --> Import -> git-fast-import \-> SearchIdxThread -> Msgmap (synchronous) \-> SearchIdxPart[n] -> SearchIdx[*] \-> SearchIdxThread -> SearchIdx ("threader", a subprocess) [* ] each subprocess writes to threader --- MANIFEST | 2 + lib/PublicInbox/Import.pm | 4 +- lib/PublicInbox/Search.pm | 5 +- lib/PublicInbox/SearchIdx.pm | 80 ++++++++++++++++++-------- lib/PublicInbox/SearchIdxPart.pm | 70 +++++++++++++++++++++++ lib/PublicInbox/SearchIdxThread.pm | 111 +++++++++++++++++++++++++++++++++++++ lib/PublicInbox/SearchMsg.pm | 33 +++++------ lib/PublicInbox/V2Writable.pm | 80 +++++++++++++++++--------- 8 files changed, 314 insertions(+), 71 deletions(-) create mode 100644 lib/PublicInbox/SearchIdxPart.pm create mode 100644 lib/PublicInbox/SearchIdxThread.pm diff --git a/MANIFEST b/MANIFEST index 4b51b543..2a6f6f07 100644 --- a/MANIFEST +++ b/MANIFEST @@ -84,6 +84,8 @@ lib/PublicInbox/Reply.pm lib/PublicInbox/SaPlugin/ListMirror.pm lib/PublicInbox/Search.pm lib/PublicInbox/SearchIdx.pm +lib/PublicInbox/SearchIdxPart.pm +lib/PublicInbox/SearchIdxThread.pm lib/PublicInbox/SearchMsg.pm lib/PublicInbox/SearchThread.pm lib/PublicInbox/SearchView.pm diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index 1a2698a7..b650e4ef 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -280,14 +280,12 @@ sub add { $self->{bytes_added} += $n; print $w "blob\nmark :$blob\ndata ", $n, "\n" or wfail; print $w $str, "\n" or wfail; - $str = undef; # v2: we need this for Xapian if ($self->{want_object_id}) { chomp($self->{last_object_id} = $self->get_mark(":$blob")); - $self->{last_object_size} = $n; + $self->{last_object} = [ $n, \$str ]; } - my $ref = $self->{ref}; my $commit = $self->{mark}++; my $parent = $tip =~ /\A:/ ? $tip : undef; diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index eac11bd4..3b280598 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -124,7 +124,10 @@ sub xdir { if ($self->{version} == 1) { "$self->{mainrepo}/public-inbox/xapian" . SCHEMA_VERSION; } else { - "$self->{mainrepo}/xap" . SCHEMA_VERSION; + my $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION; + my $part = $self->{partition}; + defined $part or die "partition not given"; + $dir .= "/$part"; } } diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index c6c5bd25..cc7e7ec9 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -51,7 +51,7 @@ sub git_unquote ($) { } sub new { - my ($class, $ibx, $creat) = @_; + my ($class, $ibx, $creat, $part) = @_; my $mainrepo = $ibx; # for "public-inbox-index" w/o entry in config my $git_dir = $mainrepo; my ($altid, $git); @@ -83,7 +83,10 @@ sub new { if ($version == 1) { $self->{lock_path} = "$mainrepo/ssoma.lock"; } elsif ($version == 2) { - $self->{lock_path} = "$mainrepo/inbox.lock"; + defined $part or die "partition is required for v2\n"; + # partition is a number or "all" + $self->{partition} = $part; + $self->{lock_path} = undef; $self->{msgmap_path} = "$mainrepo/msgmap.sqlite3"; } else { die "unsupported inbox version=$version\n"; @@ -119,14 +122,16 @@ sub _xdb_acquire { sub _lock_acquire { my ($self) = @_; croak 'already locked' if $self->{lockfh}; - sysopen(my $lockfh, $self->{lock_path}, O_WRONLY|O_CREAT) or - die "failed to open lock $self->{lock_path}: $!\n"; + my $lock_path = $self->{lock_path} or return; + sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or + die "failed to open lock $lock_path: $!\n"; flock($lockfh, LOCK_EX) or die "lock failed: $!\n"; $self->{lockfh} = $lockfh; } sub _lock_release { my ($self) = @_; + return unless $self->{lock_path}; my $lockfh = delete $self->{lockfh} or croak 'not locked'; flock($lockfh, LOCK_UN) or die "unlock failed: $!\n"; close $lockfh or die "close failed: $!\n"; @@ -138,8 +143,8 @@ sub add_val ($$$) { $doc->add_value($col, $num); } -sub add_values ($$$) { - my ($smsg, $bytes, $num) = @_; +sub add_values ($$$$) { + my ($smsg, $bytes, $num, $lines) = @_; my $ts = $smsg->ts; my $doc = $smsg->{doc}; @@ -149,8 +154,7 @@ sub add_values ($$$) { defined($bytes) and add_val($doc, &PublicInbox::Search::BYTES, $bytes); - add_val($doc, &PublicInbox::Search::LINES, - $smsg->{mime}->body_raw =~ tr!\n!\n!); + add_val($doc, &PublicInbox::Search::LINES, $lines); my $yyyymmdd = strftime('%Y%m%d', gmtime($ts)); add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd); @@ -281,6 +285,7 @@ sub add_message { my ($doc_id, $old_tid); my $mid = mid_clean(mid_mime($mime)); + my $threader = $self->{threader}; eval { die 'Message-ID too long' if length($mid) > MAX_MID_SIZE; @@ -289,19 +294,22 @@ sub add_message { # convert a ghost to a regular message # it will also clobber any existing regular message $doc_id = $smsg->{doc_id}; - $old_tid = $smsg->thread_id; + $old_tid = $smsg->thread_id unless $threader; } $smsg = PublicInbox::SearchMsg->new($mime); my $doc = $smsg->{doc}; $doc->add_term('XMID' . $mid); my $subj = $smsg->subject; + my $xpath; if ($subj ne '') { - my $path = $self->subject_path($subj); - $doc->add_term('XPATH' . id_compress($path)); + $xpath = $self->subject_path($subj); + $xpath = id_compress($xpath); + $doc->add_term('XPATH' . $xpath); } - add_values($smsg, $bytes, $num); + my $lines = $mime->body_raw =~ tr!\n!\n!; + add_values($smsg, $bytes, $num, $lines); my $tg = $self->term_generator; @@ -350,9 +358,16 @@ sub add_message { index_body($tg, \@orig, $doc) if @orig; }); - link_message($self, $smsg, $old_tid); + # populates smsg->references for smsg->to_doc_data + my $refs = parse_references($smsg); + my $data = $smsg->to_doc_data($blob); + if ($threader) { + $threader->thread_msg($mid, $smsg->ts, $xpath, $data); + } else { + link_message($self, $smsg, $refs, $old_tid); + } $tg->index_text($mid, 1, 'XM'); - $doc->set_data($smsg->to_doc_data($blob)); + $doc->set_data($data); if (my $altid = $self->{-altid}) { foreach my $alt (@$altid) { @@ -424,8 +439,8 @@ sub next_thread_id { $last_thread_id; } -sub link_message { - my ($self, $smsg, $old_tid) = @_; +sub parse_references ($) { + my ($smsg) = @_; my $doc = $smsg->{doc}; my $mid = $smsg->mid; my $mime = $smsg->{mime}; @@ -436,7 +451,6 @@ sub link_message { my @refs = (($hdr->header_raw('References') || '') =~ /<([^>]+)>/g); push(@refs, (($hdr->header_raw('In-Reply-To') || '') =~ /<([^>]+)>/g)); - my $tid; if (@refs) { my %uniq = ($mid => 1); my @orig_refs = @refs; @@ -452,25 +466,31 @@ sub link_message { push @refs, $ref; } } + $smsg->{references} = '<'.join('> <', @refs).'>' if @refs; + \@refs +} - if (@refs) { - $smsg->{references} = '<'.join('> <', @refs).'>'; +sub link_message { + my ($self, $smsg, $refs, $old_tid) = @_; + my $tid; + + if (@$refs) { # first ref *should* be the thread root, # but we can never trust clients to do the right thing - my $ref = shift @refs; + my $ref = shift @$refs; $tid = $self->_resolve_mid_to_tid($ref); $self->merge_threads($tid, $old_tid) if defined $old_tid; # the rest of the refs should point to this tid: - foreach $ref (@refs) { + foreach $ref (@$refs) { my $ptid = $self->_resolve_mid_to_tid($ref); merge_threads($self, $tid, $ptid); } } else { $tid = defined $old_tid ? $old_tid : $self->next_thread_id; } - $doc->add_term('G' . $tid); + $smsg->{doc}->add_term('G' . $tid); } sub index_blob { @@ -798,4 +818,20 @@ sub DESTROY { $_[0]->{lockfh} = undef; } +# remote_* subs are only used by SearchIdxPart and SearchIdxThread: +sub remote_commit { + my ($self) = @_; + print { $self->{w} } "commit\n" or die "failed to write commit: $!"; +} + +sub remote_close { + my ($self) = @_; + my $pid = delete $self->{pid} or die "no process to wait on\n"; + my $w = delete $self->{w} or die "no pipe to write to\n"; + print $w "close\n" or die "failed to write to pid:$pid: $!\n"; + close $w or die "failed to close pipe for pid:$pid: $!\n"; + waitpid($pid, 0) == $pid or die "remote process did not finish"; + $? == 0 or die ref($self)." exited with: $?"; +} + 1; diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm new file mode 100644 index 00000000..d5a3fd17 --- /dev/null +++ b/lib/PublicInbox/SearchIdxPart.pm @@ -0,0 +1,70 @@ +# Copyright (C) 2018 all contributors +# License: AGPL-3.0+ +package PublicInbox::SearchIdxPart; +use strict; +use warnings; +use base qw(PublicInbox::SearchIdx); + +sub new { + my ($class, $v2writable, $part, $threader) = @_; + my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part); + $self->{threader} = $threader; + my ($r, $w); + pipe($r, $w) or die "pipe failed: $!\n"; + my $pid = fork; + defined $pid or die "fork failed: $!\n"; + if ($pid == 0) { + foreach my $other (@{$v2writable->{idx_parts}}) { + my $other_w = $other->{w} or next; + close $other_w or die "close other failed: $!\n"; + } + $v2writable = undef; + close $w; + eval { partition_worker_loop($self, $r) }; + die "worker $part died: $@\n" if $@; + die "unexpected MM $self->{mm}" if $self->{mm}; + exit; + } + $self->{pid} = $pid; + $self->{w} = $w; + close $r; + $self; +} + +sub partition_worker_loop ($$) { + my ($self, $r) = @_; + my $xdb = $self->_xdb_acquire; + $xdb->begin_transaction; + my $txn = 1; + while (my $line = $r->getline) { + if ($line eq "commit\n") { + $xdb->commit_transaction if $txn; + $txn = undef; + } elsif ($line eq "close\n") { + $self->_xdb_release; + $xdb = $txn = undef; + } else { + my ($len, $artnum, $object_id) = split(/ /, $line); + $xdb ||= $self->_xdb_acquire; + if (!$txn) { + $xdb->begin_transaction; + $txn = 1; + } + my $n = read($r, my $msg, $len) or die "read: $!\n"; + $n == $len or die "short read: $n != $len\n"; + my $mime = PublicInbox::MIME->new(\$msg); + $self->index_blob($mime, $len, $artnum, $object_id); + } + } + warn "$$ still in transaction\n" if $txn; + warn "$$ xdb active\n" if $xdb; +} + +# called by V2Writable +sub index_raw { + my ($self, $len, $msgref, $artnum, $object_id) = @_; + print { $self->{w} } "$len $artnum $object_id\n", $$msgref or die + "failed to write partition $!\n"; +} + +1; diff --git a/lib/PublicInbox/SearchIdxThread.pm b/lib/PublicInbox/SearchIdxThread.pm new file mode 100644 index 00000000..6471309e --- /dev/null +++ b/lib/PublicInbox/SearchIdxThread.pm @@ -0,0 +1,111 @@ +# Copyright (C) 2018 all contributors +# License: AGPL-3.0+ +package PublicInbox::SearchIdxThread; +use strict; +use warnings; +use base qw(PublicInbox::SearchIdx); +use Storable qw(freeze thaw); + +sub new { + my ($class, $v2ibx) = @_; + my $self = $class->SUPER::new($v2ibx, 1, 'all'); + # create the DB: + $self->_xdb_acquire; + $self->_xdb_release; + + my ($r, $w); + pipe($r, $w) or die "pipe failed: $!\n"; + binmode $r, ':raw'; + binmode $w, ':raw'; + my $pid = fork; + defined $pid or die "fork failed: $!\n"; + if ($pid == 0) { + close $w; + eval { thread_worker_loop($self, $r) }; + die "thread worker died: $@\n" if $@; + exit; + } + $self->{w} = $w; + $self->{pid} = $pid; + close $r; + + $w->autoflush(1); + + # lock on only exists in parent, not in worker + my $l = $self->{lock_path} = $self->xdir . '/thread.lock'; + open my $fh, '>>', $l or die "failed to create $l: $!\n"; + $self; +} + +sub thread_worker_loop { + my ($self, $r) = @_; + my $msg; + my $xdb = $self->_xdb_acquire; + $xdb->begin_transaction; + my $txn = 1; + while (my $line = $r->getline) { + if ($line eq "commit\n") { + $xdb->commit_transaction if $txn; + $txn = undef; + } elsif ($line eq "close\n") { + $self->_xdb_release; + $xdb = $txn = undef; + } else { + read($r, $msg, $line) or die "read failed: $!\n"; + $msg = thaw($msg); # should raise on error + defined $msg or die "failed to thaw buffer\n"; + if (!$txn) { + $xdb->begin_transaction; + $txn = 1; + } + eval { $self->thread_msg_real(@$msg) }; + warn "failed to index message <$msg->[0]>: $@\n" if $@; + } + } +} + +# called by a partition worker +sub thread_msg { + my ($self, $mid, $ts, $xpath, $doc_data) = @_; + my $w = $self->{w}; + my $err; + my $str = freeze([ $mid, $ts, $xpath, $doc_data ]); + my $len = length($str) . "\n"; + + # multiple processes write to the same pipe, so use flock + $self->_lock_acquire; + print $w $len, $str or $err = $!; + $self->_lock_release; + + die "print failed: $err\n" if $err; +} + +sub thread_msg_real { + my ($self, $mid, $ts, $xpath, $doc_data) = @_; + my $smsg = $self->lookup_message($mid); + my ($old_tid, $doc_id); + if ($smsg) { + # convert a ghost to a regular message + # it will also clobber any existing regular message + $doc_id = $smsg->{doc_id}; + $old_tid = $smsg->thread_id; + } else { + $smsg = PublicInbox::SearchMsg->new(undef); + $smsg->{mid} = $mid; + } + my $doc = $smsg->{doc}; + $doc->add_term('XPATH' . $xpath) if defined $xpath; + $doc->add_term('XMID' . $mid); + $doc->set_data($doc_data); + $smsg->{ts} = $ts; + my @refs = ($smsg->references =~ /<([^>]+)>/g); + $self->link_message($smsg, \@refs, $old_tid); + my $db = $self->{xdb}; + if (defined $doc_id) { + $db->replace_document($doc_id, $doc); + } else { + $doc_id = $db->add_document($doc); + } +} + +1; diff --git a/lib/PublicInbox/SearchMsg.pm b/lib/PublicInbox/SearchMsg.pm index 25c1abb8..941bfd24 100644 --- a/lib/PublicInbox/SearchMsg.pm +++ b/lib/PublicInbox/SearchMsg.pm @@ -29,19 +29,24 @@ sub get_val ($$) { Search::Xapian::sortable_unserialise($doc->get_value($col)); } -sub load_expand { - my ($self) = @_; - my $doc = $self->{doc}; - my $data = $doc->get_data or return; - $self->{ts} = get_val($doc, &PublicInbox::Search::TS); - utf8::decode($data); - my ($subj, $from, $refs, $to, $cc, $blob) = split(/\n/, $data); +sub load_from_data ($$) { + my ($self) = $_[0]; # data = $_[1] + my ($subj, $from, $refs, $to, $cc, $blob) = split(/\n/, $_[1]); $self->{subject} = $subj; $self->{from} = $from; $self->{references} = $refs; $self->{to} = $to; $self->{cc} = $cc; $self->{blob} = $blob; +} + +sub load_expand { + my ($self) = @_; + my $doc = $self->{doc}; + my $data = $doc->get_data or return; + $self->{ts} = get_val($doc, &PublicInbox::Search::TS); + utf8::decode($data); + load_from_data($self, $data); $self; } @@ -50,17 +55,9 @@ sub load_doc { my $data = $doc->get_data or return; my $ts = get_val($doc, &PublicInbox::Search::TS); utf8::decode($data); - my ($subj, $from, $refs, $to, $cc, $blob) = split(/\n/, $data); - bless { - doc => $doc, - subject => $subj, - ts => $ts, - from => $from, - references => $refs, - to => $to, - cc => $cc, - blob => $blob, - }, $class; + my $self = bless { doc => $doc, ts => $ts }, $class; + load_from_data($self, $data); + $self } # :bytes and :lines metadata in RFC 3977 diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 41bfb8d1..cb74ab1a 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -6,7 +6,8 @@ package PublicInbox::V2Writable; use strict; use warnings; use Fcntl qw(:flock :DEFAULT); -use PublicInbox::SearchIdx; +use PublicInbox::SearchIdxPart; +use PublicInbox::SearchIdxThread; use PublicInbox::MIME; use PublicInbox::Git; use PublicInbox::Import; @@ -32,7 +33,8 @@ sub new { im => undef, # PublicInbox::Import xap_rw => undef, # PublicInbox::V2SearchIdx xap_ro => undef, - + partitions => 4, + transact_bytes => 0, # limit each repo to 1GB or so rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), }; @@ -55,29 +57,39 @@ sub add { my $cmt = $im->add($mime, $check_cb) or return; $cmt = $im->get_mark($cmt); my $oid = $im->{last_object_id}; - my $size = $im->{last_object_size}; - - my $idx = $self->search_idx; - $idx->index_both($mime, $size, $oid); - $idx->{xdb}->set_metadata('last_commit', $cmt); - my $n = $self->{transact_bytes} += $size; - if ($n > PublicInbox::SearchIdx::BATCH_BYTES) { + my ($len, $msgref) = @{$im->{last_object}}; + + my $nparts = $self->{partitions}; + my $part = hex(substr($oid, 0, 8)) % $nparts; + my $idx = $self->idx_part($part); + my $all = $self->{all}; + my $num = $all->index_mm($mime); + $idx->index_raw($len, $msgref, $num, $oid); + my $n = $self->{transact_bytes} += $len; + if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) { $self->checkpoint; } $mime; } -sub search_idx { - my ($self) = @_; - $self->{idx} ||= eval { - my $idx = PublicInbox::SearchIdx->new($self->{-inbox}, 1); - my $mm = $idx->_msgmap_init; - $idx->_xdb_acquire->begin_transaction; - $self->{transact_bytes} = 0; - $mm->{dbh}->begin_work; - $idx - }; +sub idx_part { + my ($self, $part) = @_; + my $idx = $self->{idx_parts}; + return $idx->[$part] if $idx; # fast path + + # first time initialization: + my $all = $self->{all} = + PublicInbox::SearchIdxThread->new($self->{-inbox}); + + # need to create all parts before initializing msgmap FD + my $max = $self->{partitions} - 1; + $idx = $self->{idx_parts} = []; + for my $i (0..$max) { + push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all); + } + $all->_msgmap_init->{dbh}->begin_work; + $idx->[$part]; } sub remove { @@ -99,23 +111,37 @@ sub done { my ($self) = @_; my $im = $self->{im}; $im->done if $im; # PublicInbox::Import::done - $self->searchidx_checkpoint; + $self->searchidx_checkpoint(0); } sub checkpoint { my ($self) = @_; my $im = $self->{im}; $im->checkpoint if $im; # PublicInbox::Import::checkpoint - $self->searchidx_checkpoint; + $self->searchidx_checkpoint(1); } sub searchidx_checkpoint { - my ($self) = @_; - my $idx = delete $self->{idx} or return; + my ($self, $more) = @_; + + # order matters, we can only close {all} after all partitions + # are done because the partitions also write to {all} + + my $parts = $self->{idx_parts}; + foreach my $idx (@$parts) { + $idx->remote_commit; + $idx->remote_close unless $more; + } - $idx->{mm}->{dbh}->commit; - $idx->{xdb}->commit_transaction; - $idx->_xdb_release; + if (my $all = $self->{all}) { + $all->{mm}->{dbh}->commit; + if ($more) { + $all->{mm}->{dbh}->begin_work; + } + $all->remote_commit; + $all->remote_close unless $more; + } + $self->{transact_bytes} = 0; } sub git_init { @@ -158,7 +184,7 @@ sub importer { } else { $self->{im} = undef; $im->done; - $self->searchidx_checkpoint; + $self->searchidx_checkpoint(1); $im = undef; my $git_dir = $self->git_init(++$self->{max_git}); my $git = PublicInbox::Git->new($git_dir); -- cgit v1.2.3-24-ge0c7