about summary refs log tree commit homepage
path: root/lib/PublicInbox/xap_helper.h
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
-rw-r--r--lib/PublicInbox/xap_helper.h273
1 files changed, 164 insertions, 109 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index a30a8768..51ab48bf 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -7,7 +7,7 @@
  * this is not linked to Perl in any way.
  * C (not C++) is used as much as possible to lower the contribution
  * barrier for hackers who mainly know C (this includes the maintainer).
- * Yes, that means we use C stdlib stuff like hsearch and open_memstream
+ * Yes, that means we use C stdlib stuff like open_memstream
  * instead their equivalents in the C++ stdlib :P
  * Everything here is an unstable internal API of public-inbox and
  * NOT intended for ordinary users; only public-inbox hackers
@@ -15,6 +15,9 @@
 #ifndef _ALL_SOURCE
 #        define _ALL_SOURCE
 #endif
+#ifndef _GNU_SOURCE
+#        define _GNU_SOURCE
+#endif
 #if defined(__NetBSD__) && !defined(_OPENBSD_SOURCE) // for reallocarray(3)
 #        define _OPENBSD_SOURCE
 #endif
@@ -34,7 +37,6 @@
 #include <errno.h>
 #include <fcntl.h>
 #include <limits.h>
-#include <search.h>
 #include <signal.h>
 #include <stddef.h>
 #include <stdint.h>
@@ -83,6 +85,62 @@
 #define ABORT(...) do { warnx(__VA_ARGS__); abort(); } while (0)
 #define EABORT(...) do { warn(__VA_ARGS__); abort(); } while (0)
 
+static void *xcalloc(size_t nmemb, size_t size)
+{
+        void *ret = calloc(nmemb, size);
+        if (!ret) EABORT("calloc(%zu, %zu)", nmemb, size);
+        return ret;
+}
+
+#if defined(__GLIBC__) && defined(__GLIBC_MINOR__) && \
+                MY_VER(__GLIBC__, __GLIBC_MINOR__, 0) >= MY_VER(2, 28, 0)
+#        define HAVE_REALLOCARRAY 1
+#elif defined(__OpenBSD__) || defined(__DragonFly__) || \
+                defined(__FreeBSD__) || defined(__NetBSD__)
+#        define HAVE_REALLOCARRAY 1
+#endif
+
+static void *xreallocarray(void *ptr, size_t nmemb, size_t size)
+{
+#ifdef HAVE_REALLOCARRAY
+        void *ret = reallocarray(ptr, nmemb, size);
+#else // can't rely on __builtin_mul_overflow in gcc 4.x :<
+        void *ret = NULL;
+        if (nmemb && size > SIZE_MAX / nmemb)
+                errno = ENOMEM;
+        else
+                ret = realloc(ptr, nmemb * size);
+#endif
+        if (!ret) EABORT("reallocarray(..., %zu, %zu)", nmemb, size);
+        return ret;
+}
+
+#include "khashl.h"
+
+struct srch {
+        int ckey_len; // int for comparisons
+        unsigned qp_flags;
+        bool qp_extra_done;
+        Xapian::Database *db;
+        Xapian::QueryParser *qp;
+        unsigned char ckey[]; // $shard_path0\0$shard_path1\0...
+};
+
+static khint_t srch_hash(const struct srch *srch)
+{
+        return kh_hash_bytes(srch->ckey_len, srch->ckey);
+}
+
+static int srch_eq(const struct srch *a, const struct srch *b)
+{
+        return a->ckey_len == b->ckey_len ?
+                !memcmp(a->ckey, b->ckey, (size_t)a->ckey_len) : 0;
+}
+
+KHASHL_CSET_INIT(KH_LOCAL, srch_set, srch_set, struct srch *,
+                srch_hash, srch_eq)
+static srch_set *srch_cache;
+static long my_fd_max, shard_nfd;
 // sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET
 static volatile int sock_fd = STDIN_FILENO;
 static sigset_t fullset, workerset;
@@ -91,7 +149,6 @@ static bool alive = true;
 static FILE *orig_err = stderr;
 #endif
 static int orig_err_fd = -1;
-static void *srch_tree; // tsearch + tdelete + twalk
 static pid_t *worker_pids; // nr => pid
 #define WORKER_MAX USHRT_MAX
 static unsigned long nworker, nworker_hwm;
@@ -111,15 +168,6 @@ enum exc_iter {
         ITER_ABORT
 };
 
