diff options
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
-rw-r--r-- | lib/PublicInbox/xap_helper.h | 273 |
1 files changed, 164 insertions, 109 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index a30a8768..51ab48bf 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -7,7 +7,7 @@ * this is not linked to Perl in any way. * C (not C++) is used as much as possible to lower the contribution * barrier for hackers who mainly know C (this includes the maintainer). - * Yes, that means we use C stdlib stuff like hsearch and open_memstream + * Yes, that means we use C stdlib stuff like open_memstream * instead their equivalents in the C++ stdlib :P * Everything here is an unstable internal API of public-inbox and * NOT intended for ordinary users; only public-inbox hackers @@ -15,6 +15,9 @@ #ifndef _ALL_SOURCE # define _ALL_SOURCE #endif +#ifndef _GNU_SOURCE +# define _GNU_SOURCE +#endif #if defined(__NetBSD__) && !defined(_OPENBSD_SOURCE) // for reallocarray(3) # define _OPENBSD_SOURCE #endif @@ -34,7 +37,6 @@ #include <errno.h> #include <fcntl.h> #include <limits.h> -#include <search.h> #include <signal.h> #include <stddef.h> #include <stdint.h> @@ -83,6 +85,62 @@ #define ABORT(...) do { warnx(__VA_ARGS__); abort(); } while (0) #define EABORT(...) do { warn(__VA_ARGS__); abort(); } while (0) +static void *xcalloc(size_t nmemb, size_t size) +{ + void *ret = calloc(nmemb, size); + if (!ret) EABORT("calloc(%zu, %zu)", nmemb, size); + return ret; +} + +#if defined(__GLIBC__) && defined(__GLIBC_MINOR__) && \ + MY_VER(__GLIBC__, __GLIBC_MINOR__, 0) >= MY_VER(2, 28, 0) +# define HAVE_REALLOCARRAY 1 +#elif defined(__OpenBSD__) || defined(__DragonFly__) || \ + defined(__FreeBSD__) || defined(__NetBSD__) +# define HAVE_REALLOCARRAY 1 +#endif + +static void *xreallocarray(void *ptr, size_t nmemb, size_t size) +{ +#ifdef HAVE_REALLOCARRAY + void *ret = reallocarray(ptr, nmemb, size); +#else // can't rely on __builtin_mul_overflow in gcc 4.x :< + void *ret = NULL; + if (nmemb && size > SIZE_MAX / nmemb) + errno = ENOMEM; + else + ret = realloc(ptr, nmemb * size); +#endif + if (!ret) EABORT("reallocarray(..., %zu, %zu)", nmemb, size); + return ret; +} + +#include "khashl.h" + +struct srch { + int ckey_len; // int for comparisons + unsigned qp_flags; + bool qp_extra_done; + Xapian::Database *db; + Xapian::QueryParser *qp; + unsigned char ckey[]; // $shard_path0\0$shard_path1\0... +}; + +static khint_t srch_hash(const struct srch *srch) +{ + return kh_hash_bytes(srch->ckey_len, srch->ckey); +} + +static int srch_eq(const struct srch *a, const struct srch *b) +{ + return a->ckey_len == b->ckey_len ? + !memcmp(a->ckey, b->ckey, (size_t)a->ckey_len) : 0; +} + +KHASHL_CSET_INIT(KH_LOCAL, srch_set, srch_set, struct srch *, + srch_hash, srch_eq) +static srch_set *srch_cache; +static long my_fd_max, shard_nfd; // sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET static volatile int sock_fd = STDIN_FILENO; static sigset_t fullset, workerset; @@ -91,7 +149,6 @@ static bool alive = true; static FILE *orig_err = stderr; #endif static int orig_err_fd = -1; -static void *srch_tree; // tsearch + tdelete + twalk static pid_t *worker_pids; // nr => pid #define WORKER_MAX USHRT_MAX static unsigned long nworker, nworker_hwm; @@ -111,15 +168,6 @@ enum exc_iter { ITER_ABORT }; -struct srch { - int paths_len; // int for comparisons - unsigned qp_flags; - bool qp_extra_done; - Xapian::Database *db; - Xapian::QueryParser *qp; - char paths[]; // $shard_path0\0$shard_path1\0... -}; - #define MY_ARG_MAX 256 typedef bool (*cmd)(struct req *); @@ -128,6 +176,7 @@ struct req { // argv and pfxv point into global rbuf char *argv[MY_ARG_MAX]; char *pfxv[MY_ARG_MAX]; // -A <prefix> char *qpfxv[MY_ARG_MAX]; // -Q <user_prefix>[:=]<INTERNAL_PREFIX> + char *dirv[MY_ARG_MAX]; // -d /path/to/XDB(shard) size_t *lenv; // -A <prefix>LENGTH struct srch *srch; char *Pgit_dir; @@ -139,9 +188,7 @@ struct req { // argv and pfxv point into global rbuf unsigned long timeout_sec; size_t nr_out; long sort_col; // value column, negative means BoolWeight - int argc; - int pfxc; - int qpfxc; + int argc, pfxc, qpfxc, dirc; FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional) bool has_input; // fp[0] is bidirectional bool collapse_threads; @@ -375,25 +422,6 @@ static size_t off2size(off_t n) return (size_t)n; } -static char *hsearch_enter_key(char *s) -{ -#if defined(__OpenBSD__) || defined(__DragonFly__) - // hdestroy frees each key on some platforms, - // so give it something to free: - char *ret = strdup(s); - if (!ret) err(EXIT_FAILURE, "strdup"); - return ret; -// AFAIK there's no way to detect musl, assume non-glibc Linux is musl: -#elif defined(__GLIBC__) || defined(__linux__) || \ - defined(__FreeBSD__) || defined(__NetBSD__) - // do nothing on these platforms -#else -#warning untested platform detected, unsure if hdestroy(3) frees keys -#warning contact us at meta@public-inbox.org if you get segfaults -#endif - return s; -} - // for test usage only, we need to ensure the compiler supports // __cleanup__ when exceptions are thrown struct inspect { struct req *req; }; @@ -512,15 +540,6 @@ again: return false; } -static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch -{ - const struct srch *a = (const struct srch *)pa; - const struct srch *b = (const struct srch *)pb; - int diff = a->paths_len - b->paths_len; - - return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len); -} - static bool is_chert(const char *dir) { char iamchert[PATH_MAX]; @@ -534,49 +553,85 @@ static bool is_chert(const char *dir) return false; } -static bool srch_init(struct req *req) +static void srch_free(struct srch *srch) +{ + delete srch->qp; + delete srch->db; + free(srch); +} + +static void srch_cache_renew(struct srch *keep) +{ + khint_t k; + + // can't delete while iterating, so just free each + clear + for (k = kh_begin(srch_cache); k != kh_end(srch_cache); k++) { + if (!kh_exist(srch_cache, k)) continue; + struct srch *cur = kh_key(srch_cache, k); + + if (cur != keep) + srch_free(cur); + } + srch_set_cs_clear(srch_cache); + if (keep) { + int absent; + k = srch_set_put(srch_cache, keep, &absent); + assert(absent); + assert(k < kh_end(srch_cache)); + } +} + +static void srch_init(struct req *req) { - char *dirv[MY_ARG_MAX]; int i; struct srch *srch = req->srch; - int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len); const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE; - srch->qp_flags = FLAG_PHRASE | - Xapian::QueryParser::FLAG_BOOLEAN | + srch->qp_flags = Xapian::QueryParser::FLAG_BOOLEAN | Xapian::QueryParser::FLAG_LOVEHATE | Xapian::QueryParser::FLAG_WILDCARD; - if (is_chert(dirv[0])) - srch->qp_flags &= ~FLAG_PHRASE; - try { - srch->db = new Xapian::Database(dirv[0]); - } catch (...) { - warn("E: Xapian::Database(%s)", dirv[0]); - return false; + long nfd = req->dirc * SHARD_COST; + + shard_nfd += nfd; + if (shard_nfd > my_fd_max) { + srch_cache_renew(srch); + shard_nfd = nfd; } - try { - for (i = 1; i < dirc; i++) { - if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i])) + for (int retried = 0; retried < 2; retried++) { + srch->qp_flags |= FLAG_PHRASE; + i = 0; + try { + srch->db = new Xapian::Database(req->dirv[i]); + if (is_chert(req->dirv[0])) srch->qp_flags &= ~FLAG_PHRASE; - srch->db->add_database(Xapian::Database(dirv[i])); + for (i = 1; i < req->dirc; i++) { + const char *dir = req->dirv[i]; + if (srch->qp_flags & FLAG_PHRASE && + is_chert(dir)) + srch->qp_flags &= ~FLAG_PHRASE; + srch->db->add_database(Xapian::Database(dir)); + } + break; + } catch (const Xapian::Error & e) { + warnx("E: Xapian::Error: %s (%s)", + e.get_description().c_str(), req->dirv[i]); + } catch (...) { // does this happen? + warn("E: add_database(%s)", req->dirv[i]); + } + if (retried) { + errx(EXIT_FAILURE, "E: can't open %s", req->dirv[i]); + } else { + warnx("retrying..."); + if (srch->db) + delete srch->db; + srch->db = NULL; + srch_cache_renew(srch); } - } catch (...) { - warn("E: add_database(%s)", dirv[i]); - return false; - } - try { - srch->qp = new Xapian::QueryParser; - } catch (...) { - perror("E: Xapian::QueryParser"); - return false; } + // these will raise and die on ENOMEM or other errors + srch->qp = new Xapian::QueryParser; srch->qp->set_default_op(Xapian::Query::OP_AND); srch->qp->set_database(*srch->db); - try { - srch->qp->set_stemmer(Xapian::Stem("english")); - } catch (...) { - perror("E: Xapian::Stem"); - return false; - } + srch->qp->set_stemmer(Xapian::Stem("english")); srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME); srch->qp->SET_MAX_EXPANSION(100); @@ -584,7 +639,6 @@ static bool srch_init(struct req *req) qp_init_code_search(srch->qp); // CodeSearch.pm else qp_init_mail_search(srch->qp); // Search.pm - return true; } // setup query parser for altid and arbitrary headers @@ -612,14 +666,6 @@ static void srch_init_extra(struct req *req) req->srch->qp_extra_done = true; } -static void free_srch(void *p) // tdestroy -{ - struct srch *srch = (struct srch *)p; - delete srch->qp; - delete srch->db; - free(srch); -} - static void dispatch(struct req *req) { int c; @@ -630,7 +676,6 @@ static void dispatch(struct req *req) } kbuf; char *end; FILE *kfp; - struct srch **s; req->threadid = ULLONG_MAX; for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) { if (cmds[c].fn_len == size && @@ -644,7 +689,7 @@ static void dispatch(struct req *req) kfp = open_memstream(&kbuf.ptr, &size); if (!kfp) err(EXIT_FAILURE, "open_memstream(kbuf)"); // write padding, first (contents don't matter) - fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp); + fwrite(&req->argv[0], offsetof(struct srch, ckey), 1, kfp); // global getopt variables: optopt = 0; @@ -656,7 +701,11 @@ static void dispatch(struct req *req) switch (c) { case 'a': req->asc = true; break; case 'c': req->code_search = true; break; - case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break; + case 'd': + req->dirv[req->dirc++] = optarg; + if (MY_ARG_MAX == req->dirc) ABORT("too many -d"); + fprintf(kfp, "-d%c%s%c", 0, optarg, 0); + break; case 'g': req->Pgit_dir = optarg - 1; break; // pad "P" prefix case 'k': req->sort_col = strtol(optarg, &end, 10); @@ -696,6 +745,7 @@ static void dispatch(struct req *req) case 'Q': req->qpfxv[req->qpfxc++] = optarg; if (MY_ARG_MAX == req->qpfxc) ABORT("too many -Q"); + fprintf(kfp, "-Q%c%s%c", 0, optarg, 0); break; default: ABORT("bad switch `-%c'", c); } @@ -704,22 +754,20 @@ static void dispatch(struct req *req) kbuf.srch->db = NULL; kbuf.srch->qp = NULL; kbuf.srch->qp_extra_done = false; - kbuf.srch->paths_len = size - offsetof(struct srch, paths); - if (kbuf.srch->paths_len <= 0) - ABORT("no -d args"); - s = (struct srch **)tsearch(kbuf.srch, &srch_tree, srch_cmp); - if (!s) err(EXIT_FAILURE, "tsearch"); // likely ENOMEM - req->srch = *s; - if (req->srch != kbuf.srch) { // reuse existing - free_srch(kbuf.srch); + kbuf.srch->ckey_len = size - offsetof(struct srch, ckey); + if (kbuf.srch->ckey_len <= 0 || !req->dirc) + ABORT("no -d args (or too many)"); + + int absent; + khint_t ki = srch_set_put(srch_cache, kbuf.srch, &absent); + assert(ki < kh_end(srch_cache)); + req->srch = kh_key(srch_cache, ki); + if (absent) { + srch_init(req); + } else { + assert(req->srch != kbuf.srch); + srch_free(kbuf.srch); req->srch->db->reopen(); - } else if (!srch_init(req)) { - assert(kbuf.srch == *((struct srch **)tfind( - kbuf.srch, &srch_tree, srch_cmp))); - void *del = tdelete(kbuf.srch, &srch_tree, srch_cmp); - assert(del); - free_srch(kbuf.srch); - goto cmd_err; // srch_init already warned } if (req->qpfxc && !req->srch->qp_extra_done) srch_init_extra(req); @@ -736,8 +784,6 @@ static void dispatch(struct req *req) } if (req->timeout_sec) alarm(0); -cmd_err: - return; // just be silent on errors, for now } static void cleanup_pids(void) @@ -877,10 +923,11 @@ static void start_workers(void) static void cleanup_all(void) { cleanup_pids(); -#ifdef __GLIBC__ - tdestroy(srch_tree, free_srch); - srch_tree = NULL; -#endif + if (!srch_cache) + return; + srch_cache_renew(NULL); + srch_set_destroy(srch_cache); + srch_cache = NULL; } static void parent_reopen_logs(void) @@ -1014,14 +1061,23 @@ int main(int argc, char *argv[]) socklen_t slen = (socklen_t)sizeof(c); stdout_path = getenv("STDOUT_PATH"); stderr_path = getenv("STDERR_PATH"); + struct rlimit rl; if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen)) err(EXIT_FAILURE, "getsockopt"); if (c != SOCK_SEQPACKET) errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET"); + if (getrlimit(RLIMIT_NOFILE, &rl)) + err(EXIT_FAILURE, "getrlimit"); + my_fd_max = rl.rlim_cur; + if (my_fd_max < 72) + warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max); + my_fd_max -= 64; + mail_nrp_init(); code_nrp_init(); + srch_cache = srch_set_init(); atexit(cleanup_all); if (!STDERR_ASSIGNABLE) { @@ -1082,8 +1138,7 @@ int main(int argc, char *argv[]) CHECK(int, 0, sigdelset(&workerset, SIGTERM)); CHECK(int, 0, sigdelset(&workerset, SIGCHLD)); nworker_hwm = nworker; - worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t)); - if (!worker_pids) err(EXIT_FAILURE, "calloc"); + worker_pids = (pid_t *)xcalloc(nworker, sizeof(pid_t)); if (pipe(pipefds)) err(EXIT_FAILURE, "pipe"); int fl = fcntl(pipefds[1], F_GETFL); |