about summary refs log tree commit homepage
path: root/public-inbox-nntpd
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2015-09-24 21:28:51 +0000
committerEric Wong <e@80x24.org>2015-09-24 21:41:07 +0000
commitffd40d6260b01290b2dd060f69ed2c663eba4027 (patch)
tree7136b92f6cd8486e9468b780a6572d1d7d65ec0f /public-inbox-nntpd
parent36711d6bdf8a767177e0f2f305723354121f8327 (diff)
downloadpublic-inbox-ffd40d6260b01290b2dd060f69ed2c663eba4027.tar.gz
We'll probably be supporting read-only IMAP, or maybe
we'll just implement a custom HTTP server so users can
manage/upgrade the same way as the nntpd while being
immune to slow clients.
Diffstat (limited to 'public-inbox-nntpd')
-rw-r--r--public-inbox-nntpd391
1 files changed, 6 insertions, 385 deletions
diff --git a/public-inbox-nntpd b/public-inbox-nntpd
index 67fc90b2..f6042c2f 100644
--- a/public-inbox-nntpd
+++ b/public-inbox-nntpd
@@ -3,392 +3,13 @@
 # License: AGPLv3 or later (https://www.gnu.org/licenses/agpl-3.0.txt)
 use strict;
 use warnings;
-my @CMD = ($0, @ARGV);
-require Danga::Socket;
-require IO::Handle;
-use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/;
+require PublicInbox::Daemon;
 require PublicInbox::NewsGroup;
-my $set_user;
+require PublicInbox::NNTP;
 my $nntpd = PublicInbox::NNTPD->new;
