about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-09-02 10:17:58 +0000
committerEric Wong <e@80x24.org>2021-09-02 21:22:50 +0000
commit4166c482b710acea785063de2c69a8a370c6d373 (patch)
treeb6dfb983908d485b66f6e6140b45f71a6fcae251 /lib
parent4f9c44e3fff9c413fc54050dcc633692d33f6968 (diff)
downloadpublic-inbox-4166c482b710acea785063de2c69a8a370c6d373.tar.gz
This works with existing inotify/EVFILT_VNODE functionality to
propagate changes made from one Maildir to another Maildir.

I chose the lei/store worker process to handle this since
propagating changes back into lei-daemon on a massive scale
could lead to dead-locking while both processes are attempting
to write to each other.  Eliminating IPC overhead is a nice
side effect, but could hurt performance if Maildirs are slow.

The code for "lei export-kw" is significantly revamped to match
the new code used in the "lei/store" daemon.  It should be more
correct w.r.t. corner-cases and stale entries, but perhaps
better tests need to be written.

squashed:
  t/lei-auto-watch: increase delay for FreeBSD kevent

  My FreeBSD VM seems to need longer for this test than inotify
  under Linux, likely because the kevent support code needs to be
  more complicated.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/LeiExportKw.pm24
-rw-r--r--lib/PublicInbox/LeiNoteEvent.pm14
-rw-r--r--lib/PublicInbox/LeiStore.pm105
3 files changed, 119 insertions, 24 deletions
diff --git a/lib/PublicInbox/LeiExportKw.pm b/lib/PublicInbox/LeiExportKw.pm
index 42a5ff22..78c6c6f9 100644
--- a/lib/PublicInbox/LeiExportKw.pm
+++ b/lib/PublicInbox/LeiExportKw.pm
@@ -25,12 +25,11 @@ sub export_kw_md { # LeiMailSync->each_src callback
         }
         $bn .= ':2,'.
                 PublicInbox::LeiToMail::kw2suffix([keys %$sto_kw], @$unknown);
+        return if $bn eq $$id;
         my $dst = "$mdir/cur/$bn";
-        my @fail;
         my $lei = $self->{lei};
         for my $d (@try) {
                 my $src = "$mdir/$d/$$id";
-                next if $src eq $dst;
 
                 # we use link(2) + unlink(2) since rename(2) may
                 # inadvertently clobber if the "uniquefilename" part wasn't
@@ -44,20 +43,19 @@ sub export_kw_md { # LeiMailSync->each_src callback
                         $lei->{sto}->ipc_do('lms_mv_src', "maildir:$mdir",
                                                 $oidbin, $id, $bn);
                         return; # success anyways if link(2) worked
-                }
-                if ($! == ENOENT && !-e $src) { # some other process moved it
-                        $lei->{sto}->ipc_do('lms_clear_src',
-                                                "maildir:$mdir", $id);
-                        next;
-                }
-                push @fail, $src if $! != EEXIST;
+                } elsif ($! == EEXIST) { # lost race with lei/store?
+                        return;
+                } elsif ($! != ENOENT) {
+                        $lei->child_error(1, "E: link($src -> $dst): $!");
+                } # else loop @try
         }
-        return unless @fail;
-        # both tries failed
         my $e = $!;
-        my $orig = '['.join('|', @fail).']';
+        # both tries failed
         my $oidhex = unpack('H*', $oidbin);
