about summary refs log tree commit homepage
path: root/lib/PublicInbox/WatchMaildir.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/WatchMaildir.pm')
-rw-r--r--lib/PublicInbox/WatchMaildir.pm170
1 files changed, 70 insertions, 100 deletions
diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm
index 4d3cd032..431350be 100644
--- a/lib/PublicInbox/WatchMaildir.pm
+++ b/lib/PublicInbox/WatchMaildir.pm
@@ -12,7 +12,7 @@ use PublicInbox::Filter::Base qw(REJECT);
 use PublicInbox::Spamcheck;
 use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now);
-use POSIX qw(_exit WNOHANG);
+use POSIX qw(_exit);
 *mime_from_path = \&PublicInbox::InboxWritable::mime_from_path;
 
 sub compile_watchheaders ($) {
@@ -213,9 +213,8 @@ sub quit {
         }
 }
 
-sub watch_fs {
+sub watch_fs_init ($) {
         my ($self) = @_;
-        require PublicInbox::DirIdle;
         my $done = sub {
                 delete $self->{done_timer};
                 _done_for_now($self);
@@ -224,10 +223,8 @@ sub watch_fs {
                 _try_path($self, $_[0]->fullname);
                 $self->{done_timer} //= PublicInbox::DS::requeue($done);
         };
-        my $di = PublicInbox::DirIdle->new($self->{mdir}, $cb);
-        PublicInbox::DS->SetPostLoopCallback(sub { !$self->{quit} });
-        PublicInbox::DS->EventLoop;
-        _done_for_now($self);
+        require PublicInbox::DirIdle;
+        PublicInbox::DirIdle->new($self->{mdir}, $cb); # EPOLL_CTL_ADD
 }
 
 # returns the git config section name, e.g [imap "imaps://user@example.com"]
@@ -334,25 +331,6 @@ sub mic_for ($$$) { # mic = Mail::IMAPClient
         $mic;
 }
 
-sub imap_start ($) {
-        my ($self) = @_;
-        eval { require PublicInbox::IMAPClient } or
-                die "Mail::IMAPClient is required for IMAP:\n$@\n";
-        eval { require Git } or
-                die "Git (Perl module) is required for IMAP:\n$@\n";
-        eval { require PublicInbox::IMAPTracker } or
-                die "DBD::SQLite is required for IMAP\n:$@\n";
-
-        my $mic_args = imap_common_init($self);
-        # make sure we can connect and cache the credentials in memory
-        $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args
-        my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj
-        for my $url (sort keys %{$self->{imap}}) {
-                my $uri = PublicInbox::URIimap->new($url);
-                $mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args);
-        }
-}
-
 sub imap_fetch_all ($$$) {
         my ($self, $mic, $uri) = @_;
         my $sec = imap_section($uri);
@@ -481,74 +459,76 @@ sub watch_imap_idle_1 ($$$) {
 
 sub watch_atfork_child ($) {
         my ($self) = @_;
+        delete $self->{idle_pids};
+        PublicInbox::DS->Reset;
         PublicInbox::Sigfd::sig_setmask($self->{oldset});
         %SIG = (%SIG, %{$self->{sig}});
 }
 
-sub watch_imap_idle_all ($$) {
-        my ($self, $idle) = @_; # $idle = [[ uri1, intvl1 ], [ uri2, intvl2 ]]
-        $self->{mics} = {}; # going to be forking, so disconnect
-        my $idle_pids = $self->{idle_pids} = {};
-        until ($self->{quit}) {
-                while (my $uri_intvl = shift @$idle) {
-                        my ($uri, $intvl) = @$uri_intvl;
-                        defined(my $pid = fork) or die "fork: $!";
-                        if ($pid == 0) {
-                                watch_atfork_child($self);
-                                delete $self->{idle_pids};
-                                watch_imap_idle_1($self, $uri, $intvl);
-                                _exit(0);
-                        }
-                        $idle_pids->{$pid} = $uri_intvl;
-                }
-                my $pid = waitpid(-1, 0) or next;
-                if ($pid < 0) {
-                        warn "W: no idling children: $!";
-                        if (@$idle) {
-                                sleep 60;
-                        } else {
-                                warn "W: nothing to respawn, quitting IDLE\n";
-                                last;
-                        }
-                }
-                if (my $uri_intvl = delete $idle_pids->{$pid}) {
-                        my ($uri, $intvl) = @$uri_intvl;
-                        my $url = $uri->as_string;
-                        if ($? || !$self->{quit}) {
-                                warn "W: PID=$pid on $url died: \$?=$?\n";
-                        }
-                        push @$idle, $uri_intvl;
-                } else {
-                        warn "W: PID=$pid (unknown) reaped: \$?=$?\n";
-                }
+sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
+        my ($self, $pid) = @_;
+        my $uri_intvl = delete $self->{idle_pids}->{$pid} or
+                die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
+
+        my ($uri, $intvl) = @$uri_intvl;
+        my $url = $uri->as_string;
+        return if $self->{quit};
+        warn "W: PID=$pid on $url died: \$?=$?\n" if $?;
+        push @{$self->{idle_todo}}, $uri_intvl;
+        PubicInbox::DS::requeue($self); # call ->event_step to respawn
+}
+
+sub imap_idle_fork ($$) {
+        my ($self, $uri_intvl) = @_;
+        my ($uri, $intvl) = @$uri_intvl;
+        defined(my $pid = fork) or die "fork: $!";
+        if ($pid == 0) {
+                watch_atfork_child($self);
+                watch_imap_idle_1($self, $uri, $intvl);
+                _exit(0);
         }
+        $self->{idle_pids}->{$pid} = $uri_intvl;
+        PublicInbox::DS::dwaitpid($pid, \&imap_idle_reap, $self);
+}
 
-        # tear it all down
-        kill('QUIT', $_) for (keys %$idle_pids);
-        while (scalar keys %$idle_pids) {
-                if (my $pid = waitpid(-1, WNOHANG)) {
-                        if ($pid < 0) {
-                                warn "E: no children? $! (PIDs: ",
-                                        join(', ', keys %$idle_pids),")\n";
-                                last;
-                        } else {
-                                delete $idle_pids->{$pid};
-                        }
-                } else { # signals aren't that reliable w/o signalfd/kevent
-                        sleep 1;
-                        kill('QUIT', $_) for (keys %$idle_pids);
+sub event_step {
+        my ($self) = @_;
+        return if $self->{quit};
+        my $idle_todo = $self->{idle_todo};
+        if ($idle_todo && @$idle_todo) {
+                $self->{mics} = {}; # going to be forking, so disconnect
+                while (my $uri_intvl = shift(@$idle_todo)) {
+                        imap_idle_fork($self, $uri_intvl);
                 }
         }
+        goto(&fs_scan_step) if $self->{mdre};
 }
 
-sub watch_imap ($) {
+sub watch_imap_init ($) {
         my ($self) = @_;
-        my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ];
+        eval { require PublicInbox::IMAPClient } or
+                die "Mail::IMAPClient is required for IMAP:\n$@\n";
+        eval { require Git } or
+                die "Git (Perl module) is required for IMAP:\n$@\n";
+        eval { require PublicInbox::IMAPTracker } or
+                die "DBD::SQLite is required for IMAP\n:$@\n";
+
+        my $mic_args = imap_common_init($self); # read args from config
+
+        # make sure we can connect and cache the credentials in memory
+        $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args
+        my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj
+        for my $url (sort keys %{$self->{imap}}) {
+                my $uri = PublicInbox::URIimap->new($url);
+                $mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args);
+        }
+
+        my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ]
         my $poll = {}; # intvl_seconds => [ uri1, uri2 ]
         for my $url (keys %{$self->{imap}}) {
                 my $uri = PublicInbox::URIimap->new($url);
                 my $sec = imap_section($uri);
-                my $mic = $self->{mics}->{$sec};
+                my $mic = $mics->{$sec};
                 my $intvl = $self->{imap_opt}->{$sec}->{poll_intvl};
                 if ($mic->has_capability('IDLE') && !$intvl) {
                         $intvl = $self->{imap_opt}->{$sec}->{idle_intvl};
@@ -557,9 +537,10 @@ sub watch_imap ($) {
                         push @{$poll->{$intvl || 120}}, $uri;
                 }
         }
-        my $nr_poll = scalar keys %$poll;
-        if (scalar @$idle && !$nr_poll) { # multiple idlers, need fork
-                watch_imap_idle_all($self, $idle);
+        if (scalar @$idle) {
+                $self->{idle_pids} = {};
+                $self->{idle_todo} = $idle;
+                PublicInbox::DS::requeue($self); # ->event_step to fork
         }
         # TODO: polling
 }
@@ -568,21 +549,11 @@ sub watch {
         my ($self, $sig, $oldset) = @_;
         $self->{oldset} = $oldset;
         $self->{sig} = $sig;
-        if ($self->{mdre} && $self->{imap}) {
-                defined(my $pid = fork) or die "fork: $!";
-                if ($pid == 0) {
-                        watch_atfork_child($self);
-                        imap_start($self);
-                        goto &watch_imap;
-                }
-                $self->{-imap_pid} = $pid;
-        } elsif ($self->{imap}) {
-                # not a child process, but no signalfd, yet:
-                watch_atfork_child($self);
-                imap_start($self);
-                goto &watch_imap;
-        }
-        goto &watch_fs;
+        watch_imap_init($self) if $self->{imap};
+        watch_fs_init($self) if $self->{mdre};
+        PublicInbox::DS->SetPostLoopCallback(sub {});
+        PublicInbox::DS->EventLoop until $self->{quit};
+        _done_for_now($self);
 }
 
 sub trigger_scan {
@@ -591,8 +562,7 @@ sub trigger_scan {
         PublicInbox::DS::requeue($self);
 }
 
-# called directly, and by PublicInbox::DS
-sub event_step ($) {
+sub fs_scan_step {
         my ($self) = @_;
         return if $self->{quit};
         my $op = shift @{$self->{ops}};
@@ -634,7 +604,7 @@ sub event_step ($) {
 sub scan {
         my ($self, $op) = @_;
         push @{$self->{ops}}, $op;
-        goto &event_step;
+        goto &fs_scan_step;
 }
 
 sub _importer_for {