-struct srch {
-        int paths_len; // int for comparisons
-        unsigned qp_flags;
-        bool qp_extra_done;
-        Xapian::Database *db;
-        Xapian::QueryParser *qp;
-        char paths[]; // $shard_path0\0$shard_path1\0...
-};
-
 #define MY_ARG_MAX 256
 typedef bool (*cmd)(struct req *);
 
@@ -128,6 +176,7 @@ struct req { // argv and pfxv point into global rbuf
         char *argv[MY_ARG_MAX];
         char *pfxv[MY_ARG_MAX]; // -A <prefix>
         char *qpfxv[MY_ARG_MAX]; // -Q <user_prefix>[:=]<INTERNAL_PREFIX>
+        char *dirv[MY_ARG_MAX]; // -d /path/to/XDB(shard)
         size_t *lenv; // -A <prefix>LENGTH
         struct srch *srch;
         char *Pgit_dir;
@@ -139,9 +188,7 @@ struct req { // argv and pfxv point into global rbuf
         unsigned long timeout_sec;
         size_t nr_out;
         long sort_col; // value column, negative means BoolWeight
-        int argc;
-        int pfxc;
-        int qpfxc;
+        int argc, pfxc, qpfxc, dirc;
         FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional)
         bool has_input; // fp[0] is bidirectional
         bool collapse_threads;
@@ -375,25 +422,6 @@ static size_t off2size(off_t n)
         return (size_t)n;
 }
 
-static char *hsearch_enter_key(char *s)
-{
-#if defined(__OpenBSD__) || defined(__DragonFly__)
-        // hdestroy frees each key on some platforms,
-        // so give it something to free:
-        char *ret = strdup(s);
-        if (!ret) err(EXIT_FAILURE, "strdup");
-        return ret;
-// AFAIK there's no way to detect musl, assume non-glibc Linux is musl:
-#elif defined(__GLIBC__) || defined(__linux__) || \
-        defined(__FreeBSD__) || defined(__NetBSD__)
-        // do nothing on these platforms
-#else
-#warning untested platform detected, unsure if hdestroy(3) frees keys
-#warning contact us at meta@public-inbox.org if you get segfaults
-#endif
-        return s;
-}
-
 // for test usage only, we need to ensure the compiler supports
 // __cleanup__ when exceptions are thrown
 struct inspect { struct req *req; };
@@ -512,15 +540,6 @@ again:
         return false;
 }
 
-static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
-{
-        const struct srch *a = (const struct srch *)pa;
-        const struct srch *b = (const struct srch *)pb;
-        int diff = a->paths_len - b->paths_len;
-
-        return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len);
-}
-
 static bool is_chert(const char *dir)
 {
         char iamchert[PATH_MAX];
@@ -534,49 +553,85 @@ static bool is_chert(const char *dir)
         return false;
 }
 