-my $refresh = sub { $nntpd->refresh_groups };
-$SIG{HUP} = $SIG{USR1} = $SIG{USR2} = $SIG{PIPE} =
-        $SIG{TTIN} = $SIG{TTOU} = $SIG{WINCH} = 'IGNORE';
-
-$refresh->();
-my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize);
-my $worker_processes = 0;
-my %opts = (
-        'l|listen=s' => \@cfg_listen,
-        '1|stdout=s' => \$stdout,
-        '2|stderr=s' => \$stderr,
-        'W|worker-processes=i' => \$worker_processes,
-        'P|pid-file=s' => \$pid_file,
-        'u|user=s' => \$user,
-        'g|group=s' => \$group,
-        'D|daemonize' => \$daemonize,
-);
-GetOptions(%opts) or die "bad command-line args\n";
-
-if (defined $pid_file && $pid_file =~ /\.oldbin\z/) {
-        die "--pid-file cannot end with '.oldbin'\n";
-}
-
-my %pids;
-my %listener_names;
-my $reexec_pid;
-my @listeners = inherit();
-
-# ignore daemonize when inheriting
-$daemonize = undef if scalar @listeners;
-
-# default NNTP listener if no listeners
-push @cfg_listen, '0.0.0.0:119' unless (@listeners || @cfg_listen);
-
-foreach my $l (@cfg_listen) {
-        next if $listener_names{$l}; # already inherited
-        require IO::Socket::INET6; # works for IPv4, too
-        my %o = (
-                LocalAddr => $l,
-                ReuseAddr => 1,
-                Proto => 'tcp',
-        );
-        if (my $s = IO::Socket::INET6->new(%o)) {
-                $listener_names{sockname($s)} = $s;
-                push @listeners, $s;
-        } else {
-                warn "error binding $l: $!\n";
-        }
-}
-die 'No listeners bound' unless @listeners;
-
-chdir '/' or die "chdir failed: $!\n";
-open(STDIN, '+<', '/dev/null') or die "redirect stdin failed: $!\n";
-
-if (defined $pid_file || defined $group || defined $user || $daemonize) {
-        require Net::Server::Daemonize;
-
-        Net::Server::Daemonize::check_pid_file($pid_file) if defined $pid_file;
-        my $uid = Net::Server::Daemonize::get_uid($user) if defined $user;
-        my $gid;
-        if (defined $group) {
-                $gid = Net::Server::Daemonize::get_gid($group);
-                $gid = (split /\s+/, $gid)[0];
-        } elsif (defined $uid) {
-                $gid = (getpwuid($uid))[3];
-        }
-
-        # We change users in the worker to ensure upgradability,
-        # The upgrade will create the ".oldbin" pid file in the
-        # same directory as the given pid file.
-        $uid and $set_user = sub {
-                Net::Server::Daemonize::set_user($uid, $gid);
-        };
-
-        if ($daemonize) {
-                my ($pid, $err) = do_fork();
-                die "could not fork: $err\n" unless defined $pid;
-                exit if $pid;
-
-                open STDOUT, '>&STDIN' or die "redirect stdout failed: $!\n";
-                open STDERR, '>&STDIN' or die "redirect stderr failed: $!\n";
-                POSIX::setsid();
-                ($pid, $err) = do_fork();
-                die "could not fork: $err\n" unless defined $pid;
-                exit if $pid;
-        }
-        if (defined $pid_file) {
-                my $unlink_pid = $$;
-                Net::Server::Daemonize::create_pid_file($pid_file);
-                if ($uid and !chown($uid, $gid, $pid_file)) {
-                        warn "could not chown $pid_file: $!\n";
-                }
-                END { unlink_pid_file_safe_ish($unlink_pid, $pid_file) };
-        }
-}
-
-if ($worker_processes > 0) {
-        pipe(my ($p0, $p1)) or die "failed to create parent-pipe: $!\n";
-        my %pwatch = ( fileno($p0) => sub { kill('TERM', $$) } );
-        pipe(my ($r, $w)) or die "failed to create self-pipe: $!\n";
-        IO::Handle::blocking($w, 0);
-        my $set_workers = $worker_processes;
-        my @caught;
-        my $master_pid = $$;
-        foreach my $s (qw(HUP CHLD QUIT INT TERM USR1 USR2 TTIN TTOU WINCH)) {
-                $SIG{$s} = sub {
-                        return if $$ != $master_pid;
-                        push @caught, $s;
-                        syswrite($w, '.');
-                };
-        }
-        reopen_logs();
-        # main loop
-        while (1) {
-                while (my $s = shift @caught) {
-                        if ($s eq 'USR1') {
-                                reopen_logs();
-                                kill_workers($s);
-                        } elsif ($s eq 'USR2') {
-                                upgrade();
-                        } elsif ($s =~ /\A(?:QUIT|TERM|INT)\z/) {
-                                # drops pipes and causes children to die
-                                exit
-                        } elsif ($s eq 'WINCH') {
-                                $worker_processes = 0;
-                        } elsif ($s eq 'HUP') {
-                                $worker_processes = $set_workers;
-                                $refresh->();
-                                kill_workers($s);
-                        } elsif ($s eq 'TTIN') {
-                                if ($set_workers > $worker_processes) {
-                                        ++$worker_processes;
-                                } else {
-                                        $worker_processes = ++$set_workers;
-                                }
-                        } elsif ($s eq 'TTOU') {
-                                if ($set_workers > 0) {
-                                        $worker_processes = --$set_workers;
-                                }
-                        } elsif ($s eq 'CHLD') {
-                                reap_children();
-                        }
-                }
-
-                my $n = scalar keys %pids;
-                if ($n > $worker_processes) {
-                        while (my ($k, $v) = each %pids) {
-                                kill('TERM', $k) if $v >= $worker_processes;
-                        }
-                        $n = $worker_processes;
-                }
-                foreach my $i ($n..($worker_processes - 1)) {
-                        my ($pid, $err) = do_fork();
-                        if (!defined $pid) {
-                                warn "failed to fork worker[$i]: $err\n";
-                        } elsif ($pid == 0) {
-                                $set_user->() if $set_user;
-                                close($_) for ($w, $r, $p1);
-                                Danga::Socket->AddOtherFds(%pwatch);
-                                goto worker;
-                        } else {
-                                warn "PID=$pid is worker[$i]\n";
-                                $pids{$pid} = $i;
-                        }
-                }
-                sysread($r, my $buf, 8);
-        }
-} else {
-        $set_user->() if $set_user;
-        $SIG{USR2} = sub { worker_quit() if upgrade() };
-worker:
-        # this calls epoll_create:
-        reopen_logs();
-        $SIG{QUIT} = $SIG{INT} = $SIG{TERM} = *worker_quit;
-        $SIG{USR1} = *reopen_logs;
-        $SIG{HUP} = $refresh;
-        PublicInbox::Listener->new($_) for @listeners;
-        Danga::Socket->EventLoop;
-}
-
-# end of main
-
-sub worker_quit {
-        # killing again terminates immediately:
-        exit unless @listeners;
-
-        @listeners = ();
-
-        # give slow clients 30s to finish reading/writing whatever
-        Danga::Socket->AddTimer(30, sub { exit });
-
-        # drop idle connections and try to quit gracefully
-        Danga::Socket->SetPostLoopCallback(sub {
-                my ($dmap, undef) = @_;
-                my $n = 0;
-
-                foreach my $s (values %$dmap) {
-                        if ($s->can('busy') && $s->busy) {
-                                ++$n;
-                        } else {
-                                $s->close;
-                        }
-                }
-                $n; # true: loop continues, false: loop breaks
-        });
-}
-
-sub reopen_logs {
-        if ($stdout) {
-                open STDOUT, '>>', $stdout or
-                        warn "failed to redirect stdout to $stdout: $!\n";
-        }
-        if ($stderr) {
-                open STDERR, '>>', $stderr or
-                        warn "failed to redirect stderr to $stderr: $!\n";
-        }
-}
-
-sub sockname {
-        my ($s) = @_;
-        my $n = getsockname($s) or return;
-        my ($port, $addr);
-        if (length($n) >= 28) {
-                require Socket6;
-                ($port, $addr) = Socket6::unpack_sockaddr_in6($n);
-        } else {
-                ($port, $addr) = Socket::sockaddr_in($n);
-        }
-        if (length($addr) == 4) {
-                $n = Socket::inet_ntoa($addr)
-        } else {
-                $n = '['.Socket6::inet_ntop(Socket6::AF_INET6(), $addr).']';
-        }
-        $n .= ":$port";
-}
-
-sub inherit {
-        return () if ($ENV{LISTEN_PID} || 0) != $$;
-        my $fds = $ENV{LISTEN_FDS} or return ();
-        my $end = $fds + 2; # LISTEN_FDS_START - 1
-        my @rv = ();
-        foreach my $fd (3..$end) {
-                my $s = IO::Handle->new;
-                $s->fdopen($fd, 'r');
-                if (my $k = sockname($s)) {
-                        $listener_names{$k} = $s;
-                        push @rv, $s;
-                } else {
-                        warn "failed to inherit fd=$fd (LISTEN_FDS=$fds)";
-                }
-        }
-        @rv
-}
-
-sub upgrade {
-        if ($reexec_pid) {
-                warn "upgrade in-progress: $reexec_pid\n";
-                return;
-        }
-        if (defined $pid_file) {
-                if ($pid_file =~ /\.oldbin\z/) {
-                        warn "BUG: .oldbin suffix exists: $pid_file\n";
-                        return;
-                }
-                unlink_pid_file_safe_ish($$, $pid_file);
-                $pid_file .= '.oldbin';
-                Net::Server::Daemonize::create_pid_file($pid_file);
-        }
-        my ($pid, $err) = do_fork();
-        unless (defined $pid) {
-                warn "fork failed: $err\n";
-                return;
-        }
-        if ($pid == 0) {
-                use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
-                $ENV{LISTEN_FDS} = scalar @listeners;
-                $ENV{LISTEN_PID} = $$;
-                foreach my $s (@listeners) {
-                        my $fl = fcntl($s, F_GETFD, 0);
-                        fcntl($s, F_SETFD, $fl &= ~FD_CLOEXEC);
-                }
-                exec @CMD;
-                die "Failed to exec: $!\n";
-        }
-        $reexec_pid = $pid;
-}
-
-sub kill_workers {
-        my ($s) = @_;
-
-        while (my ($pid, $id) = each %pids) {
-                kill $s, $pid;
-        }
-}
-
-sub do_fork {
-        require POSIX;
-        my $new = POSIX::SigSet->new;
-        $new->fillset;
-        my $old = POSIX::SigSet->new;
-        POSIX::sigprocmask(&POSIX::SIG_BLOCK, $new, $old) or
-                                die "SIG_BLOCK: $!\n";
-        my $pid = fork;
-        my $err = $!;
-        POSIX::sigprocmask(&POSIX::SIG_SETMASK, $old) or
-                                die "SIG_SETMASK: $!\n";
-        ($pid, $err);
-}
-
-sub upgrade_aborted ($) {
-        my ($p) = @_;
-        warn "reexec PID($p) died with: $?\n";
-        $reexec_pid = undef;
-        return unless $pid_file;
-
-        my $file = $pid_file;
-        $file =~ s/\.oldbin\z// or die "BUG: no '.oldbin' suffix in $file\n";
-        unlink_pid_file_safe_ish($$, $pid_file);
-        $pid_file = $file;
-        eval { Net::Server::Daemonize::create_pid_file($pid_file) };
-        warn $@, "\n" if $@;
-}
-
-sub reap_children {
-        while (1) {
-                my $p = waitpid(-1, &POSIX::WNOHANG) or return;
-                if (defined $reexec_pid && $p == $reexec_pid) {
-                        upgrade_aborted($p);
-                } elsif (defined(my $id = delete $pids{$p})) {
-                        warn "worker[$id] PID($p) died with: $?\n";
-                } elsif ($p > 0) {
-                        warn "unknown PID($p) reaped: $?\n";
-                } else {
-                        return;
-                }
-        }
-}
-
-sub unlink_pid_file_safe_ish ($$) {
-        my ($unlink_pid, $file) = @_;
-        return unless defined $unlink_pid && $unlink_pid == $$;
-
-        open my $fh, '<', $file or return;
-        defined(my $read_pid = <$fh>) or return;
-        chomp $read_pid;
-        if ($read_pid == $unlink_pid) {
-                Net::Server::Daemonize::unlink_pid_file($file);
-        }
-}
-
-1;
-package PublicInbox::Listener;
-use strict;
-use warnings;
-use base 'Danga::Socket';
-use Socket qw(SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
-use PublicInbox::NNTP;
-
-sub new ($$) {
-        my ($class, $s) = @_;
-        setsockopt($s, SOL_SOCKET, SO_KEEPALIVE, 1);
-        setsockopt($s, IPPROTO_TCP, TCP_NODELAY, 1);
-        listen($s, 1024);
-        IO::Handle::blocking($s, 0);
-        my $self = fields::new($class);
-        $self->SUPER::new($s); # calls epoll_create for the first socket
-        $self->watch_read(1);
-        $self
-}
-
-sub event_read {
-        my ($self) = @_;
-        # no loop here, we want to fairly distribute clients
-        # between multiple processes sharing the same socket
-        if (accept(my $c, $self->{sock})) {
-                IO::Handle::blocking($c, 0); # no accept4 :<
-                PublicInbox::NNTP->new($c, $nntpd);
-        }
-}
+daemon_run('0.0.0.0:119',
+        sub { $nntpd->refresh_groups },
+        sub ($) { PublicInbox::NNTP->new($_[0], $nntpd) });
 
 1;
 package PublicInbox::NNTPD;
@@ -406,7 +27,7 @@ sub new {
         $self;
 }
 
-sub refresh_groups {
+sub refresh_groups () {
         my ($self) = @_;
         require PublicInbox::Config;
         my $pi_config = PublicInbox::Config->new;