about summary refs log tree commit homepage
path: root/lib/PublicInbox/SearchIdxShard.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2019-06-16 01:04:28 +0000
committerEric Wong <e@80x24.org>2019-06-16 01:04:28 +0000
commitc477bdd8a80eecc319b680764edfb24bd12cb7b2 (patch)
treefb36e978b8dbe1f2228527891d47ef9d69b10591 /lib/PublicInbox/SearchIdxShard.pm
parent044b1d03c76246316d52de4bfd578745a8192398 (diff)
parent27658d2c8b8e51fa64f523c873587273f4f16c46 (diff)
downloadpublic-inbox-c477bdd8a80eecc319b680764edfb24bd12cb7b2.tar.gz
* origin/newspeak:
  comments: replace "partition" with "shard"
  t/xcpdb-reshard: use 'shard' term in local variables
  xapcmd: favor 'shard' over 'part' in local variables
  search: use "shard" for local variable
  v2writable: use "epoch" consistently when referring to git repos
  adminedit: "part" => "shard" for local variables
  v2writable: rename local vars to match Xapian terminology
  v2writable: avoid "part" in internal subs and fields
  search*: rename {partition} => {shard}
  xapcmd: update comments referencing "partitions"
  v2: rename SearchIdxPart => SearchIdxShard
  inboxwritable: s/partitions/shards/ in local var
  tests: change messages to use "shard" instead of partition
  v2writable: rename {partitions} field to {shards}
  v2writable: count_partitions => count_shards
  searchidxpart: start using "shard" in user-visible places
  rename reference to git epochs as "partitions"
  admin|xapcmd: user-facing messages say "shard"
  v2writable: update comments regarding xcpdb --reshard
  doc: rename our Xapian "partitions" to "shards"
Diffstat (limited to 'lib/PublicInbox/SearchIdxShard.pm')
-rw-r--r--lib/PublicInbox/SearchIdxShard.pm116
1 files changed, 116 insertions, 0 deletions
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
new file mode 100644
index 00000000..15ec6578
--- /dev/null
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -0,0 +1,116 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# used to interface with a single Xapian shard in V2 repos.
+# See L<public-inbox-v2-format(5)> for more info on how we shard Xapian
+package PublicInbox::SearchIdxShard;
+use strict;
+use warnings;
+use base qw(PublicInbox::SearchIdx);
+
+sub new {
+        my ($class, $v2writable, $shard) = @_;
+        my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $shard);
+        # create the DB before forking:
+        $self->_xdb_acquire;
+        $self->_xdb_release;
+        $self->spawn_worker($v2writable, $shard) if $v2writable->{parallel};
+        $self;
+}
+
+sub spawn_worker {
+        my ($self, $v2writable, $shard) = @_;
+        my ($r, $w);
+        pipe($r, $w) or die "pipe failed: $!\n";
+        binmode $r, ':raw';
+        binmode $w, ':raw';
+        my $pid = fork;
+        defined $pid or die "fork failed: $!\n";
+        if ($pid == 0) {
+                my $bnote = $v2writable->atfork_child;
+                $v2writable = undef;
+                close $w or die "failed to close: $!";
+
+                # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
+                # speeds V2Writable batch imports across 8 cores by nearly 20%
+                fcntl($r, 1031, 1048576) if $^O eq 'linux';
+
+                eval { shard_worker_loop($self, $r, $shard, $bnote) };
+                die "worker $shard died: $@\n" if $@;
+                die "unexpected MM $self->{mm}" if $self->{mm};
+                exit;
+        }
+        $self->{pid} = $pid;
+        $self->{w} = $w;
+        close $r or die "failed to close: $!";
+}
+
+sub shard_worker_loop ($$$$) {
+        my ($self, $r, $shard, $bnote) = @_;
+        $0 = "pi-v2-shard[$shard]";
+        my $current_info = '';
+        my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
+        local $SIG{__WARN__} = sub {
+                chomp $current_info;
+                $warn_cb->("[$shard] $current_info: ", @_);
+        };
+        $self->begin_txn_lazy;
+        while (my $line = $r->getline) {
+                $current_info = $line;
+                if ($line eq "commit\n") {
+                        $self->commit_txn_lazy;
+                } elsif ($line eq "close\n") {
+                        $self->_xdb_release;
+                } elsif ($line eq "barrier\n") {
+                        $self->commit_txn_lazy;
+                        # no need to lock < 512 bytes is atomic under POSIX
+                        print $bnote "barrier $shard\n" or
+                                        die "write failed for barrier $!\n";
+                } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) {
+                        my ($oid, $mid) = ($1, $2);
+                        $self->begin_txn_lazy;
+                        $self->remove_by_oid($oid, $mid);
+                } else {
+                        chomp $line;
+                        my ($len, $artnum, $oid, $mid0) = split(/ /, $line);
+                        $self->begin_txn_lazy;
+                        my $n = read($r, my $msg, $len) or die "read: $!\n";
+                        $n == $len or die "short read: $n != $len\n";
+                        my $mime = PublicInbox::MIME->new(\$msg);
+                        $artnum = int($artnum);
+                        $self->add_message($mime, $n, $artnum, $oid, $mid0);
+                }
+        }
+        $self->worker_done;
+}
+
+# called by V2Writable
+sub index_raw {
+        my ($self, $bytes, $msgref, $artnum, $oid, $mid0, $mime) = @_;
+        if (my $w = $self->{w}) {
+                print $w "$bytes $artnum $oid $mid0\n", $$msgref or die
+                        "failed to write shard $!\n";
+                $w->flush or die "failed to flush: $!\n";
+        } else {
+                $$msgref = undef;
+                $self->begin_txn_lazy;
+                $self->add_message($mime, $bytes, $artnum, $oid, $mid0);
+        }
+}
+
+sub atfork_child {
+        close $_[0]->{w} or die "failed to close write pipe: $!\n";
+}
+
+# called by V2Writable:
+sub remote_barrier {
+        my ($self) = @_;
+        if (my $w = $self->{w}) {
+                print $w "barrier\n" or die "failed to print: $!";
+                $w->flush or die "failed to flush: $!";
+        } else {
+                $self->commit_txn_lazy;
+        }
+}
+
+1;