From d1915096bbd5fdda4818336b64131799966e9d6e Mon Sep 17 00:00:00 2001 From: "Eric Wong (Contractor, The Linux Foundation)" Date: Tue, 27 Feb 2018 00:41:21 +0000 Subject: rename SearchIdxThread to SearchIdxSkeleton Interchangably using "all", "skel", "threader", etc. were confusing. Standardize on the "skeleton" term to describe this class since it's also used for retrieval of basic headers. --- MANIFEST | 2 +- lib/PublicInbox/Search.pm | 2 +- lib/PublicInbox/SearchIdx.pm | 10 +-- lib/PublicInbox/SearchIdxPart.pm | 4 +- lib/PublicInbox/SearchIdxSkeleton.pm | 121 +++++++++++++++++++++++++++++++++++ lib/PublicInbox/SearchIdxThread.pm | 120 ---------------------------------- lib/PublicInbox/V2Writable.pm | 30 +++++---- 7 files changed, 146 insertions(+), 143 deletions(-) create mode 100644 lib/PublicInbox/SearchIdxSkeleton.pm delete mode 100644 lib/PublicInbox/SearchIdxThread.pm diff --git a/MANIFEST b/MANIFEST index 2a6f6f07..1aaf8fff 100644 --- a/MANIFEST +++ b/MANIFEST @@ -85,7 +85,7 @@ lib/PublicInbox/SaPlugin/ListMirror.pm lib/PublicInbox/Search.pm lib/PublicInbox/SearchIdx.pm lib/PublicInbox/SearchIdxPart.pm -lib/PublicInbox/SearchIdxThread.pm +lib/PublicInbox/SearchIdxSkeleton.pm lib/PublicInbox/SearchMsg.pm lib/PublicInbox/SearchThread.pm lib/PublicInbox/SearchView.pm diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index 0f102dac..6b14942a 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -160,7 +160,7 @@ sub new { } warn "v2 repo with $parts found in $dir\n"; $self->{xdb} = $xdb; - $self->{skel} = Search::Xapian::Database->new("$dir/all"); + $self->{skel} = Search::Xapian::Database->new("$dir/skel"); } else { $self->{xdb} = Search::Xapian::Database->new($self->xdir); } diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index b5d43d12..32594137 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -285,7 +285,7 @@ sub add_message { my ($doc_id, $old_tid); my $mid = mid_clean(mid_mime($mime)); - my $threader = $self->{threader}; + my $skel = $self->{skeleton}; eval { die 'Message-ID too long' if length($mid) > MAX_MID_SIZE; @@ -294,7 +294,7 @@ sub add_message { # convert a ghost to a regular message # it will also clobber any existing regular message $doc_id = $smsg->{doc_id}; - $old_tid = $smsg->thread_id unless $threader; + $old_tid = $smsg->thread_id unless $skel; } $smsg = PublicInbox::SearchMsg->new($mime); my $doc = $smsg->{doc}; @@ -362,9 +362,9 @@ sub add_message { # populates smsg->references for smsg->to_doc_data my $refs = parse_references($smsg); my $data = $smsg->to_doc_data($blob); - if ($threader) { + if ($skel) { push @values, $mid, $xpath, $data; - $threader->thread_msg(\@values); + $skel->index_skeleton(\@values); } else { link_message($self, $smsg, $refs, $old_tid); } @@ -817,7 +817,7 @@ sub DESTROY { $_[0]->{lockfh} = undef; } -# remote_* subs are only used by SearchIdxPart and SearchIdxThread: +# remote_* subs are only used by SearchIdxPart and SearchIdxSkeleton sub remote_commit { my ($self) = @_; print { $self->{w} } "commit\n" or die "failed to write commit: $!"; diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm index 477a4f97..6025fc40 100644 --- a/lib/PublicInbox/SearchIdxPart.pm +++ b/lib/PublicInbox/SearchIdxPart.pm @@ -6,9 +6,9 @@ use warnings; use base qw(PublicInbox::SearchIdx); sub new { - my ($class, $v2writable, $part, $threader) = @_; + my ($class, $v2writable, $part, $skel) = @_; my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part); - $self->{threader} = $threader; + $self->{skeleton} = $skel; my ($r, $w); pipe($r, $w) or die "pipe failed: $!\n"; binmode $r, ':raw'; diff --git a/lib/PublicInbox/SearchIdxSkeleton.pm b/lib/PublicInbox/SearchIdxSkeleton.pm new file mode 100644 index 00000000..0016f89a --- /dev/null +++ b/lib/PublicInbox/SearchIdxSkeleton.pm @@ -0,0 +1,121 @@ +# Copyright (C) 2018 all contributors +# License: AGPL-3.0+ +package PublicInbox::SearchIdxSkeleton; +use strict; +use warnings; +use base qw(PublicInbox::SearchIdx); +use Storable qw(freeze thaw); + +sub new { + my ($class, $v2writable) = @_; + my $self = $class->SUPER::new($v2writable->{-inbox}, 1, 'skel'); + # create the DB: + $self->_xdb_acquire; + $self->_xdb_release; + + my ($r, $w); + pipe($r, $w) or die "pipe failed: $!\n"; + binmode $r, ':raw'; + binmode $w, ':raw'; + my $pid = fork; + defined $pid or die "fork failed: $!\n"; + if ($pid == 0) { + $v2writable->atfork_child; + $v2writable = undef; + close $w; + eval { skeleton_worker_loop($self, $r) }; + die "skeleton worker died: $@\n" if $@; + exit; + } + $self->{w} = $w; + $self->{pid} = $pid; + close $r; + + $w->autoflush(1); + + # lock on only exists in parent, not in worker + my $l = $self->{lock_path} = $self->xdir . '/pi-v2-skeleton.lock'; + open my $fh, '>>', $l or die "failed to create $l: $!\n"; + $self; +} + +sub skeleton_worker_loop { + my ($self, $r) = @_; + $0 = 'pi-v2-skeleton'; + my $msg; + my $xdb = $self->_xdb_acquire; + $xdb->begin_transaction; + my $txn = 1; + while (my $line = $r->getline) { + if ($line eq "commit\n") { + $xdb->commit_transaction if $txn; + $txn = undef; + } elsif ($line eq "close\n") { + $self->_xdb_release; + $xdb = $txn = undef; + } else { + read($r, $msg, $line) or die "read failed: $!\n"; + $msg = thaw($msg); # should raise on error + defined $msg or die "failed to thaw buffer\n"; + if (!$txn) { + $xdb->begin_transaction; + $txn = 1; + } + eval { index_skeleton_real($self, $msg) }; + warn "failed to index message <$msg->[-1]>: $@\n" if $@; + } + } +} + +# called by a partition worker +sub index_skeleton { + my ($self, $values) = @_; + my $w = $self->{w}; + my $err; + my $str = freeze($values); + $str = length($str) . "\n" . $str; + + # multiple processes write to the same pipe, so use flock + $self->_lock_acquire; + print $w $str or $err = $!; + $self->_lock_release; + + die "print failed: $err\n" if $err; +} + +# values: [ TS, NUM, BYTES, LINES, MID, XPATH, doc_data ] +sub index_skeleton_real ($$) { + my ($self, $values) = @_; + my $doc_data = pop @$values; + my $xpath = pop @$values; + my $mid = pop @$values; + my $ts = $values->[PublicInbox::Search::TS]; + my $smsg = $self->lookup_message($mid); + my ($old_tid, $doc_id); + if ($smsg) { + # convert a ghost to a regular message + # it will also clobber any existing regular message + $doc_id = $smsg->{doc_id}; + $old_tid = $smsg->thread_id; + } else { + $smsg = PublicInbox::SearchMsg->new(undef); + $smsg->{mid} = $mid; + } + my $doc = $smsg->{doc}; + $doc->add_term('XPATH' . $xpath) if defined $xpath; + $doc->add_term('XMID' . $mid); + PublicInbox::SearchIdx::add_values($doc, $values); + $doc->set_data($doc_data); + $smsg->{ts} = $ts; + $smsg->load_from_data($doc_data); + my @refs = ($smsg->references =~ /<([^>]+)>/g); + $self->link_message($smsg, \@refs, $old_tid); + my $db = $self->{xdb}; + if (defined $doc_id) { + $db->replace_document($doc_id, $doc); + } else { + $doc_id = $db->add_document($doc); + } +} + +1; diff --git a/lib/PublicInbox/SearchIdxThread.pm b/lib/PublicInbox/SearchIdxThread.pm deleted file mode 100644 index 6b50eb00..00000000 --- a/lib/PublicInbox/SearchIdxThread.pm +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright (C) 2018 all contributors -# License: AGPL-3.0+ -package PublicInbox::SearchIdxThread; -use strict; -use warnings; -use base qw(PublicInbox::SearchIdx); -use Storable qw(freeze thaw); - -sub new { - my ($class, $v2writable) = @_; - my $self = $class->SUPER::new($v2writable->{-inbox}, 1, 'all'); - # create the DB: - $self->_xdb_acquire; - $self->_xdb_release; - - my ($r, $w); - pipe($r, $w) or die "pipe failed: $!\n"; - binmode $r, ':raw'; - binmode $w, ':raw'; - my $pid = fork; - defined $pid or die "fork failed: $!\n"; - if ($pid == 0) { - $v2writable->atfork_child; - $v2writable = undef; - close $w; - eval { thread_worker_loop($self, $r) }; - die "thread worker died: $@\n" if $@; - exit; - } - $self->{w} = $w; - $self->{pid} = $pid; - close $r; - - $w->autoflush(1); - - # lock on only exists in parent, not in worker - my $l = $self->{lock_path} = $self->xdir . '/thread.lock'; - open my $fh, '>>', $l or die "failed to create $l: $!\n"; - $self; -} - -sub thread_worker_loop { - my ($self, $r) = @_; - $0 = 'pi-v2-threader'; - my $msg; - my $xdb = $self->_xdb_acquire; - $xdb->begin_transaction; - my $txn = 1; - while (my $line = $r->getline) { - if ($line eq "commit\n") { - $xdb->commit_transaction if $txn; - $txn = undef; - } elsif ($line eq "close\n") { - $self->_xdb_release; - $xdb = $txn = undef; - } else { - read($r, $msg, $line) or die "read failed: $!\n"; - $msg = thaw($msg); # should raise on error - defined $msg or die "failed to thaw buffer\n"; - if (!$txn) { - $xdb->begin_transaction; - $txn = 1; - } - eval { $self->thread_msg_real($msg) }; - warn "failed to index message <$msg->[-1]>: $@\n" if $@; - } - } -} - -# called by a partition worker -sub thread_msg { - my ($self, $values) = @_; - my $w = $self->{w}; - my $err; - my $str = freeze($values); - $str = length($str) . "\n" . $str; - - # multiple processes write to the same pipe, so use flock - $self->_lock_acquire; - print $w $str or $err = $!; - $self->_lock_release; - - die "print failed: $err\n" if $err; -} - -sub thread_msg_real { - my ($self, $values) = @_; - my $doc_data = pop @$values; - my $xpath = pop @$values; - my $mid = pop @$values; - my $ts = $values->[PublicInbox::Search::TS]; - my $smsg = $self->lookup_message($mid); - my ($old_tid, $doc_id); - if ($smsg) { - # convert a ghost to a regular message - # it will also clobber any existing regular message - $doc_id = $smsg->{doc_id}; - $old_tid = $smsg->thread_id; - } else { - $smsg = PublicInbox::SearchMsg->new(undef); - $smsg->{mid} = $mid; - } - my $doc = $smsg->{doc}; - $doc->add_term('XPATH' . $xpath) if defined $xpath; - $doc->add_term('XMID' . $mid); - PublicInbox::SearchIdx::add_values($doc, $values); - $doc->set_data($doc_data); - $smsg->{ts} = $ts; - $smsg->load_from_data($doc_data); - my @refs = ($smsg->references =~ /<([^>]+)>/g); - $self->link_message($smsg, \@refs, $old_tid); - my $db = $self->{xdb}; - if (defined $doc_id) { - $db->replace_document($doc_id, $doc); - } else { - $doc_id = $db->add_document($doc); - } -} - -1; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 5e819da4..ff3b6573 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -7,7 +7,7 @@ use strict; use warnings; use Fcntl qw(:flock :DEFAULT); use PublicInbox::SearchIdxPart; -use PublicInbox::SearchIdxThread; +use PublicInbox::SearchIdxSkeleton; use PublicInbox::MIME; use PublicInbox::Git; use PublicInbox::Import; @@ -61,7 +61,7 @@ sub add { my ($len, $msgref) = @{$im->{last_object}}; $self->idx_init; - my $num = $self->{all}->index_mm($mime, 1); + my $num = $self->{skel}->index_mm($mime, 1); my $nparts = $self->{partitions}; my $part = $num % $nparts; my $idx = $self->idx_part($part); @@ -83,18 +83,18 @@ sub idx_init { my ($self) = @_; return if $self->{idx_parts}; - # first time initialization, first we create the threader pipe: - my $all = $self->{all} = PublicInbox::SearchIdxThread->new($self); + # first time initialization, first we create the skeleton pipe: + my $skel = $self->{skel} = PublicInbox::SearchIdxSkeleton->new($self); # need to create all parts before initializing msgmap FD my $max = $self->{partitions} - 1; my $idx = $self->{idx_parts} = []; for my $i (0..$max) { - push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all); + push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $skel); } # Now that all subprocesses are up, we can open the FD for SQLite: - $all->_msgmap_init->{dbh}->begin_work; + $skel->_msgmap_init->{dbh}->begin_work; } sub remove { @@ -129,8 +129,8 @@ sub checkpoint { sub searchidx_checkpoint { my ($self, $more) = @_; - # order matters, we can only close {all} after all partitions - # are done because the partitions also write to {all} + # order matters, we can only close {skel} after all partitions + # are done because the partitions also write to {skel} if (my $parts = $self->{idx_parts}) { foreach my $idx (@$parts) { @@ -140,14 +140,16 @@ sub searchidx_checkpoint { delete $self->{idx_parts} unless $more; } - if (my $all = $self->{all}) { - $all->{mm}->{dbh}->commit; + if (my $skel = $self->{skel}) { + $skel->{mm}->{dbh}->commit; if ($more) { - $all->{mm}->{dbh}->begin_work; + $skel->{mm}->{dbh}->begin_work; + } + $skel->remote_commit; + unless ($more) { + $skel->remote_close; + delete $self->{skel}; } - $all->remote_commit; - $all->remote_close unless $more; - delete $self->{all} unless $more; } $self->{transact_bytes} = 0; } -- cgit v1.2.3-24-ge0c7