about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-11-28 14:56:19 +0000
committerEric Wong <e@80x24.org>2023-11-29 02:13:20 +0000
commit87b7f633f2414a76c55f84da73cd7dd43f964533 (patch)
treea1ef018d3bcbd522171abb28971c41a250a917f3
parenta6abd43b2df02f258d5fc3493ce185f76dd98cd9 (diff)
downloadpublic-inbox-87b7f633f2414a76c55f84da73cd7dd43f964533.tar.gz
The C++ version will allow us to take full advantage of Xapian's
APIs for better queries, and the Perl bindings version can still
be advantageous in the future since we'll be able to support
timeouts effectively.
-rw-r--r--MANIFEST1
-rw-r--r--Makefile.PL8
-rw-r--r--lib/PublicInbox/Search.pm25
-rw-r--r--lib/PublicInbox/XapHelper.pm51
-rw-r--r--lib/PublicInbox/XapHelperCxx.pm6
-rw-r--r--lib/PublicInbox/xap_helper.h110
-rw-r--r--lib/PublicInbox/xh_cidx.h37
-rw-r--r--lib/PublicInbox/xh_mset.h96
-rw-r--r--t/cindex.t52
-rw-r--r--t/xap_helper.t49
10 files changed, 363 insertions, 72 deletions
diff --git a/MANIFEST b/MANIFEST
index bbbe0b91..7b6178f9 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -379,6 +379,7 @@ lib/PublicInbox/Xapcmd.pm
 lib/PublicInbox/gcf2_libgit2.h
 lib/PublicInbox/xap_helper.h
 lib/PublicInbox/xh_cidx.h
+lib/PublicInbox/xh_mset.h
 sa_config/Makefile
 sa_config/README
 sa_config/root/etc/spamassassin/public-inbox.pre
diff --git a/Makefile.PL b/Makefile.PL
index 38e030f5..28f8263e 100644
--- a/Makefile.PL
+++ b/Makefile.PL
@@ -273,14 +273,16 @@ pm_to_blib : lib/PublicInbox.pm
 lib/PublicInbox.pm : FORCE
         VERSION=\$(VERSION) \$(PERL) -w ./version-gen.perl
 
+XH_TESTS = t/xap_helper.t t/cindex.t
+
 test-asan : pure_all
-        TEST_XH_CXX_ONLY=1 CXXFLAGS='-O0 -Wall -ggdb3 -fsanitize=address' \\
-                prove -bvw t/xap_helper.t
+        TEST_XH_CXX_ONLY=1 CXXFLAGS='-Wall -ggdb3 -fsanitize=address' \\
+                prove -bvw \$(XH_TESTS)
 
 VG_OPT = -v --trace-children=yes --track-fds=yes
 VG_OPT += --leak-check=yes --track-origins=yes
 test-valgrind : pure_all
         TEST_XH_CXX_ONLY=1 VALGRIND="valgrind \$(VG_OPT)" \\
-                prove -bvw t/xap_helper.t
+                prove -bvw \$(XH_TESTS)
 EOF
 }
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 477f77dc..6145b027 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -76,6 +76,25 @@ our @MAIL_VMAP = (
 );
 our @MAIL_NRP;
 
