user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 3/3] lei import: speed up repeated Maildir imports
  @ 2021-06-08  9:50  5% ` Eric Wong
  0 siblings, 0 replies; 3+ results
From: Eric Wong @ 2021-06-08  9:50 UTC (permalink / raw)
  To: meta

On a 4-core CPU, this speeds up "lei import" on a largish
Maildir inbox with 75K messages from ~8 minutes down to ~40s.

Parallelizing alone did not bring any improvement and may
even hurt performance slightly, depending on CPU availability.
However, creating the index on the "fid" and "name" columns in
blob2name yields us the same speedup we got.

Parallelizing IMAP makes more sense due to the fact most IMAP
stores are non-local and subject to network latency.

Followup-to: bdecd7ed8e0dcf0b45491b947cd737ba8cfe38a3 ("lei import: speed up kw updates for old IMAP messages")
---
 MANIFEST                       |  1 +
 lib/PublicInbox/LEI.pm         | 11 +++---
 lib/PublicInbox/LeiImport.pm   | 36 ++++++++++++------
 lib/PublicInbox/LeiIndex.pm    |  2 +-
 lib/PublicInbox/LeiInput.pm    | 31 +++++++++++-----
 lib/PublicInbox/LeiMailSync.pm | 14 +++++++
 lib/PublicInbox/LeiPmdir.pm    | 67 ++++++++++++++++++++++++++++++++++
 lib/PublicInbox/MdirReader.pm  | 22 ++++++-----
 t/lei-import-maildir.t         |  2 +-
 9 files changed, 148 insertions(+), 38 deletions(-)
 create mode 100644 lib/PublicInbox/LeiPmdir.pm

diff --git a/MANIFEST b/MANIFEST
index 5a70a144..7bdbf252 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -221,6 +221,7 @@ lib/PublicInbox/LeiMailSync.pm
 lib/PublicInbox/LeiMirror.pm
 lib/PublicInbox/LeiOverview.pm
 lib/PublicInbox/LeiP2q.pm
+lib/PublicInbox/LeiPmdir.pm
 lib/PublicInbox/LeiQuery.pm
 lib/PublicInbox/LeiRediff.pm
 lib/PublicInbox/LeiRemote.pm
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index ed01e8de..77fc5b8f 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -240,7 +240,7 @@ our %CMD = ( # sorted in order of importance/use:
 	 @c_opt ],
 'import' => [ 'LOCATION...|--stdin',
 	'one-time import/update from URL or filesystem',
-	qw(stdin| offset=i recursive|r exclude=s include|I=s
+	qw(stdin| offset=i recursive|r exclude=s include|I=s jobs=s
 	lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!),
 	qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt ],
 'forget-mail-sync' => [ 'LOCATION...',
@@ -421,7 +421,7 @@ my %CONFIG_KEYS = (
 	'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1); # internal workers
 
 sub _drop_wq {
 	my ($self) = @_;
@@ -566,7 +566,7 @@ sub pkt_op_pair {
 }
 
 sub workers_start {
-	my ($lei, $wq, $jobs, $ops) = @_;
+	my ($lei, $wq, $jobs, $ops, $flds) = @_;
 	$ops = {
 		'!' => [ \&fail_handler, $lei ],
 		'|' => [ \&sigpipe_handler, $lei ],
@@ -577,7 +577,8 @@ sub workers_start {
 	$ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
 	my $end = $lei->pkt_op_pair;
 	my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
-	$wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+	$flds->{lei} = $lei;
+	$wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
 	delete $lei->{pkt_op_p};
 	my $op_c = delete $lei->{pkt_op_c};
 	# {-lei_sock} persists script/lei process until ops->{''} EOF callback
@@ -590,7 +591,7 @@ sub workers_start {
 # call this when we're ready to wait on events and yield to other clients
 sub wait_wq_events {
 	my ($lei, $op_c, $ops) = @_;
-	for my $wq (grep(defined, @$lei{qw(ikw)})) { # auxiliary WQs
+	for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
 		$wq->wq_close(1);
 	}
 	$op_c->{ops} = $ops;
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 222f75c8..b0e7ba6b 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -6,6 +6,7 @@ package PublicInbox::LeiImport;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::InboxWritable qw(eml_from_path);
 
 # /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass
 
@@ -28,17 +29,26 @@ sub input_mbox_cb { # MboxReader callback
 	input_eml_cb($self, $eml, $vmd);
 }
 
-sub input_maildir_cb { # maildir_each_eml cb
-	my ($f, $kw, $eml, $self) = @_;
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+	my ($self, $f, @args) = @_;
+	my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or
+		die "BUG: $f was not from a Maildir?\n";
+	my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn);
+	return if index($fl, 'T') >= 0; # no Trashed messages
+	my $kw = PublicInbox::MdirReader::flags2kw($fl);
+	substr($folder, 0, 0) = 'maildir:'; # add prefix
+	my $lms = $self->{-lms_ro};
+	my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef;
+	my @docids = defined($oidbin) ?
+			$self->{over}->oidbin_exists($oidbin) : ();
 	my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
-	if ($self->{-mail_sync}) {
-		if ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) { # ugh...
-			$vmd->{sync_info} = [ "maildir:$1", \(my $n = $2) ];
-		} else {
-			warn "E: $f was not from a Maildir?\n";
-		}
+	if (scalar @docids) {
+		$self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+	}
+	if (my $eml = eml_from_path($f)) {
+		$vmd->{sync_info} = [ $folder, \$bn ] if $self->{-mail_sync};
+		$self->input_eml_cb($eml, $vmd);
 	}
-	$self->input_eml_cb($eml, $vmd);
 }
 
 sub input_net_cb { # imap_each / nntp_each
@@ -62,11 +72,13 @@ sub do_import_index ($$@) {
 	my $vmd_mod = $self->vmd_mod_extract(\@inputs);
 	return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
 	$self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod;
-	$self->prepare_inputs($lei, \@inputs) or return;
+	$lei->ale; # initialize for workers to read (before LeiPmdir->new)
 	$self->{-mail_sync} = $lei->{opt}->{'mail-sync'} // 1;
+	$self->prepare_inputs($lei, \@inputs) or return;
 
-	$lei->ale; # initialize for workers to read
-	my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+	my $j = $lei->{opt}->{jobs} // 0;
+	$j =~ /\A([0-9]+),[0-9]+\z/ and $j = $1 + 0;
+	$j ||= scalar(@{$self->{inputs}}) || 1;
 	my $ikw;
 	if (my $net = $lei->{net}) {
 		# $j = $net->net_concurrency($j); TODO
diff --git a/lib/PublicInbox/LeiIndex.pm b/lib/PublicInbox/LeiIndex.pm
index cc3e83e7..4be0c649 100644
--- a/lib/PublicInbox/LeiIndex.pm
+++ b/lib/PublicInbox/LeiIndex.pm
@@ -35,7 +35,7 @@ sub lei_index {
 
 no warnings 'once';
 no strict 'refs';
-for my $m (qw(input_maildir_cb input_net_cb)) {
+for my $m (qw(pmdir_cb input_net_cb)) {
 	*$m = PublicInbox::LeiImport->can($m);
 }
 
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index 4ff7a379..24211bf0 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -151,9 +151,16 @@ sub input_path_url {
 		return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
 $input appears to be a maildir, not $ifmt
 EOM
-		PublicInbox::MdirReader->new->maildir_each_eml($input,
-					$self->can('input_maildir_cb'),
-					$self, @args);
+		my $mdr = PublicInbox::MdirReader->new;
+		if (my $pmd = $self->{pmd}) {
+			$mdr->maildir_each_file($input,
+						$pmd->can('each_mdir_fn'),
+						$pmd, @args);
+		} else {
+			$mdr->maildir_each_eml($input,
+						$self->can('input_maildir_cb'),
+						$self, @args);
+		}
 	} else {
 		$lei->fail("$input unsupported (TODO)");
 	}
@@ -215,7 +222,7 @@ sub prepare_inputs { # returns undef on error
 		push @{$sync->{no}}, '/dev/stdin' if $sync;
 	}
 	my $net = $lei->{net}; # NetWriter may be created by l2m
-	my (@f, @d);
+	my (@f, @md);
 	# e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX
 	for my $input (@$inputs) {
 		my $input_path = $input;
@@ -247,11 +254,11 @@ sub prepare_inputs { # returns undef on error
 				PublicInbox::MboxReader->reads($ifmt) or return
 					$lei->fail("$ifmt not supported");
 			} elsif (-d $input_path) {
-				require PublicInbox::MdirReader;
 				$ifmt eq 'maildir' or return
 					$lei->fail("$ifmt not supported");
 				$sync and $input = 'maildir:'.
 						$lei->abs_path($input_path);
+				push @md, $input;
 			} else {
 				return $lei->fail("Unable to handle $input");
 			}
@@ -266,21 +273,18 @@ $input is `eml', not --in-format=$in_fmt
 			if ($devfd >= 0 || -f $input || -p _) {
 				push @{$sync->{no}}, $input if $sync;
 				push @f, $input;
-			} elsif (-d $input) {
+			} elsif (-d "$input/new" && -d "$input/cur") {
 				if ($sync) {
 					$input = $lei->abs_path($input);
 					push @{$sync->{ok}}, $input;
 				}
-				push @d, $input;
+				push @md, $input;
 			} else {
 				return $lei->fail("Unable to handle $input")
 			}
 		}
 	}
 	if (@f) { check_input_format($lei, \@f) or return }
