about summary refs log tree commit homepage
path: root/lib/PublicInbox/xap_helper.h
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
-rw-r--r--lib/PublicInbox/xap_helper.h282
1 files changed, 244 insertions, 38 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index 871a381c..493a24f4 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -56,15 +56,26 @@
 #        define STDERR_ASSIGNABLE (0)
 #endif
 
-static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P
-static pid_t parent_pid;
+// assert functions are used correctly (e.g. ensure hackers don't
+// cause EINVAL/EFAULT).  Not for stuff that can fail due to HW
+// failures.
+# define CHECK(type, expect, expr) do { \
+        type ckvar______ = (expr); \
+        assert(ckvar______ == (expect) && "BUG" && __FILE__ && __LINE__); \
+} while (0)
+
+static const int sock_fd = STDIN_FILENO; // SOCK_SEQPACKET as stdin :P
+static volatile pid_t parent_pid; // may be set in worker sighandler (sigw)
+static sigset_t fullset, workerset;
+static bool alive = true;
 #if STDERR_ASSIGNABLE
 static FILE *orig_err = stderr;
 #endif
 static int orig_err_fd = -1;
 static void *srch_tree; // tsearch + tdelete + twalk
 static pid_t *worker_pids; // nr => pid
-static unsigned long nworker;
+static unsigned long nworker, nworker_hwm;
+static int pipefds[2];
 
 // PublicInbox::Search and PublicInbox::CodeSearch generate these:
 static void mail_nrp_init(void);
@@ -598,11 +609,21 @@ static bool recv_req(struct req *req, char *rbuf, size_t *len)
         msg.msg_control = &cmsg.hdr;
         msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE);
 
+        // allow SIGTERM to hit
+        CHECK(int, 0, sigprocmask(SIG_SETMASK, &workerset, NULL));
+
         ssize_t r = recvmsg(sock_fd, &msg, 0);
-        if (r < 0)
-                err(EXIT_FAILURE, "recvmsg");
-        if (r == 0)
+        if (r == 0) {
                 exit(EX_NOINPUT); /* grandparent went away */
+        } else if (r < 0) {
+                if (errno == EINTR)
+                        return false; // retry recv_loop
+                err(EXIT_FAILURE, "recvmsg");
+        }
+
+        // success! no signals for the rest of the request/response cycle
+        CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, NULL));
+
         *len = r;
         if (r > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
                         cmsg.hdr.cmsg_type == SCM_RIGHTS) {
@@ -875,9 +896,19 @@ static void stderr_restore(FILE *tmp_err)
         clearerr(stderr);
 }
 
+static void sigw(int sig) // SIGTERM handler for worker
+{
+        parent_pid = -1; // break out of recv_loop
+}
+
 static void recv_loop(void) // worker process loop
 {
         static char rbuf[4096 * 33]; // per-process
+        struct sigaction sa = {};
+        sa.sa_handler = sigw;
+
+        CHECK(int, 0, sigaction(SIGTERM, &sa, NULL));
+
         while (!parent_pid || getppid() == parent_pid) {
                 size_t len = sizeof(rbuf);
                 struct req req = {};
@@ -904,18 +935,6 @@ static void insert_pid(pid_t pid, unsigned nr)
         worker_pids[nr] = pid;
 }
 
-static int delete_pid(pid_t pid)
-{
-        for (unsigned nr = 0; nr < nworker; nr++) {
-                if (worker_pids[nr] == pid) {
-                        worker_pids[nr] = 0;
-                        return nr;
-                }
-        }
-        warnx("W: unknown pid=%d reaped", (int)pid);
-        return -1;
-}
-
 static void start_worker(unsigned nr)
 {
         pid_t pid = fork();
@@ -925,11 +944,31 @@ static void start_worker(unsigned nr)
                 insert_pid(pid, nr);
         } else {
                 cleanup_pids();
+                xclose(pipefds[0]);
+                xclose(pipefds[1]);
+                if (signal(SIGCHLD, SIG_DFL) == SIG_ERR)
+                        err(EXIT_FAILURE, "signal CHLD");
+                if (signal(SIGTTIN, SIG_IGN) == SIG_ERR)
+                        err(EXIT_FAILURE, "signal TTIN");
+                if (signal(SIGTTOU, SIG_IGN) == SIG_ERR)
+                        err(EXIT_FAILURE, "signal TTIN");
                 recv_loop();
                 exit(0);
         }
 }
 
