about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/IPC.pm115
-rw-r--r--lib/PublicInbox/SearchIdxShard.pm23
-rw-r--r--lib/PublicInbox/V2Writable.pm2
3 files changed, 78 insertions, 62 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 6b7b3c7a..c1f6f920 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -6,7 +6,6 @@
 package PublicInbox::IPC;
 use strict;
 use v5.10.1;
-use Socket qw(AF_UNIX SOCK_STREAM);
 use Carp qw(confess croak);
 use PublicInbox::Sigfd;
 my ($enc, $dec);
@@ -34,39 +33,42 @@ if ($enc && $dec) { # should be custom ops
 }
 
 sub _get_rec ($) {
-        my ($sock) = @_;
-        local $/ = "\n";
-        defined(my $len = <$sock>) or return;
+        my ($r) = @_;
+        defined(my $len = <$r>) or return;
         chop($len) eq "\n" or croak "no LF byte in $len";
-        defined(my $r = read($sock, my $buf, $len)) or croak "read error: $!";
-        $r == $len or croak "short read: $r != $len";
+        defined(my $n = read($r, my $buf, $len)) or croak "read error: $!";
+        $n == $len or croak "short read: $n != $len";
         thaw($buf);
 }
 
 sub _send_rec ($$) {
-        my ($sock, $ref) = @_;
+        my ($w, $ref) = @_;
         my $buf = freeze($ref);
-        print $sock length($buf), "\n", $buf or croak "print: $!";
+        print $w length($buf), "\n", $buf or croak "print: $!";
 }
 
 sub ipc_return ($$$) {
-        my ($s2, $ret, $exc) = @_;
-        _send_rec($s2, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
+        my ($w, $ret, $exc) = @_;
+        _send_rec($w, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
 }
 
-sub ipc_worker_loop ($$) {
-        my ($self, $s2) = @_;
-        while (my $rec = _get_rec($s2)) {
-                my ($wantarray, $sub, @args) = @$rec;
-                if (!defined($wantarray)) { # no waiting if client doesn't care
+sub ipc_worker_loop ($$$) {
+        my ($self, $r_req, $w_res) = @_;
+        my ($rec, $wantarray, $sub, @args);
+        local $/ = "\n";
+        while ($rec = _get_rec($r_req)) {
+                ($wantarray, $sub, @args) = @$rec;
+                # no waiting if client doesn't care,
+                # this is the overwhelmingly likely case
+                if (!defined($wantarray)) {
                         eval { $self->$sub(@args) };
-                        eval { warn "die: $@ (from nowait $sub)\n" } if $@;
+                        eval { warn "$$ die: $@ (from nowait $sub)\n" } if $@;
                 } elsif ($wantarray) {
                         my @ret = eval { $self->$sub(@args) };
-                        ipc_return($s2, \@ret, $@);
-                } else {
+                        ipc_return($w_res, \@ret, $@);
+                } else { # '' => wantscalar
                         my $ret = eval { $self->$sub(@args) };
-                        ipc_return($s2, \$ret, $@);
+                        ipc_return($w_res, \$ret, $@);
                 }
         }
 }
@@ -75,33 +77,34 @@ sub ipc_worker_loop ($$) {
 sub ipc_worker_spawn {
         my ($self, $ident, $oldset) = @_;
         return unless $enc; # no Sereal or Storable
-        my $pid = $self->{-ipc_worker_pid};
-        confess "BUG: already spawned PID:$pid" if $pid;
-        confess "BUG: already have worker socket" if $self->{-ipc_sock};
-        my ($s1, $s2);
-        socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!";
+        return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
+        delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
+        pipe(my ($r_req, $w_req)) or die "pipe: $!";
+        pipe(my ($r_res, $w_res)) or die "pipe: $!";
         my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
         my $parent = $$;
         $self->ipc_atfork_parent;
-        defined($pid = fork) or die "fork: $!";
+        defined(my $pid = fork) or die "fork: $!";
         if ($pid == 0) {
                 eval { PublicInbox::DS->Reset };
                 $self->{-ipc_parent_pid} = $parent;
-                close $s1 or die "close(\$s1): $!";
-                $s2->autoflush(1);
+                $w_req = $r_res = undef;
+                $w_res->autoflush(1);
                 $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
                 local $0 = $ident;
                 PublicInbox::Sigfd::sig_setmask($oldset);
                 my $on_destroy = $self->ipc_atfork_child;
-                eval { ipc_worker_loop($self, $s2) };
+                eval { ipc_worker_loop($self, $r_req, $w_res) };
                 die "worker $ident PID:$$ died: $@\n" if $@;
                 exit;
         }
         PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
-        close $s2 or die "close(\$s2): $!";
-        $s1->autoflush(1);
-        $self->{-ipc_sock} = $s1;
-        $self->{-ipc_worker_pid} = $pid;
+        $r_req = $w_res = undef;
+        $w_req->autoflush(1);
+        $self->{-ipc_req} = $w_req;
+        $self->{-ipc_res} = $r_res;
+        $self->{-ipc_ppid} = $$;
+        $self->{-ipc_pid} = $pid;
 }
 
 sub ipc_worker_reap { # dwaitpid callback
@@ -122,15 +125,18 @@ sub ipc_worker_exit {
 # idempotent, can be called regardless of whether worker is active or not
 sub ipc_worker_stop {
         my ($self) = @_;
-        my $pid;
-        my $s1 = delete $self->{-ipc_sock} or do {
-                $pid = delete $self->{-ipc_worker_pid} and
-                        die "unexpected PID:$pid without ipc_sock";
-                return;
-        };
-        $pid = delete $self->{-ipc_worker_pid} or die "no PID?";
-        _send_rec($s1, [ undef, 'ipc_worker_exit', 0 ]);
-        shutdown($s1, 2) or die "shutdown(\$s1) for PID:$pid";
+        my ($pid, $ppid) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
+        my ($w_req, $r_res) = delete(@$self{qw(-ipc_req -ipc_res)});
+        if (!$w_req && !$r_res) {
+                die "unexpected PID:$pid without IPC pipes" if $pid;
+                return; # idempotent
+        }
+        die 'no PID with IPC pipes' unless $pid;
+        _send_rec($w_req, [ undef, 'ipc_worker_exit', 0 ]);
+        $w_req = $r_res = undef;
+
+        # allow any sibling to send ipc_worker_exit, but siblings can't wait
+        return if $$ != $ppid;
         eval {
                 my $reap = $self->can('ipc_worker_reap');
                 PublicInbox::DS::dwaitpid($pid, $reap, $self);
@@ -153,17 +159,30 @@ sub ipc_lock_init {
 # call $self->$sub(@args), on a worker if ipc_worker_spawn was used
 sub ipc_do {
         my ($self, $sub, @args) = @_;
-        if (my $s1 = $self->{-ipc_sock}) {
+        if (my $w_req = $self->{-ipc_req}) { # run in worker
                 my $ipc_lock = $self->{-ipc_lock};
                 my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
-                _send_rec($s1, [ wantarray, $sub, @args ]);
-                return unless defined(wantarray);
-                my $ret = _get_rec($s1) // die "no response on $sub";
-                die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
-                wantarray ? @$ret : $$ret;
-        } else {
+                if (defined(wantarray)) {
+                        my $r_res = $self->{-ipc_res} or die 'no ipc_res';
+                        _send_rec($w_req, [ wantarray, $sub, @args ]);
+                        my $ret = _get_rec($r_res) // die "no response on $sub";
+                        die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
+                        wantarray ? @$ret : $$ret;
+                } else { # likely, fire-and-forget into pipe
+                        _send_rec($w_req, [ undef , $sub, @args ]);
+                }
+        } else { # run locally
                 $self->$sub(@args);
         }
 }
 
+# needed when there's multiple IPC workers and the parent forking
+# causes newer siblings to inherit older siblings sockets
+sub ipc_sibling_atfork_child {
+        my ($self) = @_;
+        my ($pid, undef) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
+        delete(@$self{qw(-ipc_req -ipc_res)});
+        $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
+}
+
 1;
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 83cbbb25..0051df93 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -21,6 +21,14 @@ sub new {
         if ($v2w->{parallel}) {
                 local $self->{-v2w_afc} = $v2w;
                 $self->ipc_worker_spawn("shard[$shard]");
+                # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size for
+                # inputs speeds V2Writable batch imports across 8 cores by
+                # nearly 20%.  Since any of our responses are small, make
+                # the response pipe as small as possible
+                if ($^O eq 'linux') {
+                        fcntl($self->{-ipc_req}, 1031, 1048576);
+                        fcntl($self->{-ipc_res}, 1031, 4096);
+                }
         }
         $self;
 }
@@ -36,7 +44,7 @@ sub _worker_done {
 sub ipc_atfork_child { # called automatically before ipc_worker_loop
         my ($self) = @_;
         my $v2w = delete $self->{-v2w_afc} or die 'BUG: {-v2w_afc} missing';
-        $v2w->atfork_child; # calls shard_atfork_child on our siblings
+        $v2w->atfork_child; # calls ipc_sibling_atfork_child on our siblings
         $v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__}
         $self->begin_txn_lazy;
         # caller must capture this:
@@ -49,17 +57,6 @@ sub index_eml {
         $self->ipc_do('add_message', $eml, $smsg);
 }
 
-# needed when there's multiple IPC workers and the parent forking
-# causes newer siblings to inherit older siblings sockets
-sub shard_atfork_child {
-        my ($self) = @_;
-        my $pid = delete($self->{-ipc_worker_pid}) or
-                        die "BUG: $$ no -ipc_worker_pid";
-        my $s1 = delete($self->{-ipc_sock}) or die "BUG: $$ no -ipc_sock";
-        $pid == $$ and die "BUG: $$ shard_atfork_child called on itself";
-        close($s1) or die "close -ipc_sock: $!";
-}
-
 # wait for return to determine when ipc_do('commit_txn_lazy') is done
 sub echo {
         shift;
@@ -80,7 +77,7 @@ sub shard_close {
 
 sub shard_over_check {
         my ($self, $over) = @_;
-        if ($self->{-ipc_sock} && $over->{dbh}) {
+        if ($self->{-ipc_req} && $over->{dbh}) {
                 # can't send DB handles over IPC
                 $over = ref($over)->new($over->{dbh}->sqlite_db_filename);
         }
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index c4efbdd2..6be95979 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -823,7 +823,7 @@ sub content_exists ($$$) {
 sub atfork_child {
         my ($self) = @_;
         if (my $older_siblings = $self->{idx_shards}) {
-                $_->shard_atfork_child for @$older_siblings;
+                $_->ipc_sibling_atfork_child for @$older_siblings;
         }
         if (my $im = $self->{im}) {
                 $im->atfork_child;