From ffd40d6260b01290b2dd060f69ed2c663eba4027 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 24 Sep 2015 21:28:51 +0000 Subject: nntpd: hoist out daemon management code We'll probably be supporting read-only IMAP, or maybe we'll just implement a custom HTTP server so users can manage/upgrade the same way as the nntpd while being immune to slow clients. --- public-inbox-nntpd | 391 +---------------------------------------------------- 1 file changed, 6 insertions(+), 385 deletions(-) (limited to 'public-inbox-nntpd') diff --git a/public-inbox-nntpd b/public-inbox-nntpd index 67fc90b2..f6042c2f 100644 --- a/public-inbox-nntpd +++ b/public-inbox-nntpd @@ -3,392 +3,13 @@ # License: AGPLv3 or later (https://www.gnu.org/licenses/agpl-3.0.txt) use strict; use warnings; -my @CMD = ($0, @ARGV); -require Danga::Socket; -require IO::Handle; -use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/; +require PublicInbox::Daemon; require PublicInbox::NewsGroup; -my $set_user; +require PublicInbox::NNTP; my $nntpd = PublicInbox::NNTPD->new; -my $refresh = sub { $nntpd->refresh_groups }; -$SIG{HUP} = $SIG{USR1} = $SIG{USR2} = $SIG{PIPE} = - $SIG{TTIN} = $SIG{TTOU} = $SIG{WINCH} = 'IGNORE'; - -$refresh->(); -my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize); -my $worker_processes = 0; -my %opts = ( - 'l|listen=s' => \@cfg_listen, - '1|stdout=s' => \$stdout, - '2|stderr=s' => \$stderr, - 'W|worker-processes=i' => \$worker_processes, - 'P|pid-file=s' => \$pid_file, - 'u|user=s' => \$user, - 'g|group=s' => \$group, - 'D|daemonize' => \$daemonize, -); -GetOptions(%opts) or die "bad command-line args\n"; - -if (defined $pid_file && $pid_file =~ /\.oldbin\z/) { - die "--pid-file cannot end with '.oldbin'\n"; -} - -my %pids; -my %listener_names; -my $reexec_pid; -my @listeners = inherit(); - -# ignore daemonize when inheriting -$daemonize = undef if scalar @listeners; - -# default NNTP listener if no listeners -push @cfg_listen, '0.0.0.0:119' unless (@listeners || @cfg_listen); - -foreach my $l (@cfg_listen) { - next if $listener_names{$l}; # already inherited - require IO::Socket::INET6; # works for IPv4, too - my %o = ( - LocalAddr => $l, - ReuseAddr => 1, - Proto => 'tcp', - ); - if (my $s = IO::Socket::INET6->new(%o)) { - $listener_names{sockname($s)} = $s; - push @listeners, $s; - } else { - warn "error binding $l: $!\n"; - } -} -die 'No listeners bound' unless @listeners; - -chdir '/' or die "chdir failed: $!\n"; -open(STDIN, '+<', '/dev/null') or die "redirect stdin failed: $!\n"; - -if (defined $pid_file || defined $group || defined $user || $daemonize) { - require Net::Server::Daemonize; - - Net::Server::Daemonize::check_pid_file($pid_file) if defined $pid_file; - my $uid = Net::Server::Daemonize::get_uid($user) if defined $user; - my $gid; - if (defined $group) { - $gid = Net::Server::Daemonize::get_gid($group); - $gid = (split /\s+/, $gid)[0]; - } elsif (defined $uid) { - $gid = (getpwuid($uid))[3]; - } - - # We change users in the worker to ensure upgradability, - # The upgrade will create the ".oldbin" pid file in the - # same directory as the given pid file. - $uid and $set_user = sub { - Net::Server::Daemonize::set_user($uid, $gid); - }; - - if ($daemonize) { - my ($pid, $err) = do_fork(); - die "could not fork: $err\n" unless defined $pid; - exit if $pid; - - open STDOUT, '>&STDIN' or die "redirect stdout failed: $!\n"; - open STDERR, '>&STDIN' or die "redirect stderr failed: $!\n"; - POSIX::setsid(); - ($pid, $err) = do_fork(); - die "could not fork: $err\n" unless defined $pid; - exit if $pid; - } - if (defined $pid_file) { - my $unlink_pid = $$; - Net::Server::Daemonize::create_pid_file($pid_file); - if ($uid and !chown($uid, $gid, $pid_file)) { - warn "could not chown $pid_file: $!\n"; - } - END { unlink_pid_file_safe_ish($unlink_pid, $pid_file) }; - } -} - -if ($worker_processes > 0) { - pipe(my ($p0, $p1)) or die "failed to create parent-pipe: $!\n"; - my %pwatch = ( fileno($p0) => sub { kill('TERM', $$) } ); - pipe(my ($r, $w)) or die "failed to create self-pipe: $!\n"; - IO::Handle::blocking($w, 0); - my $set_workers = $worker_processes; - my @caught; - my $master_pid = $$; - foreach my $s (qw(HUP CHLD QUIT INT TERM USR1 USR2 TTIN TTOU WINCH)) { - $SIG{$s} = sub { - return if $$ != $master_pid; - push @caught, $s; - syswrite($w, '.'); - }; - } - reopen_logs(); - # main loop - while (1) { - while (my $s = shift @caught) { - if ($s eq 'USR1') { - reopen_logs(); - kill_workers($s); - } elsif ($s eq 'USR2') { - upgrade(); - } elsif ($s =~ /\A(?:QUIT|TERM|INT)\z/) { - # drops pipes and causes children to die - exit - } elsif ($s eq 'WINCH') { - $worker_processes = 0; - } elsif ($s eq 'HUP') { - $worker_processes = $set_workers; - $refresh->(); - kill_workers($s); - } elsif ($s eq 'TTIN') { - if ($set_workers > $worker_processes) { - ++$worker_processes; - } else { - $worker_processes = ++$set_workers; - } - } elsif ($s eq 'TTOU') { - if ($set_workers > 0) { - $worker_processes = --$set_workers; - } - } elsif ($s eq 'CHLD') { - reap_children(); - } - } - - my $n = scalar keys %pids; - if ($n > $worker_processes) { - while (my ($k, $v) = each %pids) { - kill('TERM', $k) if $v >= $worker_processes; - } - $n = $worker_processes; - } - foreach my $i ($n..($worker_processes - 1)) { - my ($pid, $err) = do_fork(); - if (!defined $pid) { - warn "failed to fork worker[$i]: $err\n"; - } elsif ($pid == 0) { - $set_user->() if $set_user; - close($_) for ($w, $r, $p1); - Danga::Socket->AddOtherFds(%pwatch); - goto worker; - } else { - warn "PID=$pid is worker[$i]\n"; - $pids{$pid} = $i; - } - } - sysread($r, my $buf, 8); - } -} else { - $set_user->() if $set_user; - $SIG{USR2} = sub { worker_quit() if upgrade() }; -worker: - # this calls epoll_create: - reopen_logs(); - $SIG{QUIT} = $SIG{INT} = $SIG{TERM} = *worker_quit; - $SIG{USR1} = *reopen_logs; - $SIG{HUP} = $refresh; - PublicInbox::Listener->new($_) for @listeners; - Danga::Socket->EventLoop; -} - -# end of main - -sub worker_quit { - # killing again terminates immediately: - exit unless @listeners; - - @listeners = (); - - # give slow clients 30s to finish reading/writing whatever - Danga::Socket->AddTimer(30, sub { exit }); - - # drop idle connections and try to quit gracefully - Danga::Socket->SetPostLoopCallback(sub { - my ($dmap, undef) = @_; - my $n = 0; - - foreach my $s (values %$dmap) { - if ($s->can('busy') && $s->busy) { - ++$n; - } else { - $s->close; - } - } - $n; # true: loop continues, false: loop breaks - }); -} - -sub reopen_logs { - if ($stdout) { - open STDOUT, '>>', $stdout or - warn "failed to redirect stdout to $stdout: $!\n"; - } - if ($stderr) { - open STDERR, '>>', $stderr or - warn "failed to redirect stderr to $stderr: $!\n"; - } -} - -sub sockname { - my ($s) = @_; - my $n = getsockname($s) or return; - my ($port, $addr); - if (length($n) >= 28) { - require Socket6; - ($port, $addr) = Socket6::unpack_sockaddr_in6($n); - } else { - ($port, $addr) = Socket::sockaddr_in($n); - } - if (length($addr) == 4) { - $n = Socket::inet_ntoa($addr) - } else { - $n = '['.Socket6::inet_ntop(Socket6::AF_INET6(), $addr).']'; - } - $n .= ":$port"; -} - -sub inherit { - return () if ($ENV{LISTEN_PID} || 0) != $$; - my $fds = $ENV{LISTEN_FDS} or return (); - my $end = $fds + 2; # LISTEN_FDS_START - 1 - my @rv = (); - foreach my $fd (3..$end) { - my $s = IO::Handle->new; - $s->fdopen($fd, 'r'); - if (my $k = sockname($s)) { - $listener_names{$k} = $s; - push @rv, $s; - } else { - warn "failed to inherit fd=$fd (LISTEN_FDS=$fds)"; - } - } - @rv -} - -sub upgrade { - if ($reexec_pid) { - warn "upgrade in-progress: $reexec_pid\n"; - return; - } - if (defined $pid_file) { - if ($pid_file =~ /\.oldbin\z/) { - warn "BUG: .oldbin suffix exists: $pid_file\n"; - return; - } - unlink_pid_file_safe_ish($$, $pid_file); - $pid_file .= '.oldbin'; - Net::Server::Daemonize::create_pid_file($pid_file); - } - my ($pid, $err) = do_fork(); - unless (defined $pid) { - warn "fork failed: $err\n"; - return; - } - if ($pid == 0) { - use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD); - $ENV{LISTEN_FDS} = scalar @listeners; - $ENV{LISTEN_PID} = $$; - foreach my $s (@listeners) { - my $fl = fcntl($s, F_GETFD, 0); - fcntl($s, F_SETFD, $fl &= ~FD_CLOEXEC); - } - exec @CMD; - die "Failed to exec: $!\n"; - } - $reexec_pid = $pid; -} - -sub kill_workers { - my ($s) = @_; - - while (my ($pid, $id) = each %pids) { - kill $s, $pid; - } -} - -sub do_fork { - require POSIX; - my $new = POSIX::SigSet->new; - $new->fillset; - my $old = POSIX::SigSet->new; - POSIX::sigprocmask(&POSIX::SIG_BLOCK, $new, $old) or - die "SIG_BLOCK: $!\n"; - my $pid = fork; - my $err = $!; - POSIX::sigprocmask(&POSIX::SIG_SETMASK, $old) or - die "SIG_SETMASK: $!\n"; - ($pid, $err); -} - -sub upgrade_aborted ($) { - my ($p) = @_; - warn "reexec PID($p) died with: $?\n"; - $reexec_pid = undef; - return unless $pid_file; - - my $file = $pid_file; - $file =~ s/\.oldbin\z// or die "BUG: no '.oldbin' suffix in $file\n"; - unlink_pid_file_safe_ish($$, $pid_file); - $pid_file = $file; - eval { Net::Server::Daemonize::create_pid_file($pid_file) }; - warn $@, "\n" if $@; -} - -sub reap_children { - while (1) { - my $p = waitpid(-1, &POSIX::WNOHANG) or return; - if (defined $reexec_pid && $p == $reexec_pid) { - upgrade_aborted($p); - } elsif (defined(my $id = delete $pids{$p})) { - warn "worker[$id] PID($p) died with: $?\n"; - } elsif ($p > 0) { - warn "unknown PID($p) reaped: $?\n"; - } else { - return; - } - } -} - -sub unlink_pid_file_safe_ish ($$) { - my ($unlink_pid, $file) = @_; - return unless defined $unlink_pid && $unlink_pid == $$; - - open my $fh, '<', $file or return; - defined(my $read_pid = <$fh>) or return; - chomp $read_pid; - if ($read_pid == $unlink_pid) { - Net::Server::Daemonize::unlink_pid_file($file); - } -} - -1; -package PublicInbox::Listener; -use strict; -use warnings; -use base 'Danga::Socket'; -use Socket qw(SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY); -use PublicInbox::NNTP; - -sub new ($$) { - my ($class, $s) = @_; - setsockopt($s, SOL_SOCKET, SO_KEEPALIVE, 1); - setsockopt($s, IPPROTO_TCP, TCP_NODELAY, 1); - listen($s, 1024); - IO::Handle::blocking($s, 0); - my $self = fields::new($class); - $self->SUPER::new($s); # calls epoll_create for the first socket - $self->watch_read(1); - $self -} - -sub event_read { - my ($self) = @_; - # no loop here, we want to fairly distribute clients - # between multiple processes sharing the same socket - if (accept(my $c, $self->{sock})) { - IO::Handle::blocking($c, 0); # no accept4 :< - PublicInbox::NNTP->new($c, $nntpd); - } -} +daemon_run('0.0.0.0:119', + sub { $nntpd->refresh_groups }, + sub ($) { PublicInbox::NNTP->new($_[0], $nntpd) }); 1; package PublicInbox::NNTPD; @@ -406,7 +27,7 @@ sub new { $self; } -sub refresh_groups { +sub refresh_groups () { my ($self) = @_; require PublicInbox::Config; my $pi_config = PublicInbox::Config->new; -- cgit v1.2.3-24-ge0c7