about summary refs log tree commit homepage
path: root/lib/PublicInbox/SearchIdxShard.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2019-06-14 17:35:04 +0000
committerEric Wong <e@80x24.org>2019-06-14 21:56:40 +0000
commit0ffd4a9833da64006d558ef241badfef3c096d1b (patch)
tree124f1df1d9a3c1f35c71af85acd4c2a28b19c088 /lib/PublicInbox/SearchIdxShard.pm
parentd963eec26dea82026789834573c2d5c4df91b3cf (diff)
downloadpublic-inbox-0ffd4a9833da64006d558ef241badfef3c096d1b.tar.gz
v2: rename SearchIdxPart => SearchIdxShard
Another step towards keeping our file and package names
consistent with Xapian terminology.
Diffstat (limited to 'lib/PublicInbox/SearchIdxShard.pm')
-rw-r--r--lib/PublicInbox/SearchIdxShard.pm116
1 files changed, 116 insertions, 0 deletions
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
new file mode 100644
index 00000000..15ec6578
--- /dev/null
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -0,0 +1,116 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# used to interface with a single Xapian shard in V2 repos.
+# See L<public-inbox-v2-format(5)> for more info on how we shard Xapian
+package PublicInbox::SearchIdxShard;
+use strict;
+use warnings;
+use base qw(PublicInbox::SearchIdx);
+
+sub new {
+        my ($class, $v2writable, $shard) = @_;
+        my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $shard);
+        # create the DB before forking:
+        $self->_xdb_acquire;
+        $self->_xdb_release;
+        $self->spawn_worker($v2writable, $shard) if $v2writable->{parallel};
+        $self;
+}
+
+sub spawn_worker {
+        my ($self, $v2writable, $shard) = @_;
+        my ($r, $w);
+        pipe($r, $w) or die "pipe failed: $!\n";
+        binmode $r, ':raw';
+        binmode $w, ':raw';
+        my $pid = fork;
+        defined $pid or die "fork failed: $!\n";
+        if ($pid == 0) {
+                my $bnote = $v2writable->atfork_child;
+                $v2writable = undef;
+                close $w or die "failed to close: $!";
+
+                # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
+                # speeds V2Writable batch imports across 8 cores by nearly 20%
+                fcntl($r, 1031, 1048576) if $^O eq 'linux';
+
+                eval { shard_worker_loop($self, $r, $shard, $bnote) };
+                die "worker $shard died: $@\n" if $@;
+                die "unexpected MM $self->{mm}" if $self->{mm};
+                exit;
+        }
+        $self->{pid} = $pid;
+        $self->{w} = $w;
+        close $r or die "failed to close: $!";
+}
+
+sub shard_worker_loop ($$$$) {
+        my ($self, $r, $shard, $bnote) = @_;
+        $0 = "pi-v2-shard[$shard]";
+        my $current_info = '';
+        my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
+        local $SIG{__WARN__} = sub {
+                chomp $current_info;
+                $warn_cb->("[$shard] $current_info: ", @_);
+        };
+        $self->begin_txn_lazy;
+        while (my $line = $r->getline) {
+                $current_info = $line;
+                if ($line eq "commit\n") {
+                        $self->commit_txn_lazy;
+                } elsif ($line eq "close\n") {
+                        $self->_xdb_release;
+                } elsif ($line eq "barrier\n") {
+                        $self->commit_txn_lazy;
+                        # no need to lock < 512 bytes is atomic under POSIX
+                        print $bnote "barrier $shard\n" or
+                                        die "write failed for barrier $!\n";
+                } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) {
+                        my ($oid, $mid) = ($1, $2);
+                        $self->begin_txn_lazy;
+                        $self->remove_by_oid($oid, $mid);
+                } else {
+                        chomp $line;
+                        my ($len, $artnum, $oid, $mid0) = split(/ /, $line);
+                        $self->begin_txn_lazy;
+                        my $n = read($r, my $msg, $len) or die "read: $!\n";
+                        $n == $len or die "short read: $n != $len\n";
+                        my $mime = PublicInbox::MIME->new(\$msg);
+                        $artnum = int($artnum);
+                        $self->add_message($mime, $n, $artnum, $oid, $mid0);
+                }
+        }
+        $self->worker_done;
+}
+
+# called by V2Writable
+sub index_raw {
+        my ($self, $bytes, $msgref, $artnum, $oid, $mid0, $mime) = @_;
+        if (my $w = $self->{w}) {
+                print $w "$bytes $artnum $oid $mid0\n", $$msgref or die
+                        "failed to write shard $!\n";
+                $w->flush or die "failed to flush: $!\n";
+        } else {
+                $$msgref = undef;
+                $self->begin_txn_lazy;
+                $self->add_message($mime, $bytes, $artnum, $oid, $mid0);
+        }
+}
+
+sub atfork_child {
+        close $_[0]->{w} or die "failed to close write pipe: $!\n";
+}
+
+# called by V2Writable:
+sub remote_barrier {
+        my ($self) = @_;
+        if (my $w = $self->{w}) {
+                print $w "barrier\n" or die "failed to print: $!";
+                $w->flush or die "failed to flush: $!";
+        } else {
+                $self->commit_txn_lazy;
+        }
+}
+
+1;