From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id D649E1FC0B for ; Sun, 21 Feb 2021 07:41:35 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 7/7] lei2mail: parallel augment for lock-free stores Date: Sun, 21 Feb 2021 07:41:34 +0000 Message-Id: <20210221074134.15084-8-e@80x24.org> In-Reply-To: <20210221074134.15084-1-e@80x24.org> References: <20210221074134.15084-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This lets us make use of multiple cores on IMAP and Maildir backed by SSD (or better) storage. This benefits IMAP stores with high network latency, but may still penalize IMAP servers with rotational storage. --- lib/PublicInbox/LeiToMail.pm | 32 ++++++++++++++++++++++++++++---- lib/PublicInbox/LeiXSearch.pm | 26 ++++++++++++++++---------- lib/PublicInbox/NetReader.pm | 9 +++++++-- 3 files changed, 51 insertions(+), 16 deletions(-) diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index b5d560c7..6efd398a 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -19,6 +19,7 @@ use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); use Errno qw(EEXIST ESPIPE ENOENT EPIPE); +use Digest::SHA qw(sha256_hex); my ($maildir_each_file); # struggles with short-lived repos, Gcf2Client makes little sense with lei; @@ -269,7 +270,15 @@ sub _mbox_write_cb ($$) { } sub _augment_file { # maildir_each_file cb - my ($f, $lei) = @_; + my ($f, $lei, $mod, $shard) = @_; + if ($mod) { + # can't get dirent.d_ino w/ pure Perl, so we extract the OID + # if it looks like one: + my $hex = $f =~ m!\b([a-f0-9]{40,})[^/]*\z! ? + $1 : sha256_hex($f); + my $recno = hex(substr($hex, 0, 8)); + return if ($recno % $mod) != $shard; + } my $eml = PublicInbox::InboxWritable::eml_from_path($f) or return; _augment($eml, $lei); } @@ -421,7 +430,9 @@ sub _do_augment_maildir { if ($lei->{opt}->{augment}) { my $dedupe = $lei->{dedupe}; if ($dedupe && $dedupe->prepare_dedupe) { - $maildir_each_file->($dst, \&_augment_file, $lei); + my ($mod, $shard) = @{$self->{shard_info} // []}; + $maildir_each_file->($dst, \&_augment_file, + $lei, $mod, $shard); $dedupe->pause_dedupe; } } else { # clobber existing Maildir @@ -516,11 +527,24 @@ sub ipc_atfork_child { my ($self) = @_; my $lei = delete $self->{lei}; $lei->lei_atfork_child; - if ($self->{-wq_worker_nr} == 0) { + my $aug; + if (lock_free($self)) { + my $mod = $self->{-wq_nr_workers}; + my $shard = $self->{-wq_worker_nr}; + if (my $nwr = $lei->{nwr}) { + $nwr->{shard_info} = [ $mod, $shard ]; + } else { # Maildir (MH?) + $self->{shard_info} = [ $mod, $shard ]; + } + $aug = '+'; # incr_post_augment + } elsif ($self->{-wq_worker_nr} == 0) { + $aug = '.'; # do_post_augment + } + if ($aug) { local $0 = 'do_augment'; eval { do_augment($self, $lei) }; $lei->fail($@) if $@; - pkt_do($lei->{pkt_op_p}, '.') == 1 or + pkt_do($lei->{pkt_op_p}, $aug) == 1 or die "do_post_augment trigger: $!"; } if (my $zpipe = delete $lei->{zpipe}) { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 524f4d1c..e982165f 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -331,18 +331,16 @@ Error closing $lei->{ovv}->{dst}: $! sub do_post_augment { my ($lei) = @_; - my $l2m = $lei->{l2m}; + my $l2m = $lei->{l2m} or die 'BUG: unexpected do_post_augment'; my $err; - if ($l2m) { - eval { $l2m->post_augment($lei) }; - $err = $@; - if ($err) { - if (my $lxs = delete $lei->{lxs}) { - $lxs->wq_kill; - $lxs->wq_close(0, undef, $lei); - } - $lei->fail("$err"); + eval { $l2m->post_augment($lei) }; + $err = $@; + if ($err) { + if (my $lxs = delete $lei->{lxs}) { + $lxs->wq_kill; + $lxs->wq_close(0, undef, $lei); } + $lei->fail("$err"); } if (!$err && delete $lei->{early_mua}) { # non-augment case $lei->start_mua; @@ -350,6 +348,13 @@ sub do_post_augment { close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch } +sub incr_post_augment { # called whenever an l2m shard finishes + my ($lei) = @_; + my $l2m = $lei->{l2m} or die 'BUG: unexpected incr_post_augment'; + return if ++$lei->{nr_post_augment} != $l2m->{-wq_nr_workers}; + do_post_augment($lei); +} + my $MAX_PER_HOST = 4; sub concurrency { @@ -392,6 +397,7 @@ sub do_query { '|' => [ $lei->can('sigpipe_handler'), $lei ], '!' => [ $lei->can('fail_handler'), $lei ], '.' => [ \&do_post_augment, $lei ], + '+' => [ \&incr_post_augment, $lei ], '' => [ \&query_done, $lei ], 'mset_progress' => [ \&mset_progress, $lei ], 'x_it' => [ $lei->can('x_it'), $lei ], diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index 4c412491..0956d5da 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -363,8 +363,11 @@ sub _imap_fetch_all ($$$) { } return if $l_uid >= $r_uid; # nothing to do $l_uid ||= 1; - - warn "# $uri fetching UID $l_uid:$r_uid\n" unless $self->{quiet}; + my ($mod, $shard) = @{$self->{shard_info} // []}; + unless ($self->{quiet}) { + my $m = $mod ? " [(UID % $mod) == $shard]" : ''; + warn "# $uri fetching UID $l_uid:$r_uid$m\n"; + } $mic->Uid(1); # the default, we hope my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1; my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK'; @@ -391,6 +394,8 @@ sub _imap_fetch_all ($$$) { $l_uid = $uids->[-1] + 1; # for next search my $last_uid; my $n = $self->{max_batch}; + + @$uids = grep { ($_ % $mod) == $shard } @$uids if $mod; while (scalar @$uids) { my @batch = splice(@$uids, 0, $bs); $batch = join(',', @batch);