about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
Diffstat (limited to 'lib/PublicInbox')
6 files changed, 239 insertions, 21 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1e720b89..91c95239 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -157,9 +157,10 @@ our %CMD = ( # sorted in order of importance/use:
 'plonk' => [ '--threads|--from=IDENT',
         'exclude mail matching From: or threads from non-Message-ID searches',
         qw(stdin| threads|t from|f=s mid=s oid=s), @c_opt ],
-'mark' => [ 'MESSAGE_FLAGS...',
-        'set/unset keywords on message(s) from stdin',
-        qw(stdin| oid=s exact by-mid|mid:s), @c_opt ],
+'mark' => [ 'KEYWORDS...',
+        'set/unset keywords on message(s)',
+        qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@), @c_opt,
+        pass_through('-kw:foo for delete') ],
 'forget' => [ '[--stdin|--oid=OID|--by-mid=MID]',
         "exclude message(s) on stdin from `q' search results",
         qw(stdin| oid=s exact by-mid|mid:s), @c_opt ],
@@ -348,7 +349,7 @@ my %CONFIG_KEYS = (
         'leistore.dir' => 'top-level storage location',
-my @WQ_KEYS = qw(lxs l2m imp mrr cnv p2q); # internal workers
+my @WQ_KEYS = qw(lxs l2m imp mrr cnv p2q mark); # internal workers
 # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
 sub x_it ($$) {
@@ -460,7 +461,7 @@ sub lei_atfork_child {
                 open STDERR, '+>&='.fileno($self->{2}) or warn "open $!";
                 delete $self->{0};
-        delete @$self{qw(cnv)};
+        delete @$self{qw(cnv mark imp)};
         for (delete @$self{qw(3 old_1 au_done)}) {
                 close($_) if defined($_);
@@ -690,10 +691,6 @@ sub lei_show {
         my ($self, @argv) = @_;
-sub lei_mark {
-        my ($self, @argv) = @_;
 sub _config {
         my ($self, @argv) = @_;
         my %env = (%{$self->{env}}, GIT_CONFIG => undef);
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 9ad2ff12..21af28a3 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -78,16 +78,6 @@ sub lei_import { # the main "lei import" method
-sub ipc_atfork_child {
-        my ($self) = @_;
-        my $lei = $self->{lei};
-        delete $lei->{imp}; # drop circular ref
-        $lei->lei_atfork_child;
-        $self->SUPER::ipc_atfork_child;
-        $lei->{auth}->do_auth_atfork($self) if $lei->{auth};
-        undef;
 sub _import_maildir { # maildir_each_eml cb
         my ($f, $kw, $eml, $sto, $set_kw) = @_;
         $sto->ipc_do('set_eml', $eml, $set_kw ? { kw => $kw }: ());
@@ -137,6 +127,9 @@ sub import_stdin {
         $self->input_fh($lei->{opt}->{'in-format'}, $in, '<stdin>');
-no warnings 'once'; # the following works even when LeiAuth is lazy-loaded
+no warnings 'once';
+*ipc_atfork_child = \&PublicInbox::LeiInput::input_only_atfork_child;
+# the following works even when LeiAuth is lazy-loaded
 *net_merge_all = \&PublicInbox::LeiAuth::net_merge_all;
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index 859fdb11..6ad57772 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -45,7 +45,7 @@ error reading $name: $!
-sub prepare_inputs {
+sub prepare_inputs { # returns undef on error
         my ($self, $lei, $inputs) = @_;
         my $in_fmt = $lei->{opt}->{'in-format'};
         if ($lei->{opt}->{stdin}) {
@@ -103,4 +103,13 @@ sub prepare_inputs {
         $self->{inputs} = $inputs;
+sub input_only_atfork_child {
+        my ($self) = @_;
+        my $lei = $self->{lei};
+        $lei->lei_atfork_child;
+        PublicInbox::IPC::ipc_atfork_child($self);
+        $lei->{auth}->do_auth_atfork($self) if $lei->{auth};
+        undef;
diff --git a/lib/PublicInbox/LeiMark.pm b/lib/PublicInbox/LeiMark.pm
new file mode 100644
index 00000000..aa52ad5a
--- /dev/null
+++ b/lib/PublicInbox/LeiMark.pm
@@ -0,0 +1,177 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+# handles "lei mark" command
+package PublicInbox::LeiMark;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::Eml;
+use PublicInbox::PktOp qw(pkt_do);
+# JMAP RFC 8621 4.1.1
+my @KW = (qw(seen answered flagged draft), # system
+        qw(forwarded phishing junk notjunk)); # reserved
+# note: RFC 8621 states "Users may add arbitrary keywords to an Email",
+# but is it good idea?  Stick to the system and reserved ones, for now.
+# The "system" ones map to Maildir flags and mbox Status/X-Status headers.
+my %KW = map { $_ => 1 } @KW;
+my $L_MAX = 244; # Xapian term limit - length('L')
+# RFC 8621, sec 2 (Mailboxes) a "label" for us is a JMAP Mailbox "name"
+# "Servers MAY reject names that violate server policy"
+my %ERR = (
+        L => sub {
+                my ($label) = @_;
+                length($label) >= $L_MAX and
+                        return "`$label' too long (must be <= $L_MAX)";
+                $label =~ m{\A[a-z0-9_][a-z0-9_\-\./\@\!,]*[a-z0-9]\z} ?
+                        undef : "`$label' is invalid";
+        },
+        kw => sub {
+                my ($kw) = @_;
+                $KW{$kw} ? undef : <<EOM;
+`$kw' is not one of: `seen', `flagged', `answered', `draft'
+`junk', `notjunk', `phishing' or `forwarded'
+        }
+# like Getopt::Long, but for +kw:FOO and -kw:FOO to prepare
+# for update_xvmd -> update_vmd
+sub vmd_mod_extract {
+        my $argv = $_[-1];
+        my $vmd_mod = {};
+        my @new_argv;
+        for my $x (@$argv) {
+                if ($x =~ /\A(\+|\-)(kw|L):(.+)\z/) {
+                        my ($op, $pfx, $val) = ($1, $2, $3);
+                        if (my $err = $ERR{$pfx}->($val)) {
+                                push @{$vmd_mod->{err}}, $err;
+                        } else { # set "+kw", "+L", "-L", "-kw"
+                                push @{$vmd_mod->{$op.$pfx}}, $val;
+                        }
+                } else {
+                        push @new_argv, $x;
+                }
+        }
+        @$argv = @new_argv;
+        $vmd_mod;
+sub eml_cb { # used by PublicInbox::LeiInput::input_fh
+        my ($self, $eml) = @_;
+        if (my $xoids = $self->{lei}->{ale}->xoids_for($eml)) {
+                $self->{lei}->{sto}->ipc_do('update_xvmd', $xoids,
+                                                $self->{vmd_mod});
+        } else {
+                ++$self->{missing};
+        }
+sub mbox_cb { eml_cb($_[1], $_[0]) } # used by PublicInbox::LeiInput::input_fh
+sub mark_done_wait { # dwaitpid callback
+        my ($arg, $pid) = @_;
+        my ($mark, $lei) = @$arg;
+        $lei->child_error($?, 'non-fatal errors during mark') if $?;
+        my $sto = delete $lei->{sto};
+        my $wait = $sto->ipc_do('done') if $sto; # PublicInbox::LeiStore::done
+        $lei->dclose;
+sub mark_done { # EOF callback for main daemon
+        my ($lei) = @_;
+        my $mark = delete $lei->{mark} or return;
+        $mark->wq_wait_old(\&mark_done_wait, $lei);
+sub net_merge_complete { # callback used by LeiAuth
+        my ($self) = @_;
+        for my $input (@{$self->{inputs}}) {
+                $self->wq_io_do('mark_path_url', [], $input);
+        }
+        $self->wq_close(1);
+sub _mark_maildir { # maildir_each_eml cb
+        my ($f, $kw, $eml, $self) = @_;
+        eml_cb($self, $eml);
+sub _mark_net { # imap_each, nntp_each cb
+        my ($url, $uid, $kw, $eml, $self) = @_;
+        eml_cb($self, $eml)
+sub lei_mark { # the "lei mark" method
+        my ($lei, @argv) = @_;
+        my $sto = $lei->_lei_store(1);
+        my $self = $lei->{mark} = bless { missing => 0 }, __PACKAGE__;
+        $sto->write_prepare($lei);
+        $lei->ale; # refresh and prepare
+        my $vmd_mod = vmd_mod_extract(\@argv);
+        return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
+        $self->prepare_inputs($lei, \@argv) or return;
+        grep(defined, @$vmd_mod{qw(+kw +L -L -kw)}) or
+                return $lei->fail('no keywords or labels specified');
+        my $ops = { '' => [ \&mark_done, $lei ] };
+        $lei->{auth}->op_merge($ops, $self) if $lei->{auth};
+        $self->{vmd_mod} = $vmd_mod;
+        my $op = $lei->workers_start($self, 'lei_mark', 1, $ops);
+        $self->wq_io_do('mark_stdin', []) if $self->{0};
+        net_merge_complete($self) unless $lei->{auth};
+        while ($op && $op->{sock}) { $op->event_step }
+sub mark_path_url {
+        my ($self, $input) = @_;
+        my $lei = $self->{lei};
+        my $ifmt = lc($lei->{opt}->{'in-format'} // '');
+        # TODO auto-detect?
+        if ($input =~ m!\Aimaps?://!i) {
+                $lei->{net}->imap_each($input, \&_mark_net, $self);
+                return;
+        } elsif ($input =~ m!\A(?:nntps?|s?news)://!i) {
+                $lei->{net}->nntp_each($input, \&_mark_net, $self);
+                return;
+        } elsif ($input =~ s!\A([a-z0-9]+):!!i) {
+                $ifmt = lc $1;
+        }
+        if (-f $input) {
+                my $m = $lei->{opt}->{'lock'} // ($ifmt eq 'eml' ? ['none'] :
+                                PublicInbox::MboxLock->defaults);
+                my $mbl = PublicInbox::MboxLock->acq($input, 0, $m);
+                $self->input_fh($ifmt, $mbl->{fh}, $input);
+        } elsif (-d _ && (-d "$input/cur" || -d "$input/new")) {
+                return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
+$input appears to a be a maildir, not $ifmt
+                PublicInbox::MdirReader::maildir_each_eml($input,
+                                        \&_mark_maildir, $self);
+        } else {
+                $lei->fail("$input unsupported (TODO)");
+        }
+sub mark_stdin {
+        my ($self) = @_;
+        my $lei = $self->{lei};
+        my $in = delete $self->{0};
+        $self->input_fh($lei->{opt}->{'in-format'}, $in, '<stdin>');
+sub note_missing {
+        my ($self) = @_;
+        $self->{lei}->child_error(1 << 8) if $self->{missing};
+sub ipc_atfork_child {
+        my ($self) = @_;
+        PublicInbox::LeiInput::input_only_atfork_child($self);
+        # this goes out-of-scope at worker process exit:
+        PublicInbox::OnDestroy->new($$, \&note_missing, $self);
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index b390b318..b5d43b7e 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -228,12 +228,30 @@ sub set_eml {
                 set_eml_vmd($self, $eml, $vmd);
+sub update_xvmd {
+        my ($self, $xoids, $vmd_mod) = @_;
+        my $eidx = eidx_init($self);
+        my $oidx = $eidx->{oidx};
+        my %seen;
+        for my $oid (keys %$xoids) {
+                my @docids = $oidx->blob_exists($oid) or next;
+                scalar(@docids) > 1 and
+                        warn "W: $oid indexed as multiple docids: @docids\n";
+                for my $docid (@docids) {
+                        next if $seen{$docid}++;
+                        my $idx = $eidx->idx_shard($docid);
+                        $idx->ipc_do('update_vmd', $docid, $vmd_mod);
+                }
+        }
 # set or update keywords for external message, called via ipc_do
 sub set_xvmd {
         my ($self, $xoids, $eml, $vmd) = @_;
         my $eidx = eidx_init($self);
         my $oidx = $eidx->{oidx};
+        my %seen;
         # see if we can just update existing docs
         for my $oid (keys %$xoids) {
@@ -241,6 +259,7 @@ sub set_xvmd {
                 scalar(@docids) > 1 and
                         warn "W: $oid indexed as multiple docids: @docids\n";
                 for my $docid (@docids) {
+                        next if $seen{$docid}++;
                         my $idx = $eidx->idx_shard($docid);
                         $idx->ipc_do('set_vmd', $docid, $vmd);
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 3f933121..7d46489c 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -597,6 +597,29 @@ sub remove_vmd {
         $self->{xdb}->replace_document($docid, $doc) if $replace;
+sub update_vmd {
+        my ($self, $docid, $vmd_mod) = @_;
+        begin_txn_lazy($self);
+        my $doc = _get_doc($self, $docid) or return;
+        my $updated = 0;
+        my @x = @VMD_MAP;
+        while (my ($field, $pfx) = splice(@x, 0, 2)) {
+                # field: "label" or "kw"
+                for my $val (@{$vmd_mod->{"-$field"} // []}) {
+                        eval {
+                                $doc->remove_term($pfx . $val);
+                                ++$updated;
+                        };
+                }
+                for my $val (@{$vmd_mod->{"+$field"} // []}) {
+                        $doc->add_boolean_term($pfx . $val);
+                        ++$updated;
+                }
+        }
+        $self->{xdb}->replace_document($docid, $doc) if $updated;
+        $updated;
 sub xdb_remove {
         my ($self, @docids) = @_;