-        $lei->child_error(1, "link($orig, $dst) ($oidhex): $e");
+        my $src = "$mdir/{".join(',', @try)."}/$$id";
+        $lei->child_error(1, "link($src -> $dst) ($oidhex): $e");
+        for (@try) { return if -e "$mdir/$_/$$id" }
+        $lei->{sto}->ipc_do('lms_clear_src', "maildir:$mdir", $id);
 }
 
 sub export_kw_imap { # LeiMailSync->each_src callback
diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm
index 6a40ba39..41415346 100644
--- a/lib/PublicInbox/LeiNoteEvent.pm
+++ b/lib/PublicInbox/LeiNoteEvent.pm
@@ -36,32 +36,31 @@ sub note_event_arm_done ($) {
 }
 
 sub eml_event ($$$$) {
-        my ($self, $eml, $kw, $state) = @_;
+        my ($self, $eml, $vmd, $state) = @_;
         my $sto = $self->{lei}->{sto};
         my $lse = $self->{lse} //= $sto->search;
-        my $vmd = { kw => $kw };
         if ($state =~ /\Aimport-(?:rw|ro)\z/) {
                 $sto->ipc_do('set_eml', $eml, $vmd);
         } elsif ($state =~ /\Aindex-(?:rw|ro)\z/) {
                 my $xoids = $self->{lei}->ale->xoids_for($eml);
                 $sto->ipc_do('index_eml_only', $eml, $vmd, $xoids);
         } elsif ($state =~ /\Atag-(?:rw|ro)\z/) {
-                my $c = $lse->kw_changed($eml, $kw, my $docids = []);
+                my $c = $lse->kw_changed($eml, $vmd->{kw}, my $docids = []);
                 if (scalar @$docids) { # already in lei/store
                         $sto->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c;
                 } elsif (my $xoids = $self->{lei}->ale->xoids_for($eml)) {
                         # it's in an external, only set kw, here
                         $sto->ipc_do('set_xvmd', $xoids, $eml, $vmd);
-                } # else { totally unknown
+                } # else { totally unknown: ignore
         } else {
                 warn "unknown state: $state (in $self->{lei}->{cfg}->{'-f'})\n";
         }
 }
 
 sub maildir_event { # via wq_io_do
-        my ($self, $fn, $kw, $state) = @_;
+        my ($self, $fn, $vmd, $state) = @_;
         my $eml = PublicInbox::InboxWritable::eml_from_path($fn) // return;
-        eml_event($self, $eml, $kw, $state);
+        eml_event($self, $eml, $vmd, $state);
 }
 
 sub lei_note_event {
@@ -98,7 +97,8 @@ sub lei_note_event {
                         // return;
                 return if index($fl, 'T') >= 0;
                 my $kw = PublicInbox::MdirReader::flags2kw($fl);
-                $self->wq_io_do('maildir_event', [], $fn, $kw, $state);
+                my $vmd = { kw => $kw, sync_info => [ $folder, \$bn ] };
+                $self->wq_io_do('maildir_event', [], $fn, $vmd, $state);
         } # else: TODO: imap
 }
 
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 0fa2d3c0..a91b30f7 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -25,10 +25,14 @@ use PublicInbox::MID qw(mids);
 use PublicInbox::LeiSearch;
 use PublicInbox::MDA;
 use PublicInbox::Spawn qw(spawn);
+use PublicInbox::MdirReader;
+use PublicInbox::LeiToMail;
 use List::Util qw(max);
 use File::Temp ();
 use POSIX ();
 use IO::Handle (); # ->autoflush
