about summary refs log tree commit homepage
path: root/lib/PublicInbox/xap_helper.h
diff options
authorEric Wong <e@80x24.org>2023-08-24 01:22:33 +0000
committerEric Wong <e@80x24.org>2023-08-24 07:47:51 +0000
commitb18ecb7707e83cb8cb38c3736aecd984999ca0a7 (patch)
tree0f159212810c98aa07d26b6f7f28f4b8dbc9b302 /lib/PublicInbox/xap_helper.h
parentcf96412eb8f193ebd334fae340b2d91b6b7f2afe (diff)
This allows us to perform the expensive "dump_ibx" operations in
native C++ code using the Xapian C++ library.  This provides the
majority of the speedup with the -cindex --associate switch.

Eventually this may be expanded to cover all uses of Xapian
within the project to ensure we have access to Xapian APIs which
aren't available in XS|SWIG bindings; and also for
ease-of-installation on systems which don't provide
pre-packaged Perl Xapian bindings (e.g. OpenBSD 7.3) but
do provide Xapian development libraries.

Most of the C++ code is still C, as I'm not remotely familiar
with C++ compared to C.  I suspect many users and potential
hackers being from git, Linux kernel, and glibc world are in the
same boat.
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
1 files changed, 654 insertions, 0 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
new file mode 100644
index 00000000..52db92b7
--- /dev/null
+++ b/lib/PublicInbox/xap_helper.h
@@ -0,0 +1,654 @@
+ * Copyright (C) all contributors <meta@public-inbox.org>
+ * License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+ *
+ * Standalone helper process using C and minimal C++ for Xapian,
+ * this is not linked to Perl in any way.
+ * C (not C++) is used as much as possible to lower the contribution
+ * barrier for hackers who mainly know C (this includes the maintainer).
+ * Everything here is an unstable internal API of public-inbox and
+ * NOT intended for ordinary users; only public-inbox hackers
+ */
+#ifndef _ALL_SOURCE
+#        define _ALL_SOURCE
+#include <sys/resource.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/wait.h>
+#include <assert.h>
+#include <err.h> // BSD, glibc, and musl all have this
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <search.h>
+#include <stdio.h>
+#include <string.h>
+#include <sysexits.h>
+#include <unistd.h>
+#include <xapian.h> // our only reason for using C++
+#define MY_VER(maj,min,rev) ((maj) << 16 | (min) << 8 | (rev))
+#define XAP_VER \
+#if XAP_VER >= MY_VER(1,3,6)
+#        define NRP Xapian::NumberRangeProcessor
+#        define ADD_RP add_rangeprocessor
+#        define SET_MAX_EXPANSION set_max_expansion // technically 1.3.3
+#        define NRP Xapian::NumberValueRangeProcessor
+#        define ADD_RP add_valuerangeprocessor
+#        define SET_MAX_EXPANSION set_max_wildcard_expansion
+static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P
+static pid_t parent_pid;
+static FILE *orig_err = stderr;
+static void *srch_tree; // tsearch + tdelete + twalk
+static pid_t *worker_pids; // nr => pid
+static unsigned long nworker;
+// PublicInbox::Search and PublicInbox::CodeSearch generate these:
+static void mail_nrp_init(void);
+static void code_nrp_init(void);
+static void qp_init_mail_search(Xapian::QueryParser *);
+static void qp_init_code_search(Xapian::QueryParser *);
+struct srch {
+        int paths_len; // int for comparisons
+        unsigned qp_flags;
+        Xapian::Database *db;
+        Xapian::QueryParser *qp;
+        char paths[]; // $shard_path0\0$shard_path1\0...
+#define MY_ARG_MAX 256
+typedef bool (*cmd)(struct req *);
+// only one request per-process since we have RLIMIT_CPU timeout
+struct req { // argv and pfxv point into global rbuf
+        char *argv[MY_ARG_MAX];
+        char *pfxv[MY_ARG_MAX]; // -A <prefix>
+        struct srch *srch;
+        char *Oeidx_key;
+        cmd fn;
+        unsigned long long max;
+        unsigned long long off;
+        unsigned long timeout_sec;
+        long sort_col; // value column, negative means BoolWeight
+        int argc;
+        int pfxc;
+        FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional)
+        bool has_input; // fp[0] is bidirectional
+        bool collapse_threads;
+        bool code_search;
+        bool relevance; // sort by relevance before column
+        bool asc; // ascending sort
+struct worker {
+        pid_t pid;
+        unsigned nr;
+static bool has_threadid(const struct srch *srch)
+        return srch->db->get_metadata("has_threadid") == "1";
+static Xapian::Enquire prep_enquire(const struct req *req)
+        Xapian::Enquire enq(*req->srch->db);
+        if (req->sort_col < 0) {
+                enq.set_weighting_scheme(Xapian::BoolWeight());
+                enq.set_docid_order(req->asc ? Xapian::Enquire::ASCENDING
+                                        : Xapian::Enquire::DESCENDING);
+        } else if (req->relevance) {
+                enq.set_sort_by_relevance_then_value(req->sort_col, !req->asc);
+        } else {
+                enq.set_sort_by_value_then_relevance(req->sort_col, !req->asc);
+        }
+        return enq;
+static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
+        if (!req->max)
+                req->max = 50;
+        for (int i = 0; i < 9; i++) {
+                try {
+                        Xapian::MSet mset = enq->get_mset(req->off, req->max);
+                        return mset;
+                } catch (const Xapian::DatabaseModifiedError & e) {
+                        req->srch->db->reopen();
+                }
+        }
+        return enq->get_mset(req->off, req->max);
+static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
+        struct srch *srch = req->srch;
+        Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
+        if (req->Oeidx_key) {
+                req->Oeidx_key[0] = 'O'; // modifies static rbuf
+                fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key);
+                qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+                                        Xapian::Query(req->Oeidx_key));
+        }
+        Xapian::Enquire enq = prep_enquire(req);
+        enq.set_query(qry);
+        // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm
+        if (req->collapse_threads && has_threadid(srch))
+                enq.set_collapse_key(THREADID);
+        return enquire_mset(req, &enq);
+static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
+        return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
+static void dump_ibx_term(struct req *req, const char *pfx,
+                        Xapian::Document *doc, const char *ibx_id)
+        Xapian::TermIterator cur = doc->termlist_begin();
+        Xapian::TermIterator end = doc->termlist_end();
+        size_t pfx_len = strlen(pfx);
+        for (cur.skip_to(pfx); cur != end; cur++) {
+                std::string tn = *cur;
+                if (starts_with(&tn, pfx, pfx_len))
+                        fprintf(req->fp[0], "%s %s\n",
+                                tn.c_str() + pfx_len, ibx_id);
+        }
+static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
+        return setvbuf(fp, NULL, _IOLBF, 0);
+static bool cmd_dump_ibx(struct req *req)
+        if ((optind + 1) >= req->argc) {
+                warnx("usage: dump_ibx [OPTIONS] IBX_ID QRY_STR");
+                return false; // need ibx_id + qry_str
+        }
+        if (!req->pfxc) {
+                warnx("dump_ibx requires -A PREFIX");
+                return false;
+        }
+        const char *ibx_id = req->argv[optind];
+        if (my_setlinebuf(req->fp[0])) { // for sort(1) pipe
+                perror("setlinebuf(fp[0])");
+                return false;
+        }
+        req->asc = true;
+        req->sort_col = -1;
+        req->max = (unsigned long long)req->srch->db->get_doccount();
+        Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
+        for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+                try {
+                        Xapian::Document doc = i.get_document();
+                        for (int p = 0; p < req->pfxc; p++)
+                                dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
+                } catch (const Xapian::Error & e) {
+                        fprintf(orig_err, "W: %s (#%ld)\n",
+                                e.get_description().c_str(), (long)(*i));
+                        continue;
+                }
+        }
+        if (req->fp[1])
+                fprintf(req->fp[1], "mset.size=%llu\n",
+                        (unsigned long long)mset.size());
+        return true;
+// internal usage only
+static bool cmd_test_inspect(struct req *req)
+        fprintf(req->fp[0], "pid=%d has_threadid=%d",
+                (int)getpid(), has_threadid(req->srch) ? 1 : 0);
+        return true;
+#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n }
+static const struct cmd_entry {
+        size_t fn_len;
+        const char *fn_name;
+        cmd fn;
+} cmds[] = { // should be small enough to not need bsearch || gperf
+        // most common commands first
+        CMD(dump_ibx),
+        CMD(test_inspect), // least common commands last
+#define MY_ARRAY_SIZE(x)        (sizeof(x)/sizeof((x)[0]))
+#define RECV_FD_CAPA 2
+#define RECV_FD_SPACE        (RECV_FD_CAPA * sizeof(int))
+union my_cmsg {
+        struct cmsghdr hdr;
+        char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
+static void xclose(int fd)
+        if (close(fd) < 0 && errno != EINTR)
+                err(EXIT_FAILURE, "BUG: close");
+static bool recv_req(struct req *req, char *rbuf, size_t *len)
+        union my_cmsg cmsg = { 0 };
+        struct msghdr msg = { .msg_iovlen = 1 };
+        struct iovec iov;
+        iov.iov_base = rbuf;
+        iov.iov_len = *len;
+        msg.msg_iov = &iov;
+        msg.msg_control = &cmsg.hdr;
+        msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE);
+        ssize_t r = recvmsg(sock_fd, &msg, 0);
+        if (r < 0)
+                err(EXIT_FAILURE, "recvmsg");
+        if (r == 0)
+                exit(EX_NOINPUT); /* grandparent went away */
+        *len = r;
+        if (r > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
+                        cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+                size_t len = cmsg.hdr.cmsg_len;
+                int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
+                size_t i;
+                bool fd_ok = true;
+                for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) {
+                        int fd = *fdp++;
+                        const char *mode = NULL;
+                        int fl = fd_ok ? fcntl(fd, F_GETFL) : 0;
+                        switch (fl) {
+                        case 0: break; // hit previous error
+                        case -1:
+                                warnx("invalid fd=%d", fd);
+                                fd_ok = false;
+                                break;
+                        case O_WRONLY: mode = "w"; break;
+                        case O_RDWR:
+                                mode = "r+";
+                                if (i == 0) req->has_input = true;
+                                break;
+                        default:
+                                warnx("invalid mode from F_GETFL: 0x%x", fl);
+                                fd_ok = false;
+                        }
+                        if (!fd_ok) {
+                                xclose(fd);
+                        } else {
+                                req->fp[i] = fdopen(fd, mode);
+                                if (!req->fp[i]) {
+                                        warn("fdopen(fd=%d)", fd);
+                                        fd_ok = false;
+                                }
+                        }
+                }
+                for (i = 0; !fd_ok && i < MY_ARRAY_SIZE(req->fp); i++)
+                        if (req->fp[i]) fclose(req->fp[i]);
+                return fd_ok;
+        }
+        warnx("no FD received in %zd-byte request", r);
+        return false;
+#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
+static int split2argv(char **dst, char *buf, size_t len, size_t limit)
+        if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
+                warnx("bogus argument given");
+                return 0;
+        }
+        size_t nr = 0;
+        char *c = buf;
+        for (size_t i = 1; i < len; i++) {
+                if (!buf[i]) {
+                        dst[nr++] = c;
+                        c = buf + i + 1;
+                }
+                if (nr == limit) {
+                        warnx("too many args: %zu", nr);
+                        return 0;
+                }
+        }
+        return (int)nr;
+static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
+        const struct srch *a = (const struct srch *)pa;
+        const struct srch *b = (const struct srch *)pb;
+        int diff = a->paths_len - b->paths_len;
+        return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len);
+static bool is_chert(const char *dir)
+        char iamchert[PATH_MAX];
+        struct stat sb;
+        int rc = snprintf(iamchert, sizeof(iamchert), "%s/iamchert", dir);
+        if (rc <= 0 || rc >= (int)sizeof(iamchert))
+                err(EXIT_FAILURE, "BUG: snprintf(%s/iamchert)", dir);
+        if (stat(iamchert, &sb) == 0 && S_ISREG(sb.st_mode))
+                return true;
+        return false;
+static bool srch_init(struct req *req)
+        char *dirv[MY_ARG_MAX];
+        int i;
+        struct srch *srch = req->srch;
+        int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
+        const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
+        srch->qp_flags = FLAG_PHRASE |
+                        Xapian::QueryParser::FLAG_BOOLEAN |
+                        Xapian::QueryParser::FLAG_LOVEHATE |
+                        Xapian::QueryParser::FLAG_WILDCARD;
+        if (is_chert(dirv[0]))
+                srch->qp_flags &= ~FLAG_PHRASE;
+        try {
+                srch->db = new Xapian::Database(dirv[0]);
+        } catch (...) {
+                warn("E: Xapian::Database(%s)", dirv[0]);
+                return false;
+        }
+        try {
+                for (i = 1; i < dirc; i++) {
+                        if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i]))
+                                srch->qp_flags &= ~FLAG_PHRASE;
+                        srch->db->add_database(Xapian::Database(dirv[i]));
+                }
+        } catch (...) {
+                warn("E: add_database(%s)", dirv[i]);
+                return false;
+        }
+        try {
+                srch->qp = new Xapian::QueryParser;
+        } catch (...) {
+                perror("E: Xapian::QueryParser");
+                return false;
+        }
+        srch->qp->set_default_op(Xapian::Query::OP_AND);
+        srch->qp->set_database(*srch->db);
+        try {
+                srch->qp->set_stemmer(Xapian::Stem("english"));
+        } catch (...) {
+                perror("E: Xapian::Stem");
+                return false;
+        }
+        srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME);
+        srch->qp->SET_MAX_EXPANSION(100);
+        if (req->code_search)
+                qp_init_code_search(srch->qp); // CodeSearch.pm
+        else
+                qp_init_mail_search(srch->qp); // Search.pm
+        return true;
+static void free_srch(void *p) // tdestroy
+        struct srch *srch = (struct srch *)p;
+        delete srch->qp;
+        delete srch->db;
+        free(srch);
+static void dispatch(struct req *req)
+        int c;
+        size_t size = strlen(req->argv[0]);
+        union {
+                struct srch *srch;
+                char *ptr;
+        } fbuf;
+        char *end;
+        FILE *kfp;
+        struct srch **s;
+        req->fn = NULL;
+        for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) {
+                if (cmds[c].fn_len == size &&
+                        !memcmp(cmds[c].fn_name, req->argv[0], size)) {
+                        req->fn = cmds[c].fn;
+                        break;
+                }
+        }
+        if (!req->fn) goto cmd_err;
+        kfp = open_memstream(&fbuf.ptr, &size);
+        // write padding, first
+        fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp);
+        // global getopt variables:
+        optind = 1;
+        opterr = optopt = 0;
+        optarg = NULL;
+        // keep sync with @PublicInbox::XapHelper::SPEC
+        while ((c = getopt(req->argc, req->argv, "acd:k:m:o:rtA:O:T:")) != -1) {
+                switch (c) {
+                case 'a': req->asc = true; break;
+                case 'c': req->code_search = true; break;
+                case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break;
+                case 'k':
+                        req->sort_col = strtol(optarg, &end, 10);
+                        if (*end) goto cmd_err;
+                        switch (req->sort_col) {
+                        case LONG_MAX: case LONG_MIN: goto cmd_err;
+                        }
+                        break;
+                case 'm':
+                        req->max = strtoull(optarg, &end, 10);
+                        if (*end) goto cmd_err;
+                        if (req->max == ULLONG_MAX) goto cmd_err;
+                        break;
+                case 'o':
+                        req->off = strtoull(optarg, &end, 10);
+                        if (*end) goto cmd_err;
+                        if (req->off == ULLONG_MAX) goto cmd_err;
+                        break;
+                case 'r': req->relevance = true; break;
+                case 't': req->collapse_threads = true; break;
+                case 'A':
+                        req->pfxv[req->pfxc++] = optarg;
+                        if (MY_ARG_MAX == req->pfxc) goto cmd_err;
+                        break;
+                case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix
+                case 'T':
+                        req->timeout_sec = strtoul(optarg, &end, 10);
+                        if (*end) goto cmd_err;
+                        if (req->timeout_sec == ULONG_MAX) goto cmd_err;
+                        break;
+                default: goto cmd_err;
+                }
+        }
+        if (ferror(kfp) | fclose(kfp)) {
+                perror("ferror|fclose");
+                goto cmd_err;
+        }
+        fbuf.srch->db = NULL;
+        fbuf.srch->qp = NULL;
+        fbuf.srch->paths_len = size - offsetof(struct srch, paths);
+        if (fbuf.srch->paths_len <= 0) {
+                free_srch(fbuf.srch);
+                warnx("no -d args");
+                goto cmd_err;
+        }
+        s = (struct srch **)tsearch(fbuf.srch, &srch_tree, srch_cmp);
+        if (!s) {
+                perror("tsearch");
+                goto cmd_err;
+        }
+        req->srch = *s;
+        if (req->srch != fbuf.srch) { // reuse existing
+                free_srch(fbuf.srch);
+        } else if (!srch_init(req)) {
+                assert(fbuf.srch == *((struct srch **)tfind(
+                                        fbuf.srch, &srch_tree, srch_cmp)));
+                void *del = tdelete(fbuf.srch, &srch_tree, srch_cmp);
+                assert(del);
+                free_srch(fbuf.srch);
+                goto cmd_err;
+        }
+        try {
+                if (!req->fn(req))
+                        goto cmd_err;
+        } catch (const Xapian::Error & e) {
+                warnx("Xapian::Error: %s", e.get_description().c_str());
+        } catch (...) {
+                warn("unhandled exception");
+        }
+        return; // just be silent on errors, for now
+static void cleanup_pids(void)
+        free(worker_pids);
+        worker_pids = NULL;
+static void recv_loop(void) // worker process loop
+        static char rbuf[4096 * 33]; // per-process
+        while (!parent_pid || getppid() == parent_pid) {
+                size_t len = sizeof(rbuf);
+                struct req req = { 0 };
+                if (!recv_req(&req, rbuf, &len))
+                        continue;
+                if (req.fp[1]) {
+                        if (my_setlinebuf(req.fp[1]))
+                                perror("W: setlinebuf(req.fp[1])");
+                        stderr = req.fp[1];
+                }
+                req.argc = SPLIT2ARGV(req.argv, rbuf, len);
+                if (req.argc > 0)
+                        dispatch(&req);
+                if (ferror(req.fp[0]) | fclose(req.fp[0]))
+                        perror("ferror|fclose fp[0]");
+                if (req.fp[1]) {
+                        stderr = orig_err;
+                        if (ferror(req.fp[1]) | fclose(req.fp[1]))
+                                perror("ferror|fclose fp[1]");
+                }
+        }
+static void insert_pid(pid_t pid, unsigned nr)
+        assert(!worker_pids[nr]);
+        worker_pids[nr] = pid;
+static int delete_pid(pid_t pid)
+        for (unsigned nr = 0; nr < nworker; nr++) {
+                if (worker_pids[nr] == pid) {
+                        worker_pids[nr] = 0;
+                        return nr;
+                }
+        }
+        warnx("W: unknown pid=%d reaped", (int)pid);
+        return -1;
+static void start_worker(unsigned nr)
+        pid_t pid = fork();
+        if (pid < 0) {
+                warn("E: fork(worker=%u)", nr);
+        } else if (pid > 0) {
+                insert_pid(pid, nr);
+        } else {
+                cleanup_pids();
+                recv_loop();
+                exit(0);
+        }
+static void cleanup_all(void)
+        cleanup_pids();
+#ifdef __GLIBC__
+        tdestroy(srch_tree, free_srch);
+        srch_tree = NULL;
+int main(int argc, char *argv[])
+        int c;
+        mail_nrp_init();
+        code_nrp_init();
+        atexit(cleanup_all);
+        nworker = 0;
+        long j = sysconf(_SC_NPROCESSORS_ONLN);
+        if (j > 0)
+                nworker = j > UCHAR_MAX ? UCHAR_MAX : j;
+        // make warn/warnx/err multi-process friendly:
+        if (my_setlinebuf(stderr))
+                err(EXIT_FAILURE, "setlinebuf(stderr)");
+        // not using -W<workers> like Daemon.pm, since -W is reserved (glibc)
+        while ((c = getopt(argc, argv, "j:")) != -1) {
+                char *end;
+                switch (c) {
+                case 'j':
+                        nworker = strtoul(optarg, &end, 10);
+                        if (*end != 0 || nworker > USHRT_MAX)
+                                errx(EXIT_FAILURE, "-j %s invalid", optarg);
+                        break;
+                case ':':
+                        errx(EXIT_FAILURE, "missing argument: `-%c'", optopt);
+                case '?':
+                        errx(EXIT_FAILURE, "unrecognized: `-%c'", optopt);
+                default:
+                        errx(EXIT_FAILURE, "BUG: `-%c'", c);
+                }
+        }
+        if (nworker == 0) {
+                recv_loop();
+        } else {
+                parent_pid = getpid();
+                worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
+                if (!worker_pids)
+                        err(EXIT_FAILURE, "calloc");
+                for (unsigned i = 0; i < nworker; i++)
+                        start_worker(i);
+                int st;
+                pid_t pid;
+                bool quit = false;
+                while ((pid = wait(&st)) > 0) {
+                        int nr = delete_pid(pid);
+                        if (nr < 0) continue;
+                        if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
+                                quit = true;
+                        if (!quit)
+                                start_worker(nr);
+                }
+        }
+        return 0;