From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id B62C21F5AE; Thu, 3 Jun 2021 01:05:20 +0000 (UTC) Date: Thu, 3 Jun 2021 01:05:20 +0000 From: Eric Wong To: meta@public-inbox.org Subject: [PATCH v2] lei import: speed up kw updates for old IMAP messages Message-ID: <20210603010520.GA13508@dcvr> References: <20210603001737.31369-1-e@80x24.org> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Disposition: inline In-Reply-To: <20210603001737.31369-1-e@80x24.org> List-Id: On a 4-core CPU, this speeds up "lei import" on a largish IMAP inbox with 75K messages from ~21 minutes down to 40s. Parallelizing with the new LeiImportKw WQ worker class gives a near-linear speedup and brought the runtime down to ~5:40. The new idx_fid_uid index on the "fid" and "uid" columns of blob2num in mail_sync.sqlite3 brought us the final speedup. An additional index on over.sqlite3#xref3(oidbin) did not help, since idx_nntp already exists and speeds up the new ->oidbin_exists internal API. I initially experimented with a separate "lei import-kw" command but decided against it since it's useless outside of IMAP+JMAP and would require extra cognitive overhead for both users and hackers. So LeiImportKw is just a WQ worker used by "lei import" and not its own user-visible command. v2: fix ikw_done_wait arg handling (ugh, confusing API :x) --- Interdiff against v1: diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm index e13dce07..2878cbdf 100644 --- a/lib/PublicInbox/LeiImportKw.pm +++ b/lib/PublicInbox/LeiImportKw.pm @@ -40,9 +40,10 @@ sub ck_update_kw { # via wq_io_do } sub ikw_done_wait { - my ($lei) = @_; + my ($arg, $pid) = @_; + my ($self, $lei) = @$arg; my $wait = $lei->{sto}->ipc_do('done'); - $lei->wq_done_wait; + $lei->can('wq_done_wait')->($arg, $pid); } sub _lei_wq_eof { # EOF callback for main lei daemon MANIFEST | 1 + lib/PublicInbox/LEI.pm | 2 +- lib/PublicInbox/LeiImport.pm | 25 ++++++++-------- lib/PublicInbox/LeiImportKw.pm | 55 ++++++++++++++++++++++++++++++++++ lib/PublicInbox/LeiMailSync.pm | 17 ++++++----- lib/PublicInbox/NetReader.pm | 1 + lib/PublicInbox/Over.pm | 10 ++++--- 7 files changed, 86 insertions(+), 25 deletions(-) create mode 100644 lib/PublicInbox/LeiImportKw.pm diff --git a/MANIFEST b/MANIFEST index 0b4bb380..5a70a144 100644 --- a/MANIFEST +++ b/MANIFEST @@ -208,6 +208,7 @@ lib/PublicInbox/LeiForgetMailSync.pm lib/PublicInbox/LeiForgetSearch.pm lib/PublicInbox/LeiHelp.pm lib/PublicInbox/LeiImport.pm +lib/PublicInbox/LeiImportKw.pm lib/PublicInbox/LeiIndex.pm lib/PublicInbox/LeiInit.pm lib/PublicInbox/LeiInput.pm diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 30f90798..7bda9408 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -421,7 +421,7 @@ my %CONFIG_KEYS = ( 'leistore.dir' => 'top-level storage location', ); -my @WQ_KEYS = qw(lxs l2m wq1); # internal workers +my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers sub _drop_wq { my ($self) = @_; diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 860a2c98..2efd4935 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -43,18 +43,14 @@ sub input_maildir_cb { # maildir_each_eml cb sub input_net_cb { # imap_each / nntp_each my ($uri, $uid, $kw, $eml, $self) = @_; - my $vmd = $self->{-import_kw} ? { kw => $kw } : undef; - $vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync}; if (defined $eml) { + my $vmd = $self->{-import_kw} ? { kw => $kw } : undef; + $vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync}; $self->input_eml_cb($eml, $vmd); - } elsif ($vmd) { # old message, kw only - my $oid = $self->{-lms_ro}->imap_oid2($uri, $uid) // return; - my @docids = $self->{lse}->over->blob_exists($oid) or return; - $self->{lse}->kw_changed(undef, $kw, \@docids) or return; - my $lei = $self->{lei}; - $lei->qerr("# $oid => @$kw\n") if $lei->{opt}->{verbose}; - $self->{lei}->{sto}->ipc_do('set_eml_vmd', undef, - $vmd, \@docids); + } elsif (my $ikw = $self->{lei}->{ikw}) { # old message, kw only + # we send $uri as a bare SCALAR and not a URIimap ref to + # reduce socket traffic: + $ikw->wq_io_do('ck_update_kw', [], $$uri, $uid, $kw); } } @@ -71,15 +67,17 @@ sub do_import_index ($$@) { $lei->ale; # initialize for workers to read my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1; + my $ikw; if (my $net = $lei->{net}) { # $j = $net->net_concurrency($j); TODO if ($lei->{opt}->{incremental} // 1) { $net->{incremental} = 1; $net->{-lms_ro} = $sto->search->lms // 0; - if ($self->{-import_kw}) { + if ($self->{-import_kw} && $net->{-lms_ro} && + $net->{imap_order}) { + require PublicInbox::LeiImportKw; + $ikw = PublicInbox::LeiImportKw->new($lei); $net->{each_old} = 1; - $self->{-lms_ro} = $net->{-lms_ro}; - $self->{lse} = $sto->search; } } } else { @@ -93,6 +91,7 @@ sub do_import_index ($$@) { (my $op_c, $ops) = $lei->workers_start($self, $j, $ops); $lei->{wq1} = $self; $lei->{-err_type} = 'non-fatal'; + $ikw->wq_close(1) if $ikw; net_merge_all_done($self) unless $lei->{auth}; $op_c->op_wait_event($ops); } diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm new file mode 100644 index 00000000..2878cbdf --- /dev/null +++ b/lib/PublicInbox/LeiImportKw.pm @@ -0,0 +1,55 @@ +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ + +# WQ worker for dealing with LeiImport IMAP flags on already-imported messages +# WQ key: {ikw} +package PublicInbox::LeiImportKw; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC); + +sub new { + my ($cls, $lei) = @_; + my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls; + my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc); + $op_c->{ops} = $ops; # for PktOp->event_step + $lei->{ikw} = $self; +} + +sub ipc_atfork_child { + my ($self) = @_; + my $lei = $self->{lei}; + $lei->_lei_atfork_child; + my $net = delete $lei->{net} // die 'BUG: no lei->{net}'; + $self->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}'; + $self->{verbose} = $lei->{opt}->{verbose}; + $self->{lse} = $self->{sto}->search; + $self->{over} = $self->{lse}->over; + $self->{-lms_ro} = $net->{-lms_ro} || die 'BUG: net->{-lms_ro} FALSE'; + $self->SUPER::ipc_atfork_child; +} + +sub ck_update_kw { # via wq_io_do + my ($self, $url, $uid, $kw) = @_; + my $oidbin = $self->{-lms_ro}->imap_oidbin($url, $uid) // return; + my @docids = $self->{over}->oidbin_exists($oidbin) or return; + $self->{lse}->kw_changed(undef, $kw, \@docids) or return; + $self->{verbose} and + $self->{lei}->qerr('# '.unpack('H*', $oidbin)." => @$kw\n"); + $self->{sto}->ipc_do('set_eml_vmd', undef, { kw => $kw }, \@docids); +} + +sub ikw_done_wait { + my ($arg, $pid) = @_; + my ($self, $lei) = @$arg; + my $wait = $lei->{sto}->ipc_do('done'); + $lei->can('wq_done_wait')->($arg, $pid); +} + +sub _lei_wq_eof { # EOF callback for main lei daemon + my ($lei) = @_; + my $ikw = delete $lei->{ikw} or return $lei->fail; + $ikw->wq_wait_old(\&ikw_done_wait, $lei); +} + +1; diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm index 22ee1109..75603d89 100644 --- a/lib/PublicInbox/LeiMailSync.pm +++ b/lib/PublicInbox/LeiMailSync.pm @@ -54,6 +54,10 @@ CREATE TABLE IF NOT EXISTS blob2num ( UNIQUE (oidbin, fid, uid) ) + # speeds up LeiImport->ck_update_kw (for "lei import") by 5-6x: + $dbh->do(<<''); +CREATE INDEX IF NOT EXISTS idx_fid_uid ON blob2num(fid,uid) + $dbh->do(<<''); CREATE TABLE IF NOT EXISTS blob2name ( oidbin VARBINARY NOT NULL, @@ -361,15 +365,14 @@ sub forget_folder { $dbh->do('DELETE FROM folders WHERE fid = ?', undef, $fid); } -sub imap_oid2 ($$$) { - my ($self, $uri, $uid) = @_; # $uri MUST have UIDVALIDITY - my $fid = $self->{fmap}->{"$uri"} //= fid_for($self, "$uri") // return; +sub imap_oidbin ($$$) { + my ($self, $url, $uid) = @_; # $url MUST have UIDVALIDITY + my $fid = $self->{fmap}->{$url} //= fid_for($self, $url) // return; my $sth = $self->{dbh}->prepare_cached(<execute($fid, $uid); - my ($oidbin) = $sth->fetchrow_array; - $oidbin ? unpack('H*', $oidbin) : undef; + $sth->fetchrow_array; } sub imap_oid { @@ -384,10 +387,10 @@ sub imap_oid { } $lei->qerr(@{$err->{qerr}}) if $err->{qerr}; } - imap_oid2($self, $folders->[0], $uid_uri->uid); + my $oidbin = imap_oidbin($self, $folders->[0], $uid_uri->uid); + $oidbin ? unpack('H*', $oidbin) : undef; } - # FIXME: something with "lei " is causing uncommitted transaction # warnings, not sure what... sub DESTROY { diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index 39a8f7fc..058f4313 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -471,6 +471,7 @@ sub each_old_flags ($$$$) { while (my ($uid, $per_uid) = each %$r) { my $kw = flags2kw($self, $uri, $uid, $per_uid->{FLAGS}) // next; + # LeiImport->input_net_cb $eml_cb->($uri, $uid, $kw, undef, @args); } } diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm index 0e191c47..58fdea0e 100644 --- a/lib/PublicInbox/Over.pm +++ b/lib/PublicInbox/Over.pm @@ -349,13 +349,13 @@ sub check_inodes { } } -sub blob_exists { - my ($self, $oidhex) = @_; +sub oidbin_exists { + my ($self, $oidbin) = @_; if (wantarray) { my $sth = $self->dbh->prepare_cached(<<'', undef, 1); SELECT docid FROM xref3 WHERE oidbin = ? ORDER BY docid ASC - $sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB); + $sth->bind_param(1, $oidbin, SQL_BLOB); $sth->execute; my $tmp = $sth->fetchall_arrayref; map { $_->[0] } @$tmp; @@ -363,10 +363,12 @@ SELECT docid FROM xref3 WHERE oidbin = ? ORDER BY docid ASC my $sth = $self->dbh->prepare_cached(<<'', undef, 1); SELECT COUNT(*) FROM xref3 WHERE oidbin = ? - $sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB); + $sth->bind_param(1, $oidbin, SQL_BLOB); $sth->execute; $sth->fetchrow_array; } } +sub blob_exists { oidbin_exists($_[0], pack('H*', $_[1])) } + 1;