about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2017-06-24 07:33:44 +0000
committerEric Wong <e@80x24.org>2017-06-26 03:44:30 +0000
commitf9b70eb6ebbf96c2fe79ab2738ea4954c5a124f3 (patch)
tree4f5785aa3252bbf4330c385f652926e48103dc42
parentd9c9dc5af637e097d545a828d887aae99ddcd2a7 (diff)
downloadpublic-inbox-f9b70eb6ebbf96c2fe79ab2738ea4954c5a124f3.tar.gz
We need to ensure new messages are being processed
fairly during full rescans, so have the ->scan subroutine
yield and reschedule itself.  Additionally, having a
long-running task inside the signal handler is dangerous
and subject to reentrancy bugs.

Due to the limitations of the Filesys::Notify::Simple interface,
we cannot rely on multiplexing I/O interfaces (select, IO::Poll,
Danga::Socket, etc...) for this.  Forking a separate process
was considered, but it is more expensive for a mostly-idle
process.

So, we use a variant of the "self-pipe trick" via inotify (or
whatever Filesys::Notify::Simple gives us).  Instead of writing
to our own pipe, we write to a file in our own temporary
directory watched by Filesys::Notify::Simple to trigger events
in signal handlers.
-rw-r--r--lib/PublicInbox/WatchMaildir.pm98
-rwxr-xr-xscript/public-inbox-watch2
-rw-r--r--t/watch_maildir.t12
3 files changed, 82 insertions, 30 deletions
diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm
index f81a917c..0e2a6d2c 100644
--- a/lib/PublicInbox/WatchMaildir.pm
+++ b/lib/PublicInbox/WatchMaildir.pm
@@ -13,25 +13,27 @@ use PublicInbox::Git;
 use PublicInbox::Import;
 use PublicInbox::MDA;
 use PublicInbox::Spawn qw(spawn);
