about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2024-04-26 11:29:43 +0000
committerEric Wong <e@80x24.org>2024-04-28 17:05:32 +0000
commitc807cdd1c2e85237e769532a36819888ca97f8b1 (patch)
tree5832f3904e63020b4c430538fd3abfa2dd4f3825
parent5f4c4163ad8d8045dbcfddaeddfdb986a79d4969 (diff)
downloadpublic-inbox-c807cdd1c2e85237e769532a36819888ca97f8b1.tar.gz
When read-only daemons reopen log files via SIGUSR1, be sure to
propagate it to Xapian helper processes to ensure old log files
can be closed and archived.
-rw-r--r--lib/PublicInbox/Daemon.pm37
-rw-r--r--lib/PublicInbox/TestCommon.pm11
-rw-r--r--lib/PublicInbox/XapHelper.pm16
-rw-r--r--lib/PublicInbox/xap_helper.h43
-rw-r--r--t/psgi_v2.t54
5 files changed, 142 insertions, 19 deletions
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index e08102e9..28458b19 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -388,10 +388,30 @@ sub worker_quit { # $_[0] = signal name or number (unused)
         @PublicInbox::DS::post_loop_do = (\&has_busy_clients, { -w => 0 })
 }
 
+sub spawn_xh () {
+        $xh_workers // return;
+        require PublicInbox::XhcMset;
+        local $) = $gid if defined $gid;
+        local $( = $gid if defined $gid;
+        local $> = $uid if defined $uid;
+        local $< = $uid if defined $uid;
+        $PublicInbox::Search::XHC = eval {
+                local $ENV{STDERR_PATH} = $stderr;
+                local $ENV{STDOUT_PATH} = $stdout;
+                PublicInbox::XapClient::start_helper('-j', $xh_workers)
+        };
+        warn "E: $@" if $@;
+        awaitpid($PublicInbox::Search::XHC->{io}->attached_pid, \&respawn_xh)
+                if $PublicInbox::Search::XHC;
+}
+
 sub reopen_logs {
+        my ($sig) = @_;
         $logs{$stdout} //= \*STDOUT if defined $stdout;
         $logs{$stderr} //= \*STDERR if defined $stderr;
         while (my ($p, $fh) = each %logs) { open_log_path($fh, $p) }
+        ($sig && defined($xh_workers) && $PublicInbox::Search::XHC) and
+                kill('USR1', $PublicInbox::Search::XHC->{io}->attached_pid);
 }
 
 sub sockname ($) {
@@ -548,6 +568,7 @@ sub start_worker ($) {
         my $pid = PublicInbox::DS::fork_persist;
         if ($pid == 0) {
                 undef %WORKERS;
+                undef $xh_workers;
                 local $PublicInbox::DS::Poller; # allow epoll/kqueue
                 $set_user->() if $set_user;
                 PublicInbox::EOFpipe->new($parent_pipe, \&worker_quit);
@@ -575,8 +596,9 @@ sub master_loop {
         pipe($parent_pipe, my $p1) or die "failed to create parent-pipe: $!";
         my $set_workers = $nworker; # for SIGWINCH
         reopen_logs();
+        spawn_xh;
         my $msig = {
-                USR1 => sub { reopen_logs(); kill_workers($_[0]); },
+                USR1 => sub { reopen_logs($_[0]); kill_workers($_[0]); },
                 USR2 => \&upgrade,
                 QUIT => \&master_quit,
                 INT => \&master_quit,
@@ -675,6 +697,7 @@ sub daemon_loop () {
 sub worker_loop {
         $uid = $gid = undef;
         reopen_logs();
+        spawn_xh; # only for -W0
         @listeners = map {;
                 my $l = sockname($_);
                 my $tls_cb = $POST_ACCEPT{$l};
@@ -695,8 +718,7 @@ sub respawn_xh { # awaitpid cb
         my ($pid) = @_;
         return unless @listeners;
         warn "W: xap_helper PID:$pid died: \$?=$?, respawning...\n";
-        $PublicInbox::Search::XHC =
-                PublicInbox::XapClient::start_helper('-j', $xh_workers);
+        spawn_xh;
 }
 
 sub run {
@@ -712,14 +734,7 @@ sub run {
         local $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
         local %WORKER_SIG = %WORKER_SIG;
         local $PublicInbox::XapClient::tries = 0;
-
-        local $PublicInbox::Search::XHC = PublicInbox::XapClient::start_helper(
-                        '-j', $xh_workers) if defined($xh_workers);
-        if ($PublicInbox::Search::XHC) {
-                require PublicInbox::XhcMset;
-                awaitpid($PublicInbox::Search::XHC->{io}->attached_pid,
-                        \&respawn_xh);
-        }
+        local $PublicInbox::Search::XHC if defined($xh_workers);
 
         daemon_loop();
         # $unlink_on_leave runs
diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm
index 708fa698..aeff5d1d 100644
--- a/lib/PublicInbox/TestCommon.pm
+++ b/lib/PublicInbox/TestCommon.pm
@@ -20,7 +20,7 @@ use autodie qw(chdir close fcntl mkdir open opendir seek unlink);
 $ENV{XDG_CACHE_HOME} //= "$ENV{HOME}/.cache"; # reuse C++ xap_helper builds
 
 $_ = File::Spec->rel2abs($_) for (grep(!m!^/!, @INC));
-
+our $CURRENT_DAEMON;
 BEGIN {
         @EXPORT = qw(tmpdir tcp_server tcp_connect require_git require_mods
                 run_script start_script key2sub xsys xsys_e xqx eml_load tick
@@ -566,9 +566,9 @@ sub start_script {
         my $run_mode = $ENV{TEST_RUN_MODE} // $opt->{run_mode} // 2;
         my $sub = $run_mode == 0 ? undef : key2sub($key);
         my $tail;
-        my $xh = $ENV{TEST_DAEMON_XH};
-        $xh && $key =~ /-(?:imapd|netd|httpd|pop3d|nntpd)\z/ and
-                push @argv, split(/\s+/, $xh);
+        my @xh = split(/\s+/, $ENV{TEST_DAEMON_XH} // '');
+        @xh = () if $key !~ /-(?:imapd|netd|httpd|pop3d|nntpd)\z/;
+        push @argv, @xh;
         if ($tail_cmd) {
                 my @paths;
                 for (@argv) {
@@ -616,7 +616,7 @@ sub start_script {
                         $ENV{LISTEN_FDS} = $fds;
                 }
                 if ($opt->{-C}) { chdir($opt->{-C}) }
-                $0 = join(' ', @$cmd);
+                $0 = join(' ', @$cmd, @xh);
                 local @SIG{keys %SIG} = map { undef } values %SIG;
                 local $SIG{FPE} = 'IGNORE'; # Perl default
                 undef $tmp_mask;
@@ -952,6 +952,7 @@ sub test_httpd ($$;$$) {
                 local $ENV{PLACK_TEST_EXTERNALSERVER_URI} = "http://$h:$p";
                 my $ua = LWP::UserAgent->new;
                 $ua->max_redirect(0);
+                local $CURRENT_DAEMON = $td;
                 Plack::Test::ExternalServer::test_psgi(client => $client,
                                                         ua => $ua);
                 $cb->() if $cb;
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index c55a72ce..746b4d62 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -200,6 +200,7 @@ sub recv_loop {
         local $SIG{__WARN__} = sub { print $stderr @_ };
         my $rbuf;
         local $SIG{TERM} = sub { undef $in };
+        local $SIG{USR1} = \&reopen_logs;
         while (defined($in)) {
                 PublicInbox::DS::sig_setmask($workerset);
                 my @fds = eval { # we undef $in in SIG{TERM}
@@ -263,6 +264,18 @@ sub do_sigttou {
         }
 }
 
+sub reopen_logs {
+        my $p = $ENV{STDOUT_PATH};
+        defined($p) && open(STDOUT, '>>', $p) and STDOUT->autoflush(1);
+        $p = $ENV{STDERR_PATH};
+        defined($p) && open(STDERR, '>>', $p) and STDERR->autoflush(1);
+}
+
+sub parent_reopen_logs {
+        reopen_logs();
+        kill('USR1', values %WORKERS);
+}
+
 sub xh_alive { $in || scalar(keys %WORKERS) }
 
 sub start (@) {
@@ -276,7 +289,7 @@ sub start (@) {
                 die 'bad args';
         local $workerset = POSIX::SigSet->new;
         $workerset->fillset or die "fillset: $!";
-        for (@PublicInbox::DS::UNBLOCKABLE) {
+        for (@PublicInbox::DS::UNBLOCKABLE, POSIX::SIGUSR1) {
                 $workerset->delset($_) or die "delset($_): $!";
         }
 
@@ -295,6 +308,7 @@ sub start (@) {
                 },
                 TTOU => \&do_sigttou,
                 CHLD => \&PublicInbox::DS::enqueue_reap,
+                USR1 => \&parent_reopen_logs,
         };
         PublicInbox::DS::block_signals();
         start_workers();
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index 5a89544a..7ecea264 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -95,6 +95,8 @@ static pid_t *worker_pids; // nr => pid
 #define WORKER_MAX USHRT_MAX
 static unsigned long nworker, nworker_hwm;
 static int pipefds[2];
+static const char *stdout_path, *stderr_path; // for SIGUSR1
+static sig_atomic_t worker_needs_reopen;
 
 // PublicInbox::Search and PublicInbox::CodeSearch generate these:
 static void mail_nrp_init(void);
@@ -726,9 +728,12 @@ static void stderr_restore(FILE *tmp_err)
         clearerr(stderr);
 }
 
-static void sigw(int sig) // SIGTERM handler for worker
+static void sigw(int sig) // SIGTERM+SIGUSR1 handler for worker
 {
-        sock_fd = -1; // break out of recv_loop
+        switch (sig) {
+        case SIGUSR1: worker_needs_reopen = 1; break;
+        default: sock_fd = -1; // break out of recv_loop
+        }
 }
 
 #define CLEANUP_REQ __attribute__((__cleanup__(req_cleanup)))
@@ -738,6 +743,18 @@ static void req_cleanup(void *ptr)
         free(req->lenv);
 }
 
+static void reopen_logs(void)
+{
+        if (stdout_path && *stdout_path && !freopen(stdout_path, "a", stdout))
+                err(EXIT_FAILURE, "freopen %s", stdout_path);
+        if (stderr_path && *stderr_path) {
+                if (!freopen(stderr_path, "a", stderr))
+                        err(EXIT_FAILURE, "freopen %s", stderr_path);
+                if (my_setlinebuf(stderr))
+                        err(EXIT_FAILURE, "setlinebuf(stderr)");
+        }
+}
+
 static void recv_loop(void) // worker process loop
 {
         static char rbuf[4096 * 33]; // per-process
@@ -745,6 +762,7 @@ static void recv_loop(void) // worker process loop
         sa.sa_handler = sigw;
 
         CHECK(int, 0, sigaction(SIGTERM, &sa, NULL));
+        CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL));
 
         while (sock_fd == 0) {
                 size_t len = sizeof(rbuf);
@@ -761,6 +779,10 @@ static void recv_loop(void) // worker process loop
                         stderr_restore(req.fp[1]);
                         ERR_CLOSE(req.fp[1], 0);
                 }
+                if (worker_needs_reopen) {
+                        worker_needs_reopen = 0;
+                        reopen_logs();
+                }
         }
 }
 
@@ -813,6 +835,16 @@ static void cleanup_all(void)
 #endif
 }
 
+static void parent_reopen_logs(void)
+{
+        reopen_logs();
+        for (unsigned long nr = nworker; nr < nworker_hwm; nr++) {
+                pid_t pid = worker_pids[nr];
+                if (pid != 0 && kill(pid, SIGUSR1))
+                        warn("BUG?: kill(%d, SIGUSR1)", (int)pid);
+        }
+}
+
 static void sigp(int sig) // parent signal handler
 {
         static const char eagain[] = "signals coming in too fast";
@@ -825,6 +857,7 @@ static void sigp(int sig) // parent signal handler
         case SIGCHLD: c = '.'; break;
         case SIGTTOU: c = '-'; break;
         case SIGTTIN: c = '+'; break;
+        case SIGUSR1: c = '#'; break;
         default:
                 write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1);
                 _exit(EXIT_FAILURE);
@@ -931,6 +964,8 @@ int main(int argc, char *argv[])
 {
         int c;
         socklen_t slen = (socklen_t)sizeof(c);
+        stdout_path = getenv("STDOUT_PATH");
+        stderr_path = getenv("STDERR_PATH");
 
         if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen))
                 err(EXIT_FAILURE, "getsockopt");
@@ -989,6 +1024,7 @@ int main(int argc, char *argv[])
         DELSET(SIGXCPU);
         DELSET(SIGXFSZ);
 #undef DELSET
+        CHECK(int, 0, sigdelset(&workerset, SIGUSR1));
 
         if (nworker == 0) { // no SIGTERM handling w/o workers
                 recv_loop();
@@ -1009,10 +1045,12 @@ int main(int argc, char *argv[])
         CHECK(int, 0, sigdelset(&pset, SIGCHLD));
         CHECK(int, 0, sigdelset(&pset, SIGTTIN));
         CHECK(int, 0, sigdelset(&pset, SIGTTOU));
+        CHECK(int, 0, sigdelset(&pset, SIGUSR1));
 
         struct sigaction sa = {};
         sa.sa_handler = sigp;
 
+        CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL));
         CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL));
         CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL));
         sa.sa_flags = SA_NOCLDSTOP;
@@ -1037,6 +1075,7 @@ int main(int argc, char *argv[])
                         case '.': break; // do_sigchld already called
                         case '-': do_sigttou(); break;
                         case '+': do_sigttin(); break;
+                        case '#': parent_reopen_logs(); break;
                         default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
                         }
                 }
diff --git a/t/psgi_v2.t b/t/psgi_v2.t
index d5c328f0..2b678fd8 100644
--- a/t/psgi_v2.t
+++ b/t/psgi_v2.t
@@ -9,6 +9,7 @@ require_git(2.6);
 use PublicInbox::Eml;
 use PublicInbox::Config;
 use PublicInbox::MID qw(mids);
+use autodie qw(kill rename);
 require_mods(qw(DBD::SQLite Xapian HTTP::Request::Common Plack::Test
                 URI::Escape Plack::Builder HTTP::Date));
 use_ok($_) for (qw(HTTP::Request::Common Plack::Test));
@@ -394,4 +395,57 @@ my $client3 = sub {
 test_psgi(sub { $www->call(@_) }, $client3);
 test_httpd($env, $client3, 4);
 
+if ($^O eq 'linux' && -r "/proc/$$/stat") {
+        my $args;
+        my $search_xh_pid = sub {
+                my ($pid) = @_;
+                for my $f (glob('/proc/*/stat')) {
+                        open my $fh, '<', $f or next;
+                        my @s = split /\s+/, readline($fh) // next;
+                        next if $s[3] ne $pid; # look for matching PPID
+                        open $fh, '<', "/proc/$s[0]/cmdline" or next;
+                        my $cmdline = readline($fh) // next;
+                        if ($cmdline =~ /\0-MPublicInbox::XapHelper\0-e\0/ ||
+                                        $cmdline =~ m!/xap_helper\0!) {
+                                return $s[0];
+                        }
+                }
+                undef;
+        };
+        my $usr1_test = sub {
+                my ($cb) = @_;
+                my $td = $PublicInbox::TestCommon::CURRENT_DAEMON;
+                my $pid = $td->{pid};
+                my $res = $cb->(GET('/v2test/?q=m:a-mid@b'));
+                is $res->code, 200, '-httpd is running w/ search';
+
+                $search_xh_pid->($pid);
+                my $xh_pid = $search_xh_pid->($pid) or
+                        BAIL_OUT "can't find XH pid with $args";
+                my $xh_err = readlink "/proc/$xh_pid/fd/2";
+                is $xh_err, "$env->{TMPDIR}/stderr.log",
+                        "initial stderr expected ($args)";
+                rename "$env->{TMPDIR}/stderr.log",
+                        "$env->{TMPDIR}/stderr.old";
+                $xh_err = readlink "/proc/$xh_pid/fd/2";
+                is $xh_err, "$env->{TMPDIR}/stderr.old",
+                        "stderr followed rename ($args)";
+                kill 'USR1', $pid;
+                tick;
+                $res = $cb->(GET('/v2test/?q=m:a-mid@b'));
+                is $res->code, 200, '-httpd still running w/ search';
+                my $new_xh_pid = $search_xh_pid->($pid) or
+                        BAIL_OUT "can't find new XH pid with $args";
+                is $new_xh_pid, $xh_pid, "XH pid unchanged ($args)";
+                $xh_err = readlink "/proc/$new_xh_pid/fd/2";
+                is $xh_err, "$env->{TMPDIR}/stderr.log",
+                        "stderr updated ($args)";
+        };
+        for my $x ('-X0', '-X1', '-X0 -W1', '-X1 -W1') {
+                $args = $x;
+                local $ENV{TEST_DAEMON_XH} = $args;
+                test_httpd($env, $usr1_test);
+        }
+}
+
 done_testing;