-static bool srch_init(struct req *req)
+static void srch_free(struct srch *srch)
+{
+        delete srch->qp;
+        delete srch->db;
+        free(srch);
+}
+
+static void srch_cache_renew(struct srch *keep)
+{
+        khint_t k;
+
+        // can't delete while iterating, so just free each + clear
+        for (k = kh_begin(srch_cache); k != kh_end(srch_cache); k++) {
+                if (!kh_exist(srch_cache, k)) continue;
+                struct srch *cur = kh_key(srch_cache, k);
+
+                if (cur != keep)
+                        srch_free(cur);
+        }
+        srch_set_cs_clear(srch_cache);
+        if (keep) {
+                int absent;
+                k = srch_set_put(srch_cache, keep, &absent);
+                assert(absent);
+                assert(k < kh_end(srch_cache));
+        }
+}
+
+static void srch_init(struct req *req)
 {
-        char *dirv[MY_ARG_MAX];
         int i;
         struct srch *srch = req->srch;
-        int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
         const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
-        srch->qp_flags = FLAG_PHRASE |
-                        Xapian::QueryParser::FLAG_BOOLEAN |
+        srch->qp_flags = Xapian::QueryParser::FLAG_BOOLEAN |
                         Xapian::QueryParser::FLAG_LOVEHATE |
                         Xapian::QueryParser::FLAG_WILDCARD;
-        if (is_chert(dirv[0]))
-                srch->qp_flags &= ~FLAG_PHRASE;
-        try {
-                srch->db = new Xapian::Database(dirv[0]);
-        } catch (...) {
-                warn("E: Xapian::Database(%s)", dirv[0]);
-                return false;
+        long nfd = req->dirc * SHARD_COST;
+
+        shard_nfd += nfd;
+        if (shard_nfd > my_fd_max) {
+                srch_cache_renew(srch);
+                shard_nfd = nfd;
         }
-        try {
-                for (i = 1; i < dirc; i++) {
-                        if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i]))
+        for (int retried = 0; retried < 2; retried++) {
+                srch->qp_flags |= FLAG_PHRASE;
+                i = 0;
+                try {
+                        srch->db = new Xapian::Database(req->dirv[i]);
+                        if (is_chert(req->dirv[0]))
                                 srch->qp_flags &= ~FLAG_PHRASE;
-                        srch->db->add_database(Xapian::Database(dirv[i]));
+                        for (i = 1; i < req->dirc; i++) {
+                                const char *dir = req->dirv[i];
+                                if (srch->qp_flags & FLAG_PHRASE &&
+                                                is_chert(dir))
+                                        srch->qp_flags &= ~FLAG_PHRASE;
+                                srch->db->add_database(Xapian::Database(dir));
+                        }
+                        break;
+                } catch (const Xapian::Error & e) {
+                        warnx("E: Xapian::Error: %s (%s)",
+                                e.get_description().c_str(), req->dirv[i]);
+                } catch (...) { // does this happen?
+                        warn("E: add_database(%s)", req->dirv[i]);
+                }
+                if (retried) {
+                        errx(EXIT_FAILURE, "E: can't open %s", req->dirv[i]);
+                } else {
+                        warnx("retrying...");
+                        if (srch->db)
+                                delete srch->db;
+                        srch->db = NULL;
+                        srch_cache_renew(srch);
                 }
-        } catch (...) {
-                warn("E: add_database(%s)", dirv[i]);
-                return false;
-        }
-        try {
-                srch->qp = new Xapian::QueryParser;
-        } catch (...) {
-                perror("E: Xapian::QueryParser");
-                return false;
         }
+        // these will raise and die on ENOMEM or other errors
+        srch->qp = new Xapian::QueryParser;
         srch->qp->set_default_op(Xapian::Query::OP_AND);
         srch->qp->set_database(*srch->db);
-        try {
-                srch->qp->set_stemmer(Xapian::Stem("english"));
-        } catch (...) {
-                perror("E: Xapian::Stem");
-                return false;
-        }
+        srch->qp->set_stemmer(Xapian::Stem("english"));
         srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME);
         srch->qp->SET_MAX_EXPANSION(100);
 
@@ -584,7 +639,6 @@ static bool srch_init(struct req *req)
                 qp_init_code_search(srch->qp); // CodeSearch.pm
         else
                 qp_init_mail_search(srch->qp); // Search.pm
-        return true;
 }
 
 // setup query parser for altid and arbitrary headers
@@ -612,14 +666,6 @@ static void srch_init_extra(struct req *req)
         req->srch->qp_extra_done = true;
 }
 
-static void free_srch(void *p) // tdestroy
-{
-        struct srch *srch = (struct srch *)p;
-        delete srch->qp;
-        delete srch->db;
-        free(srch);
-}
-
 static void dispatch(struct req *req)
 {
         int c;
@@ -630,7 +676,6 @@ static void dispatch(struct req *req)
         } kbuf;
         char *end;
         FILE *kfp;
