about summary refs log tree commit homepage
path: root/lib/PublicInbox/xap_helper.h
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
-rw-r--r--lib/PublicInbox/xap_helper.h109
1 files changed, 97 insertions, 12 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index 3456910b..a30a8768 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -27,6 +27,7 @@
 #include <sys/types.h>
 #include <sys/uio.h>
 #include <sys/wait.h>
+#include <poll.h>
 
 #include <assert.h>
 #include <err.h> // BSD, glibc, and musl all have this
@@ -95,6 +96,8 @@ static pid_t *worker_pids; // nr => pid
 #define WORKER_MAX USHRT_MAX
 static unsigned long nworker, nworker_hwm;
 static int pipefds[2];
+static const char *stdout_path, *stderr_path; // for SIGUSR1
+static sig_atomic_t worker_needs_reopen;
 
 // PublicInbox::Search and PublicInbox::CodeSearch generate these:
 static void mail_nrp_init(void);
@@ -111,6 +114,7 @@ enum exc_iter {
 struct srch {
         int paths_len; // int for comparisons
         unsigned qp_flags;
+        bool qp_extra_done;
         Xapian::Database *db;
         Xapian::QueryParser *qp;
         char paths[]; // $shard_path0\0$shard_path1\0...
@@ -123,6 +127,7 @@ typedef bool (*cmd)(struct req *);
 struct req { // argv and pfxv point into global rbuf
         char *argv[MY_ARG_MAX];
         char *pfxv[MY_ARG_MAX]; // -A <prefix>
+        char *qpfxv[MY_ARG_MAX]; // -Q <user_prefix>[:=]<INTERNAL_PREFIX>
         size_t *lenv; // -A <prefix>LENGTH
         struct srch *srch;
         char *Pgit_dir;
@@ -136,13 +141,12 @@ struct req { // argv and pfxv point into global rbuf
         long sort_col; // value column, negative means BoolWeight
         int argc;
         int pfxc;
+        int qpfxc;
         FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional)
         bool has_input; // fp[0] is bidirectional
         bool collapse_threads;
         bool code_search;
         bool relevance; // sort by relevance before column
-        bool emit_percent;
-        bool emit_docdata;
         bool asc; // ascending sort
 };
 
@@ -226,6 +230,13 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
                 qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
                                         Xapian::Query(req->Oeidx_key));
         }
+        // TODO: uid_range
+        if (req->threadid != ULLONG_MAX) {
+                std::string tid = Xapian::sortable_serialise(req->threadid);
+                qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+                        Xapian::Query(Xapian::Query::OP_VALUE_RANGE, THREADID,
+                                        tid, tid));
+        }
         Xapian::Enquire enq = prep_enquire(req);
         enq.set_query(qry);
         // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm
@@ -406,6 +417,11 @@ static bool cmd_test_inspect(struct req *req)
         return false;
 }
 
+static bool cmd_test_sleep(struct req *req)
+{
+        for (;;) poll(NULL, 0, 10);
+        return false;
+}
 #include "xh_mset.h" // read-only (WWW, IMAP, lei) stuff
 #include "xh_cidx.h" // CodeSearchIdx.pm stuff
 
@@ -420,6 +436,7 @@ static const struct cmd_entry {
         CMD(dump_ibx), // many inboxes
         CMD(dump_roots), // per-cidx shard
         CMD(test_inspect), // least common commands last
+        CMD(test_sleep), // least common commands last
 };
 
 #define MY_ARRAY_SIZE(x)        (sizeof(x)/sizeof((x)[0]))
@@ -570,6 +587,31 @@ static bool srch_init(struct req *req)
         return true;
 }
 
