about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiImport.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LeiImport.pm')
-rw-r--r--lib/PublicInbox/LeiImport.pm36
1 files changed, 24 insertions, 12 deletions
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 222f75c8..b0e7ba6b 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -6,6 +6,7 @@ package PublicInbox::LeiImport;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::InboxWritable qw(eml_from_path);
 
 # /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass
 
@@ -28,17 +29,26 @@ sub input_mbox_cb { # MboxReader callback
         input_eml_cb($self, $eml, $vmd);
 }
 
-sub input_maildir_cb { # maildir_each_eml cb
-        my ($f, $kw, $eml, $self) = @_;
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+        my ($self, $f, @args) = @_;
+        my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or
+                die "BUG: $f was not from a Maildir?\n";
+        my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn);
+        return if index($fl, 'T') >= 0; # no Trashed messages
+        my $kw = PublicInbox::MdirReader::flags2kw($fl);
+        substr($folder, 0, 0) = 'maildir:'; # add prefix
+        my $lms = $self->{-lms_ro};
+        my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef;
+        my @docids = defined($oidbin) ?
+                        $self->{over}->oidbin_exists($oidbin) : ();
         my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
-        if ($self->{-mail_sync}) {
-                if ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) { # ugh...
-                        $vmd->{sync_info} = [ "maildir:$1", \(my $n = $2) ];
-                } else {
-                        warn "E: $f was not from a Maildir?\n";
-                }
+        if (scalar @docids) {
+                $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+        }
+        if (my $eml = eml_from_path($f)) {
+                $vmd->{sync_info} = [ $folder, \$bn ] if $self->{-mail_sync};
+                $self->input_eml_cb($eml, $vmd);
         }
-        $self->input_eml_cb($eml, $vmd);
 }
 
 sub input_net_cb { # imap_each / nntp_each
@@ -62,11 +72,13 @@ sub do_import_index ($$@) {
         my $vmd_mod = $self->vmd_mod_extract(\@inputs);
         return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
         $self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod;
-        $self->prepare_inputs($lei, \@inputs) or return;
+        $lei->ale; # initialize for workers to read (before LeiPmdir->new)
         $self->{-mail_sync} = $lei->{opt}->{'mail-sync'} // 1;
+        $self->prepare_inputs($lei, \@inputs) or return;
 
-        $lei->ale; # initialize for workers to read
-        my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+        my $j = $lei->{opt}->{jobs} // 0;
+        $j =~ /\A([0-9]+),[0-9]+\z/ and $j = $1 + 0;
+        $j ||= scalar(@{$self->{inputs}}) || 1;
         my $ikw;
         if (my $net = $lei->{net}) {
                 # $j = $net->net_concurrency($j); TODO