about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiInput.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LeiInput.pm')
-rw-r--r--lib/PublicInbox/LeiInput.pm244
1 files changed, 182 insertions, 62 deletions
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index 22bedba6..c388f7dc 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -1,10 +1,9 @@
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # parent class for LeiImport, LeiConvert, LeiIndex
 package PublicInbox::LeiInput;
-use strict;
-use v5.10.1;
+use v5.12;
 use PublicInbox::DS;
 use PublicInbox::Spawn qw(which popen_rd);
 use PublicInbox::InboxWritable qw(eml_from_path);
@@ -29,7 +28,9 @@ my %ERR = (
                 my ($label) = @_;
                 length($label) >= $L_MAX and
                         return "`$label' too long (must be <= $L_MAX)";
-                $label =~ m{\A[a-z0-9_](?:[a-z0-9_\-\./\@,]*[a-z0-9])?\z}i ?
+                $label =~ /[A-Z]/ and
+                        return "`$label' must be lowercase";
+                $label =~ m{\A[a-z0-9_](?:[a-z0-9_\-\./\@,]*[a-z0-9])?\z} ?
                         undef : "`$label' is invalid";
         },
         kw => sub {
@@ -57,19 +58,38 @@ sub check_input_format ($;$) {
         1;
 }
 
+sub input_mbox_cb { # base MboxReader callback
+        my ($eml, $self) = @_;
+        $eml->header_set($_) for (qw(Status X-Status));
+        $self->input_eml_cb($eml);
+}
+
+sub input_maildir_cb {
+        my ($fn, $kw, $eml, $self) = @_;
+        $self->input_eml_cb($eml);
+}
+
+sub input_mh_cb {
+        my ($dn, $n, $kw, $eml, $self) = @_;
+        $self->input_eml_cb($eml);
+}
+
+sub input_net_cb { # imap_each, nntp_each cb
+        my ($url, $uid, $kw, $eml, $self) = @_;
+        $self->input_eml_cb($eml);
+}
+
 # import a single file handle of $name
 # Subclass must define ->input_eml_cb and ->input_mbox_cb
 sub input_fh {
         my ($self, $ifmt, $fh, $name, @args) = @_;
         if ($ifmt eq 'eml') {
-                my $buf = do { local $/; <$fh> } //
-                        return $self->{lei}->child_error(0, <<"");
-error reading $name: $!
+                my $buf = eval { PublicInbox::IO::read_all $fh, 0 };
+                my $e = $@;
+                return $self->{lei}->child_error($?, <<"") if !$fh->close || $e;
+error reading $name: $! (\$?=$?) (\$@=$e)
 
-                # mutt pipes single RFC822 messages with a "From " line,
-                # but no Content-Length or "From " escaping.
-                # "git format-patch" also generates such files by default.
-                $buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+                PublicInbox::Eml::strip_from($buf);
 
                 # a user may feed just a body: git diff | lei rediff -U9
                 if ($self->{-force_eml}) {
@@ -96,15 +116,58 @@ sub handle_http_input ($$@) {
         push @$curl, '-s', @$curl_opt;
         my $cmd = $curl->for_uri($lei, $uri);
         $lei->qerr("# $cmd");
-        my $rdr = { 2 => $lei->{2}, pgid => 0 };
-        my ($fh, $pid) = popen_rd($cmd, undef, $rdr);
+        my $fh = popen_rd($cmd, undef, { 2 => $lei->{2} });
         grep(/\A--compressed\z/, @$curl) or
-                $fh = IO::Uncompress::Gunzip->new($fh, MultiStream => 1);
+                $fh = IO::Uncompress::Gunzip->new($fh,
+                                        MultiStream => 1, AutoClose => 1);
         eval { $self->input_fh('mboxrd', $fh, $url, @args) };
-        my $err = $@;
-        waitpid($pid, 0);
-        $? || $err and
-                $lei->child_error($?, "@$cmd failed".$err ? " $err" : '');
+        my $err = $@ ? ": $@" : '';
+        $lei->child_error($?, "@$cmd failed$err") if $err || $?;
+}
+
+sub oid2eml { # git->cat_async cb
+        my ($bref, $oid, $type, $size, $self) = @_;
+        if ($type eq 'blob') {
+                $self->input_eml_cb(PublicInbox::Eml->new($bref));
+        } else {
+                warn "W: $oid is type=$type\n";
+        }
+}
+
+sub each_ibx_eml_unindexed {
+        my ($self, $ibx, @args) = @_;
+        $ibx->isa('PublicInbox::Inbox') or return $self->{lei}->fail(<<EOM);
+unindexed extindex $ibx->{topdir} not supported
+EOM
+        require PublicInbox::SearchIdx;
+        my $n = $ibx->max_git_epoch;
+        my @g = defined($n) ? map { $ibx->git_epoch($_) } (0..$n) : ($ibx->git);
+        my $sync = { D => {}, ibx => $ibx }; # D => {} filters out deletes
+        my ($f, $at, $ct, $oid, $cmt);
+        for my $git (grep defined, @g) {
+                my $s = PublicInbox::SearchIdx::log2stack($sync, $git, 'HEAD');
+                while (($f, $at, $ct, $oid, $cmt) = $s->pop_rec) {
+                        $git->cat_async($oid, \&oid2eml, $self) if $f eq 'm';
+                }
+                $git->cleanup; # wait all
+        }
+}
+
+sub each_ibx_eml {
+        my ($self, $ibx, @args) = @_; # TODO: is @args used at all?
+        my $over = $ibx->over or return each_ibx_eml_unindexed(@_);
+        my $git = $ibx->git;
+        my $prev = 0;
+        my $smsg;
+        my $ids = $over->ids_after(\$prev);
+        while (@$ids) {
+                for (@$ids) {
+                        $smsg = $over->get_art($_) // next;
+                        $git->cat_async($smsg->{blob}, \&oid2eml, $self);
+                }
+                $ids = $over->ids_after(\$prev);
+        }
+        $git->cat_async_wait;
 }
 
 sub input_path_url {
@@ -132,7 +195,7 @@ sub input_path_url {
                 $ifmt = lc($1);
         } elsif ($input =~ /\.(?:patch|eml)\z/i) {
                 $ifmt = 'eml';
-        } elsif (-f $input && $input =~ m{\A(?:.+)/(?:new|cur)/([^/]+)\z}) {
+        } elsif ($input =~ m{\A(?:.+)/(?:new|cur)/([^/]+)\z} && -f $input) {
                 my $bn = $1;
                 my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn);
                 return if index($fl, 'T') >= 0;
@@ -146,6 +209,10 @@ sub input_path_url {
         my $devfd = $lei->path_to_fd($input) // return;
         if ($devfd >= 0) {
                 $self->input_fh($ifmt, $lei->{$devfd}, $input, @args);
+        } elsif ($devfd < 0 && $input =~ m{\A(.+/)([0-9]+)\z} && -f $input) {
+                my ($dn, $n) = ($1, $2);
+                my $mhr = PublicInbox::MHreader->new($dn, $lei->{3});
+                $mhr->mh_read_one($n, $self->can('input_mh_cb'), $self);
         } elsif (-f $input && $ifmt eq 'eml') {
                 open my $fh, '<', $input or
                                         return $lei->fail("open($input): $!");
@@ -160,12 +227,9 @@ sub input_path_url {
                         $mbl->{fh} =
                              PublicInbox::MboxReader::zsfxcat($in, $zsfx, $lei);
                 }
-                local $PublicInbox::DS::in_loop = 0 if $zsfx; # dwaitpid
+                local $PublicInbox::DS::in_loop = 0 if $zsfx; # awaitpid
                 $self->input_fh($ifmt, $mbl->{fh}, $input, @args);
-        } elsif (-d _ && (-d "$input/cur" || -d "$input/new")) {
-                return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
-$input appears to be a maildir, not $ifmt
-EOM
+        } elsif (-d _ && $ifmt eq 'maildir') {
                 my $mdr = PublicInbox::MdirReader->new;
                 if (my $pmd = $self->{pmd}) {
                         $mdr->maildir_each_file($input,
@@ -176,8 +240,27 @@ EOM
                                                 $self->can('input_maildir_cb'),
                                                 $self, @args);
                 }
+        } elsif (-d _ && $ifmt eq 'mh') {
+                my $mhr = PublicInbox::MHreader->new($input.'/', $lei->{3});
+                $mhr->{sort} = $lei->{opt}->{sort} // [ 'sequence'];
+                $mhr->mh_each_eml($self->can('input_mh_cb'), $self, @args);
+        } elsif (-d _ && $ifmt =~ /\A(?:v1|v2)\z/) {
+                my $ibx = PublicInbox::Inbox->new({inboxdir => $input});
+                each_ibx_eml($self, $ibx, @args);
+        } elsif (-d _ && $ifmt eq 'extindex') {
+                my $esrch = PublicInbox::ExtSearch->new($input);
+                each_ibx_eml($self, $esrch, @args);
         } elsif ($self->{missing_ok} && !-e $input) { # don't ->fail
-                $self->folder_missing("$ifmt:$input");
+                if ($lei->{cmd} eq 'p2q') {
+                        my $fp = [ qw(git format-patch --stdout -1), $input ];
+                        my $rdr = { 2 => $lei->{2} };
+                        my $fh = popen_rd($fp, undef, $rdr);
+                        eval { $self->input_fh('eml', $fh, $input, @args) };
+                        my $err = $@ ? ": $@" : '';
+                        $lei->child_error($?, "@$fp failed$err") if $err || $?;
+                } else {
+                        $self->folder_missing("$ifmt:$input");
+                }
         } else {
                 $lei->fail("$ifmt_pfx$input unsupported (TODO)");
         }
@@ -230,6 +313,17 @@ sub prepare_http_input ($$$) {
         $self->{"-curl-$url"} = [ @curl_opt, $uri ]; # for handle_http_input
 }
 
+sub add_dir ($$$$) {
+        my ($lei, $istate, $ifmt, $input) = @_;
+        if ($istate->{-may_sync}) {
+                $$input = "$ifmt:".$lei->abs_path($$input);
+                push @{$istate->{-sync}->{ok}}, $$input if $istate->{-sync};
+        } else {
+                substr($$input, 0, 0) = "$ifmt:"; # prefix
+        }
+        push @{$istate->{$ifmt}}, $$input;
+}
+
 sub prepare_inputs { # returns undef on error
         my ($self, $lei, $inputs) = @_;
         my $in_fmt = $lei->{opt}->{'in-format'};
@@ -243,7 +337,8 @@ sub prepare_inputs { # returns undef on error
                 push @{$sync->{no}}, '/dev/stdin' if $sync;
         }
         my $net = $lei->{net}; # NetWriter may be created by l2m
-        my (@f, @md);
+        my @f;
+        my $istate = { -sync => $sync, -may_sync => $may_sync };
         # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX
         for my $input (@$inputs) {
                 my $input_path = $input;
@@ -263,26 +358,24 @@ sub prepare_inputs { # returns undef on error
 --in-format=$in_fmt and `$ifmt:' conflict
 
                         }
-                        if ($ifmt =~ /\A(?:maildir|mh)\z/i) {
-                                push @{$sync->{ok}}, $input if $sync;
-                        } else {
-                                push @{$sync->{no}}, $input if $sync;
-                        }
+                        ($sync && $ifmt !~ /\A(?:maildir|mh)\z/i) and
+                                push(@{$sync->{no}}, $input);
                         my $devfd = $lei->path_to_fd($input_path) // return;
                         if ($devfd >= 0 || (-f $input_path || -p _)) {
                                 require PublicInbox::MboxLock;
                                 require PublicInbox::MboxReader;
                                 PublicInbox::MboxReader->reads($ifmt) or return
                                         $lei->fail("$ifmt not supported");
-                        } elsif (-d $input_path) {
-                                $ifmt eq 'maildir' or return
-                                        $lei->fail("$ifmt not supported");
-                                $may_sync and $input = 'maildir:'.
-                                                $lei->abs_path($input_path);
-                                push @md, $input;
-                        } elsif ($self->{missing_ok} && !-e _) {
+                        } elsif (-d $input_path) { # TODO extindex
+                                $ifmt =~ /\A(?:maildir|mh|v1|v2|extindex)\z/ or
+                                        return$lei->fail("$ifmt not supported");
+                                $input = $input_path;
+                                add_dir $lei, $istate, $ifmt, \$input;
+                        } elsif ($self->{missing_ok} &&
+                                        $ifmt =~ /\A(?:maildir|mh)\z/ &&
+                                        !-e $input_path) {
                                 # for "lei rm-watch" on missing Maildir
-                                $may_sync and $input = 'maildir:'.
+                                $may_sync and $input = "$ifmt:".
                                                 $lei->abs_path($input_path);
                         } else {
                                 my $m = "Unable to handle $input";
@@ -295,7 +388,7 @@ sub prepare_inputs { # returns undef on error
 $input is `eml', not --in-format=$in_fmt
 
                         push @{$sync->{no}}, $input if $sync;
-                } elsif (-f $input && $input =~ m{\A(.+)/(new|cur)/([^/]+)\z}) {
+                } elsif ($input =~ m{\A(.+)/(new|cur)/([^/]+)\z} && -f $input) {
                         # single file in a Maildir
                         my ($mdir, $nc, $bn) = ($1, $2, $3);
                         my $other = $mdir . ($nc eq 'new' ? '/cur' : '/new');
@@ -307,25 +400,44 @@ $input is `eml', not --in-format=$in_fmt
 
                         if ($sync) {
                                 $input = $lei->abs_path($mdir) . "/$nc/$bn";
-                                push @{$sync->{ok}}, $input if $sync;
+                                push @{$sync->{ok}}, $input;
                         }
                         require PublicInbox::MdirReader;
                 } else {
                         my $devfd = $lei->path_to_fd($input) // return;
-                        if ($devfd >= 0 || -f $input || -p _) {
+                        if ($devfd < 0 && $input =~ m{\A(.+)/([0-9]+)\z} &&
+                                        -f $input) { # single file in MH dir
+                                my ($mh, $n) = ($1, $2);
+                                lc($in_fmt//'eml') eq 'eml' or
+                                                return $lei->fail(<<"");
+$input is `eml', not --in-format=$in_fmt
+
+                                if ($sync) {
+                                        $input = $lei->abs_path($mh)."/$n";
+                                        push @{$sync->{ok}}, $input;
+                                }
+                                require PublicInbox::MHreader;
+                        } elsif ($devfd >= 0 || -f $input || -p _) {
                                 push @{$sync->{no}}, $input if $sync;
                                 push @f, $input;
                         } elsif (-d "$input/new" && -d "$input/cur") {
-                                if ($may_sync) {
+                                add_dir $lei, $istate, 'maildir', \$input;
+                        } elsif (-e "$input/inbox.lock") {
+                                add_dir $lei, $istate, 'v2', \$input;
+                        } elsif (-e "$input/ssoma.lock") {
+                                add_dir $lei, $istate, 'v1', \$input;
+                        } elsif (-e "$input/ei.lock") {
+                                add_dir $lei, $istate, 'extindex', \$input;
+                        } elsif (-f "$input/.mh_sequences") {
+                                add_dir $lei, $istate, 'mh', \$input;
+                        } elsif ($self->{missing_ok} && !-e $input) {
+                                if ($lei->{cmd} eq 'p2q') {
+                                        # will run "git format-patch"
+                                } elsif ($may_sync) { # for lei rm-watch
+                                        # FIXME: support MH, here
                                         $input = 'maildir:'.
                                                 $lei->abs_path($input);
-                                        push @{$sync->{ok}}, $input if $sync;
                                 }
-                                push @md, $input;
-                        } elsif ($self->{missing_ok} && !-e $input) {
-                                # for lei rm-watch
-                                $may_sync and $input = 'maildir:'.
-                                                $lei->abs_path($input);
                         } else {
                                 return $lei->fail("Unable to handle $input")
                         }
@@ -337,8 +449,8 @@ $input is `eml', not --in-format=$in_fmt
 --mail-sync specified but no inputs support it
 
                 # non-fatal if some inputs support support sync
-                $lei->err("# --mail-sync will only be used for @{$sync->{ok}}");
-                $lei->err("# --mail-sync is not supported for: @{$sync->{no}}");
+                warn("# --mail-sync will only be used for @{$sync->{ok}}\n");
+                warn("# --mail-sync is not supported for: @{$sync->{no}}\n");
         }
         if ($net) {
                 $net->{-can_die} = 1;
@@ -350,20 +462,29 @@ $input is `eml', not --in-format=$in_fmt
                 $lei->{auth} //= PublicInbox::LeiAuth->new;
                 $lei->{net} //= $net;
         }
-        if (scalar(@md)) {
+        if (my $md = $istate->{maildir}) {
                 require PublicInbox::MdirReader;
                 if ($self->can('pmdir_cb')) {
                         require PublicInbox::LeiPmdir;
                         $self->{pmd} = PublicInbox::LeiPmdir->new($lei, $self);
                 }
+                grep(!m!\Amaildir:/!i, @$md) and die "BUG: @$md (no pfx)";
 
                 # start watching Maildirs ASAP
                 if ($may_sync && $lei->{sto}) {
-                        grep(!m!\Amaildir:/!i, @md) and die "BUG: @md (no pfx)";
-                        $lei->lms(1)->lms_write_prepare->add_folders(@md);
+                        $lei->lms(1)->lms_write_prepare->add_folders(@$md);
                         $lei->refresh_watches;
                 }
         }
+        if (my $mh = $istate->{mh}) {
+                require PublicInbox::MHreader;
+                grep(!m!\Amh:!i, @$mh) and die "BUG: @$mh (no pfx)";
+                if ($may_sync && $lei->{sto}) {
+                        $lei->lms(1)->lms_write_prepare->add_folders(@$mh);
+                        # $lei->refresh_watches; TODO
+                }
+        }
+        require PublicInbox::ExtSearch if $istate->{extindex};
         $self->{inputs} = $inputs;
 }
 
@@ -378,7 +499,7 @@ sub process_inputs {
         }
         # always commit first, even on error partial work is acceptable for
         # lei <import|tag|convert>
-        my $wait = $self->{lei}->{sto}->wq_do('done') if $self->{lei}->{sto};
+        $self->{lei}->sto_barrier_request;
         $self->{lei}->fail($err) if $err;
 }
 
@@ -395,30 +516,29 @@ sub input_only_atfork_child {
 sub input_only_net_merge_all_done {
         my ($self) = @_;
         $self->wq_io_do('process_inputs');
-        $self->wq_close(1);
+        $self->wq_close;
 }
 
 # like Getopt::Long, but for +kw:FOO and -kw:FOO to prepare
 # for update_xvmd -> update_vmd
 # returns something like { "+L" => [ @Labels ], ... }
 sub vmd_mod_extract {
-        my $argv = $_[-1];
-        my $vmd_mod = {};
-        my @new_argv;
+        my ($lei, $argv) = @_;
+        my (@new_argv, @err);
         for my $x (@$argv) {
                 if ($x =~ /\A(\+|\-)(kw|L):(.+)\z/) {
                         my ($op, $pfx, $val) = ($1, $2, $3);
                         if (my $err = $ERR{$pfx}->($val)) {
-                                push @{$vmd_mod->{err}}, $err;
+                                push @err, $err;
                         } else { # set "+kw", "+L", "-L", "-kw"
-                                push @{$vmd_mod->{$op.$pfx}}, $val;
+                                push @{$lei->{vmd_mod}->{$op.$pfx}}, $val;
                         }
                 } else {
                         push @new_argv, $x;
                 }
         }
         @$argv = @new_argv;
-        $vmd_mod;
+        @err;
 }
 
 1;