user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH v2] lei import: speed up kw updates for old IMAP messages
Date: Thu, 3 Jun 2021 01:05:20 +0000	[thread overview]
Message-ID: <20210603010520.GA13508@dcvr> (raw)
In-Reply-To: <20210603001737.31369-1-e@80x24.org>

On a 4-core CPU, this speeds up "lei import" on a largish IMAP
inbox with 75K messages from ~21 minutes down to 40s.

Parallelizing with the new LeiImportKw WQ worker class gives a
near-linear speedup and brought the runtime down to ~5:40.

The new idx_fid_uid index on the "fid" and "uid" columns of
blob2num in mail_sync.sqlite3 brought us the final speedup.

An additional index on over.sqlite3#xref3(oidbin) did not help,
since idx_nntp already exists and speeds up the new ->oidbin_exists
internal API.

I initially experimented with a separate "lei import-kw" command
but decided against it since it's useless outside of IMAP+JMAP
and would require extra cognitive overhead for both users and
hackers.  So LeiImportKw is just a WQ worker used by "lei import"
and not its own user-visible command.

v2: fix ikw_done_wait arg handling (ugh, confusing API :x)
---
Interdiff against v1:
  diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm
  index e13dce07..2878cbdf 100644
  --- a/lib/PublicInbox/LeiImportKw.pm
  +++ b/lib/PublicInbox/LeiImportKw.pm
  @@ -40,9 +40,10 @@ sub ck_update_kw { # via wq_io_do
   }
   
   sub ikw_done_wait {
  -	my ($lei) = @_;
  +	my ($arg, $pid) = @_;
  +	my ($self, $lei) = @$arg;
   	my $wait = $lei->{sto}->ipc_do('done');
  -	$lei->wq_done_wait;
  +	$lei->can('wq_done_wait')->($arg, $pid);
   }
   
   sub _lei_wq_eof { # EOF callback for main lei daemon

 MANIFEST                       |  1 +
 lib/PublicInbox/LEI.pm         |  2 +-
 lib/PublicInbox/LeiImport.pm   | 25 ++++++++--------
 lib/PublicInbox/LeiImportKw.pm | 55 ++++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiMailSync.pm | 17 ++++++-----
 lib/PublicInbox/NetReader.pm   |  1 +
 lib/PublicInbox/Over.pm        | 10 ++++---
 7 files changed, 86 insertions(+), 25 deletions(-)
 create mode 100644 lib/PublicInbox/LeiImportKw.pm

diff --git a/MANIFEST b/MANIFEST
index 0b4bb380..5a70a144 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -208,6 +208,7 @@ lib/PublicInbox/LeiForgetMailSync.pm
 lib/PublicInbox/LeiForgetSearch.pm
 lib/PublicInbox/LeiHelp.pm
 lib/PublicInbox/LeiImport.pm
+lib/PublicInbox/LeiImportKw.pm
 lib/PublicInbox/LeiIndex.pm
 lib/PublicInbox/LeiInit.pm
 lib/PublicInbox/LeiInput.pm
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 30f90798..7bda9408 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -421,7 +421,7 @@ my %CONFIG_KEYS = (
 	'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m wq1); # internal workers
+my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers
 
 sub _drop_wq {
 	my ($self) = @_;
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 860a2c98..2efd4935 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -43,18 +43,14 @@ sub input_maildir_cb { # maildir_each_eml cb
 
 sub input_net_cb { # imap_each / nntp_each
 	my ($uri, $uid, $kw, $eml, $self) = @_;
-	my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
-	$vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
 	if (defined $eml) {
+		my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
+		$vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
 		$self->input_eml_cb($eml, $vmd);
-	} elsif ($vmd) { # old message, kw only
-		my $oid = $self->{-lms_ro}->imap_oid2($uri, $uid) // return;
-		my @docids = $self->{lse}->over->blob_exists($oid) or return;
-		$self->{lse}->kw_changed(undef, $kw, \@docids) or return;
-		my $lei = $self->{lei};
-		$lei->qerr("# $oid => @$kw\n") if $lei->{opt}->{verbose};
-		$self->{lei}->{sto}->ipc_do('set_eml_vmd', undef,
-						$vmd, \@docids);
+	} elsif (my $ikw = $self->{lei}->{ikw}) { # old message, kw only
+		# we send $uri as a bare SCALAR and not a URIimap ref to
+		# reduce socket traffic:
+		$ikw->wq_io_do('ck_update_kw', [], $$uri, $uid, $kw);
 	}
 }
 
@@ -71,15 +67,17 @@ sub do_import_index ($$@) {
 
 	$lei->ale; # initialize for workers to read
 	my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+	my $ikw;
 	if (my $net = $lei->{net}) {
 		# $j = $net->net_concurrency($j); TODO
 		if ($lei->{opt}->{incremental} // 1) {
 			$net->{incremental} = 1;
 			$net->{-lms_ro} = $sto->search->lms // 0;
-			if ($self->{-import_kw}) {
+			if ($self->{-import_kw} && $net->{-lms_ro} &&
+					$net->{imap_order}) {
+				require PublicInbox::LeiImportKw;
+				$ikw = PublicInbox::LeiImportKw->new($lei);
 				$net->{each_old} = 1;
-				$self->{-lms_ro} = $net->{-lms_ro};
-				$self->{lse} = $sto->search;
 			}
 		}
 	} else {
@@ -93,6 +91,7 @@ sub do_import_index ($$@) {
 	(my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
 	$lei->{wq1} = $self;
 	$lei->{-err_type} = 'non-fatal';
+	$ikw->wq_close(1) if $ikw;
 	net_merge_all_done($self) unless $lei->{auth};
 	$op_c->op_wait_event($ops);
 }
diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm
new file mode 100644
index 00000000..2878cbdf
--- /dev/null
+++ b/lib/PublicInbox/LeiImportKw.pm
@@ -0,0 +1,55 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# WQ worker for dealing with LeiImport IMAP flags on already-imported messages
+# WQ key: {ikw}
+package PublicInbox::LeiImportKw;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+
+sub new {
+	my ($cls, $lei) = @_;
+	my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls;
+	my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc);
+	$op_c->{ops} = $ops; # for PktOp->event_step
+	$lei->{ikw} = $self;
+}
+
+sub ipc_atfork_child {
+	my ($self) = @_;
+	my $lei = $self->{lei};
+	$lei->_lei_atfork_child;
+	my $net = delete $lei->{net} // die 'BUG: no lei->{net}';
+	$self->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}';
+	$self->{verbose} = $lei->{opt}->{verbose};
+	$self->{lse} = $self->{sto}->search;
+	$self->{over} = $self->{lse}->over;
+	$self->{-lms_ro} = $net->{-lms_ro} || die 'BUG: net->{-lms_ro} FALSE';
+	$self->SUPER::ipc_atfork_child;
+}
+
+sub ck_update_kw { # via wq_io_do
+	my ($self, $url, $uid, $kw) = @_;
+	my $oidbin = $self->{-lms_ro}->imap_oidbin($url, $uid) // return;
+	my @docids = $self->{over}->oidbin_exists($oidbin) or return;
+	$self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+	$self->{verbose} and
+		$self->{lei}->qerr('# '.unpack('H*', $oidbin)." => @$kw\n");
+	$self->{sto}->ipc_do('set_eml_vmd', undef, { kw => $kw }, \@docids);
+}
+
+sub ikw_done_wait {
+	my ($arg, $pid) = @_;
+	my ($self, $lei) = @$arg;
+	my $wait = $lei->{sto}->ipc_do('done');
+	$lei->can('wq_done_wait')->($arg, $pid);
+}
+
+sub _lei_wq_eof { # EOF callback for main lei daemon
+	my ($lei) = @_;
+	my $ikw = delete $lei->{ikw} or return $lei->fail;
+	$ikw->wq_wait_old(\&ikw_done_wait, $lei);
+}
+
+1;
diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm
index 22ee1109..75603d89 100644
--- a/lib/PublicInbox/LeiMailSync.pm
+++ b/lib/PublicInbox/LeiMailSync.pm
@@ -54,6 +54,10 @@ CREATE TABLE IF NOT EXISTS blob2num (
 	UNIQUE (oidbin, fid, uid)
 )
 
+	# speeds up LeiImport->ck_update_kw (for "lei import") by 5-6x:
+	$dbh->do(<<'');
+CREATE INDEX IF NOT EXISTS idx_fid_uid ON blob2num(fid,uid)
+
 	$dbh->do(<<'');
 CREATE TABLE IF NOT EXISTS blob2name (
 	oidbin VARBINARY NOT NULL,
@@ -361,15 +365,14 @@ sub forget_folder {
 	$dbh->do('DELETE FROM folders WHERE fid = ?', undef, $fid);
 }
 
-sub imap_oid2 ($$$) {
-	my ($self, $uri, $uid) = @_; # $uri MUST have UIDVALIDITY
-	my $fid = $self->{fmap}->{"$uri"} //= fid_for($self, "$uri") // return;
+sub imap_oidbin ($$$) {
+	my ($self, $url, $uid) = @_; # $url MUST have UIDVALIDITY
+	my $fid = $self->{fmap}->{$url} //= fid_for($self, $url) // return;
 	my $sth = $self->{dbh}->prepare_cached(<<EOM, undef, 1);
 SELECT oidbin FROM blob2num WHERE fid = ? AND uid = ?
 EOM
 	$sth->execute($fid, $uid);
-	my ($oidbin) = $sth->fetchrow_array;
-	$oidbin ? unpack('H*', $oidbin) : undef;
+	$sth->fetchrow_array;
 }
 
 sub imap_oid {
@@ -384,10 +387,10 @@ sub imap_oid {
 		}
 		$lei->qerr(@{$err->{qerr}}) if $err->{qerr};
 	}
-	imap_oid2($self, $folders->[0], $uid_uri->uid);
+	my $oidbin = imap_oidbin($self, $folders->[0], $uid_uri->uid);
+	$oidbin ? unpack('H*', $oidbin) : undef;
 }
 
-
 # FIXME: something with "lei <up|q>" is causing uncommitted transaction
 # warnings, not sure what...
 sub DESTROY {
diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm
index 39a8f7fc..058f4313 100644
--- a/lib/PublicInbox/NetReader.pm
+++ b/lib/PublicInbox/NetReader.pm
@@ -471,6 +471,7 @@ sub each_old_flags ($$$$) {
 		while (my ($uid, $per_uid) = each %$r) {
 			my $kw = flags2kw($self, $uri, $uid, $per_uid->{FLAGS})
 				// next;
+			# LeiImport->input_net_cb
 			$eml_cb->($uri, $uid, $kw, undef, @args);
 		}
 	}
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
index 0e191c47..58fdea0e 100644
--- a/lib/PublicInbox/Over.pm
+++ b/lib/PublicInbox/Over.pm
@@ -349,13 +349,13 @@ sub check_inodes {
 	}
 }
 
-sub blob_exists {
-	my ($self, $oidhex) = @_;
+sub oidbin_exists {
+	my ($self, $oidbin) = @_;
 	if (wantarray) {
 		my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
 SELECT docid FROM xref3 WHERE oidbin = ? ORDER BY docid ASC
 
-		$sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
+		$sth->bind_param(1, $oidbin, SQL_BLOB);
 		$sth->execute;
 		my $tmp = $sth->fetchall_arrayref;
 		map { $_->[0] } @$tmp;
@@ -363,10 +363,12 @@ SELECT docid FROM xref3 WHERE oidbin = ? ORDER BY docid ASC
 		my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
 SELECT COUNT(*) FROM xref3 WHERE oidbin = ?
 
-		$sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
+		$sth->bind_param(1, $oidbin, SQL_BLOB);
 		$sth->execute;
 		$sth->fetchrow_array;
 	}
 }
 
+sub blob_exists { oidbin_exists($_[0], pack('H*', $_[1])) }
+
 1;

  reply	other threads:[~2021-06-03  1:05 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-06-03  0:17 [PATCH] lei import: speed up kw updates for old IMAP messages Eric Wong
2021-06-03  1:05 ` Eric Wong [this message]
2021-06-03 10:05   ` TODO: Maildir import speedups Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210603010520.GA13508@dcvr \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).