+static void start_workers(void)
+{
+        sigset_t old;
+
+        CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, &old));
+        for (unsigned long nr = 0; nr < nworker; nr++) {
+                if (!worker_pids[nr])
+                        start_worker(nr);
+        }
+        CHECK(int, 0, sigprocmask(SIG_SETMASK, &old, NULL));
+}
+
 static void cleanup_all(void)
 {
         cleanup_pids();
@@ -939,6 +978,121 @@ static void cleanup_all(void)
 #endif
 }
 
+static void sigp(int sig) // parent signal handler
+{
+        static const char eagain[] = "signals coming in too fast";
+        static const char bad_sig[] = "BUG: bad sig\n";
+        static const char write_err[] = "BUG: sigp write: ";
+        char c = 0;
+
+        switch (sig) {
+        case SIGCHLD: c = '.'; break;
+        case SIGTTOU: c = '-'; break;
+        case SIGTTIN: c = '+'; break;
+        default:
+                write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1);
+                _exit(EXIT_FAILURE);
+        }
+        ssize_t w = write(pipefds[1], &c, 1);
+        if (w == sizeof(c)) return;
+        int e = 0;
+        if (w < 0) {
+                e = errno;
+                if (e == EAGAIN) {
+                        write(STDERR_FILENO, eagain, sizeof(eagain) - 1);
+                        return;
+                }
+        }
+        struct iovec iov[3];
+        iov[0].iov_base = (void *)write_err;
+        iov[0].iov_len = sizeof(write_err) - 1;
+        iov[1].iov_base = (void *)(e ? strerror(e) : "zero write");
+        iov[1].iov_len = strlen((const char *)iov[1].iov_base);
+        iov[2].iov_base = (void *)"\n";
+        iov[2].iov_len = 1;
+        (void)writev(STDERR_FILENO, iov, MY_ARRAY_SIZE(iov));
+        _exit(EXIT_FAILURE);
+}
+
+static void reaped_worker(pid_t pid, int st)
+{
+        unsigned long nr = 0;
+        for (; nr < nworker_hwm; nr++) {
+                if (worker_pids[nr] == pid) {
+                        worker_pids[nr] = 0;
+                        break;
+                }
+        }
+        if (nr >= nworker_hwm) {
+                warnx("W: unknown pid=%d reaped $?=%d", (int)pid, st);
+                return;
+        }
+        if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
+                alive = false;
+        else if (st)
+                warnx("worker[%lu] died $?=%d", nr, st);
+        if (alive)
+                start_workers();
+}
+
+static void do_sigchld(void)
+{
+        while (1) {
+                int st;
+                pid_t pid = waitpid(-1, &st, WNOHANG);
+                if (pid > 0) {
+                        reaped_worker(pid, st);
+                } else if (pid == 0) {
+                        return;
+                } else {
+                        switch (errno) {
+                        case ECHILD: return;
+                        case EINTR: break; // can it happen w/ WNOHANG?
+                        default: err(EXIT_FAILURE, "BUG: waitpid");
+                        }
+                }
+        }
+}
+
+static void do_sigttin(void)
+{
+        if (!alive) return;
+        void *p = reallocarray(worker_pids, nworker + 1, sizeof(pid_t));
+        if (!p) {
+                warn("reallocarray");
+        } else {
+                worker_pids = (pid_t *)p;
+                worker_pids[nworker++] = 0;
+                if (nworker_hwm < nworker)
+                        nworker_hwm = nworker;
+                start_workers();
+        }
+}
+
+static void do_sigttou(void)
+{
+        if (!alive || nworker <= 1) return;
+
+        // worker_pids array does not shrink
+        --nworker;
+        for (unsigned long nr = nworker; nr < nworker_hwm; nr++) {
+                pid_t pid = worker_pids[nr];
+                if (pid != 0 && kill(pid, SIGTERM))
+                        warn("BUG?: kill(%d, SIGTERM)", (int)pid);
+        }
+}
+
+static size_t living_workers(void)
+{
+        size_t ret = 0;
+
+        for (unsigned long nr = 0; nr < nworker_hwm; nr++) {
+                if (worker_pids[nr])
+                        ret++;
+        }
+        return ret;
+}
+
 int main(int argc, char *argv[])
 {
         int c;
@@ -953,7 +1107,7 @@ int main(int argc, char *argv[])
                         err(EXIT_FAILURE, "dup(2)");
         }
 
