about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
authorEric Wong <e@yhbt.net>2020-06-27 10:03:37 +0000
committerEric Wong <e@yhbt.net>2020-06-28 22:27:14 +0000
commit58c0333adbdd9f5f82309cb6eef3c379f0ff064e (patch)
tree5e0d2304c4003fcd9c858db4b955abab7e6e3a8f /lib/PublicInbox
parentb2b1006759730507731fcd3fc3e0de68239e3b92 (diff)
downloadpublic-inbox-58c0333adbdd9f5f82309cb6eef3c379f0ff064e.tar.gz
We can get rid of the janky wannabe
self-using-a-directory-instead-of-pipe thing we needed to
workaround Filesys::Notify::Simple being blocking.

For existing Maildir users, this should be more robust and
immune to missed wakeups for signalfd and kqueue-enabled
systems; as well as being immune to BOFHs clearing $TMPDIR
and preventing notifications from firing.

The IMAP IDLE code still uses normal Perl signals, so it's still
vulnerable to missed wakeups.  That will be addressed in future
commits.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/Daemon.pm19
-rw-r--r--lib/PublicInbox/Sigfd.pm12
-rw-r--r--lib/PublicInbox/WatchMaildir.pm67
3 files changed, 55 insertions, 43 deletions
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 2f63bd73..ab0c2226 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -18,9 +18,9 @@ use PublicInbox::DS qw(now);
 use PublicInbox::Syscall qw(SFD_NONBLOCK);
 require PublicInbox::Listener;
 require PublicInbox::ParentPipe;
-require PublicInbox::Sigfd;
+use PublicInbox::Sigfd;
 my @CMD;
-my ($set_user, $oldset, $newset);
+my ($set_user, $oldset);
 my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize);
 my $worker_processes = 1;
 my @listeners;
@@ -72,15 +72,10 @@ sub accept_tls_opt ($) {
         { SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx };
 }
 
-sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
-
 sub daemon_prepare ($) {
         my ($default_listen) = @_;
         my $listener_names = {}; # sockname => IO::Handle
-        $oldset = POSIX::SigSet->new();
-        $newset = POSIX::SigSet->new();
-        $newset->fillset or die "fillset: $!";
-        sig_setmask($newset, $oldset);
+        my $oldset = PublicInbox::Sigfd::block_signals();
         @CMD = ($0, @ARGV);
         my %opts = (
                 'l|listen=s' => \@cfg_listen,
@@ -515,7 +510,7 @@ EOF
         };
         my $sigfd = PublicInbox::Sigfd->new($sig, 0);
         local %SIG = (%SIG, %$sig) if !$sigfd;
-        sig_setmask($oldset) if !$sigfd;
+        PublicInbox::restore_signals($oldset) if !$sigfd;
         while (1) { # main loop
                 my $n = scalar keys %pids;
                 unless (@listeners) {
@@ -531,7 +526,7 @@ EOF
                 }
                 my $want = $worker_processes - 1;
                 if ($n <= $want) {
-                        sig_setmask($newset) if !$sigfd;
+                        PublicInbox::Sigfd::block_signals() if !$sigfd;
                         for my $i ($n..$want) {
                                 my $pid = fork;
                                 if (!defined $pid) {
@@ -544,7 +539,7 @@ EOF
                                         $pids{$pid} = $i;
                                 }
                         }
-                        sig_setmask($oldset) if !$sigfd;
+                        PubliInbox::Sigfd::set_sigmask($oldset) if !$sigfd;
                 }
 
                 if ($sigfd) { # Linux and IO::KQueue users:
@@ -632,7 +627,7 @@ sub daemon_loop ($$$$) {
         if (!$sigfd) {
                 # wake up every second to accept signals if we don't
                 # have signalfd or IO::KQueue:
-                sig_setmask($oldset);
+                PublicInbox::Sigfd::set_sigmask($oldset);
                 PublicInbox::DS->SetLoopTimeout(1000);
         }
         PublicInbox::DS->EventLoop;
diff --git a/lib/PublicInbox/Sigfd.pm b/lib/PublicInbox/Sigfd.pm
index f500902e..17456592 100644
--- a/lib/PublicInbox/Sigfd.pm
+++ b/lib/PublicInbox/Sigfd.pm
@@ -5,7 +5,7 @@ use strict;
 use parent qw(PublicInbox::DS);
 use fields qw(sig); # hashref similar to %SIG, but signal numbers as keys
 use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET SFD_NONBLOCK);
-use POSIX ();
+use POSIX qw(:signal_h);
 use IO::Handle ();
 
 # returns a coderef to unblock signals if neither signalfd or kqueue
@@ -62,4 +62,14 @@ sub event_step {
         while (wait_once($_[0])) {} # non-blocking
 }
 
+sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
+
+sub block_signals () {
+        my $oldset = POSIX::SigSet->new;
+        my $newset = POSIX::SigSet->new;
+        $newset->fillset or die "fillset: $!";
+        sig_setmask($newset, $oldset);
+        $oldset;
+}
+
 1;
diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm
index 22f19036..4d3cd032 100644
--- a/lib/PublicInbox/WatchMaildir.pm
+++ b/lib/PublicInbox/WatchMaildir.pm
@@ -8,9 +8,9 @@ use strict;
 use warnings;
 use PublicInbox::Eml;
 use PublicInbox::InboxWritable;
-use File::Temp 0.19 (); # 0.19 for ->newdir
 use PublicInbox::Filter::Base qw(REJECT);
 use PublicInbox::Spamcheck;
+use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now);
 use POSIX qw(_exit WNOHANG);
 *mime_from_path = \&PublicInbox::InboxWritable::mime_from_path;
@@ -108,6 +108,7 @@ sub new {
                 imap => scalar keys %imap ? \%imap : undef,
                 importers => {},
                 opendirs => {}, # dirname => dirhandle (in progress scans)
+                ops => [], # 'quit', 'full'
         }, $class;
 }
 
@@ -195,7 +196,9 @@ sub _try_path {
 
 sub quit {
         my ($self) = @_;
-        trigger_scan($self, 'quit') or $self->{quit} = 1;
+        $self->{quit} = 1;
+        %{$self->{opendirs}} = ();
+        _done_for_now($self);
         if (my $imap_pid = $self->{-imap_pid}) {
                 kill('QUIT', $imap_pid);
         }
@@ -213,24 +216,15 @@ sub quit {
 sub watch_fs {
         my ($self) = @_;
         require PublicInbox::DirIdle;
-        my $scan = File::Temp->newdir("public-inbox-watch.$$.scan.XXXXXX",
-                                        TMPDIR => 1);
-        my $scandir = $self->{scandir} = $scan->dirname;
-        my $scan_re = qr!\A$scandir/!;
         my $done = sub {
                 delete $self->{done_timer};
                 _done_for_now($self);
         };
         my $cb = sub {
-                my $path = $_[0]->fullname;
-                if ($path =~ $scan_re) {
-                        scan($self, $path);
-                } else {
-                        _try_path($self, $path);
-                }
+                _try_path($self, $_[0]->fullname);
                 $self->{done_timer} //= PublicInbox::DS::requeue($done);
         };
-        my $di = PublicInbox::DirIdle->new([@{$self->{mdir}}, $scandir], $cb);
+        my $di = PublicInbox::DirIdle->new($self->{mdir}, $cb);
         PublicInbox::DS->SetPostLoopCallback(sub { !$self->{quit} });
         PublicInbox::DS->EventLoop;
         _done_for_now($self);
@@ -485,6 +479,12 @@ sub watch_imap_idle_1 ($$$) {
         }
 }
 
+sub watch_atfork_child ($) {
+        my ($self) = @_;
+        PublicInbox::Sigfd::sig_setmask($self->{oldset});
+        %SIG = (%SIG, %{$self->{sig}});
+}
+
 sub watch_imap_idle_all ($$) {
         my ($self, $idle) = @_; # $idle = [[ uri1, intvl1 ], [ uri2, intvl2 ]]
         $self->{mics} = {}; # going to be forking, so disconnect
@@ -494,6 +494,7 @@ sub watch_imap_idle_all ($$) {
                         my ($uri, $intvl) = @$uri_intvl;
                         defined(my $pid = fork) or die "fork: $!";
                         if ($pid == 0) {
+                                watch_atfork_child($self);
                                 delete $self->{idle_pids};
                                 watch_imap_idle_1($self, $uri, $intvl);
                                 _exit(0);
@@ -564,15 +565,20 @@ sub watch_imap ($) {
 }
 
 sub watch {
-        my ($self) = @_;
+        my ($self, $sig, $oldset) = @_;
+        $self->{oldset} = $oldset;
+        $self->{sig} = $sig;
         if ($self->{mdre} && $self->{imap}) {
                 defined(my $pid = fork) or die "fork: $!";
                 if ($pid == 0) {
+                        watch_atfork_child($self);
                         imap_start($self);
                         goto &watch_imap;
                 }
                 $self->{-imap_pid} = $pid;
         } elsif ($self->{imap}) {
+                # not a child process, but no signalfd, yet:
+                watch_atfork_child($self);
                 imap_start($self);
                 goto &watch_imap;
         }
@@ -580,23 +586,18 @@ sub watch {
 }
 
 sub trigger_scan {
-        my ($self, $base) = @_;
-        my $dir = $self->{scandir} or return;
-        open my $fh, '>', "$dir/$base" or die "open $dir/$base failed: $!\n";
-        close $fh or die "close $dir/$base failed: $!\n";
+        my ($self, $op) = @_;
+        push @{$self->{ops}}, $op;
+        PublicInbox::DS::requeue($self);
 }
 
-sub scan {
-        my ($self, $path) = @_;
-        if ($path =~ /quit\z/) {
-                %{$self->{opendirs}} = ();
-                _done_for_now($self);
-                delete $self->{scandir};
-                $self->{quit} = 1;
-                return;
-        }
-        # else: $path =~ /(cont|full)\z/
+# called directly, and by PublicInbox::DS
+sub event_step ($) {
+        my ($self) = @_;
         return if $self->{quit};
+        my $op = shift @{$self->{ops}};
+
+        # continue existing scan
         my $max = 10;
         my $opendirs = $self->{opendirs};
         my @dirnames = keys %$opendirs;
@@ -609,7 +610,7 @@ sub scan {
                 }
                 $opendirs->{$dir} = $dh if $n < 0;
         }
-        if ($path =~ /full\z/) {
+        if ($op && $op eq 'full') {
                 foreach my $dir (@{$self->{mdir}}) {
                         next if $opendirs->{$dir}; # already in progress
                         my $ok = opendir(my $dh, $dir);
@@ -627,7 +628,13 @@ sub scan {
         }
         _done_for_now($self);
         # do we have more work to do?
-        trigger_scan($self, 'cont') if keys %$opendirs;
+        PublicInbox::DS::requeue($self) if keys %$opendirs;
+}
+
+sub scan {
+        my ($self, $op) = @_;
+        push @{$self->{ops}}, $op;
+        goto &event_step;
 }
 
 sub _importer_for {