about summary refs log tree commit homepage
path: root/lib/PublicInbox/Daemon.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/Daemon.pm')
-rw-r--r--lib/PublicInbox/Daemon.pm107
1 files changed, 65 insertions, 42 deletions
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index c9594a37..37aa4187 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -9,11 +9,13 @@ use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/;
 use IO::Handle;
 use IO::Socket;
 use Cwd qw/abs_path/;
+use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
 STDOUT->autoflush(1);
 STDERR->autoflush(1);
 require Danga::Socket;
 require POSIX;
 require PublicInbox::Listener;
+require PublicInbox::ParentPipe;
 my @CMD;
 my $set_user;
 my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize);
@@ -101,17 +103,18 @@ sub check_absolute ($$) {
 }
 
 sub daemonize () {
-        foreach my $i (0..$#ARGV) {
-                my $arg = $ARGV[$i];
-                next unless -e $arg;
-                $ARGV[$i] = abs_path($arg);
-        }
-        check_absolute('stdout', $stdout);
-        check_absolute('stderr', $stderr);
-        check_absolute('pid-file', $pid_file);
+        if ($daemonize) {
+                foreach my $i (0..$#ARGV) {
+                        my $arg = $ARGV[$i];
+                        next unless -e $arg;
+                        $ARGV[$i] = abs_path($arg);
+                }
+                check_absolute('stdout', $stdout);
+                check_absolute('stderr', $stderr);
+                check_absolute('pid-file', $pid_file);
 
-        chdir '/' or die "chdir failed: $!";
-        open(STDIN, '+<', '/dev/null') or die "redirect stdin failed: $!";
+                chdir '/' or die "chdir failed: $!";
+        }
 
         return unless (defined $pid_file || defined $group || defined $user
                         || $daemonize);
@@ -140,15 +143,17 @@ sub daemonize () {
         };
 
         if ($daemonize) {
-                my ($pid, $err) = do_fork();
-                die "could not fork: $err\n" unless defined $pid;
+                my $pid = fork;
+                die "could not fork: $!\n" unless defined $pid;
                 exit if $pid;
 
+                open(STDIN, '+<', '/dev/null') or
+                                        die "redirect stdin failed: $!\n";
                 open STDOUT, '>&STDIN' or die "redirect stdout failed: $!\n";
                 open STDERR, '>&STDIN' or die "redirect stderr failed: $!\n";
                 POSIX::setsid();
-                ($pid, $err) = do_fork();
-                die "could not fork: $err\n" unless defined $pid;
+                $pid = fork;
+                die "could not fork: $!\n" unless defined $pid;
                 exit if $pid;
         }
         if (defined $pid_file) {
@@ -161,29 +166,44 @@ sub daemonize () {
         }
 }
 
-sub worker_quit () {
+
+sub worker_quit {
+        my ($reason) = @_;
         # killing again terminates immediately:
         exit unless @listeners;
 
         $_->close foreach @listeners; # call Danga::Socket::close
         @listeners = ();
+        $reason->close if ref($reason) eq 'PublicInbox::ParentPipe';
 
-        # give slow clients 30s to finish reading/writing whatever
-        Danga::Socket->AddTimer(30, sub { exit });
-
+        my $proc_name;
+        my $warn = 0;
         # drop idle connections and try to quit gracefully
         Danga::Socket->SetPostLoopCallback(sub {
                 my ($dmap, undef) = @_;
                 my $n = 0;
+                my $now = clock_gettime(CLOCK_MONOTONIC);
 
                 foreach my $s (values %$dmap) {
-                        if ($s->can('busy') && $s->busy) {
-                                $n = 1;
+                        $s->can('busy') or next;
+                        if ($s->busy($now)) {
+                                ++$n;
                         } else {
                                 # close as much as possible, early as possible
                                 $s->close;
                         }
                 }
+                if ($n) {
+                        if (($warn + 5) < time) {
+                                warn "$$ quitting, $n client(s) left\n";
+                                $warn = time;
+                        }
+                        unless (defined $proc_name) {
+                                $proc_name = (split(/\s+/, $0))[0];
+                                $proc_name =~ s!\A.*?([^/]+)\z!$1!;
+                        }
+                        $0 = "$proc_name quitting, $n client(s) left";
+                }
                 $n; # true: loop continues, false: loop breaks
         });
 }
@@ -264,9 +284,9 @@ sub upgrade () {
                 $pid_file .= '.oldbin';
                 write_pid($pid_file);
         }
-        my ($pid, $err) = do_fork();
+        my $pid = fork;
         unless (defined $pid) {
-                warn "fork failed: $err\n";
+                warn "fork failed: $!\n";
                 return;
         }
         if ($pid == 0) {
@@ -291,17 +311,6 @@ sub kill_workers ($) {
         }
 }
 
-sub do_fork () {
-        my $new = POSIX::SigSet->new;
-        $new->fillset;
-        my $old = POSIX::SigSet->new;
-        POSIX::sigprocmask(&POSIX::SIG_BLOCK, $new, $old) or die "SIG_BLOCK: $!";
-        my $pid = fork;
-        my $err = $!;
-        POSIX::sigprocmask(&POSIX::SIG_SETMASK, $old) or die "SIG_SETMASK: $!";
-        ($pid, $err);
-}
-
 sub upgrade_aborted ($) {
         my ($p) = @_;
         warn "reexec PID($p) died with: $?\n";
@@ -336,6 +345,7 @@ sub unlink_pid_file_safe_ish ($$) {
         return unless defined $unlink_pid && $unlink_pid == $$;
 
         open my $fh, '<', $file or return;
+        local $/ = "\n";
         defined(my $read_pid = <$fh>) or return;
         chomp $read_pid;
         if ($read_pid == $unlink_pid) {
@@ -359,6 +369,7 @@ sub master_loop {
         }
         reopen_logs();
         # main loop
+        my $quit = 0;
         while (1) {
                 while (my $s = shift @caught) {
                         if ($s eq 'USR1') {
@@ -367,10 +378,16 @@ sub master_loop {
                         } elsif ($s eq 'USR2') {
                                 upgrade();
                         } elsif ($s =~ /\A(?:QUIT|TERM|INT)\z/) {
-                                # drops pipes and causes children to die
-                                exit
+                                exit if $quit++;
+                                kill_workers($s);
                         } elsif ($s eq 'WINCH') {
-                                $worker_processes = 0;
+                                if (-t STDIN || -t STDOUT || -t STDERR) {
+                                        warn
+"ignoring SIGWINCH since we are not daemonized\n";
+                                        $SIG{WINCH} = 'IGNORE';
+                                } else {
+                                        $worker_processes = 0;
+                                }
                         } elsif ($s eq 'HUP') {
                                 $worker_processes = $set_workers;
                                 kill_workers($s);
@@ -390,6 +407,11 @@ sub master_loop {
                 }
 
                 my $n = scalar keys %pids;
+                if ($quit) {
+                        exit if $n == 0;
+                        $set_workers = $worker_processes = $n = 0;
+                }
+
                 if ($n > $worker_processes) {
                         while (my ($k, $v) = each %pids) {
                                 kill('TERM', $k) if $v >= $worker_processes;
@@ -397,9 +419,9 @@ sub master_loop {
                         $n = $worker_processes;
                 }
                 foreach my $i ($n..($worker_processes - 1)) {
-                        my ($pid, $err) = do_fork();
+                        my $pid = fork;
                         if (!defined $pid) {
-                                warn "failed to fork worker[$i]: $err\n";
+                                warn "failed to fork worker[$i]: $!\n";
                         } elsif ($pid == 0) {
                                 $set_user->() if $set_user;
                                 return $p0; # run normal work code
@@ -419,13 +441,12 @@ sub daemon_loop ($$) {
         my $parent_pipe;
         if ($worker_processes > 0) {
                 $refresh->(); # preload by default
-                $parent_pipe = master_loop(); # returns if in child process
-                my $fd = fileno($parent_pipe);
-                Danga::Socket->AddOtherFds($fd => *worker_quit);
+                my $fh = master_loop(); # returns if in child process
+                $parent_pipe = PublicInbox::ParentPipe->new($fh, *worker_quit);
         } else {
                 reopen_logs();
                 $set_user->() if $set_user;
-                $SIG{USR2} = sub { worker_quit() if upgrade() };
+                $SIG{USR2} = sub { worker_quit('USR2') if upgrade() };
                 $refresh->();
         }
         $uid = $gid = undef;
@@ -433,6 +454,8 @@ sub daemon_loop ($$) {
         $SIG{QUIT} = $SIG{INT} = $SIG{TERM} = *worker_quit;
         $SIG{USR1} = *reopen_logs;
         $SIG{HUP} = $refresh;
+        $SIG{CHLD} = 'DEFAULT';
+        $SIG{$_} = 'IGNORE' for qw(USR2 TTIN TTOU WINCH);
         # this calls epoll_create:
         @listeners = map {
                 PublicInbox::Listener->new($_, $post_accept)