diff options
author | Eric Wong <e@yhbt.net> | 2020-06-27 10:03:37 +0000 |
---|---|---|
committer | Eric Wong <e@yhbt.net> | 2020-06-28 22:27:14 +0000 |
commit | 58c0333adbdd9f5f82309cb6eef3c379f0ff064e (patch) | |
tree | 5e0d2304c4003fcd9c858db4b955abab7e6e3a8f /lib/PublicInbox | |
parent | b2b1006759730507731fcd3fc3e0de68239e3b92 (diff) | |
download | public-inbox-58c0333adbdd9f5f82309cb6eef3c379f0ff064e.tar.gz |
We can get rid of the janky wannabe self-using-a-directory-instead-of-pipe thing we needed to workaround Filesys::Notify::Simple being blocking. For existing Maildir users, this should be more robust and immune to missed wakeups for signalfd and kqueue-enabled systems; as well as being immune to BOFHs clearing $TMPDIR and preventing notifications from firing. The IMAP IDLE code still uses normal Perl signals, so it's still vulnerable to missed wakeups. That will be addressed in future commits.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/Daemon.pm | 19 | ||||
-rw-r--r-- | lib/PublicInbox/Sigfd.pm | 12 | ||||
-rw-r--r-- | lib/PublicInbox/WatchMaildir.pm | 67 |
3 files changed, 55 insertions, 43 deletions
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index 2f63bd73..ab0c2226 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -18,9 +18,9 @@ use PublicInbox::DS qw(now); use PublicInbox::Syscall qw(SFD_NONBLOCK); require PublicInbox::Listener; require PublicInbox::ParentPipe; -require PublicInbox::Sigfd; +use PublicInbox::Sigfd; my @CMD; -my ($set_user, $oldset, $newset); +my ($set_user, $oldset); my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize); my $worker_processes = 1; my @listeners; @@ -72,15 +72,10 @@ sub accept_tls_opt ($) { { SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx }; } -sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" } - sub daemon_prepare ($) { my ($default_listen) = @_; my $listener_names = {}; # sockname => IO::Handle - $oldset = POSIX::SigSet->new(); - $newset = POSIX::SigSet->new(); - $newset->fillset or die "fillset: $!"; - sig_setmask($newset, $oldset); + my $oldset = PublicInbox::Sigfd::block_signals(); @CMD = ($0, @ARGV); my %opts = ( 'l|listen=s' => \@cfg_listen, @@ -515,7 +510,7 @@ EOF }; my $sigfd = PublicInbox::Sigfd->new($sig, 0); local %SIG = (%SIG, %$sig) if !$sigfd; - sig_setmask($oldset) if !$sigfd; + PublicInbox::restore_signals($oldset) if !$sigfd; while (1) { # main loop my $n = scalar keys %pids; unless (@listeners) { @@ -531,7 +526,7 @@ EOF } my $want = $worker_processes - 1; if ($n <= $want) { - sig_setmask($newset) if !$sigfd; + PublicInbox::Sigfd::block_signals() if !$sigfd; for my $i ($n..$want) { my $pid = fork; if (!defined $pid) { @@ -544,7 +539,7 @@ EOF $pids{$pid} = $i; } } - sig_setmask($oldset) if !$sigfd; + PubliInbox::Sigfd::set_sigmask($oldset) if !$sigfd; } if ($sigfd) { # Linux and IO::KQueue users: @@ -632,7 +627,7 @@ sub daemon_loop ($$$$) { if (!$sigfd) { # wake up every second to accept signals if we don't # have signalfd or IO::KQueue: - sig_setmask($oldset); + PublicInbox::Sigfd::set_sigmask($oldset); PublicInbox::DS->SetLoopTimeout(1000); } PublicInbox::DS->EventLoop; diff --git a/lib/PublicInbox/Sigfd.pm b/lib/PublicInbox/Sigfd.pm index f500902e..17456592 100644 --- a/lib/PublicInbox/Sigfd.pm +++ b/lib/PublicInbox/Sigfd.pm @@ -5,7 +5,7 @@ use strict; use parent qw(PublicInbox::DS); use fields qw(sig); # hashref similar to %SIG, but signal numbers as keys use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET SFD_NONBLOCK); -use POSIX (); +use POSIX qw(:signal_h); use IO::Handle (); # returns a coderef to unblock signals if neither signalfd or kqueue @@ -62,4 +62,14 @@ sub event_step { while (wait_once($_[0])) {} # non-blocking } +sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" } + +sub block_signals () { + my $oldset = POSIX::SigSet->new; + my $newset = POSIX::SigSet->new; + $newset->fillset or die "fillset: $!"; + sig_setmask($newset, $oldset); + $oldset; +} + 1; diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm index 22f19036..4d3cd032 100644 --- a/lib/PublicInbox/WatchMaildir.pm +++ b/lib/PublicInbox/WatchMaildir.pm @@ -8,9 +8,9 @@ use strict; use warnings; use PublicInbox::Eml; use PublicInbox::InboxWritable; -use File::Temp 0.19 (); # 0.19 for ->newdir use PublicInbox::Filter::Base qw(REJECT); use PublicInbox::Spamcheck; +use PublicInbox::Sigfd; use PublicInbox::DS qw(now); use POSIX qw(_exit WNOHANG); *mime_from_path = \&PublicInbox::InboxWritable::mime_from_path; @@ -108,6 +108,7 @@ sub new { imap => scalar keys %imap ? \%imap : undef, importers => {}, opendirs => {}, # dirname => dirhandle (in progress scans) + ops => [], # 'quit', 'full' }, $class; } @@ -195,7 +196,9 @@ sub _try_path { sub quit { my ($self) = @_; - trigger_scan($self, 'quit') or $self->{quit} = 1; + $self->{quit} = 1; + %{$self->{opendirs}} = (); + _done_for_now($self); if (my $imap_pid = $self->{-imap_pid}) { kill('QUIT', $imap_pid); } @@ -213,24 +216,15 @@ sub quit { sub watch_fs { my ($self) = @_; require PublicInbox::DirIdle; - my $scan = File::Temp->newdir("public-inbox-watch.$$.scan.XXXXXX", - TMPDIR => 1); - my $scandir = $self->{scandir} = $scan->dirname; - my $scan_re = qr!\A$scandir/!; my $done = sub { delete $self->{done_timer}; _done_for_now($self); }; my $cb = sub { - my $path = $_[0]->fullname; - if ($path =~ $scan_re) { - scan($self, $path); - } else { - _try_path($self, $path); - } + _try_path($self, $_[0]->fullname); $self->{done_timer} //= PublicInbox::DS::requeue($done); }; - my $di = PublicInbox::DirIdle->new([@{$self->{mdir}}, $scandir], $cb); + my $di = PublicInbox::DirIdle->new($self->{mdir}, $cb); PublicInbox::DS->SetPostLoopCallback(sub { !$self->{quit} }); PublicInbox::DS->EventLoop; _done_for_now($self); @@ -485,6 +479,12 @@ sub watch_imap_idle_1 ($$$) { } } +sub watch_atfork_child ($) { + my ($self) = @_; + PublicInbox::Sigfd::sig_setmask($self->{oldset}); + %SIG = (%SIG, %{$self->{sig}}); +} + sub watch_imap_idle_all ($$) { my ($self, $idle) = @_; # $idle = [[ uri1, intvl1 ], [ uri2, intvl2 ]] $self->{mics} = {}; # going to be forking, so disconnect @@ -494,6 +494,7 @@ sub watch_imap_idle_all ($$) { my ($uri, $intvl) = @$uri_intvl; defined(my $pid = fork) or die "fork: $!"; if ($pid == 0) { + watch_atfork_child($self); delete $self->{idle_pids}; watch_imap_idle_1($self, $uri, $intvl); _exit(0); @@ -564,15 +565,20 @@ sub watch_imap ($) { } sub watch { - my ($self) = @_; + my ($self, $sig, $oldset) = @_; + $self->{oldset} = $oldset; + $self->{sig} = $sig; if ($self->{mdre} && $self->{imap}) { defined(my $pid = fork) or die "fork: $!"; if ($pid == 0) { + watch_atfork_child($self); imap_start($self); goto &watch_imap; } $self->{-imap_pid} = $pid; } elsif ($self->{imap}) { + # not a child process, but no signalfd, yet: + watch_atfork_child($self); imap_start($self); goto &watch_imap; } @@ -580,23 +586,18 @@ sub watch { } sub trigger_scan { - my ($self, $base) = @_; - my $dir = $self->{scandir} or return; - open my $fh, '>', "$dir/$base" or die "open $dir/$base failed: $!\n"; - close $fh or die "close $dir/$base failed: $!\n"; + my ($self, $op) = @_; + push @{$self->{ops}}, $op; + PublicInbox::DS::requeue($self); } -sub scan { - my ($self, $path) = @_; - if ($path =~ /quit\z/) { - %{$self->{opendirs}} = (); - _done_for_now($self); - delete $self->{scandir}; - $self->{quit} = 1; - return; - } - # else: $path =~ /(cont|full)\z/ +# called directly, and by PublicInbox::DS +sub event_step ($) { + my ($self) = @_; return if $self->{quit}; + my $op = shift @{$self->{ops}}; + + # continue existing scan my $max = 10; my $opendirs = $self->{opendirs}; my @dirnames = keys %$opendirs; @@ -609,7 +610,7 @@ sub scan { } $opendirs->{$dir} = $dh if $n < 0; } - if ($path =~ /full\z/) { + if ($op && $op eq 'full') { foreach my $dir (@{$self->{mdir}}) { next if $opendirs->{$dir}; # already in progress my $ok = opendir(my $dh, $dir); @@ -627,7 +628,13 @@ sub scan { } _done_for_now($self); # do we have more work to do? - trigger_scan($self, 'cont') if keys %$opendirs; + PublicInbox::DS::requeue($self) if keys %$opendirs; +} + +sub scan { + my ($self, $op) = @_; + push @{$self->{ops}}, $op; + goto &event_step; } sub _importer_for { |