-	if (@d) { # TODO: check for MH vs Maildir, here
-		require PublicInbox::MdirReader;
-	}
 	if ($sync && $sync->{no}) {
 		return $lei->fail(<<"") if !$sync->{ok};
 --mail-sync specified but no inputs support it
@@ -299,6 +303,13 @@ $input is `eml', not --in-format=$in_fmt
 		$lei->{auth} //= PublicInbox::LeiAuth->new;
 		$lei->{net} //= $net;
 	}
+	if (scalar(@md)) {
+		require PublicInbox::MdirReader;
+		if ($self->can('pmdir_cb')) {
+			require PublicInbox::LeiPmdir;
+			$self->{pmd} = PublicInbox::LeiPmdir->new($lei, $self);
+		}
+	}
 	$self->{inputs} = $inputs;
 }
 
diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm
index 75603d89..ec05404a 100644
--- a/lib/PublicInbox/LeiMailSync.pm
+++ b/lib/PublicInbox/LeiMailSync.pm
@@ -66,6 +66,10 @@ CREATE TABLE IF NOT EXISTS blob2name (
 	UNIQUE (oidbin, fid, name)
 )
 
+	# speeds up LeiImport->pmdir_cb (for "lei import") by ~6x:
+	$dbh->do(<<'');
+CREATE INDEX IF NOT EXISTS idx_fid_name ON blob2name(fid,name)
+
 }
 
 sub fid_for {
@@ -375,6 +379,16 @@ EOM
 	$sth->fetchrow_array;
 }
 
+sub name_oidbin ($$$) {
+	my ($self, $mdir, $nm) = @_;
+	my $fid = $self->{fmap}->{$mdir} //= fid_for($self, $mdir) // return;
+	my $sth = $self->{dbh}->prepare_cached(<<EOM, undef, 1);
+SELECT oidbin FROM blob2name WHERE fid = ? AND name = ?
+EOM
+	$sth->execute($fid, $nm);
+	$sth->fetchrow_array;
+}
+
 sub imap_oid {
 	my ($self, $lei, $uid_uri) = @_;
 	my $mailbox_uri = $uid_uri->clone;
diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm
new file mode 100644
index 00000000..5efb012e
--- /dev/null
+++ b/lib/PublicInbox/LeiPmdir.pm
@@ -0,0 +1,67 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# WQ worker for dealing with parallel Maildir reads;
+# this does NOT use the {shard_info} field of LeiToMail
+# (and we may remove {shard_info})
+# WQ key: {pmd}
+package PublicInbox::LeiPmdir;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+
+sub new {
+	my ($cls, $lei, $ipt) = @_;
+	my $self = bless { -wq_ident => 'lei Maildir worker' }, $cls;
+	my $jobs = $lei->{opt}->{jobs};
+	$jobs =~ /\A[0-9]+,([0-9]+)\z/ and $jobs = $1;
+	my $nproc = $jobs // do {
+		# untested with >=4 CPUs, though I suspect I/O latency
+		# of SATA SSD storage will make >=4 processes unnecessary,
+		# here.  NVMe users may wish to use '-j'
+		my $n = $self->detect_nproc;
+		$n = 4 if $n > 4;
+	};
+	my ($op_c, $ops) = $lei->workers_start($self, $nproc,
+		undef, { ipt => $ipt }); # LeiInput subclass
+	$op_c->{ops} = $ops; # for PktOp->event_step
+	$lei->{pmd} = $self;
+}
+
+sub ipc_atfork_child {
+	my ($self) = @_;
+	my $lei = $self->{lei};
+	$lei->_lei_atfork_child;
+	my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
+	$ipt->{lei} = $lei;
+	$ipt->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}';
+	$ipt->{lse} = $ipt->{sto}->search;
+	$ipt->{over} = $ipt->{lse}->over;
+	$ipt->{-lms_ro} //= $ipt->{lse}->lms; # may be undef or '0'
+	$self->SUPER::ipc_atfork_child;
+}
+
+sub each_mdir_fn { # maildir_each_file callback
+	my ($f, $self, @args) = @_;
+	$self->wq_io_do('mdir_iter', [], $f, @args);
+}
+
+sub mdir_iter { # via wq_io_do
+	my ($self, $f, @args) = @_;
+	$self->{ipt}->pmdir_cb($f, @args);
+}
+
+sub pmd_done_wait {
+	my ($arg, $pid) = @_;
+	my ($self, $lei) = @$arg;
+	my $wait = $lei->{sto}->ipc_do('done');
+	$lei->can('wq_done_wait')->($arg, $pid);
+}
+
+sub _lei_wq_eof { # EOF callback for main lei daemon
+	my ($lei) = @_;
+	my $pmd = delete $lei->{pmd} or return $lei->fail;
+	$pmd->wq_wait_old(\&pmd_done_wait, $lei);
+}
+
+1;
diff --git a/lib/PublicInbox/MdirReader.pm b/lib/PublicInbox/MdirReader.pm
index 304be63d..484bf0a8 100644
--- a/lib/PublicInbox/MdirReader.pm
+++ b/lib/PublicInbox/MdirReader.pm
@@ -87,17 +87,21 @@ sub maildir_each_eml {
 sub new { bless {}, __PACKAGE__ }
 
 sub flags2kw ($) {
-	my @unknown;
-	my %kw;
-	for (split(//, $_[0])) {
-		my $k = $c2kw{$_};
-		if (defined($k)) {
-			$kw{$k} = 1;
-		} else {
-			push @unknown, $_;
+	if (wantarray) {
+		my @unknown;
+		my %kw;
+		for (split(//, $_[0])) {
+			my $k = $c2kw{$_};
+			if (defined($k)) {
+				$kw{$k} = 1;
+			} else {
+				push @unknown, $_;
+			}
 		}
+		(\%kw, \@unknown);
+	} else {
+		[ sort(map { $c2kw{$_} // () } split(//, $_[0])) ];
 	}
-	(\%kw, \@unknown);
 }
 
 1;
diff --git a/t/lei-import-maildir.t b/t/lei-import-maildir.t
index 688b10ce..c81e7805 100644
--- a/t/lei-import-maildir.t
+++ b/t/lei-import-maildir.t
@@ -28,7 +28,7 @@ test_lei(sub {
 	is(scalar(keys %v), 1, 'inspect handles relative and absolute paths');
 	my $inspect = json_utf8->decode([ keys %v ]->[0]);
 	is_deeply($inspect, {"maildir:$md" => { 'name.count' => 1 }},
-		'inspect maildir: path had expected output');
+		'inspect maildir: path had expected output') or xbail($inspect);
 
 	lei_ok(qw(q s:boolean));
 	my $res = json_utf8->decode($lei_out);

^ permalink raw reply related	[relevance 5%]

* [PATCH v2] lei import: speed up kw updates for old IMAP messages
  2021-06-03  0:17  7% [PATCH] lei import: speed up kw updates for old IMAP messages Eric Wong
@ 2021-06-03  1:05  6% ` Eric Wong
  0 siblings, 0 replies; 3+ results
From: Eric Wong @ 2021-06-03  1:05 UTC (permalink / raw)
  To: meta

On a 4-core CPU, this speeds up "lei import" on a largish IMAP
inbox with 75K messages from ~21 minutes down to 40s.

Parallelizing with the new LeiImportKw WQ worker class gives a
near-linear speedup and brought the runtime down to ~5:40.

The new idx_fid_uid index on the "fid" and "uid" columns of
blob2num in mail_sync.sqlite3 brought us the final speedup.

An additional index on over.sqlite3#xref3(oidbin) did not help,
since idx_nntp already exists and speeds up the new ->oidbin_exists
internal API.

I initially experimented with a separate "lei import-kw" command
but decided against it since it's useless outside of IMAP+JMAP
and would require extra cognitive overhead for both users and
hackers.  So LeiImportKw is just a WQ worker used by "lei import"
and not its own user-visible command.

v2: fix ikw_done_wait arg handling (ugh, confusing API :x)
---
Interdiff against v1:
  diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm
  index e13dce07..2878cbdf 100644
  --- a/lib/PublicInbox/LeiImportKw.pm
  +++ b/lib/PublicInbox/LeiImportKw.pm
  @@ -40,9 +40,10 @@ sub ck_update_kw { # via wq_io_do
   }
   
   sub ikw_done_wait {
  -	my ($lei) = @_;
  +	my ($arg, $pid) = @_;
  +	my ($self, $lei) = @$arg;
   	my $wait = $lei->{sto}->ipc_do('done');
  -	$lei->wq_done_wait;
  +	$lei->can('wq_done_wait')->($arg, $pid);
   }
   
   sub _lei_wq_eof { # EOF callback for main lei daemon

 MANIFEST                       |  1 +
 lib/PublicInbox/LEI.pm         |  2 +-
 lib/PublicInbox/LeiImport.pm   | 25 ++++++++--------
 lib/PublicInbox/LeiImportKw.pm | 55 ++++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiMailSync.pm | 17 ++++++-----
 lib/PublicInbox/NetReader.pm   |  1 +
 lib/PublicInbox/Over.pm        | 10 ++++---
 7 files changed, 86 insertions(+), 25 deletions(-)
 create mode 100644 lib/PublicInbox/LeiImportKw.pm

diff --git a/MANIFEST b/MANIFEST
index 0b4bb380..5a70a144 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -208,6 +208,7 @@ lib/PublicInbox/LeiForgetMailSync.pm
 lib/PublicInbox/LeiForgetSearch.pm
 lib/PublicInbox/LeiHelp.pm
 lib/PublicInbox/LeiImport.pm
+lib/PublicInbox/LeiImportKw.pm
 lib/PublicInbox/LeiIndex.pm
 lib/PublicInbox/LeiInit.pm
 lib/PublicInbox/LeiInput.pm
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 30f90798..7bda9408 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -421,7 +421,7 @@ my %CONFIG_KEYS = (
 	'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m wq1); # internal workers
+my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers
 
 sub _drop_wq {
 	my ($self) = @_;
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 860a2c98..2efd4935 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -43,18 +43,14 @@ sub input_maildir_cb { # maildir_each_eml cb
 
 sub input_net_cb { # imap_each / nntp_each
 	my ($uri, $uid, $kw, $eml, $self) = @_;
-	my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
-	$vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
 	if (defined $eml) {
+		my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
+		$vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
 		$self->input_eml_cb($eml, $vmd);
-	} elsif ($vmd) { # old message, kw only
-		my $oid = $self->{-lms_ro}->imap_oid2($uri, $uid) // return;
-		my @docids = $self->{lse}->over->blob_exists($oid) or return;
-		$self->{lse}->kw_changed(undef, $kw, \@docids) or return;
-		my $lei = $self->{lei};
-		$lei->qerr("# $oid => @$kw\n") if $lei->{opt}->{verbose};
-		$self->{lei}->{sto}->ipc_do('set_eml_vmd', undef,
-						$vmd, \@docids);
+	} elsif (my $ikw = $self->{lei}->{ikw}) { # old message, kw only
+		# we send $uri as a bare SCALAR and not a URIimap ref to
+		# reduce socket traffic:
+		$ikw->wq_io_do('ck_update_kw', [], $$uri, $uid, $kw);
 	}
 }
 
@@ -71,15 +67,17 @@ sub do_import_index ($$@) {
 
 	$lei->ale; # initialize for workers to read
 	my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+	my $ikw;
 	if (my $net = $lei->{net}) {
 		# $j = $net->net_concurrency($j); TODO
 		if ($lei->{opt}->{incremental} // 1) {
 			$net->{incremental} = 1;
 			$net->{-lms_ro} = $sto->search->lms // 0;
-			if ($self->{-import_kw}) {
+			if ($self->{-import_kw} && $net->{-lms_ro} &&
+					$net->{imap_order}) {
+				require PublicInbox::LeiImportKw;
+				$ikw = PublicInbox::LeiImportKw->new($lei);
 				$net->{each_old} = 1;
-				$self->{-lms_ro} = $net->{-lms_ro};
-				$self->{lse} = $sto->search;
 			}
 		}
 	} else {
@@ -93,6 +91,7 @@ sub do_import_index ($$@) {
 	(my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
 	$lei->{wq1} = $self;
 	$lei->{-err_type} = 'non-fatal';
+	$ikw->wq_close(1) if $ikw;
 	net_merge_all_done($self) unless $lei->{auth};
 	$op_c->op_wait_event($ops);
 }
diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm
new file mode 100644
index 00000000..2878cbdf
--- /dev/null
+++ b/lib/PublicInbox/LeiImportKw.pm
@@ -0,0 +1,55 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# WQ worker for dealing with LeiImport IMAP flags on already-imported messages
+# WQ key: {ikw}
+package PublicInbox::LeiImportKw;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+
+sub new {
+	my ($cls, $lei) = @_;
+	my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls;
+	my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc);
+	$op_c->{ops} = $ops; # for PktOp->event_step
+	$lei->{ikw} = $self;
+}
+
+sub ipc_atfork_child {
+	my ($self) = @_;
+	my $lei = $self->{lei};
+	$lei->_lei_atfork_child;
+	my $net = delete $lei->{net} // die 'BUG: no lei->{net}';
+	$self->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}';
+	$self->{verbose} = $lei->{opt}->{verbose};
+	$self->{lse} = $self->{sto}->search;
+	$self->{over} = $self->{lse}->over;
+	$self->{-lms_ro} = $net->{-lms_ro} || die 'BUG: net->{-lms_ro} FALSE';
+	$self->SUPER::ipc_atfork_child;
+}
+
+sub ck_update_kw { # via wq_io_do
+	my ($self, $url, $uid, $kw) = @_;
+	my $oidbin = $self->{-lms_ro}->imap_oidbin($url, $uid) // return;
+	my @docids = $self->{over}->oidbin_exists($oidbin) or return;
+	$self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+	$self->{verbose} and
+		$self->{lei}->qerr('# '.unpack('H*', $oidbin)." => @$kw\n");
+	$self->{sto}->ipc_do('set_eml_vmd', undef, { kw => $kw }, \@docids);
+}
+
+sub ikw_done_wait {
+	my ($arg, $pid) = @_;
+	my ($self, $lei) = @$arg;
+	my $wait = $lei->{sto}->ipc_do('done');
+	$lei->can('wq_done_wait')->($arg, $pid);
+}
+
+sub _lei_wq_eof { # EOF callback for main lei daemon
+	my ($lei) = @_;
+	my $ikw = delete $lei->{ikw} or return $lei->fail;
+	$ikw->wq_wait_old(\&ikw_done_wait, $lei);
+}
+
+1;
diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm
index 22ee1109..75603d89 100644
--- a/lib/PublicInbox/LeiMailSync.pm
+++ b/lib/PublicInbox/LeiMailSync.pm
@@ -54,6 +54,10 @@ CREATE TABLE IF NOT EXISTS blob2num (
 	UNIQUE (oidbin, fid, uid)
 )
 
+	# speeds up LeiImport->ck_update_kw (for "lei import") by 5-6x:
+	$dbh->do(<<'');
+CREATE INDEX IF NOT EXISTS idx_fid_uid ON blob2num(fid,uid)
+
 	$dbh->do(<<'');
 CREATE TABLE IF NOT EXISTS blob2name (
 	oidbin VARBINARY NOT NULL,
@@ -361,15 +365,14 @@ sub forget_folder {
 	$dbh->do('DELETE FROM folders WHERE fid = ?', undef, $fid);
 }
 
-sub imap_oid2 ($$$) {
-	my ($self, $uri, $uid) = @_; # $uri MUST have UIDVALIDITY
-	my $fid = $self->{fmap}->{"$uri"} //= fid_for($self, "$uri") // return;
+sub imap_oidbin ($$$) {
+	my ($self, $url, $uid) = @_; # $url MUST have UIDVALIDITY
+	my $fid = $self->{fmap}->{$url} //= fid_for($self, $url) // return;
 	my $sth = $self->{dbh}->prepare_cached(<<EOM, undef, 1);
 SELECT oidbin FROM blob2num WHERE fid = ? AND uid = ?
 EOM
 	$sth->execute($fid, $uid);
-	my ($oidbin) = $sth->fetchrow_array;
-	$oidbin ? unpack('H*', $oidbin) : undef;
+	$sth->fetchrow_array;
 }
 
 sub imap_oid {
@@ -384,10 +387,10 @@ sub imap_oid {
 		}
 		$lei->qerr(@{$err->{qerr}}) if $err->{qerr};
 	}
-	imap_oid2($self, $folders->[0], $uid_uri->uid);
+	my $oidbin = imap_oidbin($self, $folders->[0], $uid_uri->uid);
+	$oidbin ? unpack('H*', $oidbin) : undef;
 }
 
-
 # FIXME: something with "lei <up|q>" is causing uncommitted transaction
 # warnings, not sure what...
 sub DESTROY {
diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm
index 39a8f7fc..058f4313 100644
--- a/lib/PublicInbox/NetReader.pm
+++ b/lib/PublicInbox/NetReader.pm
@@ -471,6 +471,7 @@ sub each_old_flags ($$$$) {
 		while (my ($uid, $per_uid) = each %$r) {
 			my $kw = flags2kw($self, $uri, $uid, $per_uid->{FLAGS})
 				// next;
+			# LeiImport->input_net_cb
 			$eml_cb->($uri, $uid, $kw, undef, @args);
 		}
 	}
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
index 0e191c47..58fdea0e 100644
--- a/lib/PublicInbox/Over.pm
+++ b/lib/PublicInbox/Over.pm
@@ -349,13 +349,13 @@ sub check_inodes {
 	}
 }
 
-sub blob_exists {
-	my ($self, $oidhex) = @_;
+sub oidbin_exists {
+	my ($self, $oidbin) = @_;
 	if (wantarray) {
 		my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
 SELECT docid FROM xref3 WHERE oidbin = ? ORDER BY docid ASC
 
-		$sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
+		$sth->bind_param(1, $oidbin, SQL_BLOB);
 		$sth->execute;
 		my $tmp = $sth->fetchall_arrayref;
 		map { $_->[0] } @$tmp;
@@ -363,10 +363,12 @@ SELECT docid FROM xref3 WHERE oidbin = ? ORDER BY docid ASC
 		my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
 SELECT COUNT(*) FROM xref3 WHERE oidbin = ?
 
-		$sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
+		$sth->bind_param(1, $oidbin, SQL_BLOB);
 		$sth->execute;
 		$sth->fetchrow_array;
 	}
 }
 
+sub blob_exists { oidbin_exists($_[0], pack('H*', $_[1])) }
+
 1;

^ permalink raw reply related	[relevance 6%]

* [PATCH] lei import: speed up kw updates for old IMAP messages
@ 2021-06-03  0:17  7% Eric Wong
  2021-06-03  1:05  6% ` [PATCH v2] " Eric Wong
  0 siblings, 1 reply; 3+ results
From: Eric Wong @ 2021-06-03  0:17 UTC (permalink / raw)
  To: meta

On a 4-core CPU, this speeds up "lei import" on a largish IMAP
inbox with 75K messages from ~21 minutes down to 40s.

Parallelizing with the new LeiImportKw WQ worker class gives a
near-linear speedup and brought the runtime down to ~5:40.

The new idx_fid_uid index on the "fid" and "uid" columns of
blob2num in mail_sync.sqlite3 brought us the final speedup.

An additional index on over.sqlite3#xref3(oidbin) did not help,
since idx_nntp already exists and speeds up the new ->oidbin_exists
internal API.

I initially experimented with a separate "lei import-kw" command
but decided against it since it's useless outside of IMAP+JMAP
and would require extra cognitive overhead for both users and
hackers.  So LeiImportKw is just a WQ worker used by "lei import"
and not its own user-visible command.
---
 MANIFEST                       |  1 +
 lib/PublicInbox/LEI.pm         |  2 +-
 lib/PublicInbox/LeiImport.pm   | 25 ++++++++--------
 lib/PublicInbox/LeiImportKw.pm | 54 ++++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiMailSync.pm | 17 ++++++-----
 lib/PublicInbox/NetReader.pm   |  1 +
 lib/PublicInbox/Over.pm        | 10 ++++---
 7 files changed, 85 insertions(+), 25 deletions(-)
 create mode 100644 lib/PublicInbox/LeiImportKw.pm

diff --git a/MANIFEST b/MANIFEST
index 0b4bb380..5a70a144 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -208,6 +208,7 @@ lib/PublicInbox/LeiForgetMailSync.pm
 lib/PublicInbox/LeiForgetSearch.pm
 lib/PublicInbox/LeiHelp.pm
 lib/PublicInbox/LeiImport.pm
+lib/PublicInbox/LeiImportKw.pm
 lib/PublicInbox/LeiIndex.pm
 lib/PublicInbox/LeiInit.pm
 lib/PublicInbox/LeiInput.pm
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 30f90798..7bda9408 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -421,7 +421,7 @@ my %CONFIG_KEYS = (
 	'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m wq1); # internal workers
+my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers
 
 sub _drop_wq {
 	my ($self) = @_;
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 860a2c98..2efd4935 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -43,18 +43,14 @@ sub input_maildir_cb { # maildir_each_eml cb
 
 sub input_net_cb { # imap_each / nntp_each
 	my ($uri, $uid, $kw, $eml, $self) = @_;
-	my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
-	$vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
 	if (defined $eml) {
+		my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
+		$vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
 		$self->input_eml_cb($eml, $vmd);
-	} elsif ($vmd) { # old message, kw only
-		my $oid = $self->{-lms_ro}->imap_oid2($uri, $uid) // return;
-		my @docids = $self->{lse}->over->blob_exists($oid) or return;
-		$self->{lse}->kw_changed(undef, $kw, \@docids) or return;
-		my $lei = $self->{lei};
-		$lei->qerr("# $oid => @$kw\n") if $lei->{opt}->{verbose};
-		$self->{lei}->{sto}->ipc_do('set_eml_vmd', undef,
-						$vmd, \@docids);
+	} elsif (my $ikw = $self->{lei}->{ikw}) { # old message, kw only
+		# we send $uri as a bare SCALAR and not a URIimap ref to
+		# reduce socket traffic:
+		$ikw->wq_io_do('ck_update_kw', [], $$uri, $uid, $kw);
 	}
 }
 
@@ -71,15 +67,17 @@ sub do_import_index ($$@) {
 
 	$lei->ale; # initialize for workers to read
 	my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+	my $ikw;
 	if (my $net = $lei->{net}) {
 		# $j = $net->net_concurrency($j); TODO
 		if ($lei->{opt}->{incremental} // 1) {
 			$net->{incremental} = 1;
 			$net->{-lms_ro} = $sto->search->lms // 0;
-			if ($self->{-import_kw}) {
+			if ($self->{-import_kw} && $net->{-lms_ro} &&
+					$net->{imap_order}) {
+				require PublicInbox::LeiImportKw;
+				$ikw = PublicInbox::LeiImportKw->new($lei);
 				$net->{each_old} = 1;
-				$self->{-lms_ro} = $net->{-lms_ro};
-				$self->{lse} = $sto->search;
 			}
 		}
 	} else {
@@ -93,6 +91,7 @@ sub do_import_index ($$@) {
 	(my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
 	$lei->{wq1} = $self;
 	$lei->{-err_type} = 'non-fatal';
+	$ikw->wq_close(1) if $ikw;
 	net_merge_all_done($self) unless $lei->{auth};
 	$op_c->op_wait_event($ops);
 }
diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm
new file mode 100644
index 00000000..e13dce07
--- /dev/null
+++ b/lib/PublicInbox/LeiImportKw.pm
@@ -0,0 +1,54 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# WQ worker for dealing with LeiImport IMAP flags on already-imported messages
+# WQ key: {ikw}
+package PublicInbox::LeiImportKw;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+
+sub new {
+	my ($cls, $lei) = @_;
+	my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls;
+	my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc);
+	$op_c->{ops} = $ops; # for PktOp->event_step
+	$lei->{ikw} = $self;
+}
+
+sub ipc_atfork_child {
+	my ($self) = @_;
+	my $lei = $self->{lei};
+	$lei->_lei_atfork_child;
+	my $net = delete $lei->{net} // die 'BUG: no lei->{net}';
+	$self->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}';
+	$self->{verbose} = $lei->{opt}->{verbose};
+	$self->{lse} = $self->{sto}->search;
+	$self->{over} = $self->{lse}->over;
+	$self->{-lms_ro} = $net->{-lms_ro} || die 'BUG: net->{-lms_ro} FALSE';
+	$self->SUPER::ipc_atfork_child;
+}
+
+sub ck_update_kw { # via wq_io_do
+	my ($self, $url, $uid, $kw) = @_;
+	my $oidbin = $self->{-lms_ro}->imap_oidbin($url, $uid) // return;
+	my @docids = $self->{over}->oidbin_exists($oidbin) or return;
+	$self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+	$self->{verbose} and
+		$self->{lei}->qerr('# '.unpack('H*', $oidbin)." => @$kw\n");
+	$self->{sto}->ipc_do('set_eml_vmd', undef, { kw => $kw }, \@docids);
+}
+
+sub ikw_done_wait {
+	my ($lei) = @_;
+	my $wait = $lei->{sto}->ipc_do('done');
+	$lei->wq_done_wait;
+}
+
+sub _lei_wq_eof { # EOF callback for main lei daemon
+	my ($lei) = @_;
+	my $ikw = delete $lei->{ikw} or return $lei->fail;
+	$ikw->wq_wait_old(\&ikw_done_wait, $lei);
+}
+
+1;
diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm
index 22ee1109..75603d89 100644
--- a/lib/PublicInbox/LeiMailSync.pm
+++ b/lib/PublicInbox/LeiMailSync.pm
@@ -54,6 +54,10 @@ CREATE TABLE IF NOT EXISTS blob2num (
 	UNIQUE (oidbin, fid, uid)
 )
 
+	# speeds up LeiImport->ck_update_kw (for "lei import") by 5-6x:
+	$dbh->do(<<'');
+CREATE INDEX IF NOT EXISTS idx_fid_uid ON blob2num(fid,uid)
+
 	$dbh->do(<<'');
 CREATE TABLE IF NOT EXISTS blob2name (
 	oidbin VARBINARY NOT NULL,
@@ -361,15 +365,14 @@ sub forget_folder {
 	$dbh->do('DELETE FROM folders WHERE fid = ?', undef, $fid);
 }
 
-sub imap_oid2 ($$$) {
-	my ($self, $uri, $uid) = @_; # $uri MUST have UIDVALIDITY
-	my $fid = $self->{fmap}->{"$uri"} //= fid_for($self, "$uri") // return;
+sub imap_oidbin ($$$) {
+	my ($self, $url, $uid) = @_; # $url MUST have UIDVALIDITY
+	my $fid = $self->{fmap}->{$url} //= fid_for($self, $url) // return;
 	my $sth = $self->{dbh}->prepare_cached(<<EOM, undef, 1);
 SELECT oidbin FROM blob2num WHERE fid = ? AND uid = ?
 EOM
 	$sth->execute($fid, $uid);
-	my ($oidbin) = $sth->fetchrow_array;
-	$oidbin ? unpack('H*', $oidbin) : undef;
+	$sth->fetchrow_array;
 }
 
 sub imap_oid {
@@ -384,10 +387,10 @@ sub imap_oid {
 		}
 		$lei->qerr(@{$err->{qerr}}) if $err->{qerr};
 	}
-	imap_oid2($self, $folders->[0], $uid_uri->uid);
+	my $oidbin = imap_oidbin($self, $folders->[0], $uid_uri->uid);
+	$oidbin ? unpack('H*', $oidbin) : undef;
 }
 
-
 # FIXME: something with "lei <up|q>" is causing uncommitted transaction
 # warnings, not sure what...
 sub DESTROY {
diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm
index 39a8f7fc..058f4313 100644
--- a/lib/PublicInbox/NetReader.pm
+++ b/lib/PublicInbox/NetReader.pm
@@ -471,6 +471,7 @@ sub each_old_flags ($$$$) {
 		while (my ($uid, $per_uid) = each %$r) {
 			my $kw = flags2kw($self, $uri, $uid, $per_uid->{FLAGS})
 				// next;
+			# LeiImport->input_net_cb
 			$eml_cb->($uri, $uid, $kw, undef, @args);
 		}
 	}
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
index 0e191c47..58fdea0e 100644
--- a/lib/PublicInbox/Over.pm
+++ b/lib/PublicInbox/Over.pm
@@ -349,13 +349,13 @@ sub check_inodes {
 	}
 }
 
-sub blob_exists {
-	my ($self, $oidhex) = @_;
+sub oidbin_exists {
+	my ($self, $oidbin) = @_;
 	if (wantarray) {
 		my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
 SELECT docid FROM xref3 WHERE oidbin = ? ORDER BY docid ASC
 
-		$sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
+		$sth->bind_param(1, $oidbin, SQL_BLOB);
 		$sth->execute;
 		my $tmp = $sth->fetchall_arrayref;
 		map { $_->[0] } @$tmp;
@@ -363,10 +363,12 @@ SELECT docid FROM xref3 WHERE oidbin = ? ORDER BY docid ASC
 		my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
 SELECT COUNT(*) FROM xref3 WHERE oidbin = ?
 
-		$sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
+		$sth->bind_param(1, $oidbin, SQL_BLOB);
 		$sth->execute;
 		$sth->fetchrow_array;
 	}
 }
 
+sub blob_exists { oidbin_exists($_[0], pack('H*', $_[1])) }
+
 1;

^ permalink raw reply related	[relevance 7%]

Results 1-3 of 3 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-06-03  0:17  7% [PATCH] lei import: speed up kw updates for old IMAP messages Eric Wong
2021-06-03  1:05  6% ` [PATCH v2] " Eric Wong
2021-06-08  9:50     [PATCH 0/3] lei import: speedup repeated Maildir import Eric Wong
2021-06-08  9:50  5% ` [PATCH 3/3] lei import: speed up repeated Maildir imports Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).