diff options
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
-rw-r--r-- | lib/PublicInbox/xap_helper.h | 372 |
1 files changed, 256 insertions, 116 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 3456910b..51ab48bf 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -7,7 +7,7 @@ * this is not linked to Perl in any way. * C (not C++) is used as much as possible to lower the contribution * barrier for hackers who mainly know C (this includes the maintainer). - * Yes, that means we use C stdlib stuff like hsearch and open_memstream + * Yes, that means we use C stdlib stuff like open_memstream * instead their equivalents in the C++ stdlib :P * Everything here is an unstable internal API of public-inbox and * NOT intended for ordinary users; only public-inbox hackers @@ -15,6 +15,9 @@ #ifndef _ALL_SOURCE # define _ALL_SOURCE #endif +#ifndef _GNU_SOURCE +# define _GNU_SOURCE +#endif #if defined(__NetBSD__) && !defined(_OPENBSD_SOURCE) // for reallocarray(3) # define _OPENBSD_SOURCE #endif @@ -27,13 +30,13 @@ #include <sys/types.h> #include <sys/uio.h> #include <sys/wait.h> +#include <poll.h> #include <assert.h> #include <err.h> // BSD, glibc, and musl all have this #include <errno.h> #include <fcntl.h> #include <limits.h> -#include <search.h> #include <signal.h> #include <stddef.h> #include <stdint.h> @@ -82,6 +85,62 @@ #define ABORT(...) do { warnx(__VA_ARGS__); abort(); } while (0) #define EABORT(...) do { warn(__VA_ARGS__); abort(); } while (0) +static void *xcalloc(size_t nmemb, size_t size) +{ + void *ret = calloc(nmemb, size); + if (!ret) EABORT("calloc(%zu, %zu)", nmemb, size); + return ret; +} + +#if defined(__GLIBC__) && defined(__GLIBC_MINOR__) && \ + MY_VER(__GLIBC__, __GLIBC_MINOR__, 0) >= MY_VER(2, 28, 0) +# define HAVE_REALLOCARRAY 1 +#elif defined(__OpenBSD__) || defined(__DragonFly__) || \ + defined(__FreeBSD__) || defined(__NetBSD__) +# define HAVE_REALLOCARRAY 1 +#endif + +static void *xreallocarray(void *ptr, size_t nmemb, size_t size) +{ +#ifdef HAVE_REALLOCARRAY + void *ret = reallocarray(ptr, nmemb, size); +#else // can't rely on __builtin_mul_overflow in gcc 4.x :< + void *ret = NULL; + if (nmemb && size > SIZE_MAX / nmemb) + errno = ENOMEM; + else + ret = realloc(ptr, nmemb * size); +#endif + if (!ret) EABORT("reallocarray(..., %zu, %zu)", nmemb, size); + return ret; +} + +#include "khashl.h" + +struct srch { + int ckey_len; // int for comparisons + unsigned qp_flags; + bool qp_extra_done; + Xapian::Database *db; + Xapian::QueryParser *qp; + unsigned char ckey[]; // $shard_path0\0$shard_path1\0... +}; + +static khint_t srch_hash(const struct srch *srch) +{ + return kh_hash_bytes(srch->ckey_len, srch->ckey); +} + +static int srch_eq(const struct srch *a, const struct srch *b) +{ + return a->ckey_len == b->ckey_len ? + !memcmp(a->ckey, b->ckey, (size_t)a->ckey_len) : 0; +} + +KHASHL_CSET_INIT(KH_LOCAL, srch_set, srch_set, struct srch *, + srch_hash, srch_eq) +static srch_set *srch_cache; +static long my_fd_max, shard_nfd; // sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET static volatile int sock_fd = STDIN_FILENO; static sigset_t fullset, workerset; @@ -90,11 +149,12 @@ static bool alive = true; static FILE *orig_err = stderr; #endif static int orig_err_fd = -1; -static void *srch_tree; // tsearch + tdelete + twalk static pid_t *worker_pids; // nr => pid #define WORKER_MAX USHRT_MAX static unsigned long nworker, nworker_hwm; static int pipefds[2]; +static const char *stdout_path, *stderr_path; // for SIGUSR1 +static sig_atomic_t worker_needs_reopen; // PublicInbox::Search and PublicInbox::CodeSearch generate these: static void mail_nrp_init(void); @@ -108,14 +168,6 @@ enum exc_iter { ITER_ABORT }; -struct srch { - int paths_len; // int for comparisons - unsigned qp_flags; - Xapian::Database *db; - Xapian::QueryParser *qp; - char paths[]; // $shard_path0\0$shard_path1\0... -}; - #define MY_ARG_MAX 256 typedef bool (*cmd)(struct req *); @@ -123,6 +175,8 @@ typedef bool (*cmd)(struct req *); struct req { // argv and pfxv point into global rbuf char *argv[MY_ARG_MAX]; char *pfxv[MY_ARG_MAX]; // -A <prefix> + char *qpfxv[MY_ARG_MAX]; // -Q <user_prefix>[:=]<INTERNAL_PREFIX> + char *dirv[MY_ARG_MAX]; // -d /path/to/XDB(shard) size_t *lenv; // -A <prefix>LENGTH struct srch *srch; char *Pgit_dir; @@ -134,15 +188,12 @@ struct req { // argv and pfxv point into global rbuf unsigned long timeout_sec; size_t nr_out; long sort_col; // value column, negative means BoolWeight - int argc; - int pfxc; + int argc, pfxc, qpfxc, dirc; FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional) bool has_input; // fp[0] is bidirectional bool collapse_threads; bool code_search; bool relevance; // sort by relevance before column - bool emit_percent; - bool emit_docdata; bool asc; // ascending sort }; @@ -226,6 +277,13 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str) qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, Xapian::Query(req->Oeidx_key)); } + // TODO: uid_range + if (req->threadid != ULLONG_MAX) { + std::string tid = Xapian::sortable_serialise(req->threadid); + qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, + Xapian::Query(Xapian::Query::OP_VALUE_RANGE, THREADID, + tid, tid)); + } Xapian::Enquire enq = prep_enquire(req); enq.set_query(qry); // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm @@ -364,25 +422,6 @@ static size_t off2size(off_t n) return (size_t)n; } -static char *hsearch_enter_key(char *s) -{ -#if defined(__OpenBSD__) || defined(__DragonFly__) - // hdestroy frees each key on some platforms, - // so give it something to free: - char *ret = strdup(s); - if (!ret) err(EXIT_FAILURE, "strdup"); - return ret; -// AFAIK there's no way to detect musl, assume non-glibc Linux is musl: -#elif defined(__GLIBC__) || defined(__linux__) || \ - defined(__FreeBSD__) || defined(__NetBSD__) - // do nothing on these platforms -#else -#warning untested platform detected, unsure if hdestroy(3) frees keys -#warning contact us at meta@public-inbox.org if you get segfaults -#endif - return s; -} - // for test usage only, we need to ensure the compiler supports // __cleanup__ when exceptions are thrown struct inspect { struct req *req; }; @@ -406,6 +445,11 @@ static bool cmd_test_inspect(struct req *req) return false; } +static bool cmd_test_sleep(struct req *req) +{ + for (;;) poll(NULL, 0, 10); + return false; +} #include "xh_mset.h" // read-only (WWW, IMAP, lei) stuff #include "xh_cidx.h" // CodeSearchIdx.pm stuff @@ -420,6 +464,7 @@ static const struct cmd_entry { CMD(dump_ibx), // many inboxes CMD(dump_roots), // per-cidx shard CMD(test_inspect), // least common commands last + CMD(test_sleep), // least common commands last }; #define MY_ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0])) @@ -495,15 +540,6 @@ again: return false; } -static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch -{ - const struct srch *a = (const struct srch *)pa; - const struct srch *b = (const struct srch *)pb; - int diff = a->paths_len - b->paths_len; - - return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len); -} - static bool is_chert(const char *dir) { char iamchert[PATH_MAX]; @@ -517,49 +553,85 @@ static bool is_chert(const char *dir) return false; } -static bool srch_init(struct req *req) +static void srch_free(struct srch *srch) +{ + delete srch->qp; + delete srch->db; + free(srch); +} + +static void srch_cache_renew(struct srch *keep) +{ + khint_t k; + + // can't delete while iterating, so just free each + clear + for (k = kh_begin(srch_cache); k != kh_end(srch_cache); k++) { + if (!kh_exist(srch_cache, k)) continue; + struct srch *cur = kh_key(srch_cache, k); + + if (cur != keep) + srch_free(cur); + } + srch_set_cs_clear(srch_cache); + if (keep) { + int absent; + k = srch_set_put(srch_cache, keep, &absent); + assert(absent); + assert(k < kh_end(srch_cache)); + } +} + +static void srch_init(struct req *req) { - char *dirv[MY_ARG_MAX]; int i; struct srch *srch = req->srch; - int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len); const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE; - srch->qp_flags = FLAG_PHRASE | - Xapian::QueryParser::FLAG_BOOLEAN | + srch->qp_flags = Xapian::QueryParser::FLAG_BOOLEAN | Xapian::QueryParser::FLAG_LOVEHATE | Xapian::QueryParser::FLAG_WILDCARD; - if (is_chert(dirv[0])) - srch->qp_flags &= ~FLAG_PHRASE; - try { - srch->db = new Xapian::Database(dirv[0]); - } catch (...) { - warn("E: Xapian::Database(%s)", dirv[0]); - return false; + long nfd = req->dirc * SHARD_COST; + + shard_nfd += nfd; + if (shard_nfd > my_fd_max) { + srch_cache_renew(srch); + shard_nfd = nfd; } - try { - for (i = 1; i < dirc; i++) { - if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i])) + for (int retried = 0; retried < 2; retried++) { + srch->qp_flags |= FLAG_PHRASE; + i = 0; + try { + srch->db = new Xapian::Database(req->dirv[i]); + if (is_chert(req->dirv[0])) srch->qp_flags &= ~FLAG_PHRASE; - srch->db->add_database(Xapian::Database(dirv[i])); + for (i = 1; i < req->dirc; i++) { + const char *dir = req->dirv[i]; + if (srch->qp_flags & FLAG_PHRASE && + is_chert(dir)) + srch->qp_flags &= ~FLAG_PHRASE; + srch->db->add_database(Xapian::Database(dir)); + } + break; + } catch (const Xapian::Error & e) { + warnx("E: Xapian::Error: %s (%s)", + e.get_description().c_str(), req->dirv[i]); + } catch (...) { // does this happen? + warn("E: add_database(%s)", req->dirv[i]); + } + if (retried) { + errx(EXIT_FAILURE, "E: can't open %s", req->dirv[i]); + } else { + warnx("retrying..."); + if (srch->db) + delete srch->db; + srch->db = NULL; + srch_cache_renew(srch); } - } catch (...) { - warn("E: add_database(%s)", dirv[i]); - return false; - } - try { - srch->qp = new Xapian::QueryParser; - } catch (...) { - perror("E: Xapian::QueryParser"); - return false; } + // these will raise and die on ENOMEM or other errors + srch->qp = new Xapian::QueryParser; srch->qp->set_default_op(Xapian::Query::OP_AND); srch->qp->set_database(*srch->db); - try { - srch->qp->set_stemmer(Xapian::Stem("english")); - } catch (...) { - perror("E: Xapian::Stem"); - return false; - } + srch->qp->set_stemmer(Xapian::Stem("english")); srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME); srch->qp->SET_MAX_EXPANSION(100); @@ -567,15 +639,31 @@ static bool srch_init(struct req *req) qp_init_code_search(srch->qp); // CodeSearch.pm else qp_init_mail_search(srch->qp); // Search.pm - return true; } -static void free_srch(void *p) // tdestroy +// setup query parser for altid and arbitrary headers +static void srch_init_extra(struct req *req) { - struct srch *srch = (struct srch *)p; - delete srch->qp; - delete srch->db; - free(srch); + const char *XPFX; + for (int i = 0; i < req->qpfxc; i++) { + size_t len = strlen(req->qpfxv[i]); + char *c = (char *)memchr(req->qpfxv[i], '=', len); + + if (c) { // it's boolean "gmane=XGMANE" + XPFX = c + 1; + *c = 0; + req->srch->qp->add_boolean_prefix(req->qpfxv[i], XPFX); + continue; + } + // maybe it's a non-boolean prefix "blob:XBLOBID" + c = (char *)memchr(req->qpfxv[i], ':', len); + if (!c) + errx(EXIT_FAILURE, "bad -Q %s", req->qpfxv[i]); + XPFX = c + 1; + *c = 0; + req->srch->qp->add_prefix(req->qpfxv[i], XPFX); + } + req->srch->qp_extra_done = true; } static void dispatch(struct req *req) @@ -588,7 +676,6 @@ static void dispatch(struct req *req) } kbuf; char *end; FILE *kfp; - struct srch **s; req->threadid = ULLONG_MAX; for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) { if (cmds[c].fn_len == size && @@ -602,7 +689,7 @@ static void dispatch(struct req *req) kfp = open_memstream(&kbuf.ptr, &size); if (!kfp) err(EXIT_FAILURE, "open_memstream(kbuf)"); // write padding, first (contents don't matter) - fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp); + fwrite(&req->argv[0], offsetof(struct srch, ckey), 1, kfp); // global getopt variables: optopt = 0; @@ -614,7 +701,11 @@ static void dispatch(struct req *req) switch (c) { case 'a': req->asc = true; break; case 'c': req->code_search = true; break; - case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break; + case 'd': + req->dirv[req->dirc++] = optarg; + if (MY_ARG_MAX == req->dirc) ABORT("too many -d"); + fprintf(kfp, "-d%c%s%c", 0, optarg, 0); + break; case 'g': req->Pgit_dir = optarg - 1; break; // pad "P" prefix case 'k': req->sort_col = strtol(optarg, &end, 10); @@ -633,7 +724,6 @@ static void dispatch(struct req *req) if (*end || req->off == ULLONG_MAX) ABORT("-o %s", optarg); break; - case 'p': req->emit_percent = true; break; case 'r': req->relevance = true; break; case 't': req->collapse_threads = true; break; case 'A': @@ -641,7 +731,6 @@ static void dispatch(struct req *req) if (MY_ARG_MAX == req->pfxc) ABORT("too many -A"); break; - case 'D': req->emit_docdata = true; break; case 'K': req->timeout_sec = strtoul(optarg, &end, 10); if (*end || req->timeout_sec == ULONG_MAX) @@ -653,28 +742,38 @@ static void dispatch(struct req *req) if (*end || req->threadid == ULLONG_MAX) ABORT("-T %s", optarg); break; + case 'Q': + req->qpfxv[req->qpfxc++] = optarg; + if (MY_ARG_MAX == req->qpfxc) ABORT("too many -Q"); + fprintf(kfp, "-Q%c%s%c", 0, optarg, 0); + break; default: ABORT("bad switch `-%c'", c); } } ERR_CLOSE(kfp, EXIT_FAILURE); // may ENOMEM, sets kbuf.srch kbuf.srch->db = NULL; kbuf.srch->qp = NULL; - kbuf.srch->paths_len = size - offsetof(struct srch, paths); - if (kbuf.srch->paths_len <= 0) - ABORT("no -d args"); - s = (struct srch **)tsearch(kbuf.srch, &srch_tree, srch_cmp); - if (!s) err(EXIT_FAILURE, "tsearch"); // likely ENOMEM - req->srch = *s; - if (req->srch != kbuf.srch) { // reuse existing - free_srch(kbuf.srch); - } else if (!srch_init(req)) { - assert(kbuf.srch == *((struct srch **)tfind( - kbuf.srch, &srch_tree, srch_cmp))); - void *del = tdelete(kbuf.srch, &srch_tree, srch_cmp); - assert(del); - free_srch(kbuf.srch); - goto cmd_err; // srch_init already warned + kbuf.srch->qp_extra_done = false; + kbuf.srch->ckey_len = size - offsetof(struct srch, ckey); + if (kbuf.srch->ckey_len <= 0 || !req->dirc) + ABORT("no -d args (or too many)"); + + int absent; + khint_t ki = srch_set_put(srch_cache, kbuf.srch, &absent); + assert(ki < kh_end(srch_cache)); + req->srch = kh_key(srch_cache, ki); + if (absent) { + srch_init(req); + } else { + assert(req->srch != kbuf.srch); + srch_free(kbuf.srch); + req->srch->db->reopen(); } + if (req->qpfxc && !req->srch->qp_extra_done) + srch_init_extra(req); + if (req->timeout_sec) + alarm(req->timeout_sec > UINT_MAX ? + UINT_MAX : (unsigned)req->timeout_sec); try { if (!req->fn(req)) warnx("`%s' failed", req->argv[0]); @@ -683,8 +782,8 @@ static void dispatch(struct req *req) } catch (...) { warn("unhandled exception"); } -cmd_err: - return; // just be silent on errors, for now + if (req->timeout_sec) + alarm(0); } static void cleanup_pids(void) @@ -723,9 +822,12 @@ static void stderr_restore(FILE *tmp_err) clearerr(stderr); } -static void sigw(int sig) // SIGTERM handler for worker +static void sigw(int sig) // SIGTERM+SIGUSR1 handler for worker { - sock_fd = -1; // break out of recv_loop + switch (sig) { + case SIGUSR1: worker_needs_reopen = 1; break; + default: sock_fd = -1; // break out of recv_loop + } } #define CLEANUP_REQ __attribute__((__cleanup__(req_cleanup))) @@ -735,6 +837,18 @@ static void req_cleanup(void *ptr) free(req->lenv); } +static void reopen_logs(void) +{ + if (stdout_path && *stdout_path && !freopen(stdout_path, "a", stdout)) + err(EXIT_FAILURE, "freopen %s", stdout_path); + if (stderr_path && *stderr_path) { + if (!freopen(stderr_path, "a", stderr)) + err(EXIT_FAILURE, "freopen %s", stderr_path); + if (my_setlinebuf(stderr)) + err(EXIT_FAILURE, "setlinebuf(stderr)"); + } +} + static void recv_loop(void) // worker process loop { static char rbuf[4096 * 33]; // per-process @@ -742,6 +856,7 @@ static void recv_loop(void) // worker process loop sa.sa_handler = sigw; CHECK(int, 0, sigaction(SIGTERM, &sa, NULL)); + CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL)); while (sock_fd == 0) { size_t len = sizeof(rbuf); @@ -758,6 +873,10 @@ static void recv_loop(void) // worker process loop stderr_restore(req.fp[1]); ERR_CLOSE(req.fp[1], 0); } + if (worker_needs_reopen) { + worker_needs_reopen = 0; + reopen_logs(); + } } } @@ -804,10 +923,21 @@ static void start_workers(void) static void cleanup_all(void) { cleanup_pids(); -#ifdef __GLIBC__ - tdestroy(srch_tree, free_srch); - srch_tree = NULL; -#endif + if (!srch_cache) + return; + srch_cache_renew(NULL); + srch_set_destroy(srch_cache); + srch_cache = NULL; +} + +static void parent_reopen_logs(void) +{ + reopen_logs(); + for (unsigned long nr = nworker; nr < nworker_hwm; nr++) { + pid_t pid = worker_pids[nr]; + if (pid != 0 && kill(pid, SIGUSR1)) + warn("BUG?: kill(%d, SIGUSR1)", (int)pid); + } } static void sigp(int sig) // parent signal handler @@ -822,6 +952,7 @@ static void sigp(int sig) // parent signal handler case SIGCHLD: c = '.'; break; case SIGTTOU: c = '-'; break; case SIGTTIN: c = '+'; break; + case SIGUSR1: c = '#'; break; default: write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1); _exit(EXIT_FAILURE); @@ -928,14 +1059,25 @@ int main(int argc, char *argv[]) { int c; socklen_t slen = (socklen_t)sizeof(c); + stdout_path = getenv("STDOUT_PATH"); + stderr_path = getenv("STDERR_PATH"); + struct rlimit rl; if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen)) err(EXIT_FAILURE, "getsockopt"); if (c != SOCK_SEQPACKET) errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET"); + if (getrlimit(RLIMIT_NOFILE, &rl)) + err(EXIT_FAILURE, "getrlimit"); + my_fd_max = rl.rlim_cur; + if (my_fd_max < 72) + warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max); + my_fd_max -= 64; + mail_nrp_init(); code_nrp_init(); + srch_cache = srch_set_init(); atexit(cleanup_all); if (!STDERR_ASSIGNABLE) { @@ -945,12 +1087,6 @@ int main(int argc, char *argv[]) } nworker = 1; -#ifdef _SC_NPROCESSORS_ONLN - long j = sysconf(_SC_NPROCESSORS_ONLN); - if (j > 0) - nworker = j > WORKER_MAX ? WORKER_MAX : j; -#endif // _SC_NPROCESSORS_ONLN - // make warn/warnx/err multi-process friendly: if (my_setlinebuf(stderr)) err(EXIT_FAILURE, "setlinebuf(stderr)"); @@ -992,6 +1128,8 @@ int main(int argc, char *argv[]) DELSET(SIGXCPU); DELSET(SIGXFSZ); #undef DELSET + CHECK(int, 0, sigdelset(&workerset, SIGUSR1)); + CHECK(int, 0, sigdelset(&fullset, SIGALRM)); if (nworker == 0) { // no SIGTERM handling w/o workers recv_loop(); @@ -1000,8 +1138,7 @@ int main(int argc, char *argv[]) CHECK(int, 0, sigdelset(&workerset, SIGTERM)); CHECK(int, 0, sigdelset(&workerset, SIGCHLD)); nworker_hwm = nworker; - worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t)); - if (!worker_pids) err(EXIT_FAILURE, "calloc"); + worker_pids = (pid_t *)xcalloc(nworker, sizeof(pid_t)); if (pipe(pipefds)) err(EXIT_FAILURE, "pipe"); int fl = fcntl(pipefds[1], F_GETFL); @@ -1012,10 +1149,12 @@ int main(int argc, char *argv[]) CHECK(int, 0, sigdelset(&pset, SIGCHLD)); CHECK(int, 0, sigdelset(&pset, SIGTTIN)); CHECK(int, 0, sigdelset(&pset, SIGTTOU)); + CHECK(int, 0, sigdelset(&pset, SIGUSR1)); struct sigaction sa = {}; sa.sa_handler = sigp; + CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL)); CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL)); CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL)); sa.sa_flags = SA_NOCLDSTOP; @@ -1040,6 +1179,7 @@ int main(int argc, char *argv[]) case '.': break; // do_sigchld already called case '-': do_sigttou(); break; case '+': do_sigttin(); break; + case '#': parent_reopen_logs(); break; default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]); } } |