+use Sys::Syslog qw(syslog openlog);
+use Errno qw(EEXIST ENOENT);
 
 sub new {
         my (undef, $dir, $opt) = @_;
@@ -165,12 +169,92 @@ sub _docids_for ($$) {
         sort { $a <=> $b } values %docids;
 }
 
+# n.b. similar to LeiExportKw->export_kw_md, but this is for a single eml
+sub export1_kw_md ($$$$$) {
+        my ($self, $mdir, $bn, $oidbin, $vmdish) = @_; # vmd/vmd_mod
+        my $orig = $bn;
+        my (@try, $unkn, $kw);
+        if ($bn =~ s/:2,([a-zA-Z]*)\z//) {
+                ($kw, $unkn) = PublicInbox::MdirReader::flags2kw($1);
+                if (my $set = $vmdish->{kw}) {
+                        $kw = $set;
+                } elsif (my $add = $vmdish->{'+kw'}) {
+                        @$kw{@$add} = ();
+                } elsif (my $del = $vmdish->{-kw}) {
+                        delete @$kw{@$del};
+                } # else no changes...
+                @try = qw(cur new);
+        } else { # no keywords, yet, could be in new/
+                @try = qw(new cur);
+                $unkn = [];
+                if (my $set = $vmdish->{kw}) {
+                        $kw = $set;
+                } elsif (my $add = $vmdish->{'+kw'}) {
+                        @$kw{@$add} = (); # auto-vivify
+                } else { # ignore $vmdish->{-kw}
+                        $kw = [];
+                }
+        }
+        $kw = [ keys %$kw ] if ref($kw) eq 'HASH';
+        $bn .= ':2,'. PublicInbox::LeiToMail::kw2suffix($kw, @$unkn);
+        return if $orig eq $bn; # no change
+
+        # we use link(2) + unlink(2) since rename(2) may
+        # inadvertently clobber if the "uniquefilename" part wasn't
+        # actually unique.
+        my $dst = "$mdir/cur/$bn";
+        for my $d (@try) {
+                my $src = "$mdir/$d/$orig";
+                if (link($src, $dst)) {
+                        if (!unlink($src) and $! != ENOENT) {
+                                syslog('warning', "unlink($src): $!");
+                        }
+                        # TODO: verify oidbin?
+                        lms_mv_src($self, "maildir:$mdir",
+                                        $oidbin, \$orig, $bn);
+                        return;
+                } elsif ($! == EEXIST) { # lost race with "lei export-kw"?
+                        return;
+                } elsif ($! == ENOENT) {
+                        syslog('warning', "link($src -> $dst): $!")
+                } # else loop @try
+        }
+        my $e = $!;
+        my $src = "$mdir/{".join(',', @try)."}/$orig";
+        my $oidhex = unpack('H*', $oidbin);
+        syslog('warning', "link($src -> $dst) ($oidhex): $e");
+        for (@try) { return if -e "$mdir/$_/$orig" };
+        lms_clear_src($self, "maildir:$mdir", \$orig);
+}
+
+sub sto_export_kw ($$$) {
+        my ($self, $docid, $vmdish) = @_; # vmdish (vmd or vmd_mod)
+        my ($eidx, $tl) = eidx_init($self);
+        my $lms = _lms_rw($self) // return;
+        my $xr3 = $eidx->{oidx}->get_xref3($docid, 1);
+        for my $row (@$xr3) {
+                my (undef, undef, $oidbin) = @$row;
+                my $locs = $lms->locations_for($oidbin) // next;
+                while (my ($loc, $ids) = each %$locs) {
+                        if ($loc =~ s!\Amaildir:!!i) {
+                                for my $id (@$ids) {
+                                        export1_kw_md($self, $loc, $id,
+                                                        $oidbin, $vmdish);
+                                }
+                        }
+                        # TODO: IMAP
+                }
+        }
+}
+
+# vmd = { kw => [ qw(seen ...) ], L => [ qw(inbox ...) ] }
 sub set_eml_vmd {
         my ($self, $eml, $vmd, $docids) = @_;
         my ($eidx, $tl) = eidx_init($self);
         $docids //= [ _docids_for($self, $eml) ];
         for my $docid (@$docids) {
                 $eidx->idx_shard($docid)->ipc_do('set_vmd', $docid, $vmd);
+                sto_export_kw($self, $docid, $vmd);
         }
         $docids;
 }
@@ -284,6 +368,12 @@ EOF
         $docid;
 }
 
+sub _add_vmd ($$$$) {
+        my ($self, $idx, $docid, $vmd) = @_;
+        $idx->ipc_do('add_vmd', $docid, $vmd);
+        sto_export_kw($self, $docid, $vmd);
+}
+
 sub add_eml {
         my ($self, $eml, $vmd, $xoids) = @_;
         my $im = $self->{-fake_im} // $self->importer; # may create new epoch
@@ -310,7 +400,7 @@ sub add_eml {
                         @$vivify_xvmd = sort { $a <=> $b } keys(%docids);
                 }
         }
