diff options
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/LEI.pm | 11 | ||||
-rw-r--r-- | lib/PublicInbox/LeiImport.pm | 36 | ||||
-rw-r--r-- | lib/PublicInbox/LeiIndex.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LeiInput.pm | 31 | ||||
-rw-r--r-- | lib/PublicInbox/LeiMailSync.pm | 14 | ||||
-rw-r--r-- | lib/PublicInbox/LeiPmdir.pm | 67 | ||||
-rw-r--r-- | lib/PublicInbox/MdirReader.pm | 22 |
7 files changed, 146 insertions, 37 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index ed01e8de..77fc5b8f 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -240,7 +240,7 @@ our %CMD = ( # sorted in order of importance/use: @c_opt ], 'import' => [ 'LOCATION...|--stdin', 'one-time import/update from URL or filesystem', - qw(stdin| offset=i recursive|r exclude=s include|I=s + qw(stdin| offset=i recursive|r exclude=s include|I=s jobs=s lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!), qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt ], 'forget-mail-sync' => [ 'LOCATION...', @@ -421,7 +421,7 @@ my %CONFIG_KEYS = ( 'leistore.dir' => 'top-level storage location', ); -my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers +my @WQ_KEYS = qw(lxs l2m ikw pmd wq1); # internal workers sub _drop_wq { my ($self) = @_; @@ -566,7 +566,7 @@ sub pkt_op_pair { } sub workers_start { - my ($lei, $wq, $jobs, $ops) = @_; + my ($lei, $wq, $jobs, $ops, $flds) = @_; $ops = { '!' => [ \&fail_handler, $lei ], '|' => [ \&sigpipe_handler, $lei ], @@ -577,7 +577,8 @@ sub workers_start { $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ]; my $end = $lei->pkt_op_pair; my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker"; - $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei }); + $flds->{lei} = $lei; + $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds); delete $lei->{pkt_op_p}; my $op_c = delete $lei->{pkt_op_c}; # {-lei_sock} persists script/lei process until ops->{''} EOF callback @@ -590,7 +591,7 @@ sub workers_start { # call this when we're ready to wait on events and yield to other clients sub wait_wq_events { my ($lei, $op_c, $ops) = @_; - for my $wq (grep(defined, @$lei{qw(ikw)})) { # auxiliary WQs + for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs $wq->wq_close(1); } $op_c->{ops} = $ops; diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 222f75c8..b0e7ba6b 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -6,6 +6,7 @@ package PublicInbox::LeiImport; use strict; use v5.10.1; use parent qw(PublicInbox::IPC PublicInbox::LeiInput); +use PublicInbox::InboxWritable qw(eml_from_path); # /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass @@ -28,17 +29,26 @@ sub input_mbox_cb { # MboxReader callback input_eml_cb($self, $eml, $vmd); } -sub input_maildir_cb { # maildir_each_eml cb - my ($f, $kw, $eml, $self) = @_; +sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn + my ($self, $f, @args) = @_; + my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or + die "BUG: $f was not from a Maildir?\n"; + my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn); + return if index($fl, 'T') >= 0; # no Trashed messages + my $kw = PublicInbox::MdirReader::flags2kw($fl); + substr($folder, 0, 0) = 'maildir:'; # add prefix + my $lms = $self->{-lms_ro}; + my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef; + my @docids = defined($oidbin) ? + $self->{over}->oidbin_exists($oidbin) : (); my $vmd = $self->{-import_kw} ? { kw => $kw } : undef; - if ($self->{-mail_sync}) { - if ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) { # ugh... - $vmd->{sync_info} = [ "maildir:$1", \(my $n = $2) ]; - } else { - warn "E: $f was not from a Maildir?\n"; - } + if (scalar @docids) { + $self->{lse}->kw_changed(undef, $kw, \@docids) or return; + } + if (my $eml = eml_from_path($f)) { + $vmd->{sync_info} = [ $folder, \$bn ] if $self->{-mail_sync}; + $self->input_eml_cb($eml, $vmd); } - $self->input_eml_cb($eml, $vmd); } sub input_net_cb { # imap_each / nntp_each @@ -62,11 +72,13 @@ sub do_import_index ($$@) { my $vmd_mod = $self->vmd_mod_extract(\@inputs); return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err}; $self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod; - $self->prepare_inputs($lei, \@inputs) or return; + $lei->ale; # initialize for workers to read (before LeiPmdir->new) $self->{-mail_sync} = $lei->{opt}->{'mail-sync'} // 1; + $self->prepare_inputs($lei, \@inputs) or return; - $lei->ale; # initialize for workers to read - my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1; + my $j = $lei->{opt}->{jobs} // 0; + $j =~ /\A([0-9]+),[0-9]+\z/ and $j = $1 + 0; + $j ||= scalar(@{$self->{inputs}}) || 1; my $ikw; if (my $net = $lei->{net}) { # $j = $net->net_concurrency($j); TODO diff --git a/lib/PublicInbox/LeiIndex.pm b/lib/PublicInbox/LeiIndex.pm index cc3e83e7..4be0c649 100644 --- a/lib/PublicInbox/LeiIndex.pm +++ b/lib/PublicInbox/LeiIndex.pm @@ -35,7 +35,7 @@ sub lei_index { no warnings 'once'; no strict 'refs'; -for my $m (qw(input_maildir_cb input_net_cb)) { +for my $m (qw(pmdir_cb input_net_cb)) { *$m = PublicInbox::LeiImport->can($m); } diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index 4ff7a379..24211bf0 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -151,9 +151,16 @@ sub input_path_url { return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir'; $input appears to be a maildir, not $ifmt EOM - PublicInbox::MdirReader->new->maildir_each_eml($input, - $self->can('input_maildir_cb'), - $self, @args); + my $mdr = PublicInbox::MdirReader->new; + if (my $pmd = $self->{pmd}) { + $mdr->maildir_each_file($input, + $pmd->can('each_mdir_fn'), + $pmd, @args); + } else { + $mdr->maildir_each_eml($input, + $self->can('input_maildir_cb'), + $self, @args); + } } else { $lei->fail("$input unsupported (TODO)"); } @@ -215,7 +222,7 @@ sub prepare_inputs { # returns undef on error push @{$sync->{no}}, '/dev/stdin' if $sync; } my $net = $lei->{net}; # NetWriter may be created by l2m - my (@f, @d); + my (@f, @md); # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX for my $input (@$inputs) { my $input_path = $input; @@ -247,11 +254,11 @@ sub prepare_inputs { # returns undef on error PublicInbox::MboxReader->reads($ifmt) or return $lei->fail("$ifmt not supported"); } elsif (-d $input_path) { - require PublicInbox::MdirReader; $ifmt eq 'maildir' or return $lei->fail("$ifmt not supported"); $sync and $input = 'maildir:'. $lei->abs_path($input_path); + push @md, $input; } else { return $lei->fail("Unable to handle $input"); } @@ -266,21 +273,18 @@ $input is `eml', not --in-format=$in_fmt if ($devfd >= 0 || -f $input || -p _) { push @{$sync->{no}}, $input if $sync; push @f, $input; - } elsif (-d $input) { + } elsif (-d "$input/new" && -d "$input/cur") { if ($sync) { $input = $lei->abs_path($input); push @{$sync->{ok}}, $input; } - push @d, $input; + push @md, $input; } else { return $lei->fail("Unable to handle $input") } } } if (@f) { check_input_format($lei, \@f) or return } - if (@d) { # TODO: check for MH vs Maildir, here - require PublicInbox::MdirReader; - } if ($sync && $sync->{no}) { return $lei->fail(<<"") if !$sync->{ok}; --mail-sync specified but no inputs support it @@ -299,6 +303,13 @@ $input is `eml', not --in-format=$in_fmt $lei->{auth} //= PublicInbox::LeiAuth->new; $lei->{net} //= $net; } + if (scalar(@md)) { + require PublicInbox::MdirReader; + if ($self->can('pmdir_cb')) { + require PublicInbox::LeiPmdir; + $self->{pmd} = PublicInbox::LeiPmdir->new($lei, $self); + } + } $self->{inputs} = $inputs; } diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm index 75603d89..ec05404a 100644 --- a/lib/PublicInbox/LeiMailSync.pm +++ b/lib/PublicInbox/LeiMailSync.pm @@ -66,6 +66,10 @@ CREATE TABLE IF NOT EXISTS blob2name ( UNIQUE (oidbin, fid, name) ) + # speeds up LeiImport->pmdir_cb (for "lei import") by ~6x: + $dbh->do(<<''); +CREATE INDEX IF NOT EXISTS idx_fid_name ON blob2name(fid,name) + } sub fid_for { @@ -375,6 +379,16 @@ EOM $sth->fetchrow_array; } +sub name_oidbin ($$$) { + my ($self, $mdir, $nm) = @_; + my $fid = $self->{fmap}->{$mdir} //= fid_for($self, $mdir) // return; + my $sth = $self->{dbh}->prepare_cached(<<EOM, undef, 1); +SELECT oidbin FROM blob2name WHERE fid = ? AND name = ? +EOM + $sth->execute($fid, $nm); + $sth->fetchrow_array; +} + sub imap_oid { my ($self, $lei, $uid_uri) = @_; my $mailbox_uri = $uid_uri->clone; diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm new file mode 100644 index 00000000..5efb012e --- /dev/null +++ b/lib/PublicInbox/LeiPmdir.pm @@ -0,0 +1,67 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# WQ worker for dealing with parallel Maildir reads; +# this does NOT use the {shard_info} field of LeiToMail +# (and we may remove {shard_info}) +# WQ key: {pmd} +package PublicInbox::LeiPmdir; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC); + +sub new { + my ($cls, $lei, $ipt) = @_; + my $self = bless { -wq_ident => 'lei Maildir worker' }, $cls; + my $jobs = $lei->{opt}->{jobs}; + $jobs =~ /\A[0-9]+,([0-9]+)\z/ and $jobs = $1; + my $nproc = $jobs // do { + # untested with >=4 CPUs, though I suspect I/O latency + # of SATA SSD storage will make >=4 processes unnecessary, + # here. NVMe users may wish to use '-j' + my $n = $self->detect_nproc; + $n = 4 if $n > 4; + }; + my ($op_c, $ops) = $lei->workers_start($self, $nproc, + undef, { ipt => $ipt }); # LeiInput subclass + $op_c->{ops} = $ops; # for PktOp->event_step + $lei->{pmd} = $self; +} + +sub ipc_atfork_child { + my ($self) = @_; + my $lei = $self->{lei}; + $lei->_lei_atfork_child; + my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}'; + $ipt->{lei} = $lei; + $ipt->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}'; + $ipt->{lse} = $ipt->{sto}->search; + $ipt->{over} = $ipt->{lse}->over; + $ipt->{-lms_ro} //= $ipt->{lse}->lms; # may be undef or '0' + $self->SUPER::ipc_atfork_child; +} + +sub each_mdir_fn { # maildir_each_file callback + my ($f, $self, @args) = @_; + $self->wq_io_do('mdir_iter', [], $f, @args); +} + +sub mdir_iter { # via wq_io_do + my ($self, $f, @args) = @_; + $self->{ipt}->pmdir_cb($f, @args); +} + +sub pmd_done_wait { + my ($arg, $pid) = @_; + my ($self, $lei) = @$arg; + my $wait = $lei->{sto}->ipc_do('done'); + $lei->can('wq_done_wait')->($arg, $pid); +} + +sub _lei_wq_eof { # EOF callback for main lei daemon + my ($lei) = @_; + my $pmd = delete $lei->{pmd} or return $lei->fail; + $pmd->wq_wait_old(\&pmd_done_wait, $lei); +} + +1; diff --git a/lib/PublicInbox/MdirReader.pm b/lib/PublicInbox/MdirReader.pm index 304be63d..484bf0a8 100644 --- a/lib/PublicInbox/MdirReader.pm +++ b/lib/PublicInbox/MdirReader.pm @@ -87,17 +87,21 @@ sub maildir_each_eml { sub new { bless {}, __PACKAGE__ } sub flags2kw ($) { - my @unknown; - my %kw; - for (split(//, $_[0])) { - my $k = $c2kw{$_}; - if (defined($k)) { - $kw{$k} = 1; - } else { - push @unknown, $_; + if (wantarray) { + my @unknown; + my %kw; + for (split(//, $_[0])) { + my $k = $c2kw{$_}; + if (defined($k)) { + $kw{$k} = 1; + } else { + push @unknown, $_; + } } + (\%kw, \@unknown); + } else { + [ sort(map { $c2kw{$_} // () } split(//, $_[0])) ]; } - (\%kw, \@unknown); } 1; |