+// setup query parser for altid and arbitrary headers
+static void srch_init_extra(struct req *req)
+{
+        const char *XPFX;
+        for (int i = 0; i < req->qpfxc; i++) {
+                size_t len = strlen(req->qpfxv[i]);
+                char *c = (char *)memchr(req->qpfxv[i], '=', len);
+
+                if (c) { // it's boolean "gmane=XGMANE"
+                        XPFX = c + 1;
+                        *c = 0;
+                        req->srch->qp->add_boolean_prefix(req->qpfxv[i], XPFX);
+                        continue;
+                }
+                // maybe it's a non-boolean prefix "blob:XBLOBID"
+                c = (char *)memchr(req->qpfxv[i], ':', len);
+                if (!c)
+                        errx(EXIT_FAILURE, "bad -Q %s", req->qpfxv[i]);
+                XPFX = c + 1;
+                *c = 0;
+                req->srch->qp->add_prefix(req->qpfxv[i], XPFX);
+        }
+        req->srch->qp_extra_done = true;
+}
+
 static void free_srch(void *p) // tdestroy
 {
         struct srch *srch = (struct srch *)p;
@@ -633,7 +675,6 @@ static void dispatch(struct req *req)
                         if (*end || req->off == ULLONG_MAX)
                                 ABORT("-o %s", optarg);
                         break;
-                case 'p': req->emit_percent = true; break;
                 case 'r': req->relevance = true; break;
                 case 't': req->collapse_threads = true; break;
                 case 'A':
@@ -641,7 +682,6 @@ static void dispatch(struct req *req)
                         if (MY_ARG_MAX == req->pfxc)
                                 ABORT("too many -A");
                         break;
-                case 'D': req->emit_docdata = true; break;
                 case 'K':
                         req->timeout_sec = strtoul(optarg, &end, 10);
                         if (*end || req->timeout_sec == ULONG_MAX)
@@ -653,12 +693,17 @@ static void dispatch(struct req *req)
                         if (*end || req->threadid == ULLONG_MAX)
                                 ABORT("-T %s", optarg);
                         break;
+                case 'Q':
+                        req->qpfxv[req->qpfxc++] = optarg;
+                        if (MY_ARG_MAX == req->qpfxc) ABORT("too many -Q");
+                        break;
                 default: ABORT("bad switch `-%c'", c);
                 }
         }
         ERR_CLOSE(kfp, EXIT_FAILURE); // may ENOMEM, sets kbuf.srch
         kbuf.srch->db = NULL;
         kbuf.srch->qp = NULL;
+        kbuf.srch->qp_extra_done = false;
         kbuf.srch->paths_len = size - offsetof(struct srch, paths);
         if (kbuf.srch->paths_len <= 0)
                 ABORT("no -d args");
@@ -667,6 +712,7 @@ static void dispatch(struct req *req)
         req->srch = *s;
         if (req->srch != kbuf.srch) { // reuse existing
                 free_srch(kbuf.srch);
+                req->srch->db->reopen();
         } else if (!srch_init(req)) {
                 assert(kbuf.srch == *((struct srch **)tfind(
                                         kbuf.srch, &srch_tree, srch_cmp)));
@@ -675,6 +721,11 @@ static void dispatch(struct req *req)
                 free_srch(kbuf.srch);
                 goto cmd_err; // srch_init already warned
         }
+        if (req->qpfxc && !req->srch->qp_extra_done)
+                srch_init_extra(req);
+        if (req->timeout_sec)
+                alarm(req->timeout_sec > UINT_MAX ?
+                        UINT_MAX : (unsigned)req->timeout_sec);
         try {
                 if (!req->fn(req))
                         warnx("`%s' failed", req->argv[0]);
@@ -683,6 +734,8 @@ static void dispatch(struct req *req)
         } catch (...) {
                 warn("unhandled exception");
         }
+        if (req->timeout_sec)
+                alarm(0);
 cmd_err:
         return; // just be silent on errors, for now
 }
@@ -723,9 +776,12 @@ static void stderr_restore(FILE *tmp_err)
         clearerr(stderr);
 }
 
-static void sigw(int sig) // SIGTERM handler for worker
+static void sigw(int sig) // SIGTERM+SIGUSR1 handler for worker
 {
-        sock_fd = -1; // break out of recv_loop
+        switch (sig) {
+        case SIGUSR1: worker_needs_reopen = 1; break;
+        default: sock_fd = -1; // break out of recv_loop
+        }
 }
 
 #define CLEANUP_REQ __attribute__((__cleanup__(req_cleanup)))
@@ -735,6 +791,18 @@ static void req_cleanup(void *ptr)
         free(req->lenv);
 }
 