-        if (@$vivify_xvmd) {
+        if (@$vivify_xvmd) { # docids list
                 $xoids //= {};
                 $xoids->{$smsg->{blob}} = 1;
                 for my $docid (@$vivify_xvmd) {
@@ -327,7 +417,7 @@ sub add_eml {
                         for my $oid (keys %$xoids) {
                                 $oidx->add_xref3($docid, -1, $oid, '.');
                         }
-                        $idx->ipc_do('add_vmd', $docid, $vmd) if $vmd;
+                        _add_vmd($self, $idx, $docid, $vmd) if $vmd;
                 }
                 $vivify_xvmd;
         } elsif (my @docids = _docids_for($self, $eml)) {
@@ -337,7 +427,7 @@ sub add_eml {
                         $oidx->add_xref3($docid, -1, $smsg->{blob}, '.');
                         # add_eidx_info for List-Id
                         $idx->ipc_do('add_eidx_info', $docid, '.', $eml);
-                        $idx->ipc_do('add_vmd', $docid, $vmd) if $vmd;
+                        _add_vmd($self, $idx, $docid, $vmd) if $vmd;
                 }
                 \@docids;
         } else { # totally new message
@@ -347,7 +437,7 @@ sub add_eml {
                 $oidx->add_xref3($smsg->{num}, -1, $smsg->{blob}, '.');
                 my $idx = $eidx->idx_shard($smsg->{num});
                 $idx->index_eml($eml, $smsg);
-                $idx->ipc_do('add_vmd', $smsg->{num}, $vmd) if $vmd;
+                _add_vmd($self, $idx, $smsg->{num}, $vmd) if $vmd;
                 $smsg;
         }
 }
@@ -365,6 +455,7 @@ sub index_eml_only {
         set_eml($self, $eml, $vmd, $xoids);
 }
 
+# store {kw} / {L} info for a message which is only in an external
 sub _external_only ($$$) {
         my ($self, $xoids, $eml) = @_;
         my $eidx = $self->{priv_eidx};
@@ -398,6 +489,7 @@ sub update_xvmd {
                 next if $seen{$docid}++;
                 my $idx = $eidx->idx_shard($docid);
                 $idx->ipc_do('update_vmd', $docid, $vmd_mod);
+                sto_export_kw($self, $docid, $vmd_mod);
         }
         return unless scalar(keys(%$xoids));
 
@@ -410,12 +502,14 @@ sub update_xvmd {
                         }
                         my $idx = $eidx->idx_shard($docid);
                         $idx->ipc_do('update_vmd', $docid, $vmd_mod);
+                        sto_export_kw($self, $docid, $vmd_mod);
                 }
                 return;
         }
         # totally unseen
         my ($smsg, $idx) = _external_only($self, $xoids, $eml);
         $idx->ipc_do('update_vmd', $smsg->{num}, $vmd_mod);
+        sto_export_kw($self, $smsg->{num}, $vmd_mod);
 }
 
 # set or update keywords for external message, called via ipc_do
@@ -433,6 +527,7 @@ sub set_xvmd {
                 next if $seen{$docid}++;
                 my $idx = $eidx->idx_shard($docid);
                 $idx->ipc_do('set_vmd', $docid, $vmd);
+                sto_export_kw($self, $docid, $vmd);
         }
         return unless scalar(keys(%$xoids));
 
@@ -443,6 +538,7 @@ sub set_xvmd {
         # totally unseen:
         my ($smsg, $idx) = _external_only($self, $xoids, $eml);
         $idx->ipc_do('add_vmd', $smsg->{num}, $vmd);
+        sto_export_kw($self, $smsg->{num}, $vmd);
 }
 
 sub checkpoint {
@@ -497,6 +593,7 @@ sub ipc_atfork_child {
         if (my $to_close = delete($self->{to_close})) {
                 close($_) for @$to_close;
         }
+        openlog('lei/store', 'pid,nowait,nofatal,ndelay', 'user');
         $self->SUPER::ipc_atfork_child;
 }