user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 0/7] "lei q -o imaps://..." support
@ 2021-02-21  7:41  7% Eric Wong
  2021-02-21  7:41  5% ` [PATCH 7/7] lei2mail: parallel augment for lock-free stores Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2021-02-21  7:41 UTC (permalink / raw)
  To: meta

-a/--augment dedupe is now parallel for both Maildirs and IMAP
stores (probably not worth the serialization cost for mbox*).

LeiAuth remains inefficient, unfortunately; but wq_broadcast
has been added to address it in the future.

The parallelization work for IMAP for "lei q" can also be done
for "lei convert" and "lei import", but it'll probably be opt-in
in case people care about preserving UID order.

Eric Wong (7):
  inbox_writable: require PublicInbox::MdirReader
  lei q: support IMAP/IMAPS --output destinations
  ipc: add wq_broadcast
  lei q: move augment into lei2mail workers
  ipc: support setting a locked number of WQ workers
  net_reader: use and accept URIimap objects in more places
  lei2mail: parallel augment for lock-free stores

 lib/PublicInbox/IPC.pm           |  35 +++++++--
 lib/PublicInbox/InboxWritable.pm |   1 +
 lib/PublicInbox/LeiAuth.pm       |   2 +-
 lib/PublicInbox/LeiOverview.pm   |   7 +-
 lib/PublicInbox/LeiQuery.pm      |  24 +++++--
 lib/PublicInbox/LeiToMail.pm     |  93 ++++++++++++++++++++++--
 lib/PublicInbox/LeiXSearch.pm    |  48 ++++++-------
 lib/PublicInbox/NetReader.pm     |  75 +++++++++++---------
 lib/PublicInbox/NetWriter.pm     |  12 ++++
 lib/PublicInbox/WQWorker.pm      |   8 +--
 lib/PublicInbox/Watch.pm         |  11 +--
 t/ipc.t                          |  39 +++++-----
 t/lei-externals.t                |   3 +-
 xt/net_writer-imap.t             | 118 ++++++++++++++++++++++++++++---
 14 files changed, 362 insertions(+), 114 deletions(-)


^ permalink raw reply	[relevance 7%]

* [PATCH 7/7] lei2mail: parallel augment for lock-free stores
  2021-02-21  7:41  7% [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
@ 2021-02-21  7:41  5% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2021-02-21  7:41 UTC (permalink / raw)
  To: meta

This lets us make use of multiple cores on IMAP and Maildir
backed by SSD (or better) storage.  This benefits IMAP stores
with high network latency, but may still penalize IMAP servers
with rotational storage.
---
 lib/PublicInbox/LeiToMail.pm  | 32 ++++++++++++++++++++++++++++----
 lib/PublicInbox/LeiXSearch.pm | 26 ++++++++++++++++----------
 lib/PublicInbox/NetReader.pm  |  9 +++++++--
 3 files changed, 51 insertions(+), 16 deletions(-)

diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index b5d560c7..6efd398a 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -19,6 +19,7 @@ use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use Errno qw(EEXIST ESPIPE ENOENT EPIPE);
+use Digest::SHA qw(sha256_hex);
 my ($maildir_each_file);
 
 # struggles with short-lived repos, Gcf2Client makes little sense with lei;
@@ -269,7 +270,15 @@ sub _mbox_write_cb ($$) {
 }
 
 sub _augment_file { # maildir_each_file cb
-	my ($f, $lei) = @_;
+	my ($f, $lei, $mod, $shard) = @_;
+	if ($mod) {
+		# can't get dirent.d_ino w/ pure Perl, so we extract the OID
+		# if it looks like one:
+		my $hex = $f =~ m!\b([a-f0-9]{40,})[^/]*\z! ?
+				$1 : sha256_hex($f);
+		my $recno = hex(substr($hex, 0, 8));
+		return if ($recno % $mod) != $shard;
+	}
 	my $eml = PublicInbox::InboxWritable::eml_from_path($f) or return;
 	_augment($eml, $lei);
 }
@@ -421,7 +430,9 @@ sub _do_augment_maildir {
 	if ($lei->{opt}->{augment}) {
 		my $dedupe = $lei->{dedupe};
 		if ($dedupe && $dedupe->prepare_dedupe) {
-			$maildir_each_file->($dst, \&_augment_file, $lei);
+			my ($mod, $shard) = @{$self->{shard_info} // []};
+			$maildir_each_file->($dst, \&_augment_file,
+						$lei, $mod, $shard);
 			$dedupe->pause_dedupe;
 		}
 	} else { # clobber existing Maildir
@@ -516,11 +527,24 @@ sub ipc_atfork_child {
 	my ($self) = @_;
 	my $lei = delete $self->{lei};
 	$lei->lei_atfork_child;
-	if ($self->{-wq_worker_nr} == 0) {
+	my $aug;
+	if (lock_free($self)) {
+		my $mod = $self->{-wq_nr_workers};
+		my $shard = $self->{-wq_worker_nr};
+		if (my $nwr = $lei->{nwr}) {
+			$nwr->{shard_info} = [ $mod, $shard ];
+		} else { # Maildir (MH?)
+			$self->{shard_info} = [ $mod, $shard ];
+		}
+		$aug = '+'; # incr_post_augment
+	} elsif ($self->{-wq_worker_nr} == 0) {
+		$aug = '.'; # do_post_augment
+	}
+	if ($aug) {
 		local $0 = 'do_augment';
 		eval { do_augment($self, $lei) };
 		$lei->fail($@) if $@;
-		pkt_do($lei->{pkt_op_p}, '.') == 1 or
+		pkt_do($lei->{pkt_op_p}, $aug) == 1 or
 					die "do_post_augment trigger: $!";
 	}
 	if (my $zpipe = delete $lei->{zpipe}) {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 524f4d1c..e982165f 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -331,18 +331,16 @@ Error closing $lei->{ovv}->{dst}: $!
 
 sub do_post_augment {
 	my ($lei) = @_;
-	my $l2m = $lei->{l2m};
+	my $l2m = $lei->{l2m} or die 'BUG: unexpected do_post_augment';
 	my $err;
-	if ($l2m) {
-		eval { $l2m->post_augment($lei) };
-		$err = $@;
-		if ($err) {
-			if (my $lxs = delete $lei->{lxs}) {
-				$lxs->wq_kill;
-				$lxs->wq_close(0, undef, $lei);
-			}
-			$lei->fail("$err");
+	eval { $l2m->post_augment($lei) };
+	$err = $@;
+	if ($err) {
+		if (my $lxs = delete $lei->{lxs}) {
+			$lxs->wq_kill;
+			$lxs->wq_close(0, undef, $lei);
 		}
+		$lei->fail("$err");
 	}
 	if (!$err && delete $lei->{early_mua}) { # non-augment case
 		$lei->start_mua;
@@ -350,6 +348,13 @@ sub do_post_augment {
 	close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
 }
 
+sub incr_post_augment { # called whenever an l2m shard finishes
+	my ($lei) = @_;
+	my $l2m = $lei->{l2m} or die 'BUG: unexpected incr_post_augment';
+	return if ++$lei->{nr_post_augment} != $l2m->{-wq_nr_workers};
+	do_post_augment($lei);
+}
+
 my $MAX_PER_HOST = 4;
 
 sub concurrency {
@@ -392,6 +397,7 @@ sub do_query {
 		'|' => [ $lei->can('sigpipe_handler'), $lei ],
 		'!' => [ $lei->can('fail_handler'), $lei ],
 		'.' => [ \&do_post_augment, $lei ],
+		'+' => [ \&incr_post_augment, $lei ],
 		'' => [ \&query_done, $lei ],
 		'mset_progress' => [ \&mset_progress, $lei ],
 		'x_it' => [ $lei->can('x_it'), $lei ],
diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm
index 4c412491..0956d5da 100644
--- a/lib/PublicInbox/NetReader.pm
+++ b/lib/PublicInbox/NetReader.pm
@@ -363,8 +363,11 @@ sub _imap_fetch_all ($$$) {
 	}
 	return if $l_uid >= $r_uid; # nothing to do
 	$l_uid ||= 1;
-
-	warn "# $uri fetching UID $l_uid:$r_uid\n" unless $self->{quiet};
+	my ($mod, $shard) = @{$self->{shard_info} // []};
+	unless ($self->{quiet}) {
+		my $m = $mod ? " [(UID % $mod) == $shard]" : '';
+		warn "# $uri fetching UID $l_uid:$r_uid$m\n";
+	}
 	$mic->Uid(1); # the default, we hope
 	my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1;
 	my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK';
@@ -391,6 +394,8 @@ sub _imap_fetch_all ($$$) {
 		$l_uid = $uids->[-1] + 1; # for next search
 		my $last_uid;
 		my $n = $self->{max_batch};
+
+		@$uids = grep { ($_ % $mod) == $shard } @$uids if $mod;
 		while (scalar @$uids) {
 			my @batch = splice(@$uids, 0, $bs);
 			$batch = join(',', @batch);

^ permalink raw reply related	[relevance 5%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-02-21  7:41  7% [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
2021-02-21  7:41  5% ` [PATCH 7/7] lei2mail: parallel augment for lock-free stores Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).