This fixes a performance regression in multi-process v2 indexing
due to the switch to PublicInbox::IPC. While Unix sockets are
fewer FDs to manage, pipes allow unprivileged processes to use
larger buffers (up to 1M) on out-of-the-box Linux instances.
A larger buffer via F_SETPIPE_SZ afforded by pipes was proven
valuable during v2 development in 2018 and continues to be
valuable when we get significant amounts of one-way traffic from
the producer parent to worker children.
Compression may be an option for systems without F_SETPIPE_SZ;
but it increases CPU usage with no memory bandwidth savings on
hosts where larger buffers are available.
---
lib/PublicInbox/IPC.pm | 115 +++++++++++++++++-------------
lib/PublicInbox/SearchIdxShard.pm | 23 +++---
lib/PublicInbox/V2Writable.pm | 2 +-
3 files changed, 78 insertions(+), 62 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 6b7b3c7a..c1f6f920 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -6,7 +6,6 @@
package PublicInbox::IPC;
use strict;
use v5.10.1;
-use Socket qw(AF_UNIX SOCK_STREAM);
use Carp qw(confess croak);
use PublicInbox::Sigfd;
my ($enc, $dec);
@@ -34,39 +33,42 @@ if ($enc && $dec) { # should be custom ops
}
sub _get_rec ($) {
- my ($sock) = @_;
- local $/ = "\n";
- defined(my $len = <$sock>) or return;
+ my ($r) = @_;
+ defined(my $len = <$r>) or return;
chop($len) eq "\n" or croak "no LF byte in $len";
- defined(my $r = read($sock, my $buf, $len)) or croak "read error: $!";
- $r == $len or croak "short read: $r != $len";
+ defined(my $n = read($r, my $buf, $len)) or croak "read error: $!";
+ $n == $len or croak "short read: $n != $len";
thaw($buf);
}
sub _send_rec ($$) {
- my ($sock, $ref) = @_;
+ my ($w, $ref) = @_;
my $buf = freeze($ref);
- print $sock length($buf), "\n", $buf or croak "print: $!";
+ print $w length($buf), "\n", $buf or croak "print: $!";
}
sub ipc_return ($$$) {
- my ($s2, $ret, $exc) = @_;
- _send_rec($s2, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
+ my ($w, $ret, $exc) = @_;
+ _send_rec($w, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
}
-sub ipc_worker_loop ($$) {
- my ($self, $s2) = @_;
- while (my $rec = _get_rec($s2)) {
- my ($wantarray, $sub, @args) = @$rec;
- if (!defined($wantarray)) { # no waiting if client doesn't care
+sub ipc_worker_loop ($$$) {
+ my ($self, $r_req, $w_res) = @_;
+ my ($rec, $wantarray, $sub, @args);
+ local $/ = "\n";
+ while ($rec = _get_rec($r_req)) {
+ ($wantarray, $sub, @args) = @$rec;
+ # no waiting if client doesn't care,
+ # this is the overwhelmingly likely case
+ if (!defined($wantarray)) {
eval { $self->$sub(@args) };
- eval { warn "die: $@ (from nowait $sub)\n" } if $@;
+ eval { warn "$$ die: $@ (from nowait $sub)\n" } if $@;
} elsif ($wantarray) {
my @ret = eval { $self->$sub(@args) };
- ipc_return($s2, \@ret, $@);
- } else {
+ ipc_return($w_res, \@ret, $@);
+ } else { # '' => wantscalar
my $ret = eval { $self->$sub(@args) };
- ipc_return($s2, \$ret, $@);
+ ipc_return($w_res, \$ret, $@);
}
}
}
@@ -75,33 +77,34 @@ sub ipc_worker_loop ($$) {
sub ipc_worker_spawn {
my ($self, $ident, $oldset) = @_;
return unless $enc; # no Sereal or Storable
- my $pid = $self->{-ipc_worker_pid};
- confess "BUG: already spawned PID:$pid" if $pid;
- confess "BUG: already have worker socket" if $self->{-ipc_sock};
- my ($s1, $s2);
- socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!";
+ return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
+ delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
+ pipe(my ($r_req, $w_req)) or die "pipe: $!";
+ pipe(my ($r_res, $w_res)) or die "pipe: $!";
my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
my $parent = $$;
$self->ipc_atfork_parent;
- defined($pid = fork) or die "fork: $!";
+ defined(my $pid = fork) or die "fork: $!";
if ($pid == 0) {
eval { PublicInbox::DS->Reset };
$self->{-ipc_parent_pid} = $parent;
- close $s1 or die "close(\$s1): $!";
- $s2->autoflush(1);
+ $w_req = $r_res = undef;
+ $w_res->autoflush(1);
$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
local $0 = $ident;
PublicInbox::Sigfd::sig_setmask($oldset);
my $on_destroy = $self->ipc_atfork_child;
- eval { ipc_worker_loop($self, $s2) };
+ eval { ipc_worker_loop($self, $r_req, $w_res) };
die "worker $ident PID:$$ died: $@\n" if $@;
exit;
}
PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
- close $s2 or die "close(\$s2): $!";
- $s1->autoflush(1);
- $self->{-ipc_sock} = $s1;
- $self->{-ipc_worker_pid} = $pid;
+ $r_req = $w_res = undef;
+ $w_req->autoflush(1);
+ $self->{-ipc_req} = $w_req;
+ $self->{-ipc_res} = $r_res;
+ $self->{-ipc_ppid} = $$;
+ $self->{-ipc_pid} = $pid;
}
sub ipc_worker_reap { # dwaitpid callback
@@ -122,15 +125,18 @@ sub ipc_worker_exit {
# idempotent, can be called regardless of whether worker is active or not
sub ipc_worker_stop {
my ($self) = @_;
- my $pid;
- my $s1 = delete $self->{-ipc_sock} or do {
- $pid = delete $self->{-ipc_worker_pid} and
- die "unexpected PID:$pid without ipc_sock";
- return;
- };
- $pid = delete $self->{-ipc_worker_pid} or die "no PID?";
- _send_rec($s1, [ undef, 'ipc_worker_exit', 0 ]);
- shutdown($s1, 2) or die "shutdown(\$s1) for PID:$pid";
+ my ($pid, $ppid) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
+ my ($w_req, $r_res) = delete(@$self{qw(-ipc_req -ipc_res)});
+ if (!$w_req && !$r_res) {
+ die "unexpected PID:$pid without IPC pipes" if $pid;
+ return; # idempotent
+ }
+ die 'no PID with IPC pipes' unless $pid;
+ _send_rec($w_req, [ undef, 'ipc_worker_exit', 0 ]);
+ $w_req = $r_res = undef;
+
+ # allow any sibling to send ipc_worker_exit, but siblings can't wait
+ return if $$ != $ppid;
eval {
my $reap = $self->can('ipc_worker_reap');
PublicInbox::DS::dwaitpid($pid, $reap, $self);
@@ -153,17 +159,30 @@ sub ipc_lock_init {
# call $self->$sub(@args), on a worker if ipc_worker_spawn was used
sub ipc_do {
my ($self, $sub, @args) = @_;
- if (my $s1 = $self->{-ipc_sock}) {
+ if (my $w_req = $self->{-ipc_req}) { # run in worker
my $ipc_lock = $self->{-ipc_lock};
my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
- _send_rec($s1, [ wantarray, $sub, @args ]);
- return unless defined(wantarray);
- my $ret = _get_rec($s1) // die "no response on $sub";
- die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
- wantarray ? @$ret : $$ret;
- } else {
+ if (defined(wantarray)) {
+ my $r_res = $self->{-ipc_res} or die 'no ipc_res';
+ _send_rec($w_req, [ wantarray, $sub, @args ]);
+ my $ret = _get_rec($r_res) // die "no response on $sub";
+ die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
+ wantarray ? @$ret : $$ret;
+ } else { # likely, fire-and-forget into pipe
+ _send_rec($w_req, [ undef , $sub, @args ]);
+ }
+ } else { # run locally
$self->$sub(@args);
}
}
+# needed when there's multiple IPC workers and the parent forking
+# causes newer siblings to inherit older siblings sockets
+sub ipc_sibling_atfork_child {
+ my ($self) = @_;
+ my ($pid, undef) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
+ delete(@$self{qw(-ipc_req -ipc_res)});
+ $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
+}
+
1;
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 83cbbb25..0051df93 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -21,6 +21,14 @@ sub new {
if ($v2w->{parallel}) {
local $self->{-v2w_afc} = $v2w;
$self->ipc_worker_spawn("shard[$shard]");
+ # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size for
+ # inputs speeds V2Writable batch imports across 8 cores by
+ # nearly 20%. Since any of our responses are small, make
+ # the response pipe as small as possible
+ if ($^O eq 'linux') {
+ fcntl($self->{-ipc_req}, 1031, 1048576);
+ fcntl($self->{-ipc_res}, 1031, 4096);
+ }
}
$self;
}
@@ -36,7 +44,7 @@ sub _worker_done {
sub ipc_atfork_child { # called automatically before ipc_worker_loop
my ($self) = @_;
my $v2w = delete $self->{-v2w_afc} or die 'BUG: {-v2w_afc} missing';
- $v2w->atfork_child; # calls shard_atfork_child on our siblings
+ $v2w->atfork_child; # calls ipc_sibling_atfork_child on our siblings
$v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__}
$self->begin_txn_lazy;
# caller must capture this:
@@ -49,17 +57,6 @@ sub index_eml {
$self->ipc_do('add_message', $eml, $smsg);
}
-# needed when there's multiple IPC workers and the parent forking
-# causes newer siblings to inherit older siblings sockets
-sub shard_atfork_child {
- my ($self) = @_;
- my $pid = delete($self->{-ipc_worker_pid}) or
- die "BUG: $$ no -ipc_worker_pid";
- my $s1 = delete($self->{-ipc_sock}) or die "BUG: $$ no -ipc_sock";
- $pid == $$ and die "BUG: $$ shard_atfork_child called on itself";
- close($s1) or die "close -ipc_sock: $!";
-}
-
# wait for return to determine when ipc_do('commit_txn_lazy') is done
sub echo {
shift;
@@ -80,7 +77,7 @@ sub shard_close {
sub shard_over_check {
my ($self, $over) = @_;
- if ($self->{-ipc_sock} && $over->{dbh}) {
+ if ($self->{-ipc_req} && $over->{dbh}) {
# can't send DB handles over IPC
$over = ref($over)->new($over->{dbh}->sqlite_db_filename);
}
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index c4efbdd2..6be95979 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -823,7 +823,7 @@ sub content_exists ($$$) {
sub atfork_child {
my ($self) = @_;
if (my $older_siblings = $self->{idx_shards}) {
- $_->shard_atfork_child for @$older_siblings;
+ $_->ipc_sibling_atfork_child for @$older_siblings;
}
if (my $im = $self->{im}) {
$im->atfork_child;