From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id BE7171FA12 for ; Sun, 3 Jan 2021 02:06:17 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 2/7] searchidxshard: use PublicInbox::IPC to kill lots of code Date: Sun, 3 Jan 2021 02:06:12 +0000 Message-Id: <20210103020617.15719-3-e@80x24.org> In-Reply-To: <20210103020617.15719-1-e@80x24.org> References: <20210103020617.15719-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: It's nice to prove the new code works by swapping it into the current V2Writable / SearchIdxShard packages. This is only the first step for the core bits, and we'll be able to delete more code in a subsequent patch. --- lib/PublicInbox/IPC.pm | 2 +- lib/PublicInbox/SearchIdx.pm | 9 +- lib/PublicInbox/SearchIdxShard.pm | 237 +++++++----------------------- lib/PublicInbox/V2Writable.pm | 63 +++----- 4 files changed, 74 insertions(+), 237 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 79cd34fe..6b7b3c7a 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -92,7 +92,7 @@ sub ipc_worker_spawn { $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); local $0 = $ident; PublicInbox::Sigfd::sig_setmask($oldset); - $self->ipc_atfork_child; + my $on_destroy = $self->ipc_atfork_child; eval { ipc_worker_loop($self, $s2) }; die "worker $ident PID:$$ died: $@\n" if $@; exit; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 88349faa..d83fd4ca 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -414,6 +414,7 @@ sub _msgmap_init ($) { sub add_message { # mime = PublicInbox::Eml or Email::MIME object my ($self, $mime, $smsg, $sync) = @_; + begin_txn_lazy($self); my $mids = mids_for_index($mime); $smsg //= bless { blob => '' }, 'PublicInbox::Smsg'; # test-only compat $smsg->{mid} //= $mids->[0]; # v1 compatibility @@ -1002,14 +1003,6 @@ sub commit_txn_lazy { $self->with_umask(\&_commit_txn, $self); } -sub worker_done { - my ($self) = @_; - if (need_xapian($self)) { - die "$$ $0 xdb not released\n" if $self->{xdb}; - } - die "$$ $0 still in transaction\n" if $self->{txn}; -} - sub eidx_shard_new { my ($class, $eidx, $shard) = @_; my $self = bless { diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index 45240e07..68644bc0 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -6,11 +6,8 @@ package PublicInbox::SearchIdxShard; use strict; use v5.10.1; -use parent qw(PublicInbox::SearchIdx); -use bytes qw(length); -use IO::Handle (); # autoflush -use PublicInbox::Eml; -use PublicInbox::Sigfd; +use parent qw(PublicInbox::SearchIdx PublicInbox::IPC); +use PublicInbox::OnDestroy; sub new { my ($class, $v2w, $shard) = @_; # v2w may be ExtSearchIdx @@ -21,238 +18,108 @@ sub new { $self->idx_acquire; $self->set_metadata_once; $self->idx_release; - $self->spawn_worker($v2w, $shard) if $v2w->{parallel}; + if ($v2w->{parallel}) { + local $self->{-v2w_afc} = $v2w; + $self->ipc_worker_spawn("shard[$shard]"); + } $self; } -sub spawn_worker { - my ($self, $v2w, $shard) = @_; - my ($r, $w); - pipe($r, $w) or die "pipe failed: $!\n"; - $w->autoflush(1); - my $oldset = PublicInbox::Sigfd::block_signals(); - my $pid = fork; - defined $pid or die "fork failed: $!\n"; - if ($pid == 0) { - eval { PublicInbox::DS->Reset }; - # these signals are localized in parent - $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); - PublicInbox::Sigfd::sig_setmask($oldset); - my $bnote = $v2w->atfork_child; - close $w or die "failed to close: $!"; - - # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here - # speeds V2Writable batch imports across 8 cores by nearly 20% - fcntl($r, 1031, 1048576) if $^O eq 'linux'; - - eval { shard_worker_loop($self, $v2w, $r, $shard, $bnote) }; - die "worker $shard died: $@\n" if $@; - die "unexpected MM $self->{mm}" if $self->{mm}; - exit; +sub _worker_done { + my ($self) = @_; + if ($self->need_xapian) { + die "$$ $0 xdb not released\n" if $self->{xdb}; } - PublicInbox::Sigfd::sig_setmask($oldset); - $self->{pid} = $pid; - $self->{w} = $w; - close $r or die "failed to close: $!"; -} - -sub eml ($$) { - my ($r, $len) = @_; - return if $len == 0; - my $n = read($r, my $bref, $len) or die "read: $!\n"; - $n == $len or die "short read: $n != $len\n"; - PublicInbox::Eml->new(\$bref); + die "$$ $0 still in transaction\n" if $self->{txn}; } -# this reads all the writes to $self->{w} from the parent process -sub shard_worker_loop ($$$$$) { - my ($self, $v2w, $r, $shard, $bnote) = @_; - $0 = "shard[$shard]"; +sub ipc_atfork_child { # called automatically before ipc_worker_loop + my ($self) = @_; + my $v2w = delete $self->{-v2w_afc} or die 'BUG: {-v2w_afc} missing'; + $v2w->atfork_child; # calls shard_atfork_child on our siblings + $v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__} $self->begin_txn_lazy; - while (my $line = readline($r)) { - chomp $line; - $v2w->{current_info} = "[$shard] $line"; - if ($line eq 'commit') { - $self->commit_txn_lazy; - } elsif ($line eq 'close') { - $self->idx_release; - } elsif ($line eq 'barrier') { - $self->commit_txn_lazy; - # no need to lock < 512 bytes is atomic under POSIX - print $bnote "barrier $shard\n" or - die "write failed for barrier $!\n"; - } elsif ($line =~ /\AD ([0-9]+)\z/s) { - $self->remove_by_docid($1 + 0); - } elsif ($line =~ s/\A\+X //) { - my ($len, $docid, $eidx_key) = split(/ /, $line, 3); - $self->add_eidx_info($docid, $eidx_key, eml($r, $len)); - } elsif ($line =~ s/\A-X //) { - my ($len, $docid, $eidx_key) = split(/ /, $line, 3); - $self->remove_eidx_info($docid, $eidx_key, - eml($r, $len)); - } elsif ($line =~ s/\A=K (\d+) //) { - $self->set_keywords($1 + 0, split(/ /, $line)); - } elsif ($line =~ s/\A-K (\d+) //) { - $self->remove_keywords($1 + 0, split(/ /, $line)); - } elsif ($line =~ s/\A\+K (\d+) //) { - $self->add_keywords($1 + 0, split(/ /, $line)); - } elsif ($line =~ s/\AO ([^\n]+)//) { - my $over_fn = $1; - $over_fn =~ tr/\0/\n/; - $self->over_check(PublicInbox::Over->new($over_fn)); - } else { - my $eidx_key; - if ($line =~ s/\AX=(.+)\0//) { - $eidx_key = $1; - $v2w->{current_info} =~ s/\0/\\0 /; - } - # n.b. $mid may contain spaces(!) - my ($len, $bytes, $num, $oid, $ds, $ts, $tid, $mid) - = split(/ /, $line, 8); - $self->begin_txn_lazy; - my $smsg = bless { - bytes => $bytes, - num => $num + 0, - blob => $oid, - mid => $mid, - tid => $tid, - ds => $ds, - ts => $ts, - }, 'PublicInbox::Smsg'; - $smsg->{eidx_key} = $eidx_key if defined($eidx_key); - $self->add_message(eml($r, $len), $smsg); - } - } - $self->worker_done; + # caller must capture this: + PublicInbox::OnDestroy->new($$, \&_worker_done, $self); } sub index_raw { my ($self, $msgref, $eml, $smsg, $eidx_key) = @_; - if (my $w = $self->{w}) { - my @ekey = defined($eidx_key) ? ("X=$eidx_key\0") : (); - $msgref //= \($eml->as_string); - $smsg->{raw_bytes} //= length($$msgref); - # mid must be last, it can contain spaces (but not LF) - print $w @ekey, join(' ', @$smsg{qw(raw_bytes bytes - num blob ds ts tid mid)}), - "\n", $$msgref or die "failed to write shard $!\n"; - } else { - if ($eml) { - undef($$msgref) if $msgref; - } else { # --xapian-only + --sequential-shard: - $eml = PublicInbox::Eml->new($msgref); - } - $self->begin_txn_lazy; - $smsg->{eidx_key} = $eidx_key if defined $eidx_key; - $self->add_message($eml, $smsg); + if ($eml) { + undef($$msgref) if $msgref; + } else { # --xapian-only + --sequential-shard: + $eml = PublicInbox::Eml->new($msgref); } + $smsg->{eidx_key} = $eidx_key if defined $eidx_key; + $self->ipc_do('add_message', $eml, $smsg); } sub shard_add_eidx_info { my ($self, $docid, $eidx_key, $eml) = @_; - if (my $w = $self->{w}) { - my $hdr = $eml->header_obj->as_string; - my $len = length($hdr); - print $w "+X $len $docid $eidx_key\n", $hdr or - die "failed to write shard: $!"; - } else { - $self->add_eidx_info($docid, $eidx_key, $eml); - } + $self->ipc_do('add_eidx_info', $docid, $eidx_key, $eml); } sub shard_remove_eidx_info { my ($self, $docid, $eidx_key, $eml) = @_; - if (my $w = $self->{w}) { - my $hdr = $eml ? $eml->header_obj->as_string : ''; - my $len = length($hdr); - print $w "-X $len $docid $eidx_key\n", $hdr or - die "failed to write shard: $!"; - } else { - $self->remove_eidx_info($docid, $eidx_key, $eml); - } + $self->ipc_do('remove_eidx_info', $docid, $eidx_key, $eml); } -sub atfork_child { - close $_[0]->{w} or die "failed to close write pipe: $!\n"; +# needed when there's multiple IPC workers and the parent forking +# causes newer siblings to inherit older siblings sockets +sub shard_atfork_child { + my ($self) = @_; + my $pid = delete($self->{-ipc_worker_pid}) or + die "BUG: $$ no -ipc_worker_pid"; + my $s1 = delete($self->{-ipc_sock}) or die "BUG: $$ no -ipc_sock"; + $pid == $$ and die "BUG: $$ shard_atfork_child called on itself"; + close($s1) or die "close -ipc_sock: $!"; } -sub shard_barrier { - my ($self) = @_; - if (my $w = $self->{w}) { - print $w "barrier\n" or die "failed to print: $!"; - } else { - $self->commit_txn_lazy; - } +# wait for return to determine when ipc_do('commit_txn_lazy') is done +sub echo { + shift; + "@_"; } -sub shard_commit { +sub idx_close { my ($self) = @_; - if (my $w = $self->{w}) { - print $w "commit\n" or die "failed to write commit: $!"; - } else { - $self->commit_txn_lazy; - } + die "transaction in progress $self\n" if $self->{txn}; + $self->idx_release if $self->{xdb}; } sub shard_close { my ($self) = @_; - if (my $w = delete $self->{w}) { - my $pid = delete $self->{pid} or die "no process to wait on\n"; - print $w "close\n" or die "failed to write to pid:$pid: $!\n"; - close $w or die "failed to close pipe for pid:$pid: $!\n"; - waitpid($pid, 0) == $pid or die "remote process did not finish"; - $? == 0 or die ref($self)." pid:$pid exited with: $?"; - } else { - die "transaction in progress $self\n" if $self->{txn}; - $self->idx_release if $self->{xdb}; - } + $self->ipc_do('idx_close'); + $self->ipc_worker_stop; } sub shard_remove { my ($self, $num) = @_; - if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child - print $w "D $num\n" or die "failed to write remove $!"; - } else { # same process - $self->remove_by_docid($num); - } + $self->ipc_do('remove_by_docid', $num); } sub shard_set_keywords { my ($self, $docid, @kw) = @_; - if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child - print $w "=K $docid @kw\n" or die "failed to write: $!"; - } else { # same process - $self->set_keywords($docid, @kw); - } + $self->ipc_do('set_keywords', $docid, @kw); } sub shard_remove_keywords { my ($self, $docid, @kw) = @_; - if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child - print $w "-K $docid @kw\n" or die "failed to write: $!"; - } else { # same process - $self->remove_keywords($docid, @kw); - } + $self->ipc_do('remove_keywords', $docid, @kw); } sub shard_add_keywords { my ($self, $docid, @kw) = @_; - if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child - print $w "+K $docid @kw\n" or die "failed to write: $!"; - } else { # same process - $self->add_keywords($docid, @kw); - } + $self->ipc_do('add_keywords', $docid, @kw); } sub shard_over_check { my ($self, $over) = @_; - if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child - my ($over_fn) = $over->{dbh}->sqlite_db_filename; - $over_fn =~ tr/\n/\0/; - print $w "O $over_fn\n" or die "failed to write over $!"; - } else { - $self->over_check($over); + if ($self->{-ipc_sock} && $over->{dbh}) { + # can't send DB handles over IPC + $over = ref($over)->new($over->{dbh}->sqlite_db_filename); } + $self->ipc_do('over_check', $over); } 1; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 724fa804..cad559c5 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -287,16 +287,7 @@ sub _idx_init { # with_umask callback sub parallel_init ($$) { my ($self, $indexlevel) = @_; - if (($indexlevel // 'full') eq 'basic') { - $self->{parallel} = 0; - } else { - pipe(my ($r, $w)) or die "pipe failed: $!"; - # pipe for barrier notifications doesn't need to be big, - # 1031: F_SETPIPE_SZ - fcntl($w, 1031, 4096) if $^O eq 'linux'; - $self->{bnote} = [ $r, $w ]; - $w->autoflush(1); - } + $self->{parallel} = 0 if ($indexlevel // 'full') eq 'basic'; } # idempotent @@ -574,24 +565,6 @@ sub set_last_commits ($) { # this is NOT for ExtSearchIdx } } -sub barrier_init { - my ($self, $n) = @_; - $self->{bnote} or return; - --$n; - my $barrier = { map { $_ => 1 } (0..$n) }; -} - -sub barrier_wait { - my ($self, $barrier) = @_; - my $bnote = $self->{bnote} or return; - my $r = $bnote->[0]; - while (scalar keys %$barrier) { - defined(my $l = readline($r)) or die "EOF on barrier_wait: $!"; - $l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l"; - delete $barrier->{$1} or die "bad shard[$1] on barrier wait"; - } -} - # public sub checkpoint ($;$) { my ($self, $wait) = @_; @@ -615,16 +588,23 @@ sub checkpoint ($;$) { $self->{oidx}->commit_lazy; # Now deal with Xapian - if ($wait) { - my $barrier = barrier_init($self, scalar @$shards); - - # each shard needs to issue a barrier command - $_->shard_barrier for @$shards; - # wait for each Xapian shard - barrier_wait($self, $barrier); - } else { - $_->shard_commit for @$shards; + # start commit_txn_lazy asynchronously on all parallel shards + # (non-parallel waits here) + $_->ipc_do('commit_txn_lazy') for @$shards; + + # transactions started on parallel shards, + # wait for them by issuing an echo command (echo can only + # run after commit_txn_lazy is done) + if ($wait && $self->{parallel}) { + my $i = 0; + for my $shard (@$shards) { + my $echo = $shard->ipc_do('echo', $i); + $echo == $i or die <<""; +shard[$i] bad echo:$echo != $i waiting for txn commit + + ++$i; + } } my $midx = $self->{midx}; # misc index @@ -679,7 +659,6 @@ sub done { eval { $self->{oidx}->dbh_close }; $err .= "over close: $@\n" if $@; delete $self->{midx}; - delete $self->{bnote}; my $nbytes = $self->{total_bytes}; $self->{total_bytes} = 0; $self->lock_release(!!$nbytes) if $shards; @@ -846,15 +825,13 @@ sub content_exists ($$$) { sub atfork_child { my ($self) = @_; - if (my $shards = $self->{idx_shards}) { - $_->atfork_child foreach @$shards; + if (my $older_siblings = $self->{idx_shards}) { + $_->shard_atfork_child for @$older_siblings; } if (my $im = $self->{im}) { $im->atfork_child; } - die "unexpected mm" if $self->{mm}; - close $self->{bnote}->[0] or die "close bnote[0]: $!\n"; - $self->{bnote}->[1]; + die "BUG: unexpected mm" if $self->{mm}; } sub reindex_checkpoint ($$) {