-        nworker = 0;
+        nworker = 1;
 #ifdef _SC_NPROCESSORS_ONLN
         long j = sysconf(_SC_NPROCESSORS_ONLN);
         if (j > 0)
@@ -981,27 +1135,79 @@ int main(int argc, char *argv[])
                         errx(EXIT_FAILURE, "BUG: `-%c'", c);
                 }
         }
-        if (nworker == 0) {
+        sigset_t pset; // parent-only
+        CHECK(int, 0, sigfillset(&pset));
+
+        // global sigsets:
+        CHECK(int, 0, sigfillset(&fullset));
+        CHECK(int, 0, sigfillset(&workerset));
+
+#define DELSET(sig) do { \
+        CHECK(int, 0, sigdelset(&fullset, sig)); \
+        CHECK(int, 0, sigdelset(&workerset, sig)); \
+        CHECK(int, 0, sigdelset(&pset, sig)); \
+} while (0)
+        DELSET(SIGABRT);
+        DELSET(SIGBUS);
+        DELSET(SIGFPE);
+        DELSET(SIGILL);
+        DELSET(SIGSEGV);
+        DELSET(SIGXCPU);
+        DELSET(SIGXFSZ);
+#undef DELSET
+
+        if (nworker == 0) { // no SIGTERM handling w/o workers
                 recv_loop();
-        } else {
-                parent_pid = getpid();
-                worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
-                if (!worker_pids)
-                        err(EXIT_FAILURE, "calloc");
-                for (unsigned i = 0; i < nworker; i++)
-                        start_worker(i);
-
-                int st;
-                pid_t pid;
-                bool quit = false;
-                while ((pid = wait(&st)) > 0) {
-                        int nr = delete_pid(pid);
-                        if (nr < 0) continue;
-                        if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
-                                quit = true;
-                        if (!quit)
-                                start_worker(nr);
+                return 0;
+        }
+        CHECK(int, 0, sigdelset(&workerset, SIGTERM));
+        CHECK(int, 0, sigdelset(&workerset, SIGCHLD));
+        parent_pid = getpid();
+        nworker_hwm = nworker;
+        worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
+        if (!worker_pids) err(EXIT_FAILURE, "calloc");
+
+        if (pipe(pipefds)) err(EXIT_FAILURE, "pipe");
+        int fl = fcntl(F_GETFL, pipefds[1]);
+        if (fl == -1) err(EXIT_FAILURE, "F_GETFL");
+        if (fcntl(F_SETFL, pipefds[1], fl | O_NONBLOCK))
+                err(EXIT_FAILURE, "F_SETFL");
+
+        CHECK(int, 0, sigdelset(&pset, SIGCHLD));
+        CHECK(int, 0, sigdelset(&pset, SIGTTIN));
+        CHECK(int, 0, sigdelset(&pset, SIGTTOU));
+
+        struct sigaction sa = {};
+        sa.sa_handler = sigp;
+
+        CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL));
+        CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL));
+        sa.sa_flags = SA_NOCLDSTOP;
+        CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL));
+
+        CHECK(int, 0, sigprocmask(SIG_SETMASK, &pset, NULL));
+
+        start_workers();
+
+        char sbuf[64];
+        while (alive || living_workers()) {
+                ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf));
+                if (n < 0) {
+                        if (errno == EINTR) continue;
+                        err(EXIT_FAILURE, "read");
+                } else if (n == 0) {
+                        errx(EXIT_FAILURE, "read EOF");
+                }
+                do_sigchld();
+                for (ssize_t i = 0; i < n; i++) {
+                        switch (sbuf[i]) {
+                        case '.': break; // do_sigchld already called
+                        case '-': do_sigttou(); break;
+                        case '+': do_sigttin(); break;
+                        default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
+                        }
                 }
         }
+
         return 0;
 }