diff options
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
-rw-r--r-- | lib/PublicInbox/xap_helper.h | 109 |
1 files changed, 97 insertions, 12 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 3456910b..a30a8768 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -27,6 +27,7 @@ #include <sys/types.h> #include <sys/uio.h> #include <sys/wait.h> +#include <poll.h> #include <assert.h> #include <err.h> // BSD, glibc, and musl all have this @@ -95,6 +96,8 @@ static pid_t *worker_pids; // nr => pid #define WORKER_MAX USHRT_MAX static unsigned long nworker, nworker_hwm; static int pipefds[2]; +static const char *stdout_path, *stderr_path; // for SIGUSR1 +static sig_atomic_t worker_needs_reopen; // PublicInbox::Search and PublicInbox::CodeSearch generate these: static void mail_nrp_init(void); @@ -111,6 +114,7 @@ enum exc_iter { struct srch { int paths_len; // int for comparisons unsigned qp_flags; + bool qp_extra_done; Xapian::Database *db; Xapian::QueryParser *qp; char paths[]; // $shard_path0\0$shard_path1\0... @@ -123,6 +127,7 @@ typedef bool (*cmd)(struct req *); struct req { // argv and pfxv point into global rbuf char *argv[MY_ARG_MAX]; char *pfxv[MY_ARG_MAX]; // -A <prefix> + char *qpfxv[MY_ARG_MAX]; // -Q <user_prefix>[:=]<INTERNAL_PREFIX> size_t *lenv; // -A <prefix>LENGTH struct srch *srch; char *Pgit_dir; @@ -136,13 +141,12 @@ struct req { // argv and pfxv point into global rbuf long sort_col; // value column, negative means BoolWeight int argc; int pfxc; + int qpfxc; FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional) bool has_input; // fp[0] is bidirectional bool collapse_threads; bool code_search; bool relevance; // sort by relevance before column - bool emit_percent; - bool emit_docdata; bool asc; // ascending sort }; @@ -226,6 +230,13 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str) qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, Xapian::Query(req->Oeidx_key)); } + // TODO: uid_range + if (req->threadid != ULLONG_MAX) { + std::string tid = Xapian::sortable_serialise(req->threadid); + qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, + Xapian::Query(Xapian::Query::OP_VALUE_RANGE, THREADID, + tid, tid)); + } Xapian::Enquire enq = prep_enquire(req); enq.set_query(qry); // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm @@ -406,6 +417,11 @@ static bool cmd_test_inspect(struct req *req) return false; } +static bool cmd_test_sleep(struct req *req) +{ + for (;;) poll(NULL, 0, 10); + return false; +} #include "xh_mset.h" // read-only (WWW, IMAP, lei) stuff #include "xh_cidx.h" // CodeSearchIdx.pm stuff @@ -420,6 +436,7 @@ static const struct cmd_entry { CMD(dump_ibx), // many inboxes CMD(dump_roots), // per-cidx shard CMD(test_inspect), // least common commands last + CMD(test_sleep), // least common commands last }; #define MY_ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0])) @@ -570,6 +587,31 @@ static bool srch_init(struct req *req) return true; } +// setup query parser for altid and arbitrary headers +static void srch_init_extra(struct req *req) +{ + const char *XPFX; + for (int i = 0; i < req->qpfxc; i++) { + size_t len = strlen(req->qpfxv[i]); + char *c = (char *)memchr(req->qpfxv[i], '=', len); + + if (c) { // it's boolean "gmane=XGMANE" + XPFX = c + 1; + *c = 0; + req->srch->qp->add_boolean_prefix(req->qpfxv[i], XPFX); + continue; + } + // maybe it's a non-boolean prefix "blob:XBLOBID" + c = (char *)memchr(req->qpfxv[i], ':', len); + if (!c) + errx(EXIT_FAILURE, "bad -Q %s", req->qpfxv[i]); + XPFX = c + 1; + *c = 0; + req->srch->qp->add_prefix(req->qpfxv[i], XPFX); + } + req->srch->qp_extra_done = true; +} + static void free_srch(void *p) // tdestroy { struct srch *srch = (struct srch *)p; @@ -633,7 +675,6 @@ static void dispatch(struct req *req) if (*end || req->off == ULLONG_MAX) ABORT("-o %s", optarg); break; - case 'p': req->emit_percent = true; break; case 'r': req->relevance = true; break; case 't': req->collapse_threads = true; break; case 'A': @@ -641,7 +682,6 @@ static void dispatch(struct req *req) if (MY_ARG_MAX == req->pfxc) ABORT("too many -A"); break; - case 'D': req->emit_docdata = true; break; case 'K': req->timeout_sec = strtoul(optarg, &end, 10); if (*end || req->timeout_sec == ULONG_MAX) @@ -653,12 +693,17 @@ static void dispatch(struct req *req) if (*end || req->threadid == ULLONG_MAX) ABORT("-T %s", optarg); break; + case 'Q': + req->qpfxv[req->qpfxc++] = optarg; + if (MY_ARG_MAX == req->qpfxc) ABORT("too many -Q"); + break; default: ABORT("bad switch `-%c'", c); } } ERR_CLOSE(kfp, EXIT_FAILURE); // may ENOMEM, sets kbuf.srch kbuf.srch->db = NULL; kbuf.srch->qp = NULL; + kbuf.srch->qp_extra_done = false; kbuf.srch->paths_len = size - offsetof(struct srch, paths); if (kbuf.srch->paths_len <= 0) ABORT("no -d args"); @@ -667,6 +712,7 @@ static void dispatch(struct req *req) req->srch = *s; if (req->srch != kbuf.srch) { // reuse existing free_srch(kbuf.srch); + req->srch->db->reopen(); } else if (!srch_init(req)) { assert(kbuf.srch == *((struct srch **)tfind( kbuf.srch, &srch_tree, srch_cmp))); @@ -675,6 +721,11 @@ static void dispatch(struct req *req) free_srch(kbuf.srch); goto cmd_err; // srch_init already warned } + if (req->qpfxc && !req->srch->qp_extra_done) + srch_init_extra(req); + if (req->timeout_sec) + alarm(req->timeout_sec > UINT_MAX ? + UINT_MAX : (unsigned)req->timeout_sec); try { if (!req->fn(req)) warnx("`%s' failed", req->argv[0]); @@ -683,6 +734,8 @@ static void dispatch(struct req *req) } catch (...) { warn("unhandled exception"); } + if (req->timeout_sec) + alarm(0); cmd_err: return; // just be silent on errors, for now } @@ -723,9 +776,12 @@ static void stderr_restore(FILE *tmp_err) clearerr(stderr); } -static void sigw(int sig) // SIGTERM handler for worker +static void sigw(int sig) // SIGTERM+SIGUSR1 handler for worker { - sock_fd = -1; // break out of recv_loop + switch (sig) { + case SIGUSR1: worker_needs_reopen = 1; break; + default: sock_fd = -1; // break out of recv_loop + } } #define CLEANUP_REQ __attribute__((__cleanup__(req_cleanup))) @@ -735,6 +791,18 @@ static void req_cleanup(void *ptr) free(req->lenv); } +static void reopen_logs(void) +{ + if (stdout_path && *stdout_path && !freopen(stdout_path, "a", stdout)) + err(EXIT_FAILURE, "freopen %s", stdout_path); + if (stderr_path && *stderr_path) { + if (!freopen(stderr_path, "a", stderr)) + err(EXIT_FAILURE, "freopen %s", stderr_path); + if (my_setlinebuf(stderr)) + err(EXIT_FAILURE, "setlinebuf(stderr)"); + } +} + static void recv_loop(void) // worker process loop { static char rbuf[4096 * 33]; // per-process @@ -742,6 +810,7 @@ static void recv_loop(void) // worker process loop sa.sa_handler = sigw; CHECK(int, 0, sigaction(SIGTERM, &sa, NULL)); + CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL)); while (sock_fd == 0) { size_t len = sizeof(rbuf); @@ -758,6 +827,10 @@ static void recv_loop(void) // worker process loop stderr_restore(req.fp[1]); ERR_CLOSE(req.fp[1], 0); } + if (worker_needs_reopen) { + worker_needs_reopen = 0; + reopen_logs(); + } } } @@ -810,6 +883,16 @@ static void cleanup_all(void) #endif } +static void parent_reopen_logs(void) +{ + reopen_logs(); + for (unsigned long nr = nworker; nr < nworker_hwm; nr++) { + pid_t pid = worker_pids[nr]; + if (pid != 0 && kill(pid, SIGUSR1)) + warn("BUG?: kill(%d, SIGUSR1)", (int)pid); + } +} + static void sigp(int sig) // parent signal handler { static const char eagain[] = "signals coming in too fast"; @@ -822,6 +905,7 @@ static void sigp(int sig) // parent signal handler case SIGCHLD: c = '.'; break; case SIGTTOU: c = '-'; break; case SIGTTIN: c = '+'; break; + case SIGUSR1: c = '#'; break; default: write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1); _exit(EXIT_FAILURE); @@ -928,6 +1012,8 @@ int main(int argc, char *argv[]) { int c; socklen_t slen = (socklen_t)sizeof(c); + stdout_path = getenv("STDOUT_PATH"); + stderr_path = getenv("STDERR_PATH"); if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen)) err(EXIT_FAILURE, "getsockopt"); @@ -945,12 +1031,6 @@ int main(int argc, char *argv[]) } nworker = 1; -#ifdef _SC_NPROCESSORS_ONLN - long j = sysconf(_SC_NPROCESSORS_ONLN); - if (j > 0) - nworker = j > WORKER_MAX ? WORKER_MAX : j; -#endif // _SC_NPROCESSORS_ONLN - // make warn/warnx/err multi-process friendly: if (my_setlinebuf(stderr)) err(EXIT_FAILURE, "setlinebuf(stderr)"); @@ -992,6 +1072,8 @@ int main(int argc, char *argv[]) DELSET(SIGXCPU); DELSET(SIGXFSZ); #undef DELSET + CHECK(int, 0, sigdelset(&workerset, SIGUSR1)); + CHECK(int, 0, sigdelset(&fullset, SIGALRM)); if (nworker == 0) { // no SIGTERM handling w/o workers recv_loop(); @@ -1012,10 +1094,12 @@ int main(int argc, char *argv[]) CHECK(int, 0, sigdelset(&pset, SIGCHLD)); CHECK(int, 0, sigdelset(&pset, SIGTTIN)); CHECK(int, 0, sigdelset(&pset, SIGTTOU)); + CHECK(int, 0, sigdelset(&pset, SIGUSR1)); struct sigaction sa = {}; sa.sa_handler = sigp; + CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL)); CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL)); CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL)); sa.sa_flags = SA_NOCLDSTOP; @@ -1040,6 +1124,7 @@ int main(int argc, char *argv[]) case '.': break; // do_sigchld already called case '-': do_sigttou(); break; case '+': do_sigttin(); break; + case '#': parent_reopen_logs(); break; default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]); } } |