+use File::Temp qw//;
 
 sub new {
         my ($class, $config) = @_;
-        my (%mdmap, @mdir, $spamc);
+        my (%mdmap, @mdir, $spamc, $spamdir);
 
         # "publicinboxwatch" is the documented namespace
         # "publicinboxlearn" is legacy but may be supported
         # indefinitely...
         foreach my $pfx (qw(publicinboxwatch publicinboxlearn)) {
                 my $k = "$pfx.watchspam";
-                if (my $spamdir = $config->{$k}) {
-                        if ($spamdir =~ s/\Amaildir://) {
-                                $spamdir =~ s!/+\z!!;
+                if (my $dir = $config->{$k}) {
+                        if ($dir =~ s/\Amaildir://) {
+                                $dir =~ s!/+\z!!;
                                 # skip "new", no MUA has seen it, yet.
-                                my $cur = "$spamdir/cur";
+                                my $cur = "$dir/cur";
+                                $spamdir = $cur;
                                 push @mdir, $cur;
                                 $mdmap{$cur} = 'watchspam';
                         } else {
-                                warn "unsupported $k=$spamdir\n";
+                                warn "unsupported $k=$dir\n";
                         }
                 }
         }
@@ -77,21 +79,41 @@ sub new {
         $mdre = qr!\A($mdre)/!;
         bless {
                 spamcheck => $spamcheck,
+                spamdir => $spamdir,
                 mdmap => \%mdmap,
                 mdir => \@mdir,
                 mdre => $mdre,
                 config => $config,
                 importers => {},
+                opendirs => {}, # dirname => dirhandle (in progress scans)
         }, $class;
 }
 
 sub _done_for_now {
-        $_->done foreach values %{$_[0]->{importers}};
+        my ($self) = @_;
+        my $opendirs = $self->{opendirs};
+
+        # spamdir scanning means every importer remains open
+        my $spamdir = $self->{spamdir};
+        return if defined($spamdir) && $opendirs->{$spamdir};
+
+        foreach my $im (values %{$self->{importers}}) {
+                # not done if we're scanning
+                next if $opendirs->{$im->{git}->{git_dir}};
+                $im->done;
+        }
 }
 
 sub _try_fsn_paths {
-        my ($self, $paths) = @_;
-        _try_path($self, $_->{path}) foreach @$paths;
+        my ($self, $scan_re, $paths) = @_;
+        foreach (@$paths) {
+                my $path = $_->{path};
+                if ($path =~ $scan_re) {
+                        scan($self, $path);
+                } else {
+                        _try_path($self, $path);
+                }
+        }
         _done_for_now($self);
 }
 
@@ -183,31 +205,61 @@ sub quit { $_[0]->{quit} = 1 }
 
 sub watch {
         my ($self) = @_;
-        my $cb = sub { _try_fsn_paths($self, \@_) };
-        my $mdir = $self->{mdir};
+        my $scan = File::Temp->newdir("public-inbox-watch.$$.scan.XXXXXX",
+                                        TMPDIR => 1);
+        my $scandir = $self->{scandir} = $scan->dirname;
+        my $re = qr!\A$scandir/!;
+        my $cb = sub { _try_fsn_paths($self, $re, \@_) };
 
         # lazy load here, we may support watching via IMAP IDLE
         # in the future...
         require Filesys::Notify::Simple;
-        my $watcher = Filesys::Notify::Simple->new($mdir);
-        $watcher->wait($cb) until ($self->{quit});
+        my $fsn = Filesys::Notify::Simple->new([@{$self->{mdir}}, $scandir]);
+        $fsn->wait($cb) until ($self->{quit});
+}
+
+sub trigger_scan {
+        my ($self, $base) = @_;
+        my $dir = $self->{scandir} or die "not watch-ing, yet\n";
+        open my $fh, '>', "$dir/$base" or die "open $dir/$base failed: $!\n";
+        close $fh or die "close $dir/$base failed: $!\n";
 }
 
 sub scan {
-        my ($self) = @_;
-        my $mdir = $self->{mdir};
-        foreach my $dir (@$mdir) {
-                my $ok = opendir(my $dh, $dir);
-                unless ($ok) {
-                        warn "failed to open $dir: $!\n";
-                        next;
-                }
+        my ($self, $path) = @_;
+        my $max = 10;
+        my $opendirs = $self->{opendirs};
+        my @dirnames = keys %$opendirs;
+        foreach my $dir (@dirnames) {
+                my $dh = delete $opendirs->{$dir};
+                my $n = $max;
                 while (my $fn = readdir($dh)) {
                         _try_path($self, "$dir/$fn");
+                        last if --$n < 0;
                 }
-                closedir $dh;
+                $opendirs->{$dir} = $dh if $n < 0;
+        }
+        if ($path =~ /full\z/) {
+                foreach my $dir (@{$self->{mdir}}) {
+                        next if $opendirs->{$dir}; # already in progress
+                        my $ok = opendir(my $dh, $dir);
+                        unless ($ok) {
+                                warn "failed to open $dir: $!\n";
+                                next;
+                        }
+                        my $n = $max;
+                        while (my $fn = readdir($dh)) {
+                                _try_path($self, "$dir/$fn");
+                                last if --$n < 0;
+                        }
+                        $opendirs->{$dir} = $dh if $n < 0;
+                }
+        }
+        if (keys %$opendirs) { # do we have more work to do?
+                trigger_scan($self, 'cont');
+        } else {
+                _done_for_now($self);
         }
-        _done_for_now($self);
 }
 
 sub _path_to_mime {
diff --git a/script/public-inbox-watch b/script/public-inbox-watch
index a72180c9..51f1baa2 100755
--- a/script/public-inbox-watch
+++ b/script/public-inbox-watch
@@ -13,7 +13,7 @@ my $reload = sub {
 };
 $reload->();
 if ($watch_md) {
-        my $scan = sub { $watch_md->scan if $watch_md };
+        my $scan = sub { $watch_md->trigger_scan('full') if $watch_md };
         $SIG{HUP} = $reload;
         $SIG{USR1} = $scan;
         $SIG{ALRM} = sub { $SIG{ALRM} = 'DEFAULT'; $scan->() };
diff --git a/t/watch_maildir.t b/t/watch_maildir.t
index 3969c80d..e12e0836 100644
--- a/t/watch_maildir.t
+++ b/t/watch_maildir.t
@@ -42,7 +42,7 @@ my $config = PublicInbox::Config->new({
         "publicinboxlearn.watchspam" => "maildir:$spamdir",
 });
 
-PublicInbox::WatchMaildir->new($config)->scan;
+PublicInbox::WatchMaildir->new($config)->scan('full');
 my $git = PublicInbox::Git->new($git_dir);
 my @list = $git->qx(qw(rev-list refs/heads/master));
 is(scalar @list, 1, 'one revision in rev-list');
@@ -59,7 +59,7 @@ my $write_spam = sub {
 };
 $write_spam->();
 is(unlink(glob("$maildir/new/*")), 1, 'unlinked old spam');
-PublicInbox::WatchMaildir->new($config)->scan;
+PublicInbox::WatchMaildir->new($config)->scan('full');
 @list = $git->qx(qw(rev-list refs/heads/master));
 is(scalar @list, 2, 'two revisions in rev-list');
 @list = $git->qx(qw(ls-tree -r --name-only refs/heads/master));
@@ -72,7 +72,7 @@ To unsubscribe from this list: send the line "unsubscribe git" in
 the body of a message to majordomo\@vger.kernel.org
 More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
         PublicInbox::Emergency->new($maildir)->prepare(\$msg);
-        PublicInbox::WatchMaildir->new($config)->scan;
+        PublicInbox::WatchMaildir->new($config)->scan('full');
         @list = $git->qx(qw(ls-tree -r --name-only refs/heads/master));
         is(scalar @list, 1, 'tree has one file');
         my $mref = $git->cat_file('HEAD:'.$list[0]);
@@ -80,7 +80,7 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
 
         is(unlink(glob("$maildir/new/*")), 1, 'unlinked spam');
         $write_spam->();
-        PublicInbox::WatchMaildir->new($config)->scan;
+        PublicInbox::WatchMaildir->new($config)->scan('full');
         @list = $git->qx(qw(ls-tree -r --name-only refs/heads/master));
         is(scalar @list, 0, 'tree is empty');
         @list = $git->qx(qw(rev-list refs/heads/master));
@@ -96,7 +96,7 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
         $config->{'publicinboxwatch.spamcheck'} = 'spamc';
         {
                 local $SIG{__WARN__} = sub {}; # quiet spam check warning
-                PublicInbox::WatchMaildir->new($config)->scan;
+                PublicInbox::WatchMaildir->new($config)->scan('full');
         }
         @list = $git->qx(qw(ls-tree -r --name-only refs/heads/master));
         is(scalar @list, 0, 'tree has no files spamc checked');
@@ -111,7 +111,7 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
         PublicInbox::Emergency->new($maildir)->prepare(\$msg);
         $config->{'publicinboxwatch.spamcheck'} = 'spamc';
         @list = $git->qx(qw(ls-tree -r --name-only refs/heads/master));
-        PublicInbox::WatchMaildir->new($config)->scan;
+        PublicInbox::WatchMaildir->new($config)->scan('full');
         @list = $git->qx(qw(ls-tree -r --name-only refs/heads/master));
         is(scalar @list, 1, 'tree has one file after spamc checked');