From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id B923D1FA01 for ; Thu, 4 Mar 2021 09:03:16 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 4/6] lei q: --import-augment for mbox and mbox.gz Date: Thu, 4 Mar 2021 17:03:14 +0800 Message-Id: <20210304090316.9568-5-e@80x24.org> In-Reply-To: <20210304090316.9568-1-e@80x24.org> References: <20210304090316.9568-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: The trickiest output formats we support due to the possibility of filesystem FIFOS and pipes for . This completes another phase of keyword sync support. --- lib/PublicInbox/LeiToMail.pm | 65 ++++++++++++++++++++++--------- t/lei-q-kw.t | 74 +++++++++++++++++++++++++++++++++++- 2 files changed, 119 insertions(+), 20 deletions(-) diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index b3228a59..6290f35e 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -246,6 +246,13 @@ sub _augment { # MboxReader eml_cb $lei->{dedupe}->is_dup($eml); } +sub _mbox_augment_kw_maybe { + my ($eml, $lei, $lse, $augment) = @_; + my @kw = PublicInbox::LeiStore::mbox_keywords($eml); + update_kw_maybe($lei, $lse, $eml, \@kw); + _augment($eml, $lei) if $augment; +} + sub _mbox_write_cb ($$) { my ($self, $lei) = @_; my $ovv = $lei->{ovv}; @@ -391,7 +398,7 @@ sub new { "$dst exists and is not a directory\n"; $lei->{ovv}->{dst} = $dst .= '/' if substr($dst, -1) ne '/'; } elsif (substr($fmt, 0, 4) eq 'mbox') { - require PublicInbox::MboxReader if $lei->{opt}->{augment}; + require PublicInbox::MboxReader; (-d $dst || (-e _ && !-w _)) and die "$dst exists and is not a writable file\n"; $self->can("eml2$fmt") or die "bad mbox format: $fmt\n"; @@ -485,8 +492,8 @@ sub _do_augment_imap { sub _pre_augment_mbox { my ($self, $lei) = @_; my $dst = $lei->{ovv}->{dst}; + my $out = $lei->{1}; if ($dst ne '/dev/stdout') { - my $out; if (-p $dst) { open $out, '>', $dst or die "open($dst): $!"; } elsif (-f _ || !-e _) { @@ -495,36 +502,56 @@ sub _pre_augment_mbox { PublicInbox::MboxLock->defaults; $self->{mbl} = PublicInbox::MboxLock->acq($dst, 1, $m); $out = $self->{mbl}->{fh}; - if (!$lei->{opt}->{augment} and !truncate($out, 0)) { - die "truncate($dst): $!"; - } } $lei->{old_1} = $lei->{1}; # keep for spawning MUA - $lei->{1} = $out; } # Perl does SEEK_END even with O_APPEND :< - $self->{seekable} = seek($lei->{1}, 0, SEEK_SET); + $self->{seekable} = seek($out, 0, SEEK_SET); if (!$self->{seekable} && $! != ESPIPE && $dst ne '/dev/stdout') { die "seek($dst): $!\n"; } + if (!$self->{seekable}) { + my $ia = $lei->{opt}->{'import-augment'}; + die "--import-augment specified but $dst is not seekable\n" + if $ia && !ref($ia); + die "--augment specified but $dst is not seekable\n" if + $lei->{opt}->{augment}; + } state $zsfx_allow = join('|', keys %zsfx2cmd); - ($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/) or return; - pipe(my ($r, $w)) or die "pipe: $!"; - $lei->{zpipe} = [ $r, $w ]; + if (($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/)) { + pipe(my ($r, $w)) or die "pipe: $!"; + $lei->{zpipe} = [ $r, $w ]; + } + $lei->{1} = $out; + undef; } sub _do_augment_mbox { my ($self, $lei) = @_; - return if !$lei->{opt}->{augment}; - my $dedupe = $lei->{dedupe}; - my $dst = $lei->{ovv}->{dst}; - die "cannot augment $dst, not seekable\n" if !$self->{seekable}; + return unless $self->{seekable}; + my $opt = $lei->{opt}; my $out = $lei->{1}; - if (-s $out && $dedupe && $dedupe->prepare_dedupe) { - my $zsfx = $self->{zsfx}; - my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : - dup_src($out); - my $fmt = $lei->{ovv}->{fmt}; + my ($fmt, $dst) = @{$lei->{ovv}}{qw(fmt dst)}; + return unless -s $out; + unless ($opt->{augment} || $opt->{'import-augment'}) { + truncate($out, 0) or die "truncate($dst): $!"; + return; + } + my $zsfx = $self->{zsfx}; + my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : dup_src($out); + my $dedupe; + if ($opt->{augment}) { + $dedupe = $lei->{dedupe}; + $dedupe->prepare_dedupe if $dedupe; + } + if ($opt->{'import-augment'}) { # the default + my $lse = $lei->{sto}->search; + PublicInbox::MboxReader->$fmt($rd, \&_mbox_augment_kw_maybe, + $lei, $lse, $opt->{augment}); + if (!$opt->{augment} and !truncate($out, 0)) { + die "truncate($dst): $!"; + } + } else { # --augment --no-import-augment PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei); } # maybe some systems don't honor O_APPEND, Perl does this: diff --git a/t/lei-q-kw.t b/t/lei-q-kw.t index 97b2e08f..babe9749 100644 --- a/t/lei-q-kw.t +++ b/t/lei-q-kw.t @@ -2,6 +2,12 @@ # Copyright (C) 2020-2021 all contributors # License: AGPL-3.0+ use strict; use v5.10.1; use PublicInbox::TestCommon; +use POSIX qw(mkfifo); +use Fcntl qw(SEEK_SET O_RDONLY O_NONBLOCK); +use IO::Uncompress::Gunzip qw(gunzip); +use IO::Compress::Gzip qw(gzip); +use PublicInbox::MboxReader; +use PublicInbox::Spawn qw(popen_rd); test_lei(sub { lei_ok(qw(import -F eml t/plack-qp.eml)); my $o = "$ENV{HOME}/dst"; @@ -28,6 +34,72 @@ lei_ok(qw(q -o), "maildir:$o", qw(m:qp@example.com)); @fn = glob("$o/cur/*:2,S"); is(scalar(@fn), 1, "`seen' flag (but not `replied') set on Maildir file"); -# TODO: other destination types +SKIP: { + $o = "$ENV{HOME}/fifo"; + mkfifo($o, 0600) or skip("mkfifo not supported: $!", 1); + # cat(1) since lei() may not execve for FD_CLOEXEC to work + my $cat = popen_rd(['cat', $o]); + ok(!lei(qw(q --import-augment bogus -o), "mboxrd:$o"), + '--import-augment fails on non-seekable output'); + is(do { local $/; <$cat> }, '', 'no output on FIFO'); +}; + +lei_ok qw(import -F eml t/utf8.eml), \'for augment test'; +my $read_file = sub { + if ($_[0] =~ /\.gz\z/) { + gunzip($_[0] => \(my $buf = ''), MultiStream => 1) or + BAIL_OUT 'gunzip'; + $buf; + } else { + open my $fh, '+<', $_[0] or BAIL_OUT $!; + do { local $/; <$fh> }; + } +}; + +my $write_file = sub { + if ($_[0] =~ /\.gz\z/) { + gzip(\($_[1]), $_[0]) or BAIL_OUT 'gzip'; + } else { + open my $fh, '>', $_[0] or BAIL_OUT $!; + print $fh $_[1] or BAIL_OUT $!; + close $fh or BAIL_OUT; + } +}; + +my $exp = { + '' => eml_load('t/plack-qp.eml'), + '' => eml_load('t/utf8.eml'), +}; +$exp->{''}->header_set('Status', 'OR'); +$exp->{''}->header_set('Status', 'O'); +for my $sfx ('', '.gz') { + $o = "$ENV{HOME}/dst.mboxrd$sfx"; + lei_ok(qw(q -o), "mboxrd:$o", qw(m:qp@example.com)); + my $buf = $read_file->($o); + $buf =~ s/^Status: [^\n]*\n//sm or BAIL_OUT "no status in $buf"; + $write_file->($o, $buf); + lei_ok(qw(q -o), "mboxrd:$o", qw(rereadandimportkwchange)); + $buf = $read_file->($o); + is($buf, '', 'emptied'); + lei_ok(qw(q -o), "mboxrd:$o", qw(m:qp@example.com)); + $buf = $read_file->($o); + $buf =~ s/\nStatus: O\n\n/\nStatus: OR\n\n/s or + BAIL_OUT "no Status in $buf"; + $write_file->($o, $buf); + lei_ok(qw(q -a -o), "mboxrd:$o", qw(m:testmessage@example.com)); + $buf = $read_file->($o); + open my $fh, '<', \$buf or BAIL_OUT "PerlIO::scalar $!"; + my %res; + PublicInbox::MboxReader->mboxrd($fh, sub { + my ($eml) = @_; + $res{$eml->header_raw('Message-ID')} = $eml; + }); + is_deeply(\%res, $exp, '--augment worked'); + + lei_ok(qw(q -o), "mboxrd:/dev/stdout", qw(m:qp@example.com)) or + diag $lei_err; + like($lei_out, qr/^Status: OR\n/sm, 'Status set by previous augment'); +} + }); done_testing;