+static void reopen_logs(void)
+{
+        if (stdout_path && *stdout_path && !freopen(stdout_path, "a", stdout))
+                err(EXIT_FAILURE, "freopen %s", stdout_path);
+        if (stderr_path && *stderr_path) {
+                if (!freopen(stderr_path, "a", stderr))
+                        err(EXIT_FAILURE, "freopen %s", stderr_path);
+                if (my_setlinebuf(stderr))
+                        err(EXIT_FAILURE, "setlinebuf(stderr)");
+        }
+}
+
 static void recv_loop(void) // worker process loop
 {
         static char rbuf[4096 * 33]; // per-process
@@ -742,6 +810,7 @@ static void recv_loop(void) // worker process loop
         sa.sa_handler = sigw;
 
         CHECK(int, 0, sigaction(SIGTERM, &sa, NULL));
+        CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL));
 
         while (sock_fd == 0) {
                 size_t len = sizeof(rbuf);
@@ -758,6 +827,10 @@ static void recv_loop(void) // worker process loop
                         stderr_restore(req.fp[1]);
                         ERR_CLOSE(req.fp[1], 0);
                 }
+                if (worker_needs_reopen) {
+                        worker_needs_reopen = 0;
+                        reopen_logs();
+                }
         }
 }
 
@@ -810,6 +883,16 @@ static void cleanup_all(void)
 #endif
 }
 
+static void parent_reopen_logs(void)
+{
+        reopen_logs();
+        for (unsigned long nr = nworker; nr < nworker_hwm; nr++) {
+                pid_t pid = worker_pids[nr];
+                if (pid != 0 && kill(pid, SIGUSR1))
+                        warn("BUG?: kill(%d, SIGUSR1)", (int)pid);
+        }
+}
+
 static void sigp(int sig) // parent signal handler
 {
         static const char eagain[] = "signals coming in too fast";
@@ -822,6 +905,7 @@ static void sigp(int sig) // parent signal handler
         case SIGCHLD: c = '.'; break;
         case SIGTTOU: c = '-'; break;
         case SIGTTIN: c = '+'; break;
+        case SIGUSR1: c = '#'; break;
         default:
                 write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1);
                 _exit(EXIT_FAILURE);
@@ -928,6 +1012,8 @@ int main(int argc, char *argv[])
 {
         int c;
         socklen_t slen = (socklen_t)sizeof(c);
+        stdout_path = getenv("STDOUT_PATH");
+        stderr_path = getenv("STDERR_PATH");
 
         if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen))
                 err(EXIT_FAILURE, "getsockopt");
@@ -945,12 +1031,6 @@ int main(int argc, char *argv[])
         }
 
         nworker = 1;
-#ifdef _SC_NPROCESSORS_ONLN
-        long j = sysconf(_SC_NPROCESSORS_ONLN);
-        if (j > 0)
-                nworker = j > WORKER_MAX ? WORKER_MAX : j;
-#endif // _SC_NPROCESSORS_ONLN
-
         // make warn/warnx/err multi-process friendly:
         if (my_setlinebuf(stderr))
                 err(EXIT_FAILURE, "setlinebuf(stderr)");
@@ -992,6 +1072,8 @@ int main(int argc, char *argv[])
         DELSET(SIGXCPU);
         DELSET(SIGXFSZ);
 #undef DELSET
+        CHECK(int, 0, sigdelset(&workerset, SIGUSR1));
+        CHECK(int, 0, sigdelset(&fullset, SIGALRM));
 
         if (nworker == 0) { // no SIGTERM handling w/o workers
                 recv_loop();
@@ -1012,10 +1094,12 @@ int main(int argc, char *argv[])
         CHECK(int, 0, sigdelset(&pset, SIGCHLD));
         CHECK(int, 0, sigdelset(&pset, SIGTTIN));
         CHECK(int, 0, sigdelset(&pset, SIGTTOU));
+        CHECK(int, 0, sigdelset(&pset, SIGUSR1));
 
         struct sigaction sa = {};
         sa.sa_handler = sigp;
 
+        CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL));
         CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL));
         CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL));
         sa.sa_flags = SA_NOCLDSTOP;
@@ -1040,6 +1124,7 @@ int main(int argc, char *argv[])
                         case '.': break; // do_sigchld already called
                         case '-': do_sigttou(); break;
                         case '+': do_sigttin(); break;
+                        case '#': parent_reopen_logs(); break;
                         default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
                         }
                 }