/*
* Copyright (C) all contributors
* License: GPL-2.0+
* Note: GPL-2+ since it'll incorporate approxidate from git someday
*
* Standalone helper process using C and minimal C++ for Xapian,
* this is not linked to Perl in any way.
* C (not C++) is used as much as possible to lower the contribution
* barrier for hackers who mainly know C (this includes the maintainer).
* Yes, that means we use C stdlib stuff like hsearch and open_memstream
* instead their equivalents in the C++ stdlib :P
* Everything here is an unstable internal API of public-inbox and
* NOT intended for ordinary users; only public-inbox hackers
*/
#ifndef _ALL_SOURCE
# define _ALL_SOURCE
#endif
#if defined(__NetBSD__) && !defined(_OPENBSD_SOURCE) // for reallocarray(3)
# define _OPENBSD_SOURCE
#endif
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include // BSD, glibc, and musl all have this
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include // our only reason for using C++
#define MY_VER(maj,min,rev) ((maj) << 16 | (min) << 8 | (rev))
#define XAP_VER \
MY_VER(XAPIAN_MAJOR_VERSION,XAPIAN_MINOR_VERSION,XAPIAN_REVISION)
#if XAP_VER >= MY_VER(1,3,6)
# define NRP Xapian::NumberRangeProcessor
# define ADD_RP add_rangeprocessor
# define SET_MAX_EXPANSION set_max_expansion // technically 1.3.3
#else
# define NRP Xapian::NumberValueRangeProcessor
# define ADD_RP add_valuerangeprocessor
# define SET_MAX_EXPANSION set_max_wildcard_expansion
#endif
#if defined(__GLIBC__)
# define MY_DO_OPTRESET() do { optind = 0; } while (0)
#else /* FreeBSD, musl, dfly, NetBSD, OpenBSD */
# define MY_DO_OPTRESET() do { optind = optreset = 1; } while (0)
#endif
#if defined(__DragonFly__) || defined(__FreeBSD__) || defined(__GLIBC__)
# define STDERR_ASSIGNABLE (1)
#else
# define STDERR_ASSIGNABLE (0)
#endif
// assert functions are used correctly (e.g. ensure hackers don't
// cause EINVAL/EFAULT). Not for stuff that can fail due to HW
// failures.
# define CHECK(type, expect, expr) do { \
type ckvar______ = (expr); \
assert(ckvar______ == (expect) && "BUG" && __FILE__ && __LINE__); \
} while (0)
// coredump on most usage errors since our only users are internal
#define ABORT(...) do { warnx(__VA_ARGS__); abort(); } while (0)
#define EABORT(...) do { warn(__VA_ARGS__); abort(); } while (0)
// sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET
static volatile int sock_fd = STDIN_FILENO;
static sigset_t fullset, workerset;
static bool alive = true;
#if STDERR_ASSIGNABLE
static FILE *orig_err = stderr;
#endif
static int orig_err_fd = -1;
static void *srch_tree; // tsearch + tdelete + twalk
static pid_t *worker_pids; // nr => pid
#define WORKER_MAX USHRT_MAX
static unsigned long nworker, nworker_hwm;
static int pipefds[2];
static const char *stdout_path, *stderr_path; // for SIGUSR1
static sig_atomic_t worker_needs_reopen;
// PublicInbox::Search and PublicInbox::CodeSearch generate these:
static void mail_nrp_init(void);
static void code_nrp_init(void);
static void qp_init_mail_search(Xapian::QueryParser *);
static void qp_init_code_search(Xapian::QueryParser *);
enum exc_iter {
ITER_OK = 0,
ITER_RETRY,
ITER_ABORT
};
struct srch {
int paths_len; // int for comparisons
unsigned qp_flags;
bool qp_extra_done;
Xapian::Database *db;
Xapian::QueryParser *qp;
char paths[]; // $shard_path0\0$shard_path1\0...
};
#define MY_ARG_MAX 256
typedef bool (*cmd)(struct req *);
// only one request per-process since we have RLIMIT_CPU timeout
struct req { // argv and pfxv point into global rbuf
char *argv[MY_ARG_MAX];
char *pfxv[MY_ARG_MAX]; // -A
char *qpfxv[MY_ARG_MAX]; // -Q [:=]
size_t *lenv; // -A LENGTH
struct srch *srch;
char *Pgit_dir;
char *Oeidx_key;
cmd fn;
unsigned long long max;
unsigned long long off;
unsigned long long threadid;
unsigned long timeout_sec;
size_t nr_out;
long sort_col; // value column, negative means BoolWeight
int argc;
int pfxc;
int qpfxc;
FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional)
bool has_input; // fp[0] is bidirectional
bool collapse_threads;
bool code_search;
bool relevance; // sort by relevance before column
bool asc; // ascending sort
};
struct worker {
pid_t pid;
unsigned nr;
};
struct fbuf {
FILE *fp;
char *ptr;
size_t len;
};
#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
static size_t split2argv(char **dst, char *buf, size_t len, size_t limit)
{
if (buf[0] == 0 || len == 0 || buf[len - 1] != 0)
ABORT("bogus argument given");
size_t nr = 0;
char *c = buf;
for (size_t i = 1; i < len; i++) {
if (!buf[i]) {
dst[nr++] = c;
c = buf + i + 1;
}
if (nr == limit)
ABORT("too many args: %zu == %zu", nr, limit);
}
if (nr == 0) ABORT("no argument given");
if ((long)nr < 0) ABORT("too many args: %zu", nr);
return (long)nr;
}
static bool has_threadid(const struct srch *srch)
{
return srch->db->get_metadata("has_threadid") == "1";
}
static Xapian::Enquire prep_enquire(const struct req *req)
{
Xapian::Enquire enq(*req->srch->db);
if (req->sort_col < 0) {
enq.set_weighting_scheme(Xapian::BoolWeight());
enq.set_docid_order(req->asc ? Xapian::Enquire::ASCENDING
: Xapian::Enquire::DESCENDING);
} else if (req->relevance) {
enq.set_sort_by_relevance_then_value(req->sort_col, !req->asc);
} else {
enq.set_sort_by_value_then_relevance(req->sort_col, !req->asc);
}
return enq;
}
static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
{
if (!req->max) {
switch (sizeof(Xapian::doccount)) {
case 4: req->max = UINT_MAX; break;
default: req->max = ULLONG_MAX;
}
}
for (int i = 0; i < 9; i++) {
try {
Xapian::MSet mset = enq->get_mset(req->off, req->max);
return mset;
} catch (const Xapian::DatabaseModifiedError & e) {
req->srch->db->reopen();
}
}
return enq->get_mset(req->off, req->max);
}
// for v1, v2, and extindex
static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
{
struct srch *srch = req->srch;
Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
if (req->Oeidx_key) {
req->Oeidx_key[0] = 'O'; // modifies static rbuf
qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
Xapian::Query(req->Oeidx_key));
}
// TODO: uid_range
if (req->threadid != ULLONG_MAX) {
std::string tid = Xapian::sortable_serialise(req->threadid);
qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
Xapian::Query(Xapian::Query::OP_VALUE_RANGE, THREADID,
tid, tid));
}
Xapian::Enquire enq = prep_enquire(req);
enq.set_query(qry);
// THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm
if (req->collapse_threads && has_threadid(srch))
enq.set_collapse_key(THREADID);
return enquire_mset(req, &enq);
}
static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
{
return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
}
static void apply_roots_filter(struct req *req, Xapian::Query *qry)
{
if (!req->Pgit_dir) return;
req->Pgit_dir[0] = 'P'; // modifies static rbuf
Xapian::Database *xdb = req->srch->db;
for (int i = 0; i < 9; i++) {
try {
std::string P = req->Pgit_dir;
Xapian::PostingIterator p = xdb->postlist_begin(P);
if (p == xdb->postlist_end(P)) {
warnx("W: %s not indexed?", req->Pgit_dir + 1);
return;
}
Xapian::TermIterator cur = xdb->termlist_begin(*p);
Xapian::TermIterator end = xdb->termlist_end(*p);
cur.skip_to("G");
if (cur == end) {
warnx("W: %s has no root commits?",
req->Pgit_dir + 1);
return;
}
Xapian::Query f = Xapian::Query(*cur);
for (++cur; cur != end; ++cur) {
std::string tn = *cur;
if (!starts_with(&tn, "G", 1))
continue;
f = Xapian::Query(Xapian::Query::OP_OR, f, tn);
}
*qry = Xapian::Query(Xapian::Query::OP_FILTER, *qry, f);
return;
} catch (const Xapian::DatabaseModifiedError & e) {
xdb->reopen();
}
}
}
// for cindex
static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
{
struct srch *srch = req->srch;
Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
apply_roots_filter(req, &qry);
// we only want commits:
qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
Xapian::Query("T" "c"));
Xapian::Enquire enq = prep_enquire(req);
enq.set_query(qry);
return enquire_mset(req, &enq);
}
static void emit_mset_stats(struct req *req, const Xapian::MSet *mset)
{
if (req->fp[1])
fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
(unsigned long long)mset->size(), req->nr_out);
else
ABORT("BUG: %s caller only passed 1 FD", req->argv[0]);
}
static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
{
return setvbuf(fp, NULL, _IOLBF, 0);
}
// n.b. __cleanup__ works fine with C++ exceptions, but not longjmp
// Only clang and g++ are supported, as AFAIK there's no other
// relevant Free(-as-in-speech) C++ compilers.
#define CLEANUP_FBUF __attribute__((__cleanup__(fbuf_ensure)))
static void fbuf_ensure(void *ptr)
{
struct fbuf *fbuf = (struct fbuf *)ptr;
if (fbuf->fp && fclose(fbuf->fp))
err(EXIT_FAILURE, "fclose(fbuf->fp)"); // ENOMEM?
fbuf->fp = NULL;
free(fbuf->ptr);
}
static void fbuf_init(struct fbuf *fbuf)
{
assert(!fbuf->ptr);
fbuf->fp = open_memstream(&fbuf->ptr, &fbuf->len);
if (!fbuf->fp) err(EXIT_FAILURE, "open_memstream(fbuf)");
}
static bool write_all(int fd, const struct fbuf *wbuf, size_t len)
{
const char *p = wbuf->ptr;
assert(wbuf->len >= len);
do { // write to client FD
ssize_t n = write(fd, p, len);
if (n > 0) {
len -= n;
p += n;
} else {
perror(n ? "write" : "write (zero bytes)");
return false;
}
} while (len);
return true;
}
#define ERR_FLUSH(f) do { \
if (ferror(f) | fflush(f)) err(EXIT_FAILURE, "ferror|fflush "#f); \
} while (0)
#define ERR_CLOSE(f, e) do { \
if (ferror(f) | fclose(f)) \
e ? err(e, "ferror|fclose "#f) : perror("ferror|fclose "#f); \
} while (0)
static void xclose(int fd)
{
if (close(fd) < 0 && errno != EINTR)
EABORT("BUG: close");
}
static size_t off2size(off_t n)
{
if (n < 0 || (uintmax_t)n > SIZE_MAX)
ABORT("off_t out of size_t range: %lld\n", (long long)n);
return (size_t)n;
}
static char *hsearch_enter_key(char *s)
{
#if defined(__OpenBSD__) || defined(__DragonFly__)
// hdestroy frees each key on some platforms,
// so give it something to free:
char *ret = strdup(s);
if (!ret) err(EXIT_FAILURE, "strdup");
return ret;
// AFAIK there's no way to detect musl, assume non-glibc Linux is musl:
#elif defined(__GLIBC__) || defined(__linux__) || \
defined(__FreeBSD__) || defined(__NetBSD__)
// do nothing on these platforms
#else
#warning untested platform detected, unsure if hdestroy(3) frees keys
#warning contact us at meta@public-inbox.org if you get segfaults
#endif
return s;
}
// for test usage only, we need to ensure the compiler supports
// __cleanup__ when exceptions are thrown
struct inspect { struct req *req; };
static void inspect_ensure(struct inspect *x)
{
fprintf(x->req->fp[0], "pid=%d has_threadid=%d",
(int)getpid(), has_threadid(x->req->srch) ? 1 : 0);
}
static bool cmd_test_inspect(struct req *req)
{
__attribute__((__cleanup__(inspect_ensure))) struct inspect x;
x.req = req;
try {
throw Xapian::InvalidArgumentError("test");
} catch (Xapian::InvalidArgumentError) {
return true;
}
fputs("this should not be printed", req->fp[0]);
return false;
}
static bool cmd_test_sleep(struct req *req)
{
for (;;) poll(NULL, 0, 10);
return false;
}
#include "xh_mset.h" // read-only (WWW, IMAP, lei) stuff
#include "xh_cidx.h" // CodeSearchIdx.pm stuff
#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n }
static const struct cmd_entry {
size_t fn_len;
const char *fn_name;
cmd fn;
} cmds[] = { // should be small enough to not need bsearch || gperf
// most common commands first
CMD(mset), // WWW and IMAP requests
CMD(dump_ibx), // many inboxes
CMD(dump_roots), // per-cidx shard
CMD(test_inspect), // least common commands last
CMD(test_sleep), // least common commands last
};
#define MY_ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0]))
#define RECV_FD_CAPA 2
#define RECV_FD_SPACE (RECV_FD_CAPA * sizeof(int))
union my_cmsg {
struct cmsghdr hdr;
char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
};
static bool recv_req(struct req *req, char *rbuf, size_t *len)
{
union my_cmsg cmsg = {};
struct msghdr msg = {};
struct iovec iov;
ssize_t r;
iov.iov_base = rbuf;
iov.iov_len = *len;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = &cmsg.hdr;
msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE);
// allow SIGTERM to hit
CHECK(int, 0, sigprocmask(SIG_SETMASK, &workerset, NULL));
again:
r = recvmsg(sock_fd, &msg, 0);
if (r == 0) {
exit(EX_NOINPUT); /* grandparent went away */
} else if (r < 0) {
switch (errno) {
case EINTR: goto again;
case EBADF: if (sock_fd < 0) exit(0);
// fall-through
default: err(EXIT_FAILURE, "recvmsg");
}
}
// success! no signals for the rest of the request/response cycle
CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, NULL));
if (r > 0 && msg.msg_flags)
ABORT("unexpected msg_flags");
*len = r;
if (cmsg.hdr.cmsg_level == SOL_SOCKET &&
cmsg.hdr.cmsg_type == SCM_RIGHTS) {
size_t clen = cmsg.hdr.cmsg_len;
int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
size_t i;
for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= clen; i++) {
int fd = *fdp++;
const char *mode = NULL;
int fl = fcntl(fd, F_GETFL);
if (fl == -1) {
errx(EXIT_FAILURE, "invalid fd=%d", fd);
} else if (fl & O_WRONLY) {
mode = "w";
} else if (fl & O_RDWR) {
mode = "r+";
if (i == 0) req->has_input = true;
} else {
errx(EXIT_FAILURE,
"invalid mode from F_GETFL: 0x%x", fl);
}
req->fp[i] = fdopen(fd, mode);
if (!req->fp[i])
err(EXIT_FAILURE, "fdopen(fd=%d)", fd);
}
return true;
}
errx(EXIT_FAILURE, "no FD received in %zd-byte request", r);
return false;
}
static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
{
const struct srch *a = (const struct srch *)pa;
const struct srch *b = (const struct srch *)pb;
int diff = a->paths_len - b->paths_len;
return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len);
}
static bool is_chert(const char *dir)
{
char iamchert[PATH_MAX];
struct stat sb;
int rc = snprintf(iamchert, sizeof(iamchert), "%s/iamchert", dir);
if (rc <= 0 || rc >= (int)sizeof(iamchert))
err(EXIT_FAILURE, "BUG: snprintf(%s/iamchert)", dir);
if (stat(iamchert, &sb) == 0 && S_ISREG(sb.st_mode))
return true;
return false;
}
static bool srch_init(struct req *req)
{
char *dirv[MY_ARG_MAX];
int i;
struct srch *srch = req->srch;
int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
srch->qp_flags = FLAG_PHRASE |
Xapian::QueryParser::FLAG_BOOLEAN |
Xapian::QueryParser::FLAG_LOVEHATE |
Xapian::QueryParser::FLAG_WILDCARD;
if (is_chert(dirv[0]))
srch->qp_flags &= ~FLAG_PHRASE;
try {
srch->db = new Xapian::Database(dirv[0]);
} catch (...) {
warn("E: Xapian::Database(%s)", dirv[0]);
return false;
}
try {
for (i = 1; i < dirc; i++) {
if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i]))
srch->qp_flags &= ~FLAG_PHRASE;
srch->db->add_database(Xapian::Database(dirv[i]));
}
} catch (...) {
warn("E: add_database(%s)", dirv[i]);
return false;
}
try {
srch->qp = new Xapian::QueryParser;
} catch (...) {
perror("E: Xapian::QueryParser");
return false;
}
srch->qp->set_default_op(Xapian::Query::OP_AND);
srch->qp->set_database(*srch->db);
try {
srch->qp->set_stemmer(Xapian::Stem("english"));
} catch (...) {
perror("E: Xapian::Stem");
return false;
}
srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME);
srch->qp->SET_MAX_EXPANSION(100);
if (req->code_search)
qp_init_code_search(srch->qp); // CodeSearch.pm
else
qp_init_mail_search(srch->qp); // Search.pm
return true;
}
// setup query parser for altid and arbitrary headers
static void srch_init_extra(struct req *req)
{
const char *XPFX;
for (int i = 0; i < req->qpfxc; i++) {
size_t len = strlen(req->qpfxv[i]);
char *c = (char *)memchr(req->qpfxv[i], '=', len);
if (c) { // it's boolean "gmane=XGMANE"
XPFX = c + 1;
*c = 0;
req->srch->qp->add_boolean_prefix(req->qpfxv[i], XPFX);
continue;
}
// maybe it's a non-boolean prefix "blob:XBLOBID"
c = (char *)memchr(req->qpfxv[i], ':', len);
if (!c)
errx(EXIT_FAILURE, "bad -Q %s", req->qpfxv[i]);
XPFX = c + 1;
*c = 0;
req->srch->qp->add_prefix(req->qpfxv[i], XPFX);
}
req->srch->qp_extra_done = true;
}
static void free_srch(void *p) // tdestroy
{
struct srch *srch = (struct srch *)p;
delete srch->qp;
delete srch->db;
free(srch);
}
static void dispatch(struct req *req)
{
int c;
size_t size = strlen(req->argv[0]);
union {
struct srch *srch;
char *ptr;
} kbuf;
char *end;
FILE *kfp;
struct srch **s;
req->threadid = ULLONG_MAX;
for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) {
if (cmds[c].fn_len == size &&
!memcmp(cmds[c].fn_name, req->argv[0], size)) {
req->fn = cmds[c].fn;
break;
}
}
if (!req->fn) ABORT("not handled: `%s'", req->argv[0]);
kfp = open_memstream(&kbuf.ptr, &size);
if (!kfp) err(EXIT_FAILURE, "open_memstream(kbuf)");
// write padding, first (contents don't matter)
fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp);
// global getopt variables:
optopt = 0;
optarg = NULL;
MY_DO_OPTRESET();
// XH_SPEC is generated from @PublicInbox::Search::XH_SPEC
while ((c = getopt(req->argc, req->argv, XH_SPEC)) != -1) {
switch (c) {
case 'a': req->asc = true; break;
case 'c': req->code_search = true; break;
case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break;
case 'g': req->Pgit_dir = optarg - 1; break; // pad "P" prefix
case 'k':
req->sort_col = strtol(optarg, &end, 10);
if (*end) ABORT("-k %s", optarg);
switch (req->sort_col) {
case LONG_MAX: case LONG_MIN: ABORT("-k %s", optarg);
}
break;
case 'm':
req->max = strtoull(optarg, &end, 10);
if (*end || req->max == ULLONG_MAX)
ABORT("-m %s", optarg);
break;
case 'o':
req->off = strtoull(optarg, &end, 10);
if (*end || req->off == ULLONG_MAX)
ABORT("-o %s", optarg);
break;
case 'r': req->relevance = true; break;
case 't': req->collapse_threads = true; break;
case 'A':
req->pfxv[req->pfxc++] = optarg;
if (MY_ARG_MAX == req->pfxc)
ABORT("too many -A");
break;
case 'K':
req->timeout_sec = strtoul(optarg, &end, 10);
if (*end || req->timeout_sec == ULONG_MAX)
ABORT("-K %s", optarg);
break;
case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix
case 'T':
req->threadid = strtoull(optarg, &end, 10);
if (*end || req->threadid == ULLONG_MAX)
ABORT("-T %s", optarg);
break;
case 'Q':
req->qpfxv[req->qpfxc++] = optarg;
if (MY_ARG_MAX == req->qpfxc) ABORT("too many -Q");
break;
default: ABORT("bad switch `-%c'", c);
}
}
ERR_CLOSE(kfp, EXIT_FAILURE); // may ENOMEM, sets kbuf.srch
kbuf.srch->db = NULL;
kbuf.srch->qp = NULL;
kbuf.srch->qp_extra_done = false;
kbuf.srch->paths_len = size - offsetof(struct srch, paths);
if (kbuf.srch->paths_len <= 0)
ABORT("no -d args");
s = (struct srch **)tsearch(kbuf.srch, &srch_tree, srch_cmp);
if (!s) err(EXIT_FAILURE, "tsearch"); // likely ENOMEM
req->srch = *s;
if (req->srch != kbuf.srch) { // reuse existing
free_srch(kbuf.srch);
} else if (!srch_init(req)) {
assert(kbuf.srch == *((struct srch **)tfind(
kbuf.srch, &srch_tree, srch_cmp)));
void *del = tdelete(kbuf.srch, &srch_tree, srch_cmp);
assert(del);
free_srch(kbuf.srch);
goto cmd_err; // srch_init already warned
}
if (req->qpfxc && !req->srch->qp_extra_done)
srch_init_extra(req);
if (req->timeout_sec)
alarm(req->timeout_sec > UINT_MAX ?
UINT_MAX : (unsigned)req->timeout_sec);
try {
if (!req->fn(req))
warnx("`%s' failed", req->argv[0]);
} catch (const Xapian::Error & e) {
warnx("Xapian::Error: %s", e.get_description().c_str());
} catch (...) {
warn("unhandled exception");
}
if (req->timeout_sec)
alarm(0);
cmd_err:
return; // just be silent on errors, for now
}
static void cleanup_pids(void)
{
free(worker_pids);
worker_pids = NULL;
}
static void stderr_set(FILE *tmp_err)
{
#if STDERR_ASSIGNABLE
if (my_setlinebuf(tmp_err))
perror("W: setlinebuf(tmp_err)");
stderr = tmp_err;
return;
#endif
int fd = fileno(tmp_err);
if (fd < 0) err(EXIT_FAILURE, "BUG: fileno(tmp_err)");
while (dup2(fd, STDERR_FILENO) < 0) {
if (errno != EINTR)
err(EXIT_FAILURE, "dup2(%d => 2)", fd);
}
}
static void stderr_restore(FILE *tmp_err)
{
#if STDERR_ASSIGNABLE
stderr = orig_err;
return;
#endif
ERR_FLUSH(stderr);
while (dup2(orig_err_fd, STDERR_FILENO) < 0) {
if (errno != EINTR)
err(EXIT_FAILURE, "dup2(%d => 2)", orig_err_fd);
}
clearerr(stderr);
}
static void sigw(int sig) // SIGTERM+SIGUSR1 handler for worker
{
switch (sig) {
case SIGUSR1: worker_needs_reopen = 1; break;
default: sock_fd = -1; // break out of recv_loop
}
}
#define CLEANUP_REQ __attribute__((__cleanup__(req_cleanup)))
static void req_cleanup(void *ptr)
{
struct req *req = (struct req *)ptr;
free(req->lenv);
}
static void reopen_logs(void)
{
if (stdout_path && *stdout_path && !freopen(stdout_path, "a", stdout))
err(EXIT_FAILURE, "freopen %s", stdout_path);
if (stderr_path && *stderr_path) {
if (!freopen(stderr_path, "a", stderr))
err(EXIT_FAILURE, "freopen %s", stderr_path);
if (my_setlinebuf(stderr))
err(EXIT_FAILURE, "setlinebuf(stderr)");
}
}
static void recv_loop(void) // worker process loop
{
static char rbuf[4096 * 33]; // per-process
struct sigaction sa = {};
sa.sa_handler = sigw;
CHECK(int, 0, sigaction(SIGTERM, &sa, NULL));
CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL));
while (sock_fd == 0) {
size_t len = sizeof(rbuf);
CLEANUP_REQ struct req req = {};
if (!recv_req(&req, rbuf, &len))
continue;
if (req.fp[1])
stderr_set(req.fp[1]);
req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
dispatch(&req);
ERR_CLOSE(req.fp[0], 0);
if (req.fp[1]) {
stderr_restore(req.fp[1]);
ERR_CLOSE(req.fp[1], 0);
}
if (worker_needs_reopen) {
worker_needs_reopen = 0;
reopen_logs();
}
}
}
static void insert_pid(pid_t pid, unsigned nr)
{
assert(!worker_pids[nr]);
worker_pids[nr] = pid;
}
static void start_worker(unsigned nr)
{
pid_t pid = fork();
if (pid < 0) {
warn("E: fork(worker=%u)", nr);
} else if (pid > 0) {
insert_pid(pid, nr);
} else {
cleanup_pids();
xclose(pipefds[0]);
xclose(pipefds[1]);
if (signal(SIGCHLD, SIG_DFL) == SIG_ERR)
err(EXIT_FAILURE, "signal CHLD");
if (signal(SIGTTIN, SIG_IGN) == SIG_ERR)
err(EXIT_FAILURE, "signal TTIN");
if (signal(SIGTTOU, SIG_IGN) == SIG_ERR)
err(EXIT_FAILURE, "signal TTIN");
recv_loop();
exit(0);
}
}
static void start_workers(void)
{
sigset_t old;
CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, &old));
for (unsigned long nr = 0; nr < nworker; nr++) {
if (!worker_pids[nr])
start_worker(nr);
}
CHECK(int, 0, sigprocmask(SIG_SETMASK, &old, NULL));
}
static void cleanup_all(void)
{
cleanup_pids();
#ifdef __GLIBC__
tdestroy(srch_tree, free_srch);
srch_tree = NULL;
#endif
}
static void parent_reopen_logs(void)
{
reopen_logs();
for (unsigned long nr = nworker; nr < nworker_hwm; nr++) {
pid_t pid = worker_pids[nr];
if (pid != 0 && kill(pid, SIGUSR1))
warn("BUG?: kill(%d, SIGUSR1)", (int)pid);
}
}
static void sigp(int sig) // parent signal handler
{
static const char eagain[] = "signals coming in too fast";
static const char bad_sig[] = "BUG: bad sig\n";
static const char write_errno[] = "BUG: sigp write (errno)";
static const char write_zero[] = "BUG: sigp write wrote zero bytes";
char c = 0;
switch (sig) {
case SIGCHLD: c = '.'; break;
case SIGTTOU: c = '-'; break;
case SIGTTIN: c = '+'; break;
case SIGUSR1: c = '#'; break;
default:
write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1);
_exit(EXIT_FAILURE);
}
ssize_t w = write(pipefds[1], &c, 1);
if (w > 0) return;
if (w < 0 && errno == EAGAIN) {
write(STDERR_FILENO, eagain, sizeof(eagain) - 1);
return;
} else if (w == 0) {
write(STDERR_FILENO, write_zero, sizeof(write_zero) - 1);
} else {
// strerror isn't technically async-signal-safe, and
// strerrordesc_np+strerrorname_np isn't portable
write(STDERR_FILENO, write_errno, sizeof(write_errno) - 1);
}
_exit(EXIT_FAILURE);
}
static void reaped_worker(pid_t pid, int st)
{
unsigned long nr = 0;
for (; nr < nworker_hwm; nr++) {
if (worker_pids[nr] == pid) {
worker_pids[nr] = 0;
break;
}
}
if (nr >= nworker_hwm) {
warnx("W: unknown pid=%d reaped $?=%d", (int)pid, st);
return;
}
if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
alive = false;
else if (st)
warnx("worker[%lu] died $?=%d alive=%d", nr, st, (int)alive);
if (alive)
start_workers();
}
static void do_sigchld(void)
{
while (1) {
int st;
pid_t pid = waitpid(-1, &st, WNOHANG);
if (pid > 0) {
reaped_worker(pid, st);
} else if (pid == 0) {
return;
} else {
switch (errno) {
case ECHILD: return;
case EINTR: break; // can it happen w/ WNOHANG?
default: err(EXIT_FAILURE, "BUG: waitpid");
}
}
}
}
static void do_sigttin(void)
{
if (!alive) return;
if (nworker >= WORKER_MAX) {
warnx("workers cannot exceed %zu", (size_t)WORKER_MAX);
return;
}
void *p = realloc(worker_pids, (nworker + 1) * sizeof(pid_t));
if (!p) {
warn("realloc worker_pids");
} else {
worker_pids = (pid_t *)p;
worker_pids[nworker++] = 0;
if (nworker_hwm < nworker)
nworker_hwm = nworker;
start_workers();
}
}
static void do_sigttou(void)
{
if (!alive || nworker <= 1) return;
// worker_pids array does not shrink
--nworker;
for (unsigned long nr = nworker; nr < nworker_hwm; nr++) {
pid_t pid = worker_pids[nr];
if (pid != 0 && kill(pid, SIGTERM))
warn("BUG?: kill(%d, SIGTERM)", (int)pid);
}
}
static size_t living_workers(void)
{
size_t ret = 0;
for (unsigned long nr = 0; nr < nworker_hwm; nr++) {
if (worker_pids[nr])
ret++;
}
return ret;
}
int main(int argc, char *argv[])
{
int c;
socklen_t slen = (socklen_t)sizeof(c);
stdout_path = getenv("STDOUT_PATH");
stderr_path = getenv("STDERR_PATH");
if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen))
err(EXIT_FAILURE, "getsockopt");
if (c != SOCK_SEQPACKET)
errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET");
mail_nrp_init();
code_nrp_init();
atexit(cleanup_all);
if (!STDERR_ASSIGNABLE) {
orig_err_fd = dup(STDERR_FILENO);
if (orig_err_fd < 0)
err(EXIT_FAILURE, "dup(2)");
}
nworker = 1;
// make warn/warnx/err multi-process friendly:
if (my_setlinebuf(stderr))
err(EXIT_FAILURE, "setlinebuf(stderr)");
// not using -W like Daemon.pm, since -W is reserved (glibc)
while ((c = getopt(argc, argv, "j:")) != -1) {
char *end;
switch (c) {
case 'j':
nworker = strtoul(optarg, &end, 10);
if (*end != 0 || nworker > WORKER_MAX)
errx(EXIT_FAILURE, "-j %s invalid", optarg);
break;
case ':':
errx(EXIT_FAILURE, "missing argument: `-%c'", optopt);
case '?':
errx(EXIT_FAILURE, "unrecognized: `-%c'", optopt);
default:
errx(EXIT_FAILURE, "BUG: `-%c'", c);
}
}
sigset_t pset; // parent-only
CHECK(int, 0, sigfillset(&pset));
// global sigsets:
CHECK(int, 0, sigfillset(&fullset));
CHECK(int, 0, sigfillset(&workerset));
#define DELSET(sig) do { \
CHECK(int, 0, sigdelset(&fullset, sig)); \
CHECK(int, 0, sigdelset(&workerset, sig)); \
CHECK(int, 0, sigdelset(&pset, sig)); \
} while (0)
DELSET(SIGABRT);
DELSET(SIGBUS);
DELSET(SIGFPE);
DELSET(SIGILL);
DELSET(SIGSEGV);
DELSET(SIGXCPU);
DELSET(SIGXFSZ);
#undef DELSET
CHECK(int, 0, sigdelset(&workerset, SIGUSR1));
CHECK(int, 0, sigdelset(&fullset, SIGALRM));
if (nworker == 0) { // no SIGTERM handling w/o workers
recv_loop();
return 0;
}
CHECK(int, 0, sigdelset(&workerset, SIGTERM));
CHECK(int, 0, sigdelset(&workerset, SIGCHLD));
nworker_hwm = nworker;
worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
if (!worker_pids) err(EXIT_FAILURE, "calloc");
if (pipe(pipefds)) err(EXIT_FAILURE, "pipe");
int fl = fcntl(pipefds[1], F_GETFL);
if (fl == -1) err(EXIT_FAILURE, "F_GETFL");
if (fcntl(pipefds[1], F_SETFL, fl | O_NONBLOCK))
err(EXIT_FAILURE, "F_SETFL");
CHECK(int, 0, sigdelset(&pset, SIGCHLD));
CHECK(int, 0, sigdelset(&pset, SIGTTIN));
CHECK(int, 0, sigdelset(&pset, SIGTTOU));
CHECK(int, 0, sigdelset(&pset, SIGUSR1));
struct sigaction sa = {};
sa.sa_handler = sigp;
CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL));
CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL));
CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL));
sa.sa_flags = SA_NOCLDSTOP;
CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL));
CHECK(int, 0, sigprocmask(SIG_SETMASK, &pset, NULL));
start_workers();
char sbuf[64];
while (alive || living_workers()) {
ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf));
if (n < 0) {
if (errno == EINTR) continue;
err(EXIT_FAILURE, "read");
} else if (n == 0) {
errx(EXIT_FAILURE, "read EOF");
}
do_sigchld();
for (ssize_t i = 0; i < n; i++) {
switch (sbuf[i]) {
case '.': break; // do_sigchld already called
case '-': do_sigttou(); break;
case '+': do_sigttin(); break;
case '#': parent_reopen_logs(); break;
default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
}
}
}
return 0;
}