diff options
Diffstat (limited to 'lib/PublicInbox/LeiInput.pm')
-rw-r--r-- | lib/PublicInbox/LeiInput.pm | 244 |
1 files changed, 182 insertions, 62 deletions
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index 22bedba6..c388f7dc 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -1,10 +1,9 @@ -# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # parent class for LeiImport, LeiConvert, LeiIndex package PublicInbox::LeiInput; -use strict; -use v5.10.1; +use v5.12; use PublicInbox::DS; use PublicInbox::Spawn qw(which popen_rd); use PublicInbox::InboxWritable qw(eml_from_path); @@ -29,7 +28,9 @@ my %ERR = ( my ($label) = @_; length($label) >= $L_MAX and return "`$label' too long (must be <= $L_MAX)"; - $label =~ m{\A[a-z0-9_](?:[a-z0-9_\-\./\@,]*[a-z0-9])?\z}i ? + $label =~ /[A-Z]/ and + return "`$label' must be lowercase"; + $label =~ m{\A[a-z0-9_](?:[a-z0-9_\-\./\@,]*[a-z0-9])?\z} ? undef : "`$label' is invalid"; }, kw => sub { @@ -57,19 +58,38 @@ sub check_input_format ($;$) { 1; } +sub input_mbox_cb { # base MboxReader callback + my ($eml, $self) = @_; + $eml->header_set($_) for (qw(Status X-Status)); + $self->input_eml_cb($eml); +} + +sub input_maildir_cb { + my ($fn, $kw, $eml, $self) = @_; + $self->input_eml_cb($eml); +} + +sub input_mh_cb { + my ($dn, $n, $kw, $eml, $self) = @_; + $self->input_eml_cb($eml); +} + +sub input_net_cb { # imap_each, nntp_each cb + my ($url, $uid, $kw, $eml, $self) = @_; + $self->input_eml_cb($eml); +} + # import a single file handle of $name # Subclass must define ->input_eml_cb and ->input_mbox_cb sub input_fh { my ($self, $ifmt, $fh, $name, @args) = @_; if ($ifmt eq 'eml') { - my $buf = do { local $/; <$fh> } // - return $self->{lei}->child_error(0, <<""); -error reading $name: $! + my $buf = eval { PublicInbox::IO::read_all $fh, 0 }; + my $e = $@; + return $self->{lei}->child_error($?, <<"") if !$fh->close || $e; +error reading $name: $! (\$?=$?) (\$@=$e) - # mutt pipes single RFC822 messages with a "From " line, - # but no Content-Length or "From " escaping. - # "git format-patch" also generates such files by default. - $buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s; + PublicInbox::Eml::strip_from($buf); # a user may feed just a body: git diff | lei rediff -U9 if ($self->{-force_eml}) { @@ -96,15 +116,58 @@ sub handle_http_input ($$@) { push @$curl, '-s', @$curl_opt; my $cmd = $curl->for_uri($lei, $uri); $lei->qerr("# $cmd"); - my $rdr = { 2 => $lei->{2}, pgid => 0 }; - my ($fh, $pid) = popen_rd($cmd, undef, $rdr); + my $fh = popen_rd($cmd, undef, { 2 => $lei->{2} }); grep(/\A--compressed\z/, @$curl) or - $fh = IO::Uncompress::Gunzip->new($fh, MultiStream => 1); + $fh = IO::Uncompress::Gunzip->new($fh, + MultiStream => 1, AutoClose => 1); eval { $self->input_fh('mboxrd', $fh, $url, @args) }; - my $err = $@; - waitpid($pid, 0); - $? || $err and - $lei->child_error($?, "@$cmd failed".$err ? " $err" : ''); + my $err = $@ ? ": $@" : ''; + $lei->child_error($?, "@$cmd failed$err") if $err || $?; +} + +sub oid2eml { # git->cat_async cb + my ($bref, $oid, $type, $size, $self) = @_; + if ($type eq 'blob') { + $self->input_eml_cb(PublicInbox::Eml->new($bref)); + } else { + warn "W: $oid is type=$type\n"; + } +} + +sub each_ibx_eml_unindexed { + my ($self, $ibx, @args) = @_; + $ibx->isa('PublicInbox::Inbox') or return $self->{lei}->fail(<<EOM); +unindexed extindex $ibx->{topdir} not supported +EOM + require PublicInbox::SearchIdx; + my $n = $ibx->max_git_epoch; + my @g = defined($n) ? map { $ibx->git_epoch($_) } (0..$n) : ($ibx->git); + my $sync = { D => {}, ibx => $ibx }; # D => {} filters out deletes + my ($f, $at, $ct, $oid, $cmt); + for my $git (grep defined, @g) { + my $s = PublicInbox::SearchIdx::log2stack($sync, $git, 'HEAD'); + while (($f, $at, $ct, $oid, $cmt) = $s->pop_rec) { + $git->cat_async($oid, \&oid2eml, $self) if $f eq 'm'; + } + $git->cleanup; # wait all + } +} + +sub each_ibx_eml { + my ($self, $ibx, @args) = @_; # TODO: is @args used at all? + my $over = $ibx->over or return each_ibx_eml_unindexed(@_); + my $git = $ibx->git; + my $prev = 0; + my $smsg; + my $ids = $over->ids_after(\$prev); + while (@$ids) { + for (@$ids) { + $smsg = $over->get_art($_) // next; + $git->cat_async($smsg->{blob}, \&oid2eml, $self); + } + $ids = $over->ids_after(\$prev); + } + $git->cat_async_wait; } sub input_path_url { @@ -132,7 +195,7 @@ sub input_path_url { $ifmt = lc($1); } elsif ($input =~ /\.(?:patch|eml)\z/i) { $ifmt = 'eml'; - } elsif (-f $input && $input =~ m{\A(?:.+)/(?:new|cur)/([^/]+)\z}) { + } elsif ($input =~ m{\A(?:.+)/(?:new|cur)/([^/]+)\z} && -f $input) { my $bn = $1; my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn); return if index($fl, 'T') >= 0; @@ -146,6 +209,10 @@ sub input_path_url { my $devfd = $lei->path_to_fd($input) // return; if ($devfd >= 0) { $self->input_fh($ifmt, $lei->{$devfd}, $input, @args); + } elsif ($devfd < 0 && $input =~ m{\A(.+/)([0-9]+)\z} && -f $input) { + my ($dn, $n) = ($1, $2); + my $mhr = PublicInbox::MHreader->new($dn, $lei->{3}); + $mhr->mh_read_one($n, $self->can('input_mh_cb'), $self); } elsif (-f $input && $ifmt eq 'eml') { open my $fh, '<', $input or return $lei->fail("open($input): $!"); @@ -160,12 +227,9 @@ sub input_path_url { $mbl->{fh} = PublicInbox::MboxReader::zsfxcat($in, $zsfx, $lei); } - local $PublicInbox::DS::in_loop = 0 if $zsfx; # dwaitpid + local $PublicInbox::DS::in_loop = 0 if $zsfx; # awaitpid $self->input_fh($ifmt, $mbl->{fh}, $input, @args); - } elsif (-d _ && (-d "$input/cur" || -d "$input/new")) { - return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir'; -$input appears to be a maildir, not $ifmt -EOM + } elsif (-d _ && $ifmt eq 'maildir') { my $mdr = PublicInbox::MdirReader->new; if (my $pmd = $self->{pmd}) { $mdr->maildir_each_file($input, @@ -176,8 +240,27 @@ EOM $self->can('input_maildir_cb'), $self, @args); } + } elsif (-d _ && $ifmt eq 'mh') { + my $mhr = PublicInbox::MHreader->new($input.'/', $lei->{3}); + $mhr->{sort} = $lei->{opt}->{sort} // [ 'sequence']; + $mhr->mh_each_eml($self->can('input_mh_cb'), $self, @args); + } elsif (-d _ && $ifmt =~ /\A(?:v1|v2)\z/) { + my $ibx = PublicInbox::Inbox->new({inboxdir => $input}); + each_ibx_eml($self, $ibx, @args); + } elsif (-d _ && $ifmt eq 'extindex') { + my $esrch = PublicInbox::ExtSearch->new($input); + each_ibx_eml($self, $esrch, @args); } elsif ($self->{missing_ok} && !-e $input) { # don't ->fail - $self->folder_missing("$ifmt:$input"); + if ($lei->{cmd} eq 'p2q') { + my $fp = [ qw(git format-patch --stdout -1), $input ]; + my $rdr = { 2 => $lei->{2} }; + my $fh = popen_rd($fp, undef, $rdr); + eval { $self->input_fh('eml', $fh, $input, @args) }; + my $err = $@ ? ": $@" : ''; + $lei->child_error($?, "@$fp failed$err") if $err || $?; + } else { + $self->folder_missing("$ifmt:$input"); + } } else { $lei->fail("$ifmt_pfx$input unsupported (TODO)"); } @@ -230,6 +313,17 @@ sub prepare_http_input ($$$) { $self->{"-curl-$url"} = [ @curl_opt, $uri ]; # for handle_http_input } +sub add_dir ($$$$) { + my ($lei, $istate, $ifmt, $input) = @_; + if ($istate->{-may_sync}) { + $$input = "$ifmt:".$lei->abs_path($$input); + push @{$istate->{-sync}->{ok}}, $$input if $istate->{-sync}; + } else { + substr($$input, 0, 0) = "$ifmt:"; # prefix + } + push @{$istate->{$ifmt}}, $$input; +} + sub prepare_inputs { # returns undef on error my ($self, $lei, $inputs) = @_; my $in_fmt = $lei->{opt}->{'in-format'}; @@ -243,7 +337,8 @@ sub prepare_inputs { # returns undef on error push @{$sync->{no}}, '/dev/stdin' if $sync; } my $net = $lei->{net}; # NetWriter may be created by l2m - my (@f, @md); + my @f; + my $istate = { -sync => $sync, -may_sync => $may_sync }; # e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX for my $input (@$inputs) { my $input_path = $input; @@ -263,26 +358,24 @@ sub prepare_inputs { # returns undef on error --in-format=$in_fmt and `$ifmt:' conflict } - if ($ifmt =~ /\A(?:maildir|mh)\z/i) { - push @{$sync->{ok}}, $input if $sync; - } else { - push @{$sync->{no}}, $input if $sync; - } + ($sync && $ifmt !~ /\A(?:maildir|mh)\z/i) and + push(@{$sync->{no}}, $input); my $devfd = $lei->path_to_fd($input_path) // return; if ($devfd >= 0 || (-f $input_path || -p _)) { require PublicInbox::MboxLock; require PublicInbox::MboxReader; PublicInbox::MboxReader->reads($ifmt) or return $lei->fail("$ifmt not supported"); - } elsif (-d $input_path) { - $ifmt eq 'maildir' or return - $lei->fail("$ifmt not supported"); - $may_sync and $input = 'maildir:'. - $lei->abs_path($input_path); - push @md, $input; - } elsif ($self->{missing_ok} && !-e _) { + } elsif (-d $input_path) { # TODO extindex + $ifmt =~ /\A(?:maildir|mh|v1|v2|extindex)\z/ or + return$lei->fail("$ifmt not supported"); + $input = $input_path; + add_dir $lei, $istate, $ifmt, \$input; + } elsif ($self->{missing_ok} && + $ifmt =~ /\A(?:maildir|mh)\z/ && + !-e $input_path) { # for "lei rm-watch" on missing Maildir - $may_sync and $input = 'maildir:'. + $may_sync and $input = "$ifmt:". $lei->abs_path($input_path); } else { my $m = "Unable to handle $input"; @@ -295,7 +388,7 @@ sub prepare_inputs { # returns undef on error $input is `eml', not --in-format=$in_fmt push @{$sync->{no}}, $input if $sync; - } elsif (-f $input && $input =~ m{\A(.+)/(new|cur)/([^/]+)\z}) { + } elsif ($input =~ m{\A(.+)/(new|cur)/([^/]+)\z} && -f $input) { # single file in a Maildir my ($mdir, $nc, $bn) = ($1, $2, $3); my $other = $mdir . ($nc eq 'new' ? '/cur' : '/new'); @@ -307,25 +400,44 @@ $input is `eml', not --in-format=$in_fmt if ($sync) { $input = $lei->abs_path($mdir) . "/$nc/$bn"; - push @{$sync->{ok}}, $input if $sync; + push @{$sync->{ok}}, $input; } require PublicInbox::MdirReader; } else { my $devfd = $lei->path_to_fd($input) // return; - if ($devfd >= 0 || -f $input || -p _) { + if ($devfd < 0 && $input =~ m{\A(.+)/([0-9]+)\z} && + -f $input) { # single file in MH dir + my ($mh, $n) = ($1, $2); + lc($in_fmt//'eml') eq 'eml' or + return $lei->fail(<<""); +$input is `eml', not --in-format=$in_fmt + + if ($sync) { + $input = $lei->abs_path($mh)."/$n"; + push @{$sync->{ok}}, $input; + } + require PublicInbox::MHreader; + } elsif ($devfd >= 0 || -f $input || -p _) { push @{$sync->{no}}, $input if $sync; push @f, $input; } elsif (-d "$input/new" && -d "$input/cur") { - if ($may_sync) { + add_dir $lei, $istate, 'maildir', \$input; + } elsif (-e "$input/inbox.lock") { + add_dir $lei, $istate, 'v2', \$input; + } elsif (-e "$input/ssoma.lock") { + add_dir $lei, $istate, 'v1', \$input; + } elsif (-e "$input/ei.lock") { + add_dir $lei, $istate, 'extindex', \$input; + } elsif (-f "$input/.mh_sequences") { + add_dir $lei, $istate, 'mh', \$input; + } elsif ($self->{missing_ok} && !-e $input) { + if ($lei->{cmd} eq 'p2q') { + # will run "git format-patch" + } elsif ($may_sync) { # for lei rm-watch + # FIXME: support MH, here $input = 'maildir:'. $lei->abs_path($input); - push @{$sync->{ok}}, $input if $sync; } - push @md, $input; - } elsif ($self->{missing_ok} && !-e $input) { - # for lei rm-watch - $may_sync and $input = 'maildir:'. - $lei->abs_path($input); } else { return $lei->fail("Unable to handle $input") } @@ -337,8 +449,8 @@ $input is `eml', not --in-format=$in_fmt --mail-sync specified but no inputs support it # non-fatal if some inputs support support sync - $lei->err("# --mail-sync will only be used for @{$sync->{ok}}"); - $lei->err("# --mail-sync is not supported for: @{$sync->{no}}"); + warn("# --mail-sync will only be used for @{$sync->{ok}}\n"); + warn("# --mail-sync is not supported for: @{$sync->{no}}\n"); } if ($net) { $net->{-can_die} = 1; @@ -350,20 +462,29 @@ $input is `eml', not --in-format=$in_fmt $lei->{auth} //= PublicInbox::LeiAuth->new; $lei->{net} //= $net; } - if (scalar(@md)) { + if (my $md = $istate->{maildir}) { require PublicInbox::MdirReader; if ($self->can('pmdir_cb')) { require PublicInbox::LeiPmdir; $self->{pmd} = PublicInbox::LeiPmdir->new($lei, $self); } + grep(!m!\Amaildir:/!i, @$md) and die "BUG: @$md (no pfx)"; # start watching Maildirs ASAP if ($may_sync && $lei->{sto}) { - grep(!m!\Amaildir:/!i, @md) and die "BUG: @md (no pfx)"; - $lei->lms(1)->lms_write_prepare->add_folders(@md); + $lei->lms(1)->lms_write_prepare->add_folders(@$md); $lei->refresh_watches; } } + if (my $mh = $istate->{mh}) { + require PublicInbox::MHreader; + grep(!m!\Amh:!i, @$mh) and die "BUG: @$mh (no pfx)"; + if ($may_sync && $lei->{sto}) { + $lei->lms(1)->lms_write_prepare->add_folders(@$mh); + # $lei->refresh_watches; TODO + } + } + require PublicInbox::ExtSearch if $istate->{extindex}; $self->{inputs} = $inputs; } @@ -378,7 +499,7 @@ sub process_inputs { } # always commit first, even on error partial work is acceptable for # lei <import|tag|convert> - my $wait = $self->{lei}->{sto}->wq_do('done') if $self->{lei}->{sto}; + $self->{lei}->sto_barrier_request; $self->{lei}->fail($err) if $err; } @@ -395,30 +516,29 @@ sub input_only_atfork_child { sub input_only_net_merge_all_done { my ($self) = @_; $self->wq_io_do('process_inputs'); - $self->wq_close(1); + $self->wq_close; } # like Getopt::Long, but for +kw:FOO and -kw:FOO to prepare # for update_xvmd -> update_vmd # returns something like { "+L" => [ @Labels ], ... } sub vmd_mod_extract { - my $argv = $_[-1]; - my $vmd_mod = {}; - my @new_argv; + my ($lei, $argv) = @_; + my (@new_argv, @err); for my $x (@$argv) { if ($x =~ /\A(\+|\-)(kw|L):(.+)\z/) { my ($op, $pfx, $val) = ($1, $2, $3); if (my $err = $ERR{$pfx}->($val)) { - push @{$vmd_mod->{err}}, $err; + push @err, $err; } else { # set "+kw", "+L", "-L", "-kw" - push @{$vmd_mod->{$op.$pfx}}, $val; + push @{$lei->{vmd_mod}->{$op.$pfx}}, $val; } } else { push @new_argv, $x; } } @$argv = @new_argv; - $vmd_mod; + @err; } 1; |