diff options
author | Eric Wong <e@80x24.org> | 2023-09-04 10:36:04 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2023-09-05 03:01:40 +0000 |
commit | 0c32084032cc54c501234cf37b3289628e98a645 (patch) | |
tree | 3abeb8f9ad65bd0ce98aee332659e204e672e872 /lib/PublicInbox/xap_helper.h | |
parent | 078ad3d512f5b07ef491adae284350f79f4ae656 (diff) | |
download | public-inbox-0c32084032cc54c501234cf37b3289628e98a645.tar.gz |
Being able to tune worker process counts on-the-fly when xap_helper gets used with -{netd,httpd,imapd} will be useful for tuning new setups.
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
-rw-r--r-- | lib/PublicInbox/xap_helper.h | 282 |
1 files changed, 244 insertions, 38 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 871a381c..493a24f4 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -56,15 +56,26 @@ # define STDERR_ASSIGNABLE (0) #endif -static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P -static pid_t parent_pid; +// assert functions are used correctly (e.g. ensure hackers don't +// cause EINVAL/EFAULT). Not for stuff that can fail due to HW +// failures. +# define CHECK(type, expect, expr) do { \ + type ckvar______ = (expr); \ + assert(ckvar______ == (expect) && "BUG" && __FILE__ && __LINE__); \ +} while (0) + +static const int sock_fd = STDIN_FILENO; // SOCK_SEQPACKET as stdin :P +static volatile pid_t parent_pid; // may be set in worker sighandler (sigw) +static sigset_t fullset, workerset; +static bool alive = true; #if STDERR_ASSIGNABLE static FILE *orig_err = stderr; #endif static int orig_err_fd = -1; static void *srch_tree; // tsearch + tdelete + twalk static pid_t *worker_pids; // nr => pid -static unsigned long nworker; +static unsigned long nworker, nworker_hwm; +static int pipefds[2]; // PublicInbox::Search and PublicInbox::CodeSearch generate these: static void mail_nrp_init(void); @@ -598,11 +609,21 @@ static bool recv_req(struct req *req, char *rbuf, size_t *len) msg.msg_control = &cmsg.hdr; msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE); + // allow SIGTERM to hit + CHECK(int, 0, sigprocmask(SIG_SETMASK, &workerset, NULL)); + ssize_t r = recvmsg(sock_fd, &msg, 0); - if (r < 0) - err(EXIT_FAILURE, "recvmsg"); - if (r == 0) + if (r == 0) { exit(EX_NOINPUT); /* grandparent went away */ + } else if (r < 0) { + if (errno == EINTR) + return false; // retry recv_loop + err(EXIT_FAILURE, "recvmsg"); + } + + // success! no signals for the rest of the request/response cycle + CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, NULL)); + *len = r; if (r > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET && cmsg.hdr.cmsg_type == SCM_RIGHTS) { @@ -875,9 +896,19 @@ static void stderr_restore(FILE *tmp_err) clearerr(stderr); } +static void sigw(int sig) // SIGTERM handler for worker +{ + parent_pid = -1; // break out of recv_loop +} + static void recv_loop(void) // worker process loop { static char rbuf[4096 * 33]; // per-process + struct sigaction sa = {}; + sa.sa_handler = sigw; + + CHECK(int, 0, sigaction(SIGTERM, &sa, NULL)); + while (!parent_pid || getppid() == parent_pid) { size_t len = sizeof(rbuf); struct req req = {}; @@ -904,18 +935,6 @@ static void insert_pid(pid_t pid, unsigned nr) worker_pids[nr] = pid; } -static int delete_pid(pid_t pid) -{ - for (unsigned nr = 0; nr < nworker; nr++) { - if (worker_pids[nr] == pid) { - worker_pids[nr] = 0; - return nr; - } - } - warnx("W: unknown pid=%d reaped", (int)pid); - return -1; -} - static void start_worker(unsigned nr) { pid_t pid = fork(); @@ -925,11 +944,31 @@ static void start_worker(unsigned nr) insert_pid(pid, nr); } else { cleanup_pids(); + xclose(pipefds[0]); + xclose(pipefds[1]); + if (signal(SIGCHLD, SIG_DFL) == SIG_ERR) + err(EXIT_FAILURE, "signal CHLD"); + if (signal(SIGTTIN, SIG_IGN) == SIG_ERR) + err(EXIT_FAILURE, "signal TTIN"); + if (signal(SIGTTOU, SIG_IGN) == SIG_ERR) + err(EXIT_FAILURE, "signal TTIN"); recv_loop(); exit(0); } } +static void start_workers(void) +{ + sigset_t old; + + CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, &old)); + for (unsigned long nr = 0; nr < nworker; nr++) { + if (!worker_pids[nr]) + start_worker(nr); + } + CHECK(int, 0, sigprocmask(SIG_SETMASK, &old, NULL)); +} + static void cleanup_all(void) { cleanup_pids(); @@ -939,6 +978,121 @@ static void cleanup_all(void) #endif } +static void sigp(int sig) // parent signal handler +{ + static const char eagain[] = "signals coming in too fast"; + static const char bad_sig[] = "BUG: bad sig\n"; + static const char write_err[] = "BUG: sigp write: "; + char c = 0; + + switch (sig) { + case SIGCHLD: c = '.'; break; + case SIGTTOU: c = '-'; break; + case SIGTTIN: c = '+'; break; + default: + write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1); + _exit(EXIT_FAILURE); + } + ssize_t w = write(pipefds[1], &c, 1); + if (w == sizeof(c)) return; + int e = 0; + if (w < 0) { + e = errno; + if (e == EAGAIN) { + write(STDERR_FILENO, eagain, sizeof(eagain) - 1); + return; + } + } + struct iovec iov[3]; + iov[0].iov_base = (void *)write_err; + iov[0].iov_len = sizeof(write_err) - 1; + iov[1].iov_base = (void *)(e ? strerror(e) : "zero write"); + iov[1].iov_len = strlen((const char *)iov[1].iov_base); + iov[2].iov_base = (void *)"\n"; + iov[2].iov_len = 1; + (void)writev(STDERR_FILENO, iov, MY_ARRAY_SIZE(iov)); + _exit(EXIT_FAILURE); +} + +static void reaped_worker(pid_t pid, int st) +{ + unsigned long nr = 0; + for (; nr < nworker_hwm; nr++) { + if (worker_pids[nr] == pid) { + worker_pids[nr] = 0; + break; + } + } + if (nr >= nworker_hwm) { + warnx("W: unknown pid=%d reaped $?=%d", (int)pid, st); + return; + } + if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT) + alive = false; + else if (st) + warnx("worker[%lu] died $?=%d", nr, st); + if (alive) + start_workers(); +} + +static void do_sigchld(void) +{ + while (1) { + int st; + pid_t pid = waitpid(-1, &st, WNOHANG); + if (pid > 0) { + reaped_worker(pid, st); + } else if (pid == 0) { + return; + } else { + switch (errno) { + case ECHILD: return; + case EINTR: break; // can it happen w/ WNOHANG? + default: err(EXIT_FAILURE, "BUG: waitpid"); + } + } + } +} + +static void do_sigttin(void) +{ + if (!alive) return; + void *p = reallocarray(worker_pids, nworker + 1, sizeof(pid_t)); + if (!p) { + warn("reallocarray"); + } else { + worker_pids = (pid_t *)p; + worker_pids[nworker++] = 0; + if (nworker_hwm < nworker) + nworker_hwm = nworker; + start_workers(); + } +} + +static void do_sigttou(void) +{ + if (!alive || nworker <= 1) return; + + // worker_pids array does not shrink + --nworker; + for (unsigned long nr = nworker; nr < nworker_hwm; nr++) { + pid_t pid = worker_pids[nr]; + if (pid != 0 && kill(pid, SIGTERM)) + warn("BUG?: kill(%d, SIGTERM)", (int)pid); + } +} + +static size_t living_workers(void) +{ + size_t ret = 0; + + for (unsigned long nr = 0; nr < nworker_hwm; nr++) { + if (worker_pids[nr]) + ret++; + } + return ret; +} + int main(int argc, char *argv[]) { int c; @@ -953,7 +1107,7 @@ int main(int argc, char *argv[]) err(EXIT_FAILURE, "dup(2)"); } - nworker = 0; + nworker = 1; #ifdef _SC_NPROCESSORS_ONLN long j = sysconf(_SC_NPROCESSORS_ONLN); if (j > 0) @@ -981,27 +1135,79 @@ int main(int argc, char *argv[]) errx(EXIT_FAILURE, "BUG: `-%c'", c); } } - if (nworker == 0) { + sigset_t pset; // parent-only + CHECK(int, 0, sigfillset(&pset)); + + // global sigsets: + CHECK(int, 0, sigfillset(&fullset)); + CHECK(int, 0, sigfillset(&workerset)); + +#define DELSET(sig) do { \ + CHECK(int, 0, sigdelset(&fullset, sig)); \ + CHECK(int, 0, sigdelset(&workerset, sig)); \ + CHECK(int, 0, sigdelset(&pset, sig)); \ +} while (0) + DELSET(SIGABRT); + DELSET(SIGBUS); + DELSET(SIGFPE); + DELSET(SIGILL); + DELSET(SIGSEGV); + DELSET(SIGXCPU); + DELSET(SIGXFSZ); +#undef DELSET + + if (nworker == 0) { // no SIGTERM handling w/o workers recv_loop(); - } else { - parent_pid = getpid(); - worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t)); - if (!worker_pids) - err(EXIT_FAILURE, "calloc"); - for (unsigned i = 0; i < nworker; i++) - start_worker(i); - - int st; - pid_t pid; - bool quit = false; - while ((pid = wait(&st)) > 0) { - int nr = delete_pid(pid); - if (nr < 0) continue; - if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT) - quit = true; - if (!quit) - start_worker(nr); + return 0; + } + CHECK(int, 0, sigdelset(&workerset, SIGTERM)); + CHECK(int, 0, sigdelset(&workerset, SIGCHLD)); + parent_pid = getpid(); + nworker_hwm = nworker; + worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t)); + if (!worker_pids) err(EXIT_FAILURE, "calloc"); + + if (pipe(pipefds)) err(EXIT_FAILURE, "pipe"); + int fl = fcntl(F_GETFL, pipefds[1]); + if (fl == -1) err(EXIT_FAILURE, "F_GETFL"); + if (fcntl(F_SETFL, pipefds[1], fl | O_NONBLOCK)) + err(EXIT_FAILURE, "F_SETFL"); + + CHECK(int, 0, sigdelset(&pset, SIGCHLD)); + CHECK(int, 0, sigdelset(&pset, SIGTTIN)); + CHECK(int, 0, sigdelset(&pset, SIGTTOU)); + + struct sigaction sa = {}; + sa.sa_handler = sigp; + + CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL)); + CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL)); + sa.sa_flags = SA_NOCLDSTOP; + CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL)); + + CHECK(int, 0, sigprocmask(SIG_SETMASK, &pset, NULL)); + + start_workers(); + + char sbuf[64]; + while (alive || living_workers()) { + ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf)); + if (n < 0) { + if (errno == EINTR) continue; + err(EXIT_FAILURE, "read"); + } else if (n == 0) { + errx(EXIT_FAILURE, "read EOF"); + } + do_sigchld(); + for (ssize_t i = 0; i < n; i++) { + switch (sbuf[i]) { + case '.': break; // do_sigchld already called + case '-': do_sigttou(); break; + case '+': do_sigttin(); break; + default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]); + } } } + return 0; } |