diff options
author | Eric Wong <e@80x24.org> | 2023-08-24 01:22:33 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2023-08-24 07:47:51 +0000 |
commit | b18ecb7707e83cb8cb38c3736aecd984999ca0a7 (patch) | |
tree | 0f159212810c98aa07d26b6f7f28f4b8dbc9b302 /lib/PublicInbox/xap_helper.h | |
parent | cf96412eb8f193ebd334fae340b2d91b6b7f2afe (diff) | |
download | public-inbox-b18ecb7707e83cb8cb38c3736aecd984999ca0a7.tar.gz |
This allows us to perform the expensive "dump_ibx" operations in native C++ code using the Xapian C++ library. This provides the majority of the speedup with the -cindex --associate switch. Eventually this may be expanded to cover all uses of Xapian within the project to ensure we have access to Xapian APIs which aren't available in XS|SWIG bindings; and also for ease-of-installation on systems which don't provide pre-packaged Perl Xapian bindings (e.g. OpenBSD 7.3) but do provide Xapian development libraries. Most of the C++ code is still C, as I'm not remotely familiar with C++ compared to C. I suspect many users and potential hackers being from git, Linux kernel, and glibc world are in the same boat.
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
-rw-r--r-- | lib/PublicInbox/xap_helper.h | 654 |
1 files changed, 654 insertions, 0 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h new file mode 100644 index 00000000..52db92b7 --- /dev/null +++ b/lib/PublicInbox/xap_helper.h @@ -0,0 +1,654 @@ +/* + * Copyright (C) all contributors <meta@public-inbox.org> + * License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + * + * Standalone helper process using C and minimal C++ for Xapian, + * this is not linked to Perl in any way. + * C (not C++) is used as much as possible to lower the contribution + * barrier for hackers who mainly know C (this includes the maintainer). + * Everything here is an unstable internal API of public-inbox and + * NOT intended for ordinary users; only public-inbox hackers + */ +#ifndef _ALL_SOURCE +# define _ALL_SOURCE +#endif +#include <sys/resource.h> +#include <sys/socket.h> +#include <sys/stat.h> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <sys/wait.h> + +#include <assert.h> +#include <err.h> // BSD, glibc, and musl all have this +#include <errno.h> +#include <fcntl.h> +#include <limits.h> +#include <search.h> +#include <stdio.h> +#include <string.h> +#include <sysexits.h> +#include <unistd.h> +#include <xapian.h> // our only reason for using C++ + +#define MY_VER(maj,min,rev) ((maj) << 16 | (min) << 8 | (rev)) +#define XAP_VER \ + MY_VER(XAPIAN_MAJOR_VERSION,XAPIAN_MINOR_VERSION,XAPIAN_REVISION) + +#if XAP_VER >= MY_VER(1,3,6) +# define NRP Xapian::NumberRangeProcessor +# define ADD_RP add_rangeprocessor +# define SET_MAX_EXPANSION set_max_expansion // technically 1.3.3 +#else +# define NRP Xapian::NumberValueRangeProcessor +# define ADD_RP add_valuerangeprocessor +# define SET_MAX_EXPANSION set_max_wildcard_expansion +#endif + +static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P +static pid_t parent_pid; +static FILE *orig_err = stderr; +static void *srch_tree; // tsearch + tdelete + twalk +static pid_t *worker_pids; // nr => pid +static unsigned long nworker; + +// PublicInbox::Search and PublicInbox::CodeSearch generate these: +static void mail_nrp_init(void); +static void code_nrp_init(void); +static void qp_init_mail_search(Xapian::QueryParser *); +static void qp_init_code_search(Xapian::QueryParser *); + +struct srch { + int paths_len; // int for comparisons + unsigned qp_flags; + Xapian::Database *db; + Xapian::QueryParser *qp; + char paths[]; // $shard_path0\0$shard_path1\0... +}; + +#define MY_ARG_MAX 256 +typedef bool (*cmd)(struct req *); + +// only one request per-process since we have RLIMIT_CPU timeout +struct req { // argv and pfxv point into global rbuf + char *argv[MY_ARG_MAX]; + char *pfxv[MY_ARG_MAX]; // -A <prefix> + struct srch *srch; + char *Oeidx_key; + cmd fn; + unsigned long long max; + unsigned long long off; + unsigned long timeout_sec; + long sort_col; // value column, negative means BoolWeight + int argc; + int pfxc; + FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional) + bool has_input; // fp[0] is bidirectional + bool collapse_threads; + bool code_search; + bool relevance; // sort by relevance before column + bool asc; // ascending sort +}; + +struct worker { + pid_t pid; + unsigned nr; +}; + +static bool has_threadid(const struct srch *srch) +{ + return srch->db->get_metadata("has_threadid") == "1"; +} + +static Xapian::Enquire prep_enquire(const struct req *req) +{ + Xapian::Enquire enq(*req->srch->db); + if (req->sort_col < 0) { + enq.set_weighting_scheme(Xapian::BoolWeight()); + enq.set_docid_order(req->asc ? Xapian::Enquire::ASCENDING + : Xapian::Enquire::DESCENDING); + } else if (req->relevance) { + enq.set_sort_by_relevance_then_value(req->sort_col, !req->asc); + } else { + enq.set_sort_by_value_then_relevance(req->sort_col, !req->asc); + } + return enq; +} + +static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq) +{ + if (!req->max) + req->max = 50; + for (int i = 0; i < 9; i++) { + try { + Xapian::MSet mset = enq->get_mset(req->off, req->max); + return mset; + } catch (const Xapian::DatabaseModifiedError & e) { + req->srch->db->reopen(); + } + } + return enq->get_mset(req->off, req->max); +} + +static Xapian::MSet mail_mset(struct req *req, const char *qry_str) +{ + struct srch *srch = req->srch; + Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags); + if (req->Oeidx_key) { + req->Oeidx_key[0] = 'O'; // modifies static rbuf + fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key); + qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, + Xapian::Query(req->Oeidx_key)); + } + Xapian::Enquire enq = prep_enquire(req); + enq.set_query(qry); + // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm + if (req->collapse_threads && has_threadid(srch)) + enq.set_collapse_key(THREADID); + + return enquire_mset(req, &enq); +} + +static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len) +{ + return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len); +} + +static void dump_ibx_term(struct req *req, const char *pfx, + Xapian::Document *doc, const char *ibx_id) +{ + Xapian::TermIterator cur = doc->termlist_begin(); + Xapian::TermIterator end = doc->termlist_end(); + size_t pfx_len = strlen(pfx); + + for (cur.skip_to(pfx); cur != end; cur++) { + std::string tn = *cur; + + if (starts_with(&tn, pfx, pfx_len)) + fprintf(req->fp[0], "%s %s\n", + tn.c_str() + pfx_len, ibx_id); + } +} + +static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors +{ + return setvbuf(fp, NULL, _IOLBF, 0); +} + +static bool cmd_dump_ibx(struct req *req) +{ + if ((optind + 1) >= req->argc) { + warnx("usage: dump_ibx [OPTIONS] IBX_ID QRY_STR"); + return false; // need ibx_id + qry_str + } + if (!req->pfxc) { + warnx("dump_ibx requires -A PREFIX"); + return false; + } + + const char *ibx_id = req->argv[optind]; + if (my_setlinebuf(req->fp[0])) { // for sort(1) pipe + perror("setlinebuf(fp[0])"); + return false; + } + req->asc = true; + req->sort_col = -1; + req->max = (unsigned long long)req->srch->db->get_doccount(); + Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]); + for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) { + try { + Xapian::Document doc = i.get_document(); + for (int p = 0; p < req->pfxc; p++) + dump_ibx_term(req, req->pfxv[p], &doc, ibx_id); + } catch (const Xapian::Error & e) { + fprintf(orig_err, "W: %s (#%ld)\n", + e.get_description().c_str(), (long)(*i)); + continue; + } + } + if (req->fp[1]) + fprintf(req->fp[1], "mset.size=%llu\n", + (unsigned long long)mset.size()); + return true; +} + +// internal usage only +static bool cmd_test_inspect(struct req *req) +{ + fprintf(req->fp[0], "pid=%d has_threadid=%d", + (int)getpid(), has_threadid(req->srch) ? 1 : 0); + return true; +} + +#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n } +static const struct cmd_entry { + size_t fn_len; + const char *fn_name; + cmd fn; +} cmds[] = { // should be small enough to not need bsearch || gperf + // most common commands first + CMD(dump_ibx), + CMD(test_inspect), // least common commands last +}; + +#define MY_ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0])) +#define RECV_FD_CAPA 2 +#define RECV_FD_SPACE (RECV_FD_CAPA * sizeof(int)) +union my_cmsg { + struct cmsghdr hdr; + char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE]; +}; + +static void xclose(int fd) +{ + if (close(fd) < 0 && errno != EINTR) + err(EXIT_FAILURE, "BUG: close"); +} + +static bool recv_req(struct req *req, char *rbuf, size_t *len) +{ + union my_cmsg cmsg = { 0 }; + struct msghdr msg = { .msg_iovlen = 1 }; + struct iovec iov; + iov.iov_base = rbuf; + iov.iov_len = *len; + msg.msg_iov = &iov; + msg.msg_control = &cmsg.hdr; + msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE); + + ssize_t r = recvmsg(sock_fd, &msg, 0); + if (r < 0) + err(EXIT_FAILURE, "recvmsg"); + if (r == 0) + exit(EX_NOINPUT); /* grandparent went away */ + *len = r; + if (r > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET && + cmsg.hdr.cmsg_type == SCM_RIGHTS) { + size_t len = cmsg.hdr.cmsg_len; + int *fdp = (int *)CMSG_DATA(&cmsg.hdr); + size_t i; + bool fd_ok = true; + for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) { + int fd = *fdp++; + const char *mode = NULL; + int fl = fd_ok ? fcntl(fd, F_GETFL) : 0; + switch (fl) { + case 0: break; // hit previous error + case -1: + warnx("invalid fd=%d", fd); + fd_ok = false; + break; + case O_WRONLY: mode = "w"; break; + case O_RDWR: + mode = "r+"; + if (i == 0) req->has_input = true; + break; + default: + warnx("invalid mode from F_GETFL: 0x%x", fl); + fd_ok = false; + } + if (!fd_ok) { + xclose(fd); + } else { + req->fp[i] = fdopen(fd, mode); + if (!req->fp[i]) { + warn("fdopen(fd=%d)", fd); + fd_ok = false; + } + } + } + for (i = 0; !fd_ok && i < MY_ARRAY_SIZE(req->fp); i++) + if (req->fp[i]) fclose(req->fp[i]); + return fd_ok; + } + warnx("no FD received in %zd-byte request", r); + return false; +} + +#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst)) +static int split2argv(char **dst, char *buf, size_t len, size_t limit) +{ + if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) { + warnx("bogus argument given"); + return 0; + } + size_t nr = 0; + char *c = buf; + for (size_t i = 1; i < len; i++) { + if (!buf[i]) { + dst[nr++] = c; + c = buf + i + 1; + } + if (nr == limit) { + warnx("too many args: %zu", nr); + return 0; + } + } + return (int)nr; +} + +static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch +{ + const struct srch *a = (const struct srch *)pa; + const struct srch *b = (const struct srch *)pb; + int diff = a->paths_len - b->paths_len; + + return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len); +} + +static bool is_chert(const char *dir) +{ + char iamchert[PATH_MAX]; + struct stat sb; + int rc = snprintf(iamchert, sizeof(iamchert), "%s/iamchert", dir); + + if (rc <= 0 || rc >= (int)sizeof(iamchert)) + err(EXIT_FAILURE, "BUG: snprintf(%s/iamchert)", dir); + if (stat(iamchert, &sb) == 0 && S_ISREG(sb.st_mode)) + return true; + return false; +} + +static bool srch_init(struct req *req) +{ + char *dirv[MY_ARG_MAX]; + int i; + struct srch *srch = req->srch; + int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len); + const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE; + srch->qp_flags = FLAG_PHRASE | + Xapian::QueryParser::FLAG_BOOLEAN | + Xapian::QueryParser::FLAG_LOVEHATE | + Xapian::QueryParser::FLAG_WILDCARD; + if (is_chert(dirv[0])) + srch->qp_flags &= ~FLAG_PHRASE; + try { + srch->db = new Xapian::Database(dirv[0]); + } catch (...) { + warn("E: Xapian::Database(%s)", dirv[0]); + return false; + } + try { + for (i = 1; i < dirc; i++) { + if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i])) + srch->qp_flags &= ~FLAG_PHRASE; + srch->db->add_database(Xapian::Database(dirv[i])); + } + } catch (...) { + warn("E: add_database(%s)", dirv[i]); + return false; + } + try { + srch->qp = new Xapian::QueryParser; + } catch (...) { + perror("E: Xapian::QueryParser"); + return false; + } + srch->qp->set_default_op(Xapian::Query::OP_AND); + srch->qp->set_database(*srch->db); + try { + srch->qp->set_stemmer(Xapian::Stem("english")); + } catch (...) { + perror("E: Xapian::Stem"); + return false; + } + srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME); + srch->qp->SET_MAX_EXPANSION(100); + + if (req->code_search) + qp_init_code_search(srch->qp); // CodeSearch.pm + else + qp_init_mail_search(srch->qp); // Search.pm + return true; +} + +static void free_srch(void *p) // tdestroy +{ + struct srch *srch = (struct srch *)p; + delete srch->qp; + delete srch->db; + free(srch); +} + +static void dispatch(struct req *req) +{ + int c; + size_t size = strlen(req->argv[0]); + union { + struct srch *srch; + char *ptr; + } fbuf; + char *end; + FILE *kfp; + struct srch **s; + req->fn = NULL; + for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) { + if (cmds[c].fn_len == size && + !memcmp(cmds[c].fn_name, req->argv[0], size)) { + req->fn = cmds[c].fn; + break; + } + } + if (!req->fn) goto cmd_err; + + kfp = open_memstream(&fbuf.ptr, &size); + // write padding, first + fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp); + + // global getopt variables: + optind = 1; + opterr = optopt = 0; + optarg = NULL; + + // keep sync with @PublicInbox::XapHelper::SPEC + while ((c = getopt(req->argc, req->argv, "acd:k:m:o:rtA:O:T:")) != -1) { + switch (c) { + case 'a': req->asc = true; break; + case 'c': req->code_search = true; break; + case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break; + case 'k': + req->sort_col = strtol(optarg, &end, 10); + if (*end) goto cmd_err; + switch (req->sort_col) { + case LONG_MAX: case LONG_MIN: goto cmd_err; + } + break; + case 'm': + req->max = strtoull(optarg, &end, 10); + if (*end) goto cmd_err; + if (req->max == ULLONG_MAX) goto cmd_err; + break; + case 'o': + req->off = strtoull(optarg, &end, 10); + if (*end) goto cmd_err; + if (req->off == ULLONG_MAX) goto cmd_err; + break; + case 'r': req->relevance = true; break; + case 't': req->collapse_threads = true; break; + case 'A': + req->pfxv[req->pfxc++] = optarg; + if (MY_ARG_MAX == req->pfxc) goto cmd_err; + break; + case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix + case 'T': + req->timeout_sec = strtoul(optarg, &end, 10); + if (*end) goto cmd_err; + if (req->timeout_sec == ULONG_MAX) goto cmd_err; + break; + default: goto cmd_err; + } + } + if (ferror(kfp) | fclose(kfp)) { + perror("ferror|fclose"); + goto cmd_err; + } + fbuf.srch->db = NULL; + fbuf.srch->qp = NULL; + fbuf.srch->paths_len = size - offsetof(struct srch, paths); + if (fbuf.srch->paths_len <= 0) { + free_srch(fbuf.srch); + warnx("no -d args"); + goto cmd_err; + } + s = (struct srch **)tsearch(fbuf.srch, &srch_tree, srch_cmp); + if (!s) { + perror("tsearch"); + goto cmd_err; + } + req->srch = *s; + if (req->srch != fbuf.srch) { // reuse existing + free_srch(fbuf.srch); + } else if (!srch_init(req)) { + assert(fbuf.srch == *((struct srch **)tfind( + fbuf.srch, &srch_tree, srch_cmp))); + void *del = tdelete(fbuf.srch, &srch_tree, srch_cmp); + assert(del); + free_srch(fbuf.srch); + goto cmd_err; + } + try { + if (!req->fn(req)) + goto cmd_err; + } catch (const Xapian::Error & e) { + warnx("Xapian::Error: %s", e.get_description().c_str()); + } catch (...) { + warn("unhandled exception"); + } +cmd_err: + return; // just be silent on errors, for now +} + +static void cleanup_pids(void) +{ + free(worker_pids); + worker_pids = NULL; +} + +static void recv_loop(void) // worker process loop +{ + static char rbuf[4096 * 33]; // per-process + while (!parent_pid || getppid() == parent_pid) { + size_t len = sizeof(rbuf); + struct req req = { 0 }; + if (!recv_req(&req, rbuf, &len)) + continue; + if (req.fp[1]) { + if (my_setlinebuf(req.fp[1])) + perror("W: setlinebuf(req.fp[1])"); + stderr = req.fp[1]; + } + req.argc = SPLIT2ARGV(req.argv, rbuf, len); + if (req.argc > 0) + dispatch(&req); + if (ferror(req.fp[0]) | fclose(req.fp[0])) + perror("ferror|fclose fp[0]"); + if (req.fp[1]) { + stderr = orig_err; + if (ferror(req.fp[1]) | fclose(req.fp[1])) + perror("ferror|fclose fp[1]"); + } + } +} + +static void insert_pid(pid_t pid, unsigned nr) +{ + assert(!worker_pids[nr]); + worker_pids[nr] = pid; +} + +static int delete_pid(pid_t pid) +{ + for (unsigned nr = 0; nr < nworker; nr++) { + if (worker_pids[nr] == pid) { + worker_pids[nr] = 0; + return nr; + } + } + warnx("W: unknown pid=%d reaped", (int)pid); + return -1; +} + +static void start_worker(unsigned nr) +{ + pid_t pid = fork(); + if (pid < 0) { + warn("E: fork(worker=%u)", nr); + } else if (pid > 0) { + insert_pid(pid, nr); + } else { + cleanup_pids(); + recv_loop(); + exit(0); + } +} + +static void cleanup_all(void) +{ + cleanup_pids(); +#ifdef __GLIBC__ + tdestroy(srch_tree, free_srch); + srch_tree = NULL; +#endif +} + +int main(int argc, char *argv[]) +{ + int c; + + mail_nrp_init(); + code_nrp_init(); + atexit(cleanup_all); + + nworker = 0; +#ifdef _SC_NPROCESSORS_ONLN + long j = sysconf(_SC_NPROCESSORS_ONLN); + if (j > 0) + nworker = j > UCHAR_MAX ? UCHAR_MAX : j; +#endif // _SC_NPROCESSORS_ONLN + + // make warn/warnx/err multi-process friendly: + if (my_setlinebuf(stderr)) + err(EXIT_FAILURE, "setlinebuf(stderr)"); + // not using -W<workers> like Daemon.pm, since -W is reserved (glibc) + while ((c = getopt(argc, argv, "j:")) != -1) { + char *end; + + switch (c) { + case 'j': + nworker = strtoul(optarg, &end, 10); + if (*end != 0 || nworker > USHRT_MAX) + errx(EXIT_FAILURE, "-j %s invalid", optarg); + break; + case ':': + errx(EXIT_FAILURE, "missing argument: `-%c'", optopt); + case '?': + errx(EXIT_FAILURE, "unrecognized: `-%c'", optopt); + default: + errx(EXIT_FAILURE, "BUG: `-%c'", c); + } + } + if (nworker == 0) { + recv_loop(); + } else { + parent_pid = getpid(); + worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t)); + if (!worker_pids) + err(EXIT_FAILURE, "calloc"); + for (unsigned i = 0; i < nworker; i++) + start_worker(i); + + int st; + pid_t pid; + bool quit = false; + while ((pid = wait(&st)) > 0) { + int nr = delete_pid(pid); + if (nr < 0) continue; + if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT) + quit = true; + if (!quit) + start_worker(nr); + } + } + return 0; +} |