From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 558E61F8F2 for ; Sat, 27 Jun 2020 10:04:02 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 11/34] watch: use signalfd for Maildir watching Date: Sat, 27 Jun 2020 10:03:37 +0000 Message-Id: <20200627100400.9871-12-e@yhbt.net> In-Reply-To: <20200627100400.9871-1-e@yhbt.net> References: <20200627100400.9871-1-e@yhbt.net> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We can get rid of the janky wannabe self-using-a-directory-instead-of-pipe thing we needed to workaround Filesys::Notify::Simple being blocking. For existing Maildir users, this should be more robust and immune to missed wakeups for to signalfd and kqueue-enabled systems; as well as being immune to BOFHs clearing $TMPDIR and preventing notifications from firing. The IMAP IDLE code still uses normal Perl signals, so it's still vulnerable to missed wakeups. That will be addressed in future commits. --- lib/PublicInbox/Daemon.pm | 19 ++++------ lib/PublicInbox/Sigfd.pm | 12 +++++- lib/PublicInbox/WatchMaildir.pm | 67 ++++++++++++++++++--------------- script/public-inbox-watch | 24 +++++++++--- t/watch_maildir.t | 4 +- 5 files changed, 75 insertions(+), 51 deletions(-) diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index 2f63bd73b4a..ab0c2226e40 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -18,9 +18,9 @@ use PublicInbox::DS qw(now); use PublicInbox::Syscall qw(SFD_NONBLOCK); require PublicInbox::Listener; require PublicInbox::ParentPipe; -require PublicInbox::Sigfd; +use PublicInbox::Sigfd; my @CMD; -my ($set_user, $oldset, $newset); +my ($set_user, $oldset); my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize); my $worker_processes = 1; my @listeners; @@ -72,15 +72,10 @@ sub accept_tls_opt ($) { { SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx }; } -sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" } - sub daemon_prepare ($) { my ($default_listen) = @_; my $listener_names = {}; # sockname => IO::Handle - $oldset = POSIX::SigSet->new(); - $newset = POSIX::SigSet->new(); - $newset->fillset or die "fillset: $!"; - sig_setmask($newset, $oldset); + my $oldset = PublicInbox::Sigfd::block_signals(); @CMD = ($0, @ARGV); my %opts = ( 'l|listen=s' => \@cfg_listen, @@ -515,7 +510,7 @@ EOF }; my $sigfd = PublicInbox::Sigfd->new($sig, 0); local %SIG = (%SIG, %$sig) if !$sigfd; - sig_setmask($oldset) if !$sigfd; + PublicInbox::restore_signals($oldset) if !$sigfd; while (1) { # main loop my $n = scalar keys %pids; unless (@listeners) { @@ -531,7 +526,7 @@ EOF } my $want = $worker_processes - 1; if ($n <= $want) { - sig_setmask($newset) if !$sigfd; + PublicInbox::Sigfd::block_signals() if !$sigfd; for my $i ($n..$want) { my $pid = fork; if (!defined $pid) { @@ -544,7 +539,7 @@ EOF $pids{$pid} = $i; } } - sig_setmask($oldset) if !$sigfd; + PubliInbox::Sigfd::set_sigmask($oldset) if !$sigfd; } if ($sigfd) { # Linux and IO::KQueue users: @@ -632,7 +627,7 @@ sub daemon_loop ($$$$) { if (!$sigfd) { # wake up every second to accept signals if we don't # have signalfd or IO::KQueue: - sig_setmask($oldset); + PublicInbox::Sigfd::set_sigmask($oldset); PublicInbox::DS->SetLoopTimeout(1000); } PublicInbox::DS->EventLoop; diff --git a/lib/PublicInbox/Sigfd.pm b/lib/PublicInbox/Sigfd.pm index f500902ea67..17456592a7e 100644 --- a/lib/PublicInbox/Sigfd.pm +++ b/lib/PublicInbox/Sigfd.pm @@ -5,7 +5,7 @@ use strict; use parent qw(PublicInbox::DS); use fields qw(sig); # hashref similar to %SIG, but signal numbers as keys use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET SFD_NONBLOCK); -use POSIX (); +use POSIX qw(:signal_h); use IO::Handle (); # returns a coderef to unblock signals if neither signalfd or kqueue @@ -62,4 +62,14 @@ sub event_step { while (wait_once($_[0])) {} # non-blocking } +sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" } + +sub block_signals () { + my $oldset = POSIX::SigSet->new; + my $newset = POSIX::SigSet->new; + $newset->fillset or die "fillset: $!"; + sig_setmask($newset, $oldset); + $oldset; +} + 1; diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm index 22f190366a4..4d3cd032e5a 100644 --- a/lib/PublicInbox/WatchMaildir.pm +++ b/lib/PublicInbox/WatchMaildir.pm @@ -8,9 +8,9 @@ use strict; use warnings; use PublicInbox::Eml; use PublicInbox::InboxWritable; -use File::Temp 0.19 (); # 0.19 for ->newdir use PublicInbox::Filter::Base qw(REJECT); use PublicInbox::Spamcheck; +use PublicInbox::Sigfd; use PublicInbox::DS qw(now); use POSIX qw(_exit WNOHANG); *mime_from_path = \&PublicInbox::InboxWritable::mime_from_path; @@ -108,6 +108,7 @@ sub new { imap => scalar keys %imap ? \%imap : undef, importers => {}, opendirs => {}, # dirname => dirhandle (in progress scans) + ops => [], # 'quit', 'full' }, $class; } @@ -195,7 +196,9 @@ sub _try_path { sub quit { my ($self) = @_; - trigger_scan($self, 'quit') or $self->{quit} = 1; + $self->{quit} = 1; + %{$self->{opendirs}} = (); + _done_for_now($self); if (my $imap_pid = $self->{-imap_pid}) { kill('QUIT', $imap_pid); } @@ -213,24 +216,15 @@ sub quit { sub watch_fs { my ($self) = @_; require PublicInbox::DirIdle; - my $scan = File::Temp->newdir("public-inbox-watch.$$.scan.XXXXXX", - TMPDIR => 1); - my $scandir = $self->{scandir} = $scan->dirname; - my $scan_re = qr!\A$scandir/!; my $done = sub { delete $self->{done_timer}; _done_for_now($self); }; my $cb = sub { - my $path = $_[0]->fullname; - if ($path =~ $scan_re) { - scan($self, $path); - } else { - _try_path($self, $path); - } + _try_path($self, $_[0]->fullname); $self->{done_timer} //= PublicInbox::DS::requeue($done); }; - my $di = PublicInbox::DirIdle->new([@{$self->{mdir}}, $scandir], $cb); + my $di = PublicInbox::DirIdle->new($self->{mdir}, $cb); PublicInbox::DS->SetPostLoopCallback(sub { !$self->{quit} }); PublicInbox::DS->EventLoop; _done_for_now($self); @@ -485,6 +479,12 @@ sub watch_imap_idle_1 ($$$) { } } +sub watch_atfork_child ($) { + my ($self) = @_; + PublicInbox::Sigfd::sig_setmask($self->{oldset}); + %SIG = (%SIG, %{$self->{sig}}); +} + sub watch_imap_idle_all ($$) { my ($self, $idle) = @_; # $idle = [[ uri1, intvl1 ], [ uri2, intvl2 ]] $self->{mics} = {}; # going to be forking, so disconnect @@ -494,6 +494,7 @@ sub watch_imap_idle_all ($$) { my ($uri, $intvl) = @$uri_intvl; defined(my $pid = fork) or die "fork: $!"; if ($pid == 0) { + watch_atfork_child($self); delete $self->{idle_pids}; watch_imap_idle_1($self, $uri, $intvl); _exit(0); @@ -564,15 +565,20 @@ sub watch_imap ($) { } sub watch { - my ($self) = @_; + my ($self, $sig, $oldset) = @_; + $self->{oldset} = $oldset; + $self->{sig} = $sig; if ($self->{mdre} && $self->{imap}) { defined(my $pid = fork) or die "fork: $!"; if ($pid == 0) { + watch_atfork_child($self); imap_start($self); goto &watch_imap; } $self->{-imap_pid} = $pid; } elsif ($self->{imap}) { + # not a child process, but no signalfd, yet: + watch_atfork_child($self); imap_start($self); goto &watch_imap; } @@ -580,23 +586,18 @@ sub watch { } sub trigger_scan { - my ($self, $base) = @_; - my $dir = $self->{scandir} or return; - open my $fh, '>', "$dir/$base" or die "open $dir/$base failed: $!\n"; - close $fh or die "close $dir/$base failed: $!\n"; + my ($self, $op) = @_; + push @{$self->{ops}}, $op; + PublicInbox::DS::requeue($self); } -sub scan { - my ($self, $path) = @_; - if ($path =~ /quit\z/) { - %{$self->{opendirs}} = (); - _done_for_now($self); - delete $self->{scandir}; - $self->{quit} = 1; - return; - } - # else: $path =~ /(cont|full)\z/ +# called directly, and by PublicInbox::DS +sub event_step ($) { + my ($self) = @_; return if $self->{quit}; + my $op = shift @{$self->{ops}}; + + # continue existing scan my $max = 10; my $opendirs = $self->{opendirs}; my @dirnames = keys %$opendirs; @@ -609,7 +610,7 @@ sub scan { } $opendirs->{$dir} = $dh if $n < 0; } - if ($path =~ /full\z/) { + if ($op && $op eq 'full') { foreach my $dir (@{$self->{mdir}}) { next if $opendirs->{$dir}; # already in progress my $ok = opendir(my $dh, $dir); @@ -627,7 +628,13 @@ sub scan { } _done_for_now($self); # do we have more work to do? - trigger_scan($self, 'cont') if keys %$opendirs; + PublicInbox::DS::requeue($self) if keys %$opendirs; +} + +sub scan { + my ($self, $op) = @_; + push @{$self->{ops}}, $op; + goto &event_step; } sub _importer_for { diff --git a/script/public-inbox-watch b/script/public-inbox-watch index 2057066a2a9..b6d545adad7 100755 --- a/script/public-inbox-watch +++ b/script/public-inbox-watch @@ -5,6 +5,10 @@ use strict; use warnings; use PublicInbox::WatchMaildir; use PublicInbox::Config; +use PublicInbox::DS; +use PublicInbox::Sigfd; +use PublicInbox::Syscall qw(SFD_NONBLOCK); +my $oldset = PublicInbox::Sigfd::block_signals(); my ($config, $watch_md); my $reload = sub { $config = PublicInbox::Config->new; @@ -14,14 +18,22 @@ my $reload = sub { $reload->(); if ($watch_md) { my $scan = sub { $watch_md->trigger_scan('full') if $watch_md }; - $SIG{HUP} = $reload; - $SIG{USR1} = $scan; - $SIG{ALRM} = sub { $SIG{ALRM} = 'DEFAULT'; $scan->() }; - $SIG{QUIT} = $SIG{TERM} = $SIG{INT} = sub { + my $quit = sub { $watch_md->quit if $watch_md; $watch_md = undef; }; + my $sig = { HUP => $reload, USR1 => $scan }; + $sig->{QUIT} = $sig->{TERM} = $sig->{INT} = $quit; + # --no-scan is only intended for testing atm, undocumented. - alarm(1) unless (grep(/\A--no-scan\z/, @ARGV)); - $watch_md->watch while ($watch_md); + unless (grep(/\A--no-scan\z/, @ARGV)) { + PublicInbox::DS::requeue($scan); + } + my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK); + local %SIG = (%SIG, %$sig) if !$sigfd; + if (!$sigfd) { + PublicInbox::Sigfd::set_sigmask($oldset); + PublicInbox::DS->SetLoopTimeout(1000); + } + $watch_md->watch($sig, $oldset) while ($watch_md); } diff --git a/t/watch_maildir.t b/t/watch_maildir.t index a2c09b0351b..c8658140cf2 100644 --- a/t/watch_maildir.t +++ b/t/watch_maildir.t @@ -184,10 +184,10 @@ More majordomo info at http://vger.kernel.org/majordomo-info.html\n); my $ino_fdinfo = "/proc/$wm->{pid}/fdinfo/$ino_fd"; while (time < $end && open(my $fh, '<', $ino_fdinfo)) { @ino_info = grep(/^inotify wd:/, <$fh>); - last if @ino_info >= 4; + last if @ino_info >= 3; tick; } - $sleep = undef if @ino_info >= 4; + $sleep = undef if @ino_info >= 3; } } if ($sleep) {