diff options
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
-rw-r--r-- | lib/PublicInbox/xap_helper.h | 1046 |
1 files changed, 1046 insertions, 0 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h new file mode 100644 index 00000000..5a89544a --- /dev/null +++ b/lib/PublicInbox/xap_helper.h @@ -0,0 +1,1046 @@ +/* + * Copyright (C) all contributors <meta@public-inbox.org> + * License: GPL-2.0+ <https://www.gnu.org/licenses/gpl-2.0.txt> + * Note: GPL-2+ since it'll incorporate approxidate from git someday + * + * Standalone helper process using C and minimal C++ for Xapian, + * this is not linked to Perl in any way. + * C (not C++) is used as much as possible to lower the contribution + * barrier for hackers who mainly know C (this includes the maintainer). + * Yes, that means we use C stdlib stuff like hsearch and open_memstream + * instead their equivalents in the C++ stdlib :P + * Everything here is an unstable internal API of public-inbox and + * NOT intended for ordinary users; only public-inbox hackers + */ +#ifndef _ALL_SOURCE +# define _ALL_SOURCE +#endif +#if defined(__NetBSD__) && !defined(_OPENBSD_SOURCE) // for reallocarray(3) +# define _OPENBSD_SOURCE +#endif +#include <sys/file.h> +#include <sys/mman.h> +#include <sys/resource.h> +#include <sys/socket.h> +#include <sys/stat.h> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <sys/wait.h> + +#include <assert.h> +#include <err.h> // BSD, glibc, and musl all have this +#include <errno.h> +#include <fcntl.h> +#include <limits.h> +#include <search.h> +#include <signal.h> +#include <stddef.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sysexits.h> +#include <unistd.h> +#include <xapian.h> // our only reason for using C++ + +#define MY_VER(maj,min,rev) ((maj) << 16 | (min) << 8 | (rev)) +#define XAP_VER \ + MY_VER(XAPIAN_MAJOR_VERSION,XAPIAN_MINOR_VERSION,XAPIAN_REVISION) + +#if XAP_VER >= MY_VER(1,3,6) +# define NRP Xapian::NumberRangeProcessor +# define ADD_RP add_rangeprocessor +# define SET_MAX_EXPANSION set_max_expansion // technically 1.3.3 +#else +# define NRP Xapian::NumberValueRangeProcessor +# define ADD_RP add_valuerangeprocessor +# define SET_MAX_EXPANSION set_max_wildcard_expansion +#endif + +#if defined(__GLIBC__) +# define MY_DO_OPTRESET() do { optind = 0; } while (0) +#else /* FreeBSD, musl, dfly, NetBSD, OpenBSD */ +# define MY_DO_OPTRESET() do { optind = optreset = 1; } while (0) +#endif + +#if defined(__DragonFly__) || defined(__FreeBSD__) || defined(__GLIBC__) +# define STDERR_ASSIGNABLE (1) +#else +# define STDERR_ASSIGNABLE (0) +#endif + +// assert functions are used correctly (e.g. ensure hackers don't +// cause EINVAL/EFAULT). Not for stuff that can fail due to HW +// failures. +# define CHECK(type, expect, expr) do { \ + type ckvar______ = (expr); \ + assert(ckvar______ == (expect) && "BUG" && __FILE__ && __LINE__); \ +} while (0) + +// coredump on most usage errors since our only users are internal +#define ABORT(...) do { warnx(__VA_ARGS__); abort(); } while (0) +#define EABORT(...) do { warn(__VA_ARGS__); abort(); } while (0) + +// sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET +static volatile int sock_fd = STDIN_FILENO; +static sigset_t fullset, workerset; +static bool alive = true; +#if STDERR_ASSIGNABLE +static FILE *orig_err = stderr; +#endif +static int orig_err_fd = -1; +static void *srch_tree; // tsearch + tdelete + twalk +static pid_t *worker_pids; // nr => pid +#define WORKER_MAX USHRT_MAX +static unsigned long nworker, nworker_hwm; +static int pipefds[2]; + +// PublicInbox::Search and PublicInbox::CodeSearch generate these: +static void mail_nrp_init(void); +static void code_nrp_init(void); +static void qp_init_mail_search(Xapian::QueryParser *); +static void qp_init_code_search(Xapian::QueryParser *); + +enum exc_iter { + ITER_OK = 0, + ITER_RETRY, + ITER_ABORT +}; + +struct srch { + int paths_len; // int for comparisons + unsigned qp_flags; + Xapian::Database *db; + Xapian::QueryParser *qp; + char paths[]; // $shard_path0\0$shard_path1\0... +}; + +#define MY_ARG_MAX 256 +typedef bool (*cmd)(struct req *); + +// only one request per-process since we have RLIMIT_CPU timeout +struct req { // argv and pfxv point into global rbuf + char *argv[MY_ARG_MAX]; + char *pfxv[MY_ARG_MAX]; // -A <prefix> + size_t *lenv; // -A <prefix>LENGTH + struct srch *srch; + char *Pgit_dir; + char *Oeidx_key; + cmd fn; + unsigned long long max; + unsigned long long off; + unsigned long long threadid; + unsigned long timeout_sec; + size_t nr_out; + long sort_col; // value column, negative means BoolWeight + int argc; + int pfxc; + FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional) + bool has_input; // fp[0] is bidirectional + bool collapse_threads; + bool code_search; + bool relevance; // sort by relevance before column + bool asc; // ascending sort +}; + +struct worker { + pid_t pid; + unsigned nr; +}; + +struct fbuf { + FILE *fp; + char *ptr; + size_t len; +}; + +#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst)) +static size_t split2argv(char **dst, char *buf, size_t len, size_t limit) +{ + if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) + ABORT("bogus argument given"); + size_t nr = 0; + char *c = buf; + for (size_t i = 1; i < len; i++) { + if (!buf[i]) { + dst[nr++] = c; + c = buf + i + 1; + } + if (nr == limit) + ABORT("too many args: %zu == %zu", nr, limit); + } + if (nr == 0) ABORT("no argument given"); + if ((long)nr < 0) ABORT("too many args: %zu", nr); + return (long)nr; +} + +static bool has_threadid(const struct srch *srch) +{ + return srch->db->get_metadata("has_threadid") == "1"; +} + +static Xapian::Enquire prep_enquire(const struct req *req) +{ + Xapian::Enquire enq(*req->srch->db); + if (req->sort_col < 0) { + enq.set_weighting_scheme(Xapian::BoolWeight()); + enq.set_docid_order(req->asc ? Xapian::Enquire::ASCENDING + : Xapian::Enquire::DESCENDING); + } else if (req->relevance) { + enq.set_sort_by_relevance_then_value(req->sort_col, !req->asc); + } else { + enq.set_sort_by_value_then_relevance(req->sort_col, !req->asc); + } + return enq; +} + +static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq) +{ + if (!req->max) { + switch (sizeof(Xapian::doccount)) { + case 4: req->max = UINT_MAX; break; + default: req->max = ULLONG_MAX; + } + } + for (int i = 0; i < 9; i++) { + try { + Xapian::MSet mset = enq->get_mset(req->off, req->max); + return mset; + } catch (const Xapian::DatabaseModifiedError & e) { + req->srch->db->reopen(); + } + } + return enq->get_mset(req->off, req->max); +} + +// for v1, v2, and extindex +static Xapian::MSet mail_mset(struct req *req, const char *qry_str) +{ + struct srch *srch = req->srch; + Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags); + if (req->Oeidx_key) { + req->Oeidx_key[0] = 'O'; // modifies static rbuf + qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, + Xapian::Query(req->Oeidx_key)); + } + // TODO: uid_range + if (req->threadid != ULLONG_MAX) { + std::string tid = Xapian::sortable_serialise(req->threadid); + qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, + Xapian::Query(Xapian::Query::OP_VALUE_RANGE, THREADID, + tid, tid)); + } + Xapian::Enquire enq = prep_enquire(req); + enq.set_query(qry); + // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm + if (req->collapse_threads && has_threadid(srch)) + enq.set_collapse_key(THREADID); + + return enquire_mset(req, &enq); +} + +static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len) +{ + return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len); +} + +static void apply_roots_filter(struct req *req, Xapian::Query *qry) +{ + if (!req->Pgit_dir) return; + req->Pgit_dir[0] = 'P'; // modifies static rbuf + Xapian::Database *xdb = req->srch->db; + for (int i = 0; i < 9; i++) { + try { + std::string P = req->Pgit_dir; + Xapian::PostingIterator p = xdb->postlist_begin(P); + if (p == xdb->postlist_end(P)) { + warnx("W: %s not indexed?", req->Pgit_dir + 1); + return; + } + Xapian::TermIterator cur = xdb->termlist_begin(*p); + Xapian::TermIterator end = xdb->termlist_end(*p); + cur.skip_to("G"); + if (cur == end) { + warnx("W: %s has no root commits?", + req->Pgit_dir + 1); + return; + } + Xapian::Query f = Xapian::Query(*cur); + for (++cur; cur != end; ++cur) { + std::string tn = *cur; + if (!starts_with(&tn, "G", 1)) + continue; + f = Xapian::Query(Xapian::Query::OP_OR, f, tn); + } + *qry = Xapian::Query(Xapian::Query::OP_FILTER, *qry, f); + return; + } catch (const Xapian::DatabaseModifiedError & e) { + xdb->reopen(); + } + } +} + +// for cindex +static Xapian::MSet commit_mset(struct req *req, const char *qry_str) +{ + struct srch *srch = req->srch; + Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags); + apply_roots_filter(req, &qry); + + // we only want commits: + qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, + Xapian::Query("T" "c")); + Xapian::Enquire enq = prep_enquire(req); + enq.set_query(qry); + return enquire_mset(req, &enq); +} + +static void emit_mset_stats(struct req *req, const Xapian::MSet *mset) +{ + if (req->fp[1]) + fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n", + (unsigned long long)mset->size(), req->nr_out); + else + ABORT("BUG: %s caller only passed 1 FD", req->argv[0]); +} + +static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors +{ + return setvbuf(fp, NULL, _IOLBF, 0); +} + +// n.b. __cleanup__ works fine with C++ exceptions, but not longjmp +// Only clang and g++ are supported, as AFAIK there's no other +// relevant Free(-as-in-speech) C++ compilers. +#define CLEANUP_FBUF __attribute__((__cleanup__(fbuf_ensure))) +static void fbuf_ensure(void *ptr) +{ + struct fbuf *fbuf = (struct fbuf *)ptr; + if (fbuf->fp && fclose(fbuf->fp)) + err(EXIT_FAILURE, "fclose(fbuf->fp)"); // ENOMEM? + fbuf->fp = NULL; + free(fbuf->ptr); +} + +static void fbuf_init(struct fbuf *fbuf) +{ + assert(!fbuf->ptr); + fbuf->fp = open_memstream(&fbuf->ptr, &fbuf->len); + if (!fbuf->fp) err(EXIT_FAILURE, "open_memstream(fbuf)"); +} + +static bool write_all(int fd, const struct fbuf *wbuf, size_t len) +{ + const char *p = wbuf->ptr; + assert(wbuf->len >= len); + do { // write to client FD + ssize_t n = write(fd, p, len); + if (n > 0) { + len -= n; + p += n; + } else { + perror(n ? "write" : "write (zero bytes)"); + return false; + } + } while (len); + return true; +} + +#define ERR_FLUSH(f) do { \ + if (ferror(f) | fflush(f)) err(EXIT_FAILURE, "ferror|fflush "#f); \ +} while (0) + +#define ERR_CLOSE(f, e) do { \ + if (ferror(f) | fclose(f)) \ + e ? err(e, "ferror|fclose "#f) : perror("ferror|fclose "#f); \ +} while (0) + +static void xclose(int fd) +{ + if (close(fd) < 0 && errno != EINTR) + EABORT("BUG: close"); +} + +static size_t off2size(off_t n) +{ + if (n < 0 || (uintmax_t)n > SIZE_MAX) + ABORT("off_t out of size_t range: %lld\n", (long long)n); + return (size_t)n; +} + +static char *hsearch_enter_key(char *s) +{ +#if defined(__OpenBSD__) || defined(__DragonFly__) + // hdestroy frees each key on some platforms, + // so give it something to free: + char *ret = strdup(s); + if (!ret) err(EXIT_FAILURE, "strdup"); + return ret; +// AFAIK there's no way to detect musl, assume non-glibc Linux is musl: +#elif defined(__GLIBC__) || defined(__linux__) || \ + defined(__FreeBSD__) || defined(__NetBSD__) + // do nothing on these platforms +#else +#warning untested platform detected, unsure if hdestroy(3) frees keys +#warning contact us at meta@public-inbox.org if you get segfaults +#endif + return s; +} + +// for test usage only, we need to ensure the compiler supports +// __cleanup__ when exceptions are thrown +struct inspect { struct req *req; }; + +static void inspect_ensure(struct inspect *x) +{ + fprintf(x->req->fp[0], "pid=%d has_threadid=%d", + (int)getpid(), has_threadid(x->req->srch) ? 1 : 0); +} + +static bool cmd_test_inspect(struct req *req) +{ + __attribute__((__cleanup__(inspect_ensure))) struct inspect x; + x.req = req; + try { + throw Xapian::InvalidArgumentError("test"); + } catch (Xapian::InvalidArgumentError) { + return true; + } + fputs("this should not be printed", req->fp[0]); + return false; +} + +#include "xh_mset.h" // read-only (WWW, IMAP, lei) stuff +#include "xh_cidx.h" // CodeSearchIdx.pm stuff + +#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n } +static const struct cmd_entry { + size_t fn_len; + const char *fn_name; + cmd fn; +} cmds[] = { // should be small enough to not need bsearch || gperf + // most common commands first + CMD(mset), // WWW and IMAP requests + CMD(dump_ibx), // many inboxes + CMD(dump_roots), // per-cidx shard + CMD(test_inspect), // least common commands last +}; + +#define MY_ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0])) +#define RECV_FD_CAPA 2 +#define RECV_FD_SPACE (RECV_FD_CAPA * sizeof(int)) +union my_cmsg { + struct cmsghdr hdr; + char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE]; +}; + +static bool recv_req(struct req *req, char *rbuf, size_t *len) +{ + union my_cmsg cmsg = {}; + struct msghdr msg = {}; + struct iovec iov; + ssize_t r; + iov.iov_base = rbuf; + iov.iov_len = *len; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = &cmsg.hdr; + msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE); + + // allow SIGTERM to hit + CHECK(int, 0, sigprocmask(SIG_SETMASK, &workerset, NULL)); + +again: + r = recvmsg(sock_fd, &msg, 0); + if (r == 0) { + exit(EX_NOINPUT); /* grandparent went away */ + } else if (r < 0) { + switch (errno) { + case EINTR: goto again; + case EBADF: if (sock_fd < 0) exit(0); + // fall-through + default: err(EXIT_FAILURE, "recvmsg"); + } + } + + // success! no signals for the rest of the request/response cycle + CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, NULL)); + if (r > 0 && msg.msg_flags) + ABORT("unexpected msg_flags"); + + *len = r; + if (cmsg.hdr.cmsg_level == SOL_SOCKET && + cmsg.hdr.cmsg_type == SCM_RIGHTS) { + size_t clen = cmsg.hdr.cmsg_len; + int *fdp = (int *)CMSG_DATA(&cmsg.hdr); + size_t i; + for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= clen; i++) { + int fd = *fdp++; + const char *mode = NULL; + int fl = fcntl(fd, F_GETFL); + if (fl == -1) { + errx(EXIT_FAILURE, "invalid fd=%d", fd); + } else if (fl & O_WRONLY) { + mode = "w"; + } else if (fl & O_RDWR) { + mode = "r+"; + if (i == 0) req->has_input = true; + } else { + errx(EXIT_FAILURE, + "invalid mode from F_GETFL: 0x%x", fl); + } + req->fp[i] = fdopen(fd, mode); + if (!req->fp[i]) + err(EXIT_FAILURE, "fdopen(fd=%d)", fd); + } + return true; + } + errx(EXIT_FAILURE, "no FD received in %zd-byte request", r); + return false; +} + +static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch +{ + const struct srch *a = (const struct srch *)pa; + const struct srch *b = (const struct srch *)pb; + int diff = a->paths_len - b->paths_len; + + return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len); +} + +static bool is_chert(const char *dir) +{ + char iamchert[PATH_MAX]; + struct stat sb; + int rc = snprintf(iamchert, sizeof(iamchert), "%s/iamchert", dir); + + if (rc <= 0 || rc >= (int)sizeof(iamchert)) + err(EXIT_FAILURE, "BUG: snprintf(%s/iamchert)", dir); + if (stat(iamchert, &sb) == 0 && S_ISREG(sb.st_mode)) + return true; + return false; +} + +static bool srch_init(struct req *req) +{ + char *dirv[MY_ARG_MAX]; + int i; + struct srch *srch = req->srch; + int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len); + const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE; + srch->qp_flags = FLAG_PHRASE | + Xapian::QueryParser::FLAG_BOOLEAN | + Xapian::QueryParser::FLAG_LOVEHATE | + Xapian::QueryParser::FLAG_WILDCARD; + if (is_chert(dirv[0])) + srch->qp_flags &= ~FLAG_PHRASE; + try { + srch->db = new Xapian::Database(dirv[0]); + } catch (...) { + warn("E: Xapian::Database(%s)", dirv[0]); + return false; + } + try { + for (i = 1; i < dirc; i++) { + if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i])) + srch->qp_flags &= ~FLAG_PHRASE; + srch->db->add_database(Xapian::Database(dirv[i])); + } + } catch (...) { + warn("E: add_database(%s)", dirv[i]); + return false; + } + try { + srch->qp = new Xapian::QueryParser; + } catch (...) { + perror("E: Xapian::QueryParser"); + return false; + } + srch->qp->set_default_op(Xapian::Query::OP_AND); + srch->qp->set_database(*srch->db); + try { + srch->qp->set_stemmer(Xapian::Stem("english")); + } catch (...) { + perror("E: Xapian::Stem"); + return false; + } + srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME); + srch->qp->SET_MAX_EXPANSION(100); + + if (req->code_search) + qp_init_code_search(srch->qp); // CodeSearch.pm + else + qp_init_mail_search(srch->qp); // Search.pm + return true; +} + +static void free_srch(void *p) // tdestroy +{ + struct srch *srch = (struct srch *)p; + delete srch->qp; + delete srch->db; + free(srch); +} + +static void dispatch(struct req *req) +{ + int c; + size_t size = strlen(req->argv[0]); + union { + struct srch *srch; + char *ptr; + } kbuf; + char *end; + FILE *kfp; + struct srch **s; + req->threadid = ULLONG_MAX; + for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) { + if (cmds[c].fn_len == size && + !memcmp(cmds[c].fn_name, req->argv[0], size)) { + req->fn = cmds[c].fn; + break; + } + } + if (!req->fn) ABORT("not handled: `%s'", req->argv[0]); + + kfp = open_memstream(&kbuf.ptr, &size); + if (!kfp) err(EXIT_FAILURE, "open_memstream(kbuf)"); + // write padding, first (contents don't matter) + fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp); + + // global getopt variables: + optopt = 0; + optarg = NULL; + MY_DO_OPTRESET(); + + // XH_SPEC is generated from @PublicInbox::Search::XH_SPEC + while ((c = getopt(req->argc, req->argv, XH_SPEC)) != -1) { + switch (c) { + case 'a': req->asc = true; break; + case 'c': req->code_search = true; break; + case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break; + case 'g': req->Pgit_dir = optarg - 1; break; // pad "P" prefix + case 'k': + req->sort_col = strtol(optarg, &end, 10); + if (*end) ABORT("-k %s", optarg); + switch (req->sort_col) { + case LONG_MAX: case LONG_MIN: ABORT("-k %s", optarg); + } + break; + case 'm': + req->max = strtoull(optarg, &end, 10); + if (*end || req->max == ULLONG_MAX) + ABORT("-m %s", optarg); + break; + case 'o': + req->off = strtoull(optarg, &end, 10); + if (*end || req->off == ULLONG_MAX) + ABORT("-o %s", optarg); + break; + case 'r': req->relevance = true; break; + case 't': req->collapse_threads = true; break; + case 'A': + req->pfxv[req->pfxc++] = optarg; + if (MY_ARG_MAX == req->pfxc) + ABORT("too many -A"); + break; + case 'K': + req->timeout_sec = strtoul(optarg, &end, 10); + if (*end || req->timeout_sec == ULONG_MAX) + ABORT("-K %s", optarg); + break; + case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix + case 'T': + req->threadid = strtoull(optarg, &end, 10); + if (*end || req->threadid == ULLONG_MAX) + ABORT("-T %s", optarg); + break; + default: ABORT("bad switch `-%c'", c); + } + } + ERR_CLOSE(kfp, EXIT_FAILURE); // may ENOMEM, sets kbuf.srch + kbuf.srch->db = NULL; + kbuf.srch->qp = NULL; + kbuf.srch->paths_len = size - offsetof(struct srch, paths); + if (kbuf.srch->paths_len <= 0) + ABORT("no -d args"); + s = (struct srch **)tsearch(kbuf.srch, &srch_tree, srch_cmp); + if (!s) err(EXIT_FAILURE, "tsearch"); // likely ENOMEM + req->srch = *s; + if (req->srch != kbuf.srch) { // reuse existing + free_srch(kbuf.srch); + } else if (!srch_init(req)) { + assert(kbuf.srch == *((struct srch **)tfind( + kbuf.srch, &srch_tree, srch_cmp))); + void *del = tdelete(kbuf.srch, &srch_tree, srch_cmp); + assert(del); + free_srch(kbuf.srch); + goto cmd_err; // srch_init already warned + } + try { + if (!req->fn(req)) + warnx("`%s' failed", req->argv[0]); + } catch (const Xapian::Error & e) { + warnx("Xapian::Error: %s", e.get_description().c_str()); + } catch (...) { + warn("unhandled exception"); + } +cmd_err: + return; // just be silent on errors, for now +} + +static void cleanup_pids(void) +{ + free(worker_pids); + worker_pids = NULL; +} + +static void stderr_set(FILE *tmp_err) +{ +#if STDERR_ASSIGNABLE + if (my_setlinebuf(tmp_err)) + perror("W: setlinebuf(tmp_err)"); + stderr = tmp_err; + return; +#endif + int fd = fileno(tmp_err); + if (fd < 0) err(EXIT_FAILURE, "BUG: fileno(tmp_err)"); + while (dup2(fd, STDERR_FILENO) < 0) { + if (errno != EINTR) + err(EXIT_FAILURE, "dup2(%d => 2)", fd); + } +} + +static void stderr_restore(FILE *tmp_err) +{ +#if STDERR_ASSIGNABLE + stderr = orig_err; + return; +#endif + ERR_FLUSH(stderr); + while (dup2(orig_err_fd, STDERR_FILENO) < 0) { + if (errno != EINTR) + err(EXIT_FAILURE, "dup2(%d => 2)", orig_err_fd); + } + clearerr(stderr); +} + +static void sigw(int sig) // SIGTERM handler for worker +{ + sock_fd = -1; // break out of recv_loop +} + +#define CLEANUP_REQ __attribute__((__cleanup__(req_cleanup))) +static void req_cleanup(void *ptr) +{ + struct req *req = (struct req *)ptr; + free(req->lenv); +} + +static void recv_loop(void) // worker process loop +{ + static char rbuf[4096 * 33]; // per-process + struct sigaction sa = {}; + sa.sa_handler = sigw; + + CHECK(int, 0, sigaction(SIGTERM, &sa, NULL)); + + while (sock_fd == 0) { + size_t len = sizeof(rbuf); + CLEANUP_REQ struct req req = {}; + + if (!recv_req(&req, rbuf, &len)) + continue; + if (req.fp[1]) + stderr_set(req.fp[1]); + req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len); + dispatch(&req); + ERR_CLOSE(req.fp[0], 0); + if (req.fp[1]) { + stderr_restore(req.fp[1]); + ERR_CLOSE(req.fp[1], 0); + } + } +} + +static void insert_pid(pid_t pid, unsigned nr) +{ + assert(!worker_pids[nr]); + worker_pids[nr] = pid; +} + +static void start_worker(unsigned nr) +{ + pid_t pid = fork(); + if (pid < 0) { + warn("E: fork(worker=%u)", nr); + } else if (pid > 0) { + insert_pid(pid, nr); + } else { + cleanup_pids(); + xclose(pipefds[0]); + xclose(pipefds[1]); + if (signal(SIGCHLD, SIG_DFL) == SIG_ERR) + err(EXIT_FAILURE, "signal CHLD"); + if (signal(SIGTTIN, SIG_IGN) == SIG_ERR) + err(EXIT_FAILURE, "signal TTIN"); + if (signal(SIGTTOU, SIG_IGN) == SIG_ERR) + err(EXIT_FAILURE, "signal TTIN"); + recv_loop(); + exit(0); + } +} + +static void start_workers(void) +{ + sigset_t old; + + CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, &old)); + for (unsigned long nr = 0; nr < nworker; nr++) { + if (!worker_pids[nr]) + start_worker(nr); + } + CHECK(int, 0, sigprocmask(SIG_SETMASK, &old, NULL)); +} + +static void cleanup_all(void) +{ + cleanup_pids(); +#ifdef __GLIBC__ + tdestroy(srch_tree, free_srch); + srch_tree = NULL; +#endif +} + +static void sigp(int sig) // parent signal handler +{ + static const char eagain[] = "signals coming in too fast"; + static const char bad_sig[] = "BUG: bad sig\n"; + static const char write_errno[] = "BUG: sigp write (errno)"; + static const char write_zero[] = "BUG: sigp write wrote zero bytes"; + char c = 0; + + switch (sig) { + case SIGCHLD: c = '.'; break; + case SIGTTOU: c = '-'; break; + case SIGTTIN: c = '+'; break; + default: + write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1); + _exit(EXIT_FAILURE); + } + ssize_t w = write(pipefds[1], &c, 1); + if (w > 0) return; + if (w < 0 && errno == EAGAIN) { + write(STDERR_FILENO, eagain, sizeof(eagain) - 1); + return; + } else if (w == 0) { + write(STDERR_FILENO, write_zero, sizeof(write_zero) - 1); + } else { + // strerror isn't technically async-signal-safe, and + // strerrordesc_np+strerrorname_np isn't portable + write(STDERR_FILENO, write_errno, sizeof(write_errno) - 1); + } + _exit(EXIT_FAILURE); +} + +static void reaped_worker(pid_t pid, int st) +{ + unsigned long nr = 0; + for (; nr < nworker_hwm; nr++) { + if (worker_pids[nr] == pid) { + worker_pids[nr] = 0; + break; + } + } + if (nr >= nworker_hwm) { + warnx("W: unknown pid=%d reaped $?=%d", (int)pid, st); + return; + } + if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT) + alive = false; + else if (st) + warnx("worker[%lu] died $?=%d alive=%d", nr, st, (int)alive); + if (alive) + start_workers(); +} + +static void do_sigchld(void) +{ + while (1) { + int st; + pid_t pid = waitpid(-1, &st, WNOHANG); + if (pid > 0) { + reaped_worker(pid, st); + } else if (pid == 0) { + return; + } else { + switch (errno) { + case ECHILD: return; + case EINTR: break; // can it happen w/ WNOHANG? + default: err(EXIT_FAILURE, "BUG: waitpid"); + } + } + } +} + +static void do_sigttin(void) +{ + if (!alive) return; + if (nworker >= WORKER_MAX) { + warnx("workers cannot exceed %zu", (size_t)WORKER_MAX); + return; + } + void *p = realloc(worker_pids, (nworker + 1) * sizeof(pid_t)); + if (!p) { + warn("realloc worker_pids"); + } else { + worker_pids = (pid_t *)p; + worker_pids[nworker++] = 0; + if (nworker_hwm < nworker) + nworker_hwm = nworker; + start_workers(); + } +} + +static void do_sigttou(void) +{ + if (!alive || nworker <= 1) return; + + // worker_pids array does not shrink + --nworker; + for (unsigned long nr = nworker; nr < nworker_hwm; nr++) { + pid_t pid = worker_pids[nr]; + if (pid != 0 && kill(pid, SIGTERM)) + warn("BUG?: kill(%d, SIGTERM)", (int)pid); + } +} + +static size_t living_workers(void) +{ + size_t ret = 0; + + for (unsigned long nr = 0; nr < nworker_hwm; nr++) { + if (worker_pids[nr]) + ret++; + } + return ret; +} + +int main(int argc, char *argv[]) +{ + int c; + socklen_t slen = (socklen_t)sizeof(c); + + if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen)) + err(EXIT_FAILURE, "getsockopt"); + if (c != SOCK_SEQPACKET) + errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET"); + + mail_nrp_init(); + code_nrp_init(); + atexit(cleanup_all); + + if (!STDERR_ASSIGNABLE) { + orig_err_fd = dup(STDERR_FILENO); + if (orig_err_fd < 0) + err(EXIT_FAILURE, "dup(2)"); + } + + nworker = 1; + // make warn/warnx/err multi-process friendly: + if (my_setlinebuf(stderr)) + err(EXIT_FAILURE, "setlinebuf(stderr)"); + // not using -W<workers> like Daemon.pm, since -W is reserved (glibc) + while ((c = getopt(argc, argv, "j:")) != -1) { + char *end; + + switch (c) { + case 'j': + nworker = strtoul(optarg, &end, 10); + if (*end != 0 || nworker > WORKER_MAX) + errx(EXIT_FAILURE, "-j %s invalid", optarg); + break; + case ':': + errx(EXIT_FAILURE, "missing argument: `-%c'", optopt); + case '?': + errx(EXIT_FAILURE, "unrecognized: `-%c'", optopt); + default: + errx(EXIT_FAILURE, "BUG: `-%c'", c); + } + } + sigset_t pset; // parent-only + CHECK(int, 0, sigfillset(&pset)); + + // global sigsets: + CHECK(int, 0, sigfillset(&fullset)); + CHECK(int, 0, sigfillset(&workerset)); + +#define DELSET(sig) do { \ + CHECK(int, 0, sigdelset(&fullset, sig)); \ + CHECK(int, 0, sigdelset(&workerset, sig)); \ + CHECK(int, 0, sigdelset(&pset, sig)); \ +} while (0) + DELSET(SIGABRT); + DELSET(SIGBUS); + DELSET(SIGFPE); + DELSET(SIGILL); + DELSET(SIGSEGV); + DELSET(SIGXCPU); + DELSET(SIGXFSZ); +#undef DELSET + + if (nworker == 0) { // no SIGTERM handling w/o workers + recv_loop(); + return 0; + } + CHECK(int, 0, sigdelset(&workerset, SIGTERM)); + CHECK(int, 0, sigdelset(&workerset, SIGCHLD)); + nworker_hwm = nworker; + worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t)); + if (!worker_pids) err(EXIT_FAILURE, "calloc"); + + if (pipe(pipefds)) err(EXIT_FAILURE, "pipe"); + int fl = fcntl(pipefds[1], F_GETFL); + if (fl == -1) err(EXIT_FAILURE, "F_GETFL"); + if (fcntl(pipefds[1], F_SETFL, fl | O_NONBLOCK)) + err(EXIT_FAILURE, "F_SETFL"); + + CHECK(int, 0, sigdelset(&pset, SIGCHLD)); + CHECK(int, 0, sigdelset(&pset, SIGTTIN)); + CHECK(int, 0, sigdelset(&pset, SIGTTOU)); + + struct sigaction sa = {}; + sa.sa_handler = sigp; + + CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL)); + CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL)); + sa.sa_flags = SA_NOCLDSTOP; + CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL)); + + CHECK(int, 0, sigprocmask(SIG_SETMASK, &pset, NULL)); + + start_workers(); + + char sbuf[64]; + while (alive || living_workers()) { + ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf)); + if (n < 0) { + if (errno == EINTR) continue; + err(EXIT_FAILURE, "read"); + } else if (n == 0) { + errx(EXIT_FAILURE, "read EOF"); + } + do_sigchld(); + for (ssize_t i = 0; i < n; i++) { + switch (sbuf[i]) { + case '.': break; // do_sigchld already called + case '-': do_sigttou(); break; + case '+': do_sigttin(); break; + default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]); + } + } + } + + return 0; +} |