about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--MANIFEST1
-rw-r--r--lib/PublicInbox/LEI.pm11
-rw-r--r--lib/PublicInbox/LeiImport.pm36
-rw-r--r--lib/PublicInbox/LeiIndex.pm2
-rw-r--r--lib/PublicInbox/LeiInput.pm31
-rw-r--r--lib/PublicInbox/LeiMailSync.pm14
-rw-r--r--lib/PublicInbox/LeiPmdir.pm67
-rw-r--r--lib/PublicInbox/MdirReader.pm22
-rw-r--r--t/lei-import-maildir.t2
9 files changed, 148 insertions, 38 deletions
diff --git a/MANIFEST b/MANIFEST
index 5a70a144..7bdbf252 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -221,6 +221,7 @@ lib/PublicInbox/LeiMailSync.pm
 lib/PublicInbox/LeiMirror.pm
 lib/PublicInbox/LeiOverview.pm
 lib/PublicInbox/LeiP2q.pm
+lib/PublicInbox/LeiPmdir.pm
 lib/PublicInbox/LeiQuery.pm
 lib/PublicInbox/LeiRediff.pm
 lib/PublicInbox/LeiRemote.pm
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index ed01e8de..77fc5b8f 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -240,7 +240,7 @@ our %CMD = ( # sorted in order of importance/use:
          @c_opt ],
 'import' => [ 'LOCATION...|--stdin',
         'one-time import/update from URL or filesystem',
-        qw(stdin| offset=i recursive|r exclude=s include|I=s
+        qw(stdin| offset=i recursive|r exclude=s include|I=s jobs=s
         lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!),
         qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt(), @c_opt ],
 'forget-mail-sync' => [ 'LOCATION...',
@@ -421,7 +421,7 @@ my %CONFIG_KEYS = (
         'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1); # internal workers
 
 sub _drop_wq {
         my ($self) = @_;
@@ -566,7 +566,7 @@ sub pkt_op_pair {
 }
 
 sub workers_start {
-        my ($lei, $wq, $jobs, $ops) = @_;
+        my ($lei, $wq, $jobs, $ops, $flds) = @_;
         $ops = {
                 '!' => [ \&fail_handler, $lei ],
                 '|' => [ \&sigpipe_handler, $lei ],
@@ -577,7 +577,8 @@ sub workers_start {
         $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
         my $end = $lei->pkt_op_pair;
         my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
-        $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+        $flds->{lei} = $lei;
+        $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
         delete $lei->{pkt_op_p};
         my $op_c = delete $lei->{pkt_op_c};
         # {-lei_sock} persists script/lei process until ops->{''} EOF callback
@@ -590,7 +591,7 @@ sub workers_start {
 # call this when we're ready to wait on events and yield to other clients
 sub wait_wq_events {
         my ($lei, $op_c, $ops) = @_;
-        for my $wq (grep(defined, @$lei{qw(ikw)})) { # auxiliary WQs
+        for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
                 $wq->wq_close(1);
         }
         $op_c->{ops} = $ops;
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 222f75c8..b0e7ba6b 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -6,6 +6,7 @@ package PublicInbox::LeiImport;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::InboxWritable qw(eml_from_path);
 
 # /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass
 
@@ -28,17 +29,26 @@ sub input_mbox_cb { # MboxReader callback
         input_eml_cb($self, $eml, $vmd);
 }
 
-sub input_maildir_cb { # maildir_each_eml cb
-        my ($f, $kw, $eml, $self) = @_;
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+        my ($self, $f, @args) = @_;
+        my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or
+                die "BUG: $f was not from a Maildir?\n";
+        my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn);
+        return if index($fl, 'T') >= 0; # no Trashed messages
+        my $kw = PublicInbox::MdirReader::flags2kw($fl);
+        substr($folder, 0, 0) = 'maildir:'; # add prefix
+        my $lms = $self->{-lms_ro};
+        my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef;
+        my @docids = defined($oidbin) ?
+                        $self->{over}->oidbin_exists($oidbin) : ();
         my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
-        if ($self->{-mail_sync}) {
-                if ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) { # ugh...
-                        $vmd->{sync_info} = [ "maildir:$1", \(my $n = $2) ];
-                } else {
-                        warn "E: $f was not from a Maildir?\n";
-                }
+        if (scalar @docids) {
+                $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+        }
+        if (my $eml = eml_from_path($f)) {
+                $vmd->{sync_info} = [ $folder, \$bn ] if $self->{-mail_sync};
+                $self->input_eml_cb($eml, $vmd);
         }
-        $self->input_eml_cb($eml, $vmd);
 }
 
 sub input_net_cb { # imap_each / nntp_each
@@ -62,11 +72,13 @@ sub do_import_index ($$@) {
         my $vmd_mod = $self->vmd_mod_extract(\@inputs);
         return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
         $self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod;
-        $self->prepare_inputs($lei, \@inputs) or return;
+        $lei->ale; # initialize for workers to read (before LeiPmdir->new)
         $self->{-mail_sync} = $lei->{opt}->{'mail-sync'} // 1;
+        $self->prepare_inputs($lei, \@inputs) or return;
 
-        $lei->ale; # initialize for workers to read
-        my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+        my $j = $lei->{opt}->{jobs} // 0;
+        $j =~ /\A([0-9]+),[0-9]+\z/ and $j = $1 + 0;
+        $j ||= scalar(@{$self->{inputs}}) || 1;
         my $ikw;
         if (my $net = $lei->{net}) {
                 # $j = $net->net_concurrency($j); TODO
diff --git a/lib/PublicInbox/LeiIndex.pm b/lib/PublicInbox/LeiIndex.pm
index cc3e83e7..4be0c649 100644
--- a/lib/PublicInbox/LeiIndex.pm
+++ b/lib/PublicInbox/LeiIndex.pm
@@ -35,7 +35,7 @@ sub lei_index {
 
 no warnings 'once';
 no strict 'refs';
-for my $m (qw(input_maildir_cb input_net_cb)) {
+for my $m (qw(pmdir_cb input_net_cb)) {
         *$m = PublicInbox::LeiImport->can($m);
 }
 
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index 4ff7a379..24211bf0 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -151,9 +151,16 @@ sub input_path_url {
                 return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
 $input appears to be a maildir, not $ifmt
 EOM
-                PublicInbox::MdirReader->new->maildir_each_eml($input,
-                                        $self->can('input_maildir_cb'),
-                                        $self, @args);
+                my $mdr = PublicInbox::MdirReader->new;
+                if (my $pmd = $self->{pmd}) {
+                        $mdr->maildir_each_file($input,
+                                                $pmd->can('each_mdir_fn'),
+                                                $pmd, @args);
+                } else {
+                        $mdr->maildir_each_eml($input,
+                                                $self->can('input_maildir_cb'),
+                                                $self, @args);
+                }
         } else {
                 $lei->fail("$input unsupported (TODO)");
         }
@@ -215,7 +222,7 @@ sub prepare_inputs { # returns undef on error
                 push @{$sync->{no}}, '/dev/stdin' if $sync;
         }
         my $net = $lei->{net}; # NetWriter may be created by l2m
-        my (@f, @d);
+        my (@f, @md);
         # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX
         for my $input (@$inputs) {
                 my $input_path = $input;
@@ -247,11 +254,11 @@ sub prepare_inputs { # returns undef on error
                                 PublicInbox::MboxReader->reads($ifmt) or return
                                         $lei->fail("$ifmt not supported");
                         } elsif (-d $input_path) {
-                                require PublicInbox::MdirReader;
                                 $ifmt eq 'maildir' or return
                                         $lei->fail("$ifmt not supported");
                                 $sync and $input = 'maildir:'.
                                                 $lei->abs_path($input_path);
+                                push @md, $input;
                         } else {
                                 return $lei->fail("Unable to handle $input");
                         }
@@ -266,21 +273,18 @@ $input is `eml', not --in-format=$in_fmt
                         if ($devfd >= 0 || -f $input || -p _) {
                                 push @{$sync->{no}}, $input if $sync;
                                 push @f, $input;
-                        } elsif (-d $input) {
+                        } elsif (-d "$input/new" && -d "$input/cur") {
                                 if ($sync) {
                                         $input = $lei->abs_path($input);
                                         push @{$sync->{ok}}, $input;
                                 }
-                                push @d, $input;
+                                push @md, $input;
                         } else {
                                 return $lei->fail("Unable to handle $input")
                         }
                 }
         }
         if (@f) { check_input_format($lei, \@f) or return }
-        if (@d) { # TODO: check for MH vs Maildir, here
-                require PublicInbox::MdirReader;
-        }
         if ($sync && $sync->{no}) {
                 return $lei->fail(<<"") if !$sync->{ok};
 --mail-sync specified but no inputs support it
@@ -299,6 +303,13 @@ $input is `eml', not --in-format=$in_fmt
                 $lei->{auth} //= PublicInbox::LeiAuth->new;
                 $lei->{net} //= $net;
         }
+        if (scalar(@md)) {
+                require PublicInbox::MdirReader;
+                if ($self->can('pmdir_cb')) {
+                        require PublicInbox::LeiPmdir;
+                        $self->{pmd} = PublicInbox::LeiPmdir->new($lei, $self);
+                }
+        }
         $self->{inputs} = $inputs;
 }
 
diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm
index 75603d89..ec05404a 100644
--- a/lib/PublicInbox/LeiMailSync.pm
+++ b/lib/PublicInbox/LeiMailSync.pm
@@ -66,6 +66,10 @@ CREATE TABLE IF NOT EXISTS blob2name (
         UNIQUE (oidbin, fid, name)
 )
 
+        # speeds up LeiImport->pmdir_cb (for "lei import") by ~6x:
+        $dbh->do(<<'');
+CREATE INDEX IF NOT EXISTS idx_fid_name ON blob2name(fid,name)
+
 }
 
 sub fid_for {
@@ -375,6 +379,16 @@ EOM
         $sth->fetchrow_array;
 }
 
+sub name_oidbin ($$$) {
+        my ($self, $mdir, $nm) = @_;
+        my $fid = $self->{fmap}->{$mdir} //= fid_for($self, $mdir) // return;
+        my $sth = $self->{dbh}->prepare_cached(<<EOM, undef, 1);
+SELECT oidbin FROM blob2name WHERE fid = ? AND name = ?
+EOM
+        $sth->execute($fid, $nm);
+        $sth->fetchrow_array;
+}
+
 sub imap_oid {
         my ($self, $lei, $uid_uri) = @_;
         my $mailbox_uri = $uid_uri->clone;
diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm
new file mode 100644
index 00000000..5efb012e
--- /dev/null
+++ b/lib/PublicInbox/LeiPmdir.pm
@@ -0,0 +1,67 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# WQ worker for dealing with parallel Maildir reads;
+# this does NOT use the {shard_info} field of LeiToMail
+# (and we may remove {shard_info})
+# WQ key: {pmd}
+package PublicInbox::LeiPmdir;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+
+sub new {
+        my ($cls, $lei, $ipt) = @_;
+        my $self = bless { -wq_ident => 'lei Maildir worker' }, $cls;
+        my $jobs = $lei->{opt}->{jobs};
+        $jobs =~ /\A[0-9]+,([0-9]+)\z/ and $jobs = $1;
+        my $nproc = $jobs // do {
+                # untested with >=4 CPUs, though I suspect I/O latency
+                # of SATA SSD storage will make >=4 processes unnecessary,
+                # here.  NVMe users may wish to use '-j'
+                my $n = $self->detect_nproc;
+                $n = 4 if $n > 4;
+        };
+        my ($op_c, $ops) = $lei->workers_start($self, $nproc,
+                undef, { ipt => $ipt }); # LeiInput subclass
+        $op_c->{ops} = $ops; # for PktOp->event_step
+        $lei->{pmd} = $self;
+}
+
+sub ipc_atfork_child {
+        my ($self) = @_;
+        my $lei = $self->{lei};
+        $lei->_lei_atfork_child;
+        my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
+        $ipt->{lei} = $lei;
+        $ipt->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}';
+        $ipt->{lse} = $ipt->{sto}->search;
+        $ipt->{over} = $ipt->{lse}->over;
+        $ipt->{-lms_ro} //= $ipt->{lse}->lms; # may be undef or '0'
+        $self->SUPER::ipc_atfork_child;
+}
+
+sub each_mdir_fn { # maildir_each_file callback
+        my ($f, $self, @args) = @_;
+        $self->wq_io_do('mdir_iter', [], $f, @args);
+}
+
+sub mdir_iter { # via wq_io_do
+        my ($self, $f, @args) = @_;
+        $self->{ipt}->pmdir_cb($f, @args);
+}
+
+sub pmd_done_wait {
+        my ($arg, $pid) = @_;
+        my ($self, $lei) = @$arg;
+        my $wait = $lei->{sto}->ipc_do('done');
+        $lei->can('wq_done_wait')->($arg, $pid);
+}
+
+sub _lei_wq_eof { # EOF callback for main lei daemon
+        my ($lei) = @_;
+        my $pmd = delete $lei->{pmd} or return $lei->fail;
+        $pmd->wq_wait_old(\&pmd_done_wait, $lei);
+}
+
+1;
diff --git a/lib/PublicInbox/MdirReader.pm b/lib/PublicInbox/MdirReader.pm
index 304be63d..484bf0a8 100644
--- a/lib/PublicInbox/MdirReader.pm
+++ b/lib/PublicInbox/MdirReader.pm
@@ -87,17 +87,21 @@ sub maildir_each_eml {
 sub new { bless {}, __PACKAGE__ }
 
 sub flags2kw ($) {
-        my @unknown;
-        my %kw;
-        for (split(//, $_[0])) {
-                my $k = $c2kw{$_};
-                if (defined($k)) {
-                        $kw{$k} = 1;
-                } else {
-                        push @unknown, $_;
+        if (wantarray) {
+                my @unknown;
+                my %kw;
+                for (split(//, $_[0])) {
+                        my $k = $c2kw{$_};
+                        if (defined($k)) {
+                                $kw{$k} = 1;
+                        } else {
+                                push @unknown, $_;
+                        }
                 }
+                (\%kw, \@unknown);
+        } else {
+                [ sort(map { $c2kw{$_} // () } split(//, $_[0])) ];
         }
-        (\%kw, \@unknown);
 }
 
 1;
diff --git a/t/lei-import-maildir.t b/t/lei-import-maildir.t
index 688b10ce..c81e7805 100644
--- a/t/lei-import-maildir.t
+++ b/t/lei-import-maildir.t
@@ -28,7 +28,7 @@ test_lei(sub {
         is(scalar(keys %v), 1, 'inspect handles relative and absolute paths');
         my $inspect = json_utf8->decode([ keys %v ]->[0]);
         is_deeply($inspect, {"maildir:$md" => { 'name.count' => 1 }},
-                'inspect maildir: path had expected output');
+                'inspect maildir: path had expected output') or xbail($inspect);
 
         lei_ok(qw(q s:boolean));
         my $res = json_utf8->decode($lei_out);