about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiNoteEvent.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-07-19 08:59:35 +0000
committerEric Wong <e@80x24.org>2021-07-22 02:29:00 +0000
commit5b4fde37adefa37508d131dbe013353ef3345051 (patch)
treebfd68b4f1fedf94464776a5760a4dc531f1d097e /lib/PublicInbox/LeiNoteEvent.pm
parent7e1c18af5468c7708e28de759911ec5542d23c4b (diff)
downloadpublic-inbox-5b4fde37adefa37508d131dbe013353ef3345051.tar.gz
This allows lei to automatically note keyword (message flag)
changes made to a Maildir and propagate it into lei/store:

	lei add-watch --state=tag-ro /path/to/Maildir

This doesn't persist across restarts, yet.  In the future,
it will be applied automatically to "lei q" output Maildirs
by default (with an option to disable it).

State values of tag-rw, index-<ro|rw>, import-<ro|rw> will all
be supported for Maildir.

This represents a fairly major internal change that's fairly
intrusive, but the whole daemon-oriented design was to
facilitate being able to automatically monitor (and propagate)
Maildir/IMAP flag changes.
Diffstat (limited to 'lib/PublicInbox/LeiNoteEvent.pm')
-rw-r--r--lib/PublicInbox/LeiNoteEvent.pm124
1 files changed, 124 insertions, 0 deletions
diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm
new file mode 100644
index 00000000..bf15cd26
--- /dev/null
+++ b/lib/PublicInbox/LeiNoteEvent.pm
@@ -0,0 +1,124 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# internal command for dealing with inotify, kqueue vnodes, etc
+package PublicInbox::LeiNoteEvent;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+
+my $flush_timer;
+our $to_flush; # { cfgpath => $lei }
+
+sub flush_lei ($) {
+        my ($lei) = @_;
+        if (my $lne = delete $lei->{cfg}->{-lei_note_event}) {
+                $lne->wq_close(1, undef, $lei); # runs _lei_wq_eof;
+        } else { # lms_clear_src calls only:
+                my $wait = $lei->{sto}->ipc_do('done');
+        }
+}
+
+# we batch up writes and flush every 5s (matching Linux default
+# writeback behavior) since MUAs can trigger a storm of inotify events
+sub flush_task { # PublicInbox::DS timer callback
+        undef $flush_timer;
+        my $todo = $to_flush // return;
+        $to_flush = undef;
+        for my $lei (values %$todo) { flush_lei($lei) }
+}
+
+# sets a timer to flush
+sub note_event_arm_done ($) {
+        my ($lei) = @_;
+        $flush_timer //= PublicInbox::DS::add_timer(5, \&flush_task);
+        $to_flush->{$lei->{cfg}->{'-f'}} //= $lei;
+}
+
+sub eml_event ($$$$) {
+        my ($self, $eml, $kw, $state) = @_;
+        my $sto = $self->{lei}->{sto};
+        my $lse = $self->{lse} //= $sto->search;
+        my $vmd = { kw => $kw };
+        if ($state =~ /\Aimport-(?:rw|ro)\z/) {
+                $sto->ipc_do('set_eml', $eml, $vmd);
+        } elsif ($state =~ /\Aindex-(?:rw|ro)\z/) {
+                my $xoids = $self->{lei}->ale->xoids_for($eml);
+                $sto->ipc_do('index_eml_only', $eml, $vmd, $xoids);
+        } elsif ($state =~ /\Atag-(?:rw|ro)\z/) {
+                my $c = $lse->kw_changed($eml, $kw, my $docids = []);
+                if (scalar @$docids) { # already in lei/store
+                        $sto->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c;
+                } elsif (my $xoids = $self->{lei}->ale->xoids_for($eml)) {
+                        # it's in an external, only set kw, here
+                        $sto->ipc_do('set_xvmd', $xoids, $eml, $vmd);
+                } # else { totally unknown
+        } else {
+                warn "unknown state: $state (in $self->{lei}->{cfg}->{'-f'})\n";
+        }
+}
+
+sub maildir_event { # via wq_io_do
+        my ($self, $fn, $kw, $state) = @_;
+        my $eml = PublicInbox::InboxWritable::eml_from_path($fn) // return;
+        eml_event($self, $eml, $kw, $state);
+}
+
+sub lei_note_event {
+        my ($lei, $folder, $new_cur, $bn, $fn, @rest) = @_;
+        die "BUG: unexpected: @rest" if @rest;
+        my $cfg = $lei->_lei_cfg or return; # gone (race)
+        my $sto = $lei->_lei_store or return; # gone
+        return flush_lei($lei) if $folder eq 'done'; # special case
+        my $lms = $sto->search->lms or return;
+        my $err = $lms->arg2folder($lei, [ $folder ]);
+        return if $err->{fail};
+        undef $lms;
+        my $state = $cfg->get_1("watch.$folder", 'state') // 'pause';
+        return if $state eq 'pause';
+        $lei->ale; # prepare
+        $sto->write_prepare($lei);
+        if ($new_cur eq '') {
+                $sto->ipc_do('lms_clear_src', $folder, \$bn);
+                return note_event_arm_done($lei);
+        }
+        require PublicInbox::MdirReader;
+        my $self = $cfg->{-lei_note_event} //= do {
+                my $wq = bless {}, __PACKAGE__;
+                # MUAs such as mutt can trigger massive rename() storms so
+                # use all CPU power available:
+                my $jobs = $wq->detect_nproc // 1;
+                my ($op_c, $ops) = $lei->workers_start($wq, $jobs);
+                $lei->wait_wq_events($op_c, $ops);
+                note_event_arm_done($lei);
+                $lei->{lne} = $wq;
+        };
+        if ($folder =~ /\Amaildir:/i) {
+                my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn)
+                        // return;
+                return if index($fl, 'T') >= 0;
+                my $kw = PublicInbox::MdirReader::flags2kw($fl);
+                $self->wq_io_do('maildir_event', [], $fn, $kw, $state);
+        } # else: TODO: imap
+}
+
+sub ipc_atfork_child {
+        my ($self) = @_;
+        $self->{lei}->_lei_atfork_child(1); # persistent, for a while
+        $self->SUPER::ipc_atfork_child;
+}
+
+sub lne_done_wait {
+        my ($arg, $pid) = @_;
+        my ($self, $lei) = @$arg;
+        $lei->can('wq_done_wait')->($arg, $pid);
+}
+
+sub _lei_wq_eof { # EOF callback for main lei daemon
+        my ($lei) = @_;
+        my $lne = delete $lei->{lne} or return $lei->fail;
+        my $wait = $lei->{sto}->ipc_do('done');
+        $lne->wq_wait_old(\&lne_done_wait, $lei);
+}
+
+1;