-        struct srch **s;
         req->threadid = ULLONG_MAX;
         for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) {
                 if (cmds[c].fn_len == size &&
@@ -644,7 +689,7 @@ static void dispatch(struct req *req)
         kfp = open_memstream(&kbuf.ptr, &size);
         if (!kfp) err(EXIT_FAILURE, "open_memstream(kbuf)");
         // write padding, first (contents don't matter)
-        fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp);
+        fwrite(&req->argv[0], offsetof(struct srch, ckey), 1, kfp);
 
         // global getopt variables:
         optopt = 0;
@@ -656,7 +701,11 @@ static void dispatch(struct req *req)
                 switch (c) {
                 case 'a': req->asc = true; break;
                 case 'c': req->code_search = true; break;
-                case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break;
+                case 'd':
+                        req->dirv[req->dirc++] = optarg;
+                        if (MY_ARG_MAX == req->dirc) ABORT("too many -d");
+                        fprintf(kfp, "-d%c%s%c", 0, optarg, 0);
+                        break;
                 case 'g': req->Pgit_dir = optarg - 1; break; // pad "P" prefix
                 case 'k':
                         req->sort_col = strtol(optarg, &end, 10);
@@ -696,6 +745,7 @@ static void dispatch(struct req *req)
                 case 'Q':
                         req->qpfxv[req->qpfxc++] = optarg;
                         if (MY_ARG_MAX == req->qpfxc) ABORT("too many -Q");
+                        fprintf(kfp, "-Q%c%s%c", 0, optarg, 0);
                         break;
                 default: ABORT("bad switch `-%c'", c);
                 }
@@ -704,22 +754,20 @@ static void dispatch(struct req *req)
         kbuf.srch->db = NULL;
         kbuf.srch->qp = NULL;
         kbuf.srch->qp_extra_done = false;
-        kbuf.srch->paths_len = size - offsetof(struct srch, paths);
-        if (kbuf.srch->paths_len <= 0)
-                ABORT("no -d args");
-        s = (struct srch **)tsearch(kbuf.srch, &srch_tree, srch_cmp);
-        if (!s) err(EXIT_FAILURE, "tsearch"); // likely ENOMEM
-        req->srch = *s;
-        if (req->srch != kbuf.srch) { // reuse existing
-                free_srch(kbuf.srch);
+        kbuf.srch->ckey_len = size - offsetof(struct srch, ckey);
+        if (kbuf.srch->ckey_len <= 0 || !req->dirc)
+                ABORT("no -d args (or too many)");
+
+        int absent;
+        khint_t ki = srch_set_put(srch_cache, kbuf.srch, &absent);
+        assert(ki < kh_end(srch_cache));
+        req->srch = kh_key(srch_cache, ki);
+        if (absent) {
+                srch_init(req);
+        } else {
+                assert(req->srch != kbuf.srch);
+                srch_free(kbuf.srch);
                 req->srch->db->reopen();
-        } else if (!srch_init(req)) {
-                assert(kbuf.srch == *((struct srch **)tfind(
-                                        kbuf.srch, &srch_tree, srch_cmp)));
-                void *del = tdelete(kbuf.srch, &srch_tree, srch_cmp);
-                assert(del);
-                free_srch(kbuf.srch);
-                goto cmd_err; // srch_init already warned
         }
         if (req->qpfxc && !req->srch->qp_extra_done)
                 srch_init_extra(req);
@@ -736,8 +784,6 @@ static void dispatch(struct req *req)
         }
         if (req->timeout_sec)
                 alarm(0);
-cmd_err:
-        return; // just be silent on errors, for now
 }
 
 static void cleanup_pids(void)
@@ -877,10 +923,11 @@ static void start_workers(void)
 static void cleanup_all(void)
 {
         cleanup_pids();
-#ifdef __GLIBC__
-        tdestroy(srch_tree, free_srch);
-        srch_tree = NULL;
-#endif
+        if (!srch_cache)
+                return;
+        srch_cache_renew(NULL);
+        srch_set_destroy(srch_cache);
+        srch_cache = NULL;
 }
 
 static void parent_reopen_logs(void)
@@ -1014,14 +1061,23 @@ int main(int argc, char *argv[])
         socklen_t slen = (socklen_t)sizeof(c);
         stdout_path = getenv("STDOUT_PATH");
         stderr_path = getenv("STDERR_PATH");
+        struct rlimit rl;
 
         if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen))
                 err(EXIT_FAILURE, "getsockopt");
         if (c != SOCK_SEQPACKET)
                 errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET");
 
+        if (getrlimit(RLIMIT_NOFILE, &rl))
+                err(EXIT_FAILURE, "getrlimit");
+        my_fd_max = rl.rlim_cur;
+        if (my_fd_max < 72)
+                warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max);
+        my_fd_max -= 64;
+
         mail_nrp_init();
         code_nrp_init();
+        srch_cache = srch_set_init();
         atexit(cleanup_all);
 
         if (!STDERR_ASSIGNABLE) {
@@ -1082,8 +1138,7 @@ int main(int argc, char *argv[])
         CHECK(int, 0, sigdelset(&workerset, SIGTERM));
         CHECK(int, 0, sigdelset(&workerset, SIGCHLD));
         nworker_hwm = nworker;
-        worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
-        if (!worker_pids) err(EXIT_FAILURE, "calloc");
+        worker_pids = (pid_t *)xcalloc(nworker, sizeof(pid_t));
 
         if (pipe(pipefds)) err(EXIT_FAILURE, "pipe");
         int fl = fcntl(pipefds[1], F_GETFL);