+# Getopt::Long spec, only short options for portability in C++ implementation
+our @XH_SPEC = (
+        'a', # ascending sort
+        'c', # code search
+        'd=s@', # shard dirs
+        'g=s', # git dir (with -c)
+        'k=i', # sort column (like sort(1))
+        'm=i', # maximum number of results
+        'o=i', # offset
+        'p', # show percent
+        'r', # 1=relevance then column
+        't', # collapse threads
+        'A=s@', # prefixes
+        'D', # emit docdata
+        'K=i', # timeout kill after i seconds
+        'O=s', # eidx_key
+        'T=i', # threadid
+);
+
 sub load_xapian () {
         return 1 if defined $Xap;
         # n.b. PI_XAPIAN is intended for development use only
@@ -247,6 +266,12 @@ sub mdocid {
         int(($docid - 1) / $nshard) + 1;
 }
 
+sub docids_to_artnums {
+        my $nshard = shift->{nshard};
+        # XXX does array vs arrayref make a difference in modern Perls?
+        map { int(($_ - 1) / $nshard) + 1 } @_;
+}
+
 sub mset_to_artnums {
         my ($self, $mset) = @_;
         my $nshard = $self->{nshard};
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index fe831b8f..b21e70a2 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -21,21 +21,6 @@ my $X = \%PublicInbox::Search::X;
 our (%SRCH, %WORKERS, $nworker, $workerset, $in);
 our $stderr = \*STDERR;
 
-# only short options for portability in C++ implementation
-our @SPEC = (
-        'a', # ascending sort
-        'c', # code search
-        'd=s@', # shard dirs
-        'k=i', # sort column (like sort(1))
-        'm=i', # maximum number of results
-        'o=i', # offset
-        'r', # 1=relevance then column
-        't', # collapse threads
-        'A=s@', # prefixes
-        'O=s', # eidx_key
-        'T=i', # timeout in seconds
-);
-
 sub cmd_test_inspect {
         my ($req) = @_;
         print { $req->{0} } "pid=$$ has_threadid=",
@@ -144,10 +129,44 @@ sub cmd_dump_roots {
         emit_mset_stats($req, $mset);
 }
 
+sub mset_iter ($$) {
+        my ($req, $it) = @_;
+        eval {
+                my $buf = $it->get_docid;
+                $buf .= "\0".$it->get_percent if $req->{p};
+                my $doc = ($req->{A} || $req->{D}) ? $it->get_document : undef;
+                for my $p (@{$req->{A}}) {
+                        $buf .= "\0".$p.$_ for xap_terms($p, $doc);
+                }
+                $buf .= "\0".$doc->get_data if $req->{D};
+                say { $req->{0} } $buf;
+        };
+        $@ ? iter_retry_check($req) : 0;
+}
+
+sub cmd_mset { # to be used by WWW + IMAP
+        my ($req, $qry_str) = @_;
+        $qry_str // die 'usage: mset [OPTIONS] QRY_STR';
+        my $opt = { limit => $req->{'m'}, offset => $req->{o} // 0 };
+        $opt->{relevance} = 1 if $req->{r};
+        $opt->{threads} = 1 if defined $req->{t};
+        $opt->{git_dir} = $req->{g} if defined $req->{g};
+        $opt->{eidx_key} = $req->{O} if defined $req->{O};
+        $opt->{threadid} = $req->{T} if defined $req->{T};
+        my $mset = $req->{srch}->mset($qry_str, $opt);
+        say { $req->{0} } 'mset.size=', $mset->size;
+        for my $it ($mset->items) {
+                for (my $t = 10; $t > 0; --$t) {
+                        $t = mset_iter($req, $it) // $t;
+                }
+        }
+}
+
 sub dispatch {
         my ($req, $cmd, @argv) = @_;
         my $fn = $req->can("cmd_$cmd") or return;
-        $GLP->getoptionsfromarray(\@argv, $req, @SPEC) or return;
+        $GLP->getoptionsfromarray(\@argv, $req, @PublicInbox::Search::XH_SPEC)
+                or return;
         my $dirs = delete $req->{d} or die 'no -d args';
         my $key = join("\0", @$dirs);
         $req->{srch} = $SRCH{$key} //= do {
diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm
index 8a66fdcd..1aa75f2a 100644
--- a/lib/PublicInbox/XapHelperCxx.pm
+++ b/lib/PublicInbox/XapHelperCxx.pm
@@ -20,13 +20,15 @@ $ENV{PERL_INLINE_DIRECTORY} // die('BUG: PERL_INLINE_DIRECTORY unset');
 substr($dir, 0, 0) = "$ENV{PERL_INLINE_DIRECTORY}/";
 my $bin = "$dir/xap_helper";
 my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!);
-my @srcs = map { $srcpfx.$_ } qw(xap_helper.h xh_cidx.h);
+my @srcs = map { $srcpfx.$_ } qw(xh_mset.h xh_cidx.h xap_helper.h);
 my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm);
 my $ldflags = '-Wl,-O1';
 $ldflags .= ' -Wl,--compress-debug-sections=zlib' if $^O ne 'openbsd';
 my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -pipe') . ' ' .
         ' -DTHREADID=' . PublicInbox::Search::THREADID .
-        ' ' . ($ENV{LDFLAGS} // $ldflags);
+        ' -DXH_SPEC="'.join('',
+                map { s/=.*/:/; $_ } @PublicInbox::Search::XH_SPEC) . '" ' .
+        ($ENV{LDFLAGS} // $ldflags);
 my $xap_modversion;
 
 sub xap_cfg (@) {
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index 89d151d9..18665567 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -124,10 +124,12 @@ struct req { // argv and pfxv point into global rbuf
         char *argv[MY_ARG_MAX];
         char *pfxv[MY_ARG_MAX]; // -A <prefix>
         struct srch *srch;
+        char *Pgit_dir;
         char *Oeidx_key;
         cmd fn;
         unsigned long long max;
         unsigned long long off;
+        unsigned long long threadid;
         unsigned long timeout_sec;
         size_t nr_out;
         long sort_col; // value column, negative means BoolWeight
@@ -138,6 +140,8 @@ struct req { // argv and pfxv point into global rbuf
         bool collapse_threads;
         bool code_search;
         bool relevance; // sort by relevance before column
+        bool emit_percent;
+        bool emit_docdata;
         bool asc; // ascending sort
 };
 
@@ -230,12 +234,53 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
         return enquire_mset(req, &enq);
 }
 
+static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
+{
+        return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
+}
+
+static void apply_roots_filter(struct req *req, Xapian::Query *qry)
+{
+        if (!req->Pgit_dir) return;
+        req->Pgit_dir[0] = 'P'; // modifies static rbuf
+        Xapian::Database *xdb = req->srch->db;
+        for (int i = 0; i < 9; i++) {
+                try {
+                        std::string P = req->Pgit_dir;
+                        Xapian::PostingIterator p = xdb->postlist_begin(P);
+                        if (p == xdb->postlist_end(P)) {
+                                warnx("W: %s not indexed?", req->Pgit_dir + 1);
+                                return;
+                        }
+                        Xapian::TermIterator cur = xdb->termlist_begin(*p);
+                        Xapian::TermIterator end = xdb->termlist_end(*p);
+                        cur.skip_to("G");
+                        if (cur == end) {
+                                warnx("W: %s has no root commits?",
+                                        req->Pgit_dir + 1);
+                                return;
+                        }
+                        Xapian::Query f = Xapian::Query(*cur);
+                        for (++cur; cur != end; ++cur) {
+                                std::string tn = *cur;
+                                if (!starts_with(&tn, "G", 1))
+                                        continue;
+                                f = Xapian::Query(Xapian::Query::OP_OR, f, tn);
+                        }
+                        *qry = Xapian::Query(Xapian::Query::OP_FILTER, *qry, f);
+                        return;
+                } catch (const Xapian::DatabaseModifiedError & e) {
+                        xdb->reopen();
+                }
+        }
+}
+
 // for cindex
 static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
 {
         struct srch *srch = req->srch;
         Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
-        // TODO: git_dir + roots_filter
+        apply_roots_filter(req, &qry);
 
         // we only want commits:
         qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
@@ -254,11 +299,6 @@ static void emit_mset_stats(struct req *req, const Xapian::MSet *mset)
                 ABORT("BUG: %s caller only passed 1 FD", req->argv[0]);
 }
 
-static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
-{
-        return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
-}
-
 static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
 {
         return setvbuf(fp, NULL, _IOLBF, 0);
@@ -284,6 +324,32 @@ static void fbuf_init(struct fbuf *fbuf)
         if (!fbuf->fp) err(EXIT_FAILURE, "open_memstream(fbuf)");
 }
 
+static bool write_all(int fd, const struct fbuf *wbuf, size_t len)
+{
+        const char *p = wbuf->ptr;
+        assert(wbuf->len >= len);
+        do { // write to client FD
+                ssize_t n = write(fd, p, len);
+                if (n > 0) {
+                        len -= n;
+                        p += n;
+                } else {
+                        perror(n ? "write" : "write (zero bytes)");
+                        return false;
+                }
+        } while (len);
+        return true;
+}
+
+#define ERR_FLUSH(f) do { \
+        if (ferror(f) | fflush(f)) err(EXIT_FAILURE, "ferror|fflush "#f); \
+} while (0)
+
+#define ERR_CLOSE(f, e) do { \
+        if (ferror(f) | fclose(f)) \
+                e ? err(e, "ferror|fclose "#f) : perror("ferror|fclose "#f); \
+} while (0)
+
 static void xclose(int fd)
 {
         if (close(fd) < 0 && errno != EINTR)
@@ -339,6 +405,7 @@ static bool cmd_test_inspect(struct req *req)
         return false;
 }
 
+#include "xh_mset.h" // read-only (WWW, IMAP, lei) stuff
 #include "xh_cidx.h" // CodeSearchIdx.pm stuff
 
 #define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n }
@@ -348,6 +415,7 @@ static const struct cmd_entry {
         cmd fn;
 } cmds[] = { // should be small enough to not need bsearch || gperf
         // most common commands first
+        CMD(mset), // WWW and IMAP requests
         CMD(dump_ibx), // many inboxes
         CMD(dump_roots), // per-cidx shard
         CMD(test_inspect), // least common commands last
@@ -520,7 +588,7 @@ static void dispatch(struct req *req)
         char *end;
         FILE *kfp;
         struct srch **s;
-        req->fn = NULL;
+        req->threadid = ULLONG_MAX;
         for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) {
                 if (cmds[c].fn_len == size &&
                         !memcmp(cmds[c].fn_name, req->argv[0], size)) {
@@ -540,12 +608,13 @@ static void dispatch(struct req *req)
         optarg = NULL;
         MY_DO_OPTRESET();
 
-        // keep sync with @PublicInbox::XapHelper::SPEC
-        while ((c = getopt(req->argc, req->argv, "acd:k:m:o:rtA:O:T:")) != -1) {
+        // XH_SPEC is generated from @PublicInbox::Search::XH_SPEC
+        while ((c = getopt(req->argc, req->argv, XH_SPEC)) != -1) {
                 switch (c) {
                 case 'a': req->asc = true; break;
                 case 'c': req->code_search = true; break;
                 case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break;
+                case 'g': req->Pgit_dir = optarg - 1; break; // pad "P" prefix
                 case 'k':
                         req->sort_col = strtol(optarg, &end, 10);
                         if (*end) ABORT("-k %s", optarg);
@@ -563,6 +632,7 @@ static void dispatch(struct req *req)
                         if (*end || req->off == ULLONG_MAX)
                                 ABORT("-o %s", optarg);
                         break;
+                case 'p': req->emit_percent = true; break;
                 case 'r': req->relevance = true; break;
                 case 't': req->collapse_threads = true; break;
                 case 'A':
@@ -570,17 +640,22 @@ static void dispatch(struct req *req)
                         if (MY_ARG_MAX == req->pfxc)
                                 ABORT("too many -A");
                         break;
-                case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix
-                case 'T':
+                case 'D': req->emit_docdata = true; break;
+                case 'K':
                         req->timeout_sec = strtoul(optarg, &end, 10);
                         if (*end || req->timeout_sec == ULONG_MAX)
+                                ABORT("-K %s", optarg);
+                        break;
+                case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix
+                case 'T':
+                        req->threadid = strtoull(optarg, &end, 10);
+                        if (*end || req->threadid == ULLONG_MAX)
                                 ABORT("-T %s", optarg);
                         break;
                 default: ABORT("bad switch `-%c'", c);
                 }
         }
-        if (ferror(kfp) | fclose(kfp)) /* sets kbuf.srch */
-                err(EXIT_FAILURE, "ferror|fclose"); // likely ENOMEM
+        ERR_CLOSE(kfp, EXIT_FAILURE); // may ENOMEM, sets kbuf.srch
         kbuf.srch->db = NULL;
         kbuf.srch->qp = NULL;
         kbuf.srch->paths_len = size - offsetof(struct srch, paths);
@@ -639,8 +714,7 @@ static void stderr_restore(FILE *tmp_err)
         stderr = orig_err;
         return;
 #endif
-        if (ferror(stderr) | fflush(stderr))
-                err(EXIT_FAILURE, "ferror|fflush stderr");
+        ERR_CLOSE(stderr, EXIT_FAILURE);
         while (dup2(orig_err_fd, STDERR_FILENO) < 0) {
                 if (errno != EINTR)
                         err(EXIT_FAILURE, "dup2(%d => 2)", orig_err_fd);
@@ -670,12 +744,10 @@ static void recv_loop(void) // worker process loop
                         stderr_set(req.fp[1]);
                 req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
                 dispatch(&req);
-                if (ferror(req.fp[0]) | fclose(req.fp[0]))
-                        perror("ferror|fclose fp[0]");
+                ERR_CLOSE(req.fp[0], 0);
                 if (req.fp[1]) {
                         stderr_restore(req.fp[1]);
-                        if (ferror(req.fp[1]) | fclose(req.fp[1]))
-                                perror("ferror|fclose fp[1]");
+                        ERR_CLOSE(req.fp[1], 0);
                 }
         }
 }
diff --git a/lib/PublicInbox/xh_cidx.h b/lib/PublicInbox/xh_cidx.h
index c2d94162..1980f9f6 100644
--- a/lib/PublicInbox/xh_cidx.h
+++ b/lib/PublicInbox/xh_cidx.h
@@ -107,8 +107,7 @@ static bool root2offs_str(struct fbuf *root_offs, Xapian::Document *doc)
                 fputs((const char *)ep->data, root_offs->fp);
         }
         fputc('\n', root_offs->fp);
-        if (ferror(root_offs->fp) | fclose(root_offs->fp))
-                err(EXIT_FAILURE, "ferror|fclose(root_offs)"); // ENOMEM
+        ERR_CLOSE(root_offs->fp, EXIT_FAILURE); // ENOMEM
         root_offs->fp = NULL;
         return true;
 }
@@ -138,38 +137,24 @@ static void dump_roots_term(struct req *req, const char *pfx,
 // buffering and rely on flock(2), here
 static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
 {
-        char *p;
-        int fd = fileno(req->fp[0]);
         bool ok = true;
+        off_t off = ftello(drt->wbuf.fp);
+        if (off < 0) EABORT("ftello");
+        if (!off) return ok;
+
+        ERR_FLUSH(drt->wbuf.fp); // ENOMEM
+        int fd = fileno(req->fp[0]);
 
-        if (!drt->wbuf.fp) return true;
-        if (fd < 0) EABORT("BUG: fileno");
-        if (ferror(drt->wbuf.fp) | fclose(drt->wbuf.fp)) // ENOMEM?
-                err(EXIT_FAILURE, "ferror|fclose(drt->wbuf.fp)");
-        drt->wbuf.fp = NULL;
-        if (!drt->wbuf.len) goto done_free;
         while (flock(drt->root2off_fd, LOCK_EX)) {
                 if (errno == EINTR) continue;
                 err(EXIT_FAILURE, "LOCK_EX"); // ENOLCK?
         }
-        p = drt->wbuf.ptr;
-        do { // write to client FD
-                ssize_t n = write(fd, p, drt->wbuf.len);
-                if (n > 0) {
-                        drt->wbuf.len -= n;
-                        p += n;
-                } else {
-                        perror(n ? "write" : "write (zero bytes)");
-                        return false;
-                }
-        } while (drt->wbuf.len);
+        ok = write_all(fd, &drt->wbuf, (size_t)off);
         while (flock(drt->root2off_fd, LOCK_UN)) {
                 if (errno == EINTR) continue;
                 err(EXIT_FAILURE, "LOCK_UN"); // ENOLCK?
         }
-done_free: // OK to skip on errors, dump_roots_ensure calls fbuf_ensure
-        free(drt->wbuf.ptr);
-        drt->wbuf.ptr = NULL;
+        if (fseeko(drt->wbuf.fp, 0, SEEK_SET)) EABORT("fseeko");
         return ok;
 }
 
@@ -238,11 +223,11 @@ static bool cmd_dump_roots(struct req *req)
         req->sort_col = -1;
         Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
 
+        fbuf_init(&drt.wbuf);
+
         // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
         // in case we need to retry on DB reopens
         for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
-                if (!drt.wbuf.fp)
-                        fbuf_init(&drt.wbuf);
                 for (int t = 10; t > 0; --t)
                         switch (dump_roots_iter(req, &drt, &i)) {
                         case ITER_OK: t = 0; break; // leave inner loop
diff --git a/lib/PublicInbox/xh_mset.h b/lib/PublicInbox/xh_mset.h
new file mode 100644
index 00000000..056fe22b
--- /dev/null
+++ b/lib/PublicInbox/xh_mset.h
@@ -0,0 +1,96 @@
+// Copyright (C) all contributors <meta@public-inbox.org>
+// License: GPL-2.0+ <https://www.gnu.org/licenses/gpl-2.0.txt>
+// This file is only intended to be included by xap_helper.h
+// it implements pieces used by WWW, IMAP and lei
+
+static void emit_doc_term(FILE *fp, const char *pfx, Xapian::Document *doc)
+{
+        Xapian::TermIterator cur = doc->termlist_begin();
+        Xapian::TermIterator end = doc->termlist_end();
+        size_t pfx_len = strlen(pfx);
+
+        for (cur.skip_to(pfx); cur != end; cur++) {
+                std::string tn = *cur;
+                if (!starts_with(&tn, pfx, pfx_len)) continue;
+                fputc(0, fp);
+                fwrite(tn.data(), tn.size(), 1, fp);
+        }
+}
+
+static enum exc_iter mset_iter(const struct req *req, FILE *fp, off_t off,
+                                Xapian::MSetIterator *i)
+{
+        try {
+                fprintf(fp, "%llu", (unsigned long long)(*(*i))); // get_docid
+                if (req->emit_percent)
+                        fprintf(fp, "%c%d", 0, i->get_percent());
+                if (req->pfxc || req->emit_docdata) {
+                        Xapian::Document doc = i->get_document();
+                        for (int p = 0; p < req->pfxc; p++)
+                                emit_doc_term(fp, req->pfxv[p], &doc);
+                        if (req->emit_docdata) {
+                                std::string d = doc.get_data();
+                                fputc(0, fp);
+                                fwrite(d.data(), d.size(), 1, fp);
+                        }
+                }
+                fputc('\n', fp);
+        } catch (const Xapian::DatabaseModifiedError & e) {
+                req->srch->db->reopen();
+                if (fseeko(fp, off, SEEK_SET) < 0) EABORT("fseeko");
+                return ITER_RETRY;
+        } catch (const Xapian::DocNotFoundError & e) { // oh well...
+                warnx("doc not found: %s", e.get_description().c_str());
+                if (fseeko(fp, off, SEEK_SET) < 0) EABORT("fseeko");
+        }
+        return ITER_OK;
+}
+
+#ifndef WBUF_FLUSH_THRESHOLD
+#        define WBUF_FLUSH_THRESHOLD (BUFSIZ - 1000)
+#endif
+#if WBUF_FLUSH_THRESHOLD < 0
+#        undef WBUF_FLUSH_THRESHOLD
+#        define WBUF_FLUSH_THRESHOLD BUFSIZ
+#endif
+
+static bool cmd_mset(struct req *req)
+{
+        if (optind >= req->argc) ABORT("usage: mset [OPTIONS] WANT QRY_STR");
+        if (req->fp[1]) ABORT("mset only accepts 1 FD");
+        const char *qry_str = req->argv[optind];
+        CLEANUP_FBUF struct fbuf wbuf = {};
+        Xapian::MSet mset = req->code_search ? commit_mset(req, qry_str) :
+                                                mail_mset(req, qry_str);
+        fbuf_init(&wbuf);
+        fprintf(wbuf.fp, "mset.size=%llu\n", (unsigned long long)mset.size());
+        int fd = fileno(req->fp[0]);
+        for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+                off_t off = ftello(wbuf.fp);
+                if (off < 0) EABORT("ftello");
+                /*
+                 * TODO verify our fflush + fseeko use isn't affected by a
+                 * glibc <2.25 bug:
+                 * https://sourceware.org/bugzilla/show_bug.cgi?id=20181
+                 * CentOS 7.x only has glibc 2.17.  In any case, bug #20181
+                 * shouldn't affect us since our use of fseeko is used to
+                 * effectively discard data.
+                 */
+                if (off > WBUF_FLUSH_THRESHOLD) {
+                        ERR_FLUSH(wbuf.fp);
+                        if (!write_all(fd, &wbuf, (size_t)off)) return false;
+                        if (fseeko(wbuf.fp, 0, SEEK_SET)) EABORT("fseeko");
+                        off = 0;
+                }
+                for (int t = 10; t > 0; --t)
+                        switch (mset_iter(req, wbuf.fp, off, &i)) {
+                        case ITER_OK: t = 0; break; // leave inner loop
+                        case ITER_RETRY: break; // continue for-loop
+                        case ITER_ABORT: return false; // error
+                        }
+        }
+        off_t off = ftello(wbuf.fp);
+        if (off < 0) EABORT("ftello");
+        ERR_FLUSH(wbuf.fp);
+        return off > 0 ? write_all(fd, &wbuf, (size_t)off) : true;
+}
diff --git a/t/cindex.t b/t/cindex.t
index 261945bf..a9075092 100644
--- a/t/cindex.t
+++ b/t/cindex.t
@@ -121,22 +121,70 @@ my $no_metadata_set = sub {
 
 use_ok 'PublicInbox::CodeSearch';
 
+
+my @xh_args;
+my $exp = [ 'initial with NUL character', 'remove NUL character' ];
+my $zp_git = abs_path("$zp/.git");
 if ('multi-repo search') {
         my $csrch = PublicInbox::CodeSearch->new("$tmp/ext");
         my $mset = $csrch->mset('NUL');
         is(scalar($mset->items), 2, 'got results');
-        my $exp = [ 'initial with NUL character', 'remove NUL character' ];
         my @have = sort(map { $_->get_document->get_data } $mset->items);
         is_xdeeply(\@have, $exp, 'got expected subjects');
 
         $mset = $csrch->mset('NUL', { git_dir => "$tmp/wt0/.git" });
         is(scalar($mset->items), 0, 'no results with other GIT_DIR');
 
-        $mset = $csrch->mset('NUL', { git_dir => abs_path("$zp/.git") });
+        $mset = $csrch->mset('NUL', { git_dir => $zp_git });
         @have = sort(map { $_->get_document->get_data } $mset->items);
         is_xdeeply(\@have, $exp, 'got expected subjects w/ GIT_DIR filter');
         my @xdb = $csrch->xdb_shards_flat;
         $no_metadata_set->(0, ['indexlevel'], \@xdb);
+        @xh_args = $csrch->xh_args;
+}
+
+my $test_xhc = sub {
+        my ($xhc) = @_;
+        my $impl = $xhc->{impl};
+        my ($r, @l);
+        $r = $xhc->mkreq([], qw(mset -D -c -g), $zp_git, @xh_args, 'NUL');
+        chomp(@l = <$r>);
+        is(shift(@l), 'mset.size=2', "got expected header $impl");
+        my %docid2data;
+        my @got = sort map {
+                my @f = split /\0/;
+                is scalar(@f), 2, 'got 2 entries';
+                $docid2data{$f[0]} = $f[1];
+                $f[1];
+        } @l;
+        is_deeply(\@got, $exp, "expected doc_data $impl");
+
+        $r = $xhc->mkreq([], qw(mset -c -g), "$tmp/wt0/.git", @xh_args, 'NUL');
+        chomp(@l = <$r>);
+        is(shift(@l), 'mset.size=0', "got miss in wrong dir $impl");
+        is_deeply(\@l, [], "no extra lines $impl");
+
+        my $csrch = PublicInbox::CodeSearch->new("$tmp/ext");
+        while (my ($did, $expect) = each %docid2data) {
+                is_deeply($csrch->xdb->get_document($did)->get_data,
+                        $expect, "docid=$did data matches");
+        }
+        ok(!$xhc->{io}->close, "$impl close");
+        is($?, 66 << 8, "got EX_NOINPUT from $impl exit");
+};
+
+SKIP: {
+        require_mods('+SCM_RIGHTS', 1);
+        require PublicInbox::XapClient;
+        my $xhc = PublicInbox::XapClient::start_helper('-j0');
+        $test_xhc->($xhc);
+        skip 'PI_NO_CXX set', 1 if $ENV{PI_NO_CXX};
+        $xhc->{impl} =~ /Cxx/ or
+                skip 'C++ compiler or xapian development libs missing', 1;
+        skip 'TEST_XH_CXX_ONLY set', 1 if $ENV{TEST_XH_CXX_ONLY};
+        local $ENV{PI_NO_CXX} = 1; # force XS or SWIG binding test
+        $xhc = PublicInbox::XapClient::start_helper('-j0');
+        $test_xhc->($xhc);
 }
 
 if ('--update') {
diff --git a/t/xap_helper.t b/t/xap_helper.t
index e3abeded..ee25b2dc 100644
--- a/t/xap_helper.t
+++ b/t/xap_helper.t
@@ -40,6 +40,7 @@ my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2,
 };
 
 my @ibx_idx = glob("$v2->{inboxdir}/xap*/?");
+my @ibx_shard_args = map { ('-d', $_) } @ibx_idx;
 my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
 my (@ext) = glob("$crepo/cidx-ext/cidx*/?");
 is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext);
@@ -76,8 +77,7 @@ my $test = sub {
         is($cinfo{has_threadid}, '0', 'has_threadid false for cindex');
         is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex');
 
-        my @dump = (qw(dump_ibx -A XDFID), (map { ('-d', $_) } @ibx_idx),
-                        qw(13 rt:0..));
+        my @dump = (qw(dump_ibx -A XDFID), @ibx_shard_args, qw(13 rt:0..));
         $r = $doreq->($s, @dump);
         my @res;
         while (sysread($r, my $buf, 512) != 0) { push @res, $buf }
@@ -89,7 +89,8 @@ my $test = sub {
         my $res = do { local $/; <$r> };
         is(join('', @res), $res, 'got identical response w/ error pipe');
         my $stats = do { local $/; <$err_rd> };
-        is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported');
+        is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported') or
+                diag "res=$res";
 
         return wantarray ? ($ar, $s) : $ar if $cinfo{pid} == $pid;
 
@@ -198,7 +199,47 @@ for my $n (@NO_CXX) {
         is(scalar(@res), scalar(grep(/\A[0-9a-f]{40,} [0-9]+\n\z/, @res)),
                 'entries match format');
         $err = do { local $/; <$err_r> };
-        is($err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})");
+        is $err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})";
+
+        $r = $xhc->mkreq([], qw(mset -p -A XDFID -A Q), @ibx_shard_args,
+                                'dfn:lib/PublicInbox/Search.pm');
+        chomp((my $hdr, @res) = readline($r));
+        is $hdr, 'mset.size=1', "got expected header via mset ($xhc->{impl}";
+        is scalar(@res), 1, 'got one result';
+        @res = split /\0/, $res[0];
+        {
+                my $doc = $v2->search->xdb->get_document($res[0]);
+                my @q = PublicInbox::Search::xap_terms('Q', $doc);
+                is_deeply \@q, [ $mid ], 'docid usable';
+        }
+        ok $res[1] > 0 && $res[1] <= 100, 'pct > 0 && <= 100';
+        is $res[2], 'XDFID'.$dfid, 'XDFID result matches';
+        is $res[3], 'Q'.$mid, 'Q (msgid) mset result matches';
+        is scalar(@res), 4, 'only 4 columns in result';
+
+        $r = $xhc->mkreq([], qw(mset -p -A XDFID -A Q), @ibx_shard_args,
+                                'dt:19700101'.'000000..');
+        chomp(($hdr, @res) = readline($r));
+        is $hdr, 'mset.size=6',
+                "got expected header via multi-result mset ($xhc->{impl}";
+        is(scalar(@res), 6, 'got 6 rows');
+        for my $r (@res) {
+                my ($docid, $pct, @rest) = split /\0/, $r;
+                my $doc = $v2->search->xdb->get_document($docid);
+                ok $pct > 0 && $pct <= 100,
+                        "pct > 0 && <= 100 #$docid ($xhc->{impl})";
+                my %terms;
+                for (@rest) {
+                        s/\A([A-Z]+)// or xbail 'no prefix=', \@rest;
+                        push @{$terms{$1}}, $_;
+                }
+                while (my ($pfx, $vals) = each %terms) {
+                        @$vals = sort @$vals;
+                        my @q = PublicInbox::Search::xap_terms($pfx, $doc);
+                        is_deeply $vals, \@q,
+                                "#$docid $pfx as expected ($xhc->{impl})";
+                }
+        }
 }
 
 done_testing;