about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-08-24 01:22:33 +0000
committerEric Wong <e@80x24.org>2023-08-24 07:47:51 +0000
commitb18ecb7707e83cb8cb38c3736aecd984999ca0a7 (patch)
tree0f159212810c98aa07d26b6f7f28f4b8dbc9b302
parentcf96412eb8f193ebd334fae340b2d91b6b7f2afe (diff)
downloadpublic-inbox-b18ecb7707e83cb8cb38c3736aecd984999ca0a7.tar.gz
This allows us to perform the expensive "dump_ibx" operations in
native C++ code using the Xapian C++ library.  This provides the
majority of the speedup with the -cindex --associate switch.

Eventually this may be expanded to cover all uses of Xapian
within the project to ensure we have access to Xapian APIs which
aren't available in XS|SWIG bindings; and also for
ease-of-installation on systems which don't provide
pre-packaged Perl Xapian bindings (e.g. OpenBSD 7.3) but
do provide Xapian development libraries.

Most of the C++ code is still C, as I'm not remotely familiar
with C++ compared to C.  I suspect many users and potential
hackers being from git, Linux kernel, and glibc world are in the
same boat.
-rw-r--r--MANIFEST7
-rw-r--r--lib/PublicInbox/CidxDumpIbx.pm59
-rw-r--r--lib/PublicInbox/CidxXapHelperAux.pm48
-rw-r--r--lib/PublicInbox/CodeSearch.pm54
-rw-r--r--lib/PublicInbox/CodeSearchIdx.pm78
-rw-r--r--lib/PublicInbox/Isearch.pm5
-rw-r--r--lib/PublicInbox/Search.pm56
-rw-r--r--lib/PublicInbox/XapClient.pm50
-rw-r--r--lib/PublicInbox/XapHelper.pm144
-rw-r--r--lib/PublicInbox/XapHelperCxx.pm93
-rw-r--r--lib/PublicInbox/xap_helper.h654
-rw-r--r--t/xap_helper.t147
12 files changed, 1278 insertions, 117 deletions
diff --git a/MANIFEST b/MANIFEST
index 162e3038..4f61af42 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,10 +162,10 @@ lib/PublicInbox/AltId.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
 lib/PublicInbox/CidxComm.pm
-lib/PublicInbox/CidxDumpIbx.pm
 lib/PublicInbox/CidxDumpShardRoots.pm
 lib/PublicInbox/CidxLogP.pm
 lib/PublicInbox/CidxRecvIbx.pm
+lib/PublicInbox/CidxXapHelperAux.pm
 lib/PublicInbox/CmdIPC4.pm
 lib/PublicInbox/CodeSearch.pm
 lib/PublicInbox/CodeSearchIdx.pm
@@ -368,8 +368,12 @@ lib/PublicInbox/WwwListing.pm
 lib/PublicInbox/WwwStatic.pm
 lib/PublicInbox/WwwStream.pm
 lib/PublicInbox/WwwText.pm
+lib/PublicInbox/XapClient.pm
+lib/PublicInbox/XapHelper.pm
+lib/PublicInbox/XapHelperCxx.pm
 lib/PublicInbox/Xapcmd.pm
 lib/PublicInbox/gcf2_libgit2.h
+lib/PublicInbox/xap_helper.h
 sa_config/Makefile
 sa_config/README
 sa_config/root/etc/spamassassin/public-inbox.pre
@@ -610,6 +614,7 @@ t/www_altid.t
 t/www_listing.t
 t/www_static.t
 t/x-unknown-alpine.eml
+t/xap_helper.t
 t/xcpdb-reshard.t
 version-gen.perl
 xt/cmp-msgstr.t
diff --git a/lib/PublicInbox/CidxDumpIbx.pm b/lib/PublicInbox/CidxDumpIbx.pm
deleted file mode 100644
index e1bc273d..00000000
--- a/lib/PublicInbox/CidxDumpIbx.pm
+++ /dev/null
@@ -1,59 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# Intended for PublicInbox::DS::event_loop for -cindex --associate
-# Iterating through mset->items is slow in Perl due to method dispatch
-# and that loop may implemented in C++ using Xapian directly
-package PublicInbox::CidxDumpIbx;
-use v5.12;
-use PublicInbox::Search qw(xap_terms);
-use PublicInbox::DS;
-use Socket qw(MSG_EOR);
-
-sub start {
-        my ($rcvibx, $ibx_id) = @_;
-        my $cidx = $rcvibx->{cidx};
-        my $ibx = $cidx->{IBX}->[$ibx_id] // die "BUG: no IBX[$ibx_id]";
-        my $self = bless { rcvibx => $rcvibx, ekey => $ibx->eidx_key,
-                ibx_id => $ibx_id }, __PACKAGE__;
-        $self->{srch} = $ibx->isrch // do {
-                warn("W: $self->{ekey} has no search index (ignoring)\n");
-                return undef;
-        };
-        my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
-        $self->{mset} = $self->{srch}->mset($rcvibx->{qry_str}, $opt);
-        $self->{iter} = 0;
-        event_step($self);
-}
-
-sub event_step {
-        my ($self) = @_;
-        my $rcvibx = $self->{rcvibx} // die 'BUG: no rcvibx';
-        return if $rcvibx->{cidx}->do_quit;
-        my $last = $self->{mset}->size - 1;
-        my $cur = $self->{iter};
-        my $end = $cur + 9999;
-        if ($end >= $last) {
-                send($rcvibx->{op_p}, 'index_next', MSG_EOR);
-                $end = $last;
-        }
-        $self->{iter} = $end + 1;
-        local $0 = "dumping $self->{ekey} $cur..$end";
-
-        my $sort_w = $rcvibx->{sort_w};
-        my $ibx_id = $self->{ibx_id};
-        local $0 = "dumping $self->{ekey} $cur..$end";
-        $rcvibx->{cidx}->progress($0);
-        for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
-                my $doc = $x->get_document;
-                for my $p (@{$rcvibx->{cidx}->{ASSOC_PFX}}) {
-                        for (xap_terms($p, $doc)) {
-                                print $sort_w "$_ $ibx_id\n" or die "print: $!";
-                        }
-                }
-        }
-        $end < $last && !$rcvibx->{cidx}->do_quit and
-                PublicInbox::DS::requeue($self);
-}
-
-1;
diff --git a/lib/PublicInbox/CidxXapHelperAux.pm b/lib/PublicInbox/CidxXapHelperAux.pm
new file mode 100644
index 00000000..c9a5ddad
--- /dev/null
+++ b/lib/PublicInbox/CidxXapHelperAux.pm
@@ -0,0 +1,48 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Intended for PublicInbox::DS::event_loop for -cindex --associate,
+# this reports auxilliary status while dumping
+package PublicInbox::CidxXapHelperAux;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN);
+
+# rpipe connects to req->fp[1] in xap_helper.h
+sub new {
+        my ($cls, $rpipe, $cidx, $pfx, $associate) = @_;
+        my $self = bless {
+                cidx => $cidx,
+                pfx => $pfx,
+                associate => $associate
+        }, $cls;
+        $rpipe->blocking(0);
+        $self->SUPER::new($rpipe, EPOLLIN);
+}
+
+sub event_step {
+        my ($self) = @_; # xap_helper.h is line-buffered
+        my $buf = delete($self->{buf}) // '';
+        my $n = sysread($self->{sock}, $buf, 65536, length($buf));
+        if (!defined($n)) {
+                return if $!{EAGAIN};
+                die "sysread: $!";
+        }
+        my $pfx = $self->{pfx};
+        if ($n == 0) {
+                $self->{cidx}->progress("$pfx $buf") if $buf ne '';
+                return $self->close;
+        }
+        my @lines = split(/^/m, $buf);
+        $self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n";
+        for my $l (@lines) {
+                if ($l =~ /\Amset\.size=[0-9]+\n\z/) {
+                        delete $self->{cidx}->{PENDING}->{$pfx};
+                        $self->{cidx}->index_next;
+                }
+                chomp $l;
+                $self->{cidx}->progress("$pfx $l");
+        }
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearch.pm b/lib/PublicInbox/CodeSearch.pm
index a5ccce03..6234e259 100644
--- a/lib/PublicInbox/CodeSearch.pm
+++ b/lib/PublicInbox/CodeSearch.pm
@@ -16,6 +16,12 @@ use constant {
         # in refs/{heads,tags}.  AT(col=0) may be used to store disk usage
         # in the future, but disk usage calculation is espensive w/ alternates
 };
+our @CODE_NRP;
+our @CODE_VMAP = (
+        [ AT, 'd:' ], # mairix compat
+        [ AT, 'dt:' ], # mail compat
+        [ CT, 'ct:' ],
+);
 
 # note: the non-X term prefix allocations are shared with Xapian omega,
 # see xapian-applications/omega/docs/termprefixes.rst
@@ -45,15 +51,17 @@ sub new {
         bless { xpfx => "$dir/cidx".CIDX_SCHEMA_VER }, $cls;
 }
 
-sub cqparse_new ($) {
+sub qparse_new ($) {
         my ($self) = @_;
         my $qp = $self->qp_init_common;
         my $cb = $qp->can('add_valuerangeprocessor') //
                 $qp->can('add_rangeprocessor'); # Xapian 1.5.0+
-        $cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'd:')); # mairix compat
-        $cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'dt:')); # mail compat
-        $cb->($qp, $PublicInbox::Search::NVRP->new(CT, 'ct:'));
-
+        if (!@CODE_NRP) {
+                @CODE_NRP = map {
+                        $PublicInbox::Search::NVRP->new(@$_)
+                } @CODE_VMAP;
+        }
+        $cb->($qp, $_) for @CODE_NRP;
         while (my ($name, $pfx) = each %bool_pfx_external) {
                 $qp->add_boolean_prefix($name, $_) for split(/ /, $pfx);
         }
@@ -63,6 +71,40 @@ sub cqparse_new ($) {
         $qp;
 }
 
+sub generate_cxx () { # generates snippet for xap_helper.h
+        my ($line, $file) = (__LINE__ + 2, __FILE__);
+        my $ret = <<EOM;
+# line ${\__LINE__} "${\__FILE__}"
+static NRP *code_nrp[${\scalar(@CODE_VMAP)}];
+static void code_nrp_init(void)
+{
+EOM
+        for (0..$#CODE_VMAP) {
+                my $x = $CODE_VMAP[$_];
+                $ret .= qq{\tcode_nrp[$_] = new NRP($x->[0], "$x->[1]");\n}
+        }
+$ret .= <<EOM;
+}
+
+# line ${\__LINE__} "${\__FILE__}"
+static void qp_init_code_search(Xapian::QueryParser *qp)
+{
+        for (size_t i = 0; i < MY_ARRAY_SIZE(code_nrp); i++)
+                qp->ADD_RP(code_nrp[i]);
+EOM
+        for my $name (sort keys %bool_pfx_external) {
+                for (split(/ /, $bool_pfx_external{$name})) {
+                        $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n}
+                }
+        }
+        for my $name (sort keys %prob_prefix) {
+                for (split(/ /, $prob_prefix{$name})) {
+                        $ret .= qq{\tqp->add_prefix("$name", "$_");\n}
+                }
+        }
+        $ret .= "}\n";
+}
+
 # returns a Xapian::Query to filter by roots
 sub roots_filter { # retry_reopen callback
         my ($self, $git_dir) = @_;
@@ -89,7 +131,7 @@ sub roots_filter { # retry_reopen callback
 
 sub mset {
         my ($self, $qry_str, $opt) = @_;
-        my $qp = $self->{qp} //= cqparse_new($self);
+        my $qp = $self->{qp} //= qparse_new($self);
         my $qry = $qp->parse_query($qry_str, $self->{qp_flags});
 
         # limit to commits with shared roots
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index e795c2b3..b8afecd2 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -42,6 +42,7 @@ use PublicInbox::IPC qw(nproc_shards);
 use POSIX qw(WNOHANG SEEK_SET);
 use File::Path ();
 use File::Spec ();
+use List::Util qw(max);
 use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
@@ -69,6 +70,9 @@ our (
         @GIT_DIR_GONE, # [ git_dir1, git_dir2 ]
         $PRUNE_DONE, # marks off prune completions
         $NCHANGE, # current number of changes
+        $NPROC,
+        $XH_PID, # XapHelper PID
+        $XHC, # XapClient
         $REPO_CTX, # current repo being indexed in shards
         $IDX_TODO, # [ $git0, $root0, $git1, $root1, ...]
         $GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...]
@@ -81,8 +85,8 @@ our (
         @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
         @ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
         $QRY_STR, # common query string for both code and inbox associations
-        $IBXDIR_FEED, # SOCK_SEQPACKET
-        @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate
+        @DUMP_SHARD_ROOTS_OK, # for associate
+        $DUMP_IBX_WPIPE, # goes to sort(1)
         @ID2ROOT,
 );
 
@@ -529,45 +533,29 @@ sub dump_roots_once {
         progress($self, 'waiting on dump_shard_roots sort');
 }
 
-sub recv_ibx_done { # via PktOp on recv_ibx completion
-        my ($self, $pid, $n) = @_;
-        return if $DO_QUIT;
-        progress($self, "recv_ibx [$n] done");
-        $RECV_IBX_OK[$n] = 1;
-}
-
-# causes a worker to become a dumper for inbox/extindex
-sub recv_ibx { # wq_io_do
-        my ($self, $qry_str) = @_;
-        PublicInbox::CidxRecvIbx->new($self, $qry_str);
-}
-
-sub dump_ibx { # sends to PublicInbox::CidxRecvIbx::event_step
-        my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR"
-        my $n = length($id_dir);
-        my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!";
-        $n == $w or die "send($id_dir) $w != $n";
+sub dump_ibx { # sends to xap_helper.h
+        my ($self, $ibx_id) = @_;
+        my $ibx = $IBX[$ibx_id] // die "BUG: no IBX[$ibx_id]";
+        my @cmd = ('dump_ibx', $ibx->isrch->xh_args,
+                        (map { ('-A', $_) } @ASSOC_PFX),
+                        $ibx_id, $QRY_STR);
+        pipe(my ($r, $w)) or die "pipe: $!";
+        $XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
+        my $ekey = $ibx->eidx_key;
+        $self->{PENDING}->{$ekey} = undef;
+        PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate});
 }
 
-# repurpose shard workers to dump inbox patchids with perfect balance
 sub dump_ibx_start {
         my ($self, $associate) = @_;
-        pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+        pipe(my $sort_r, $DUMP_IBX_WPIPE) or die "pipe: $!";
         my @sort = (@SORT, '-k1,1');
         my $dst = "$TMPDIR/to_ibx_id";
         open my $fh, '>', $dst or die "open($dst): $!";
         my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
         close $sort_r or die "close: $!";
         awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
-
-        my ($r, $w);
-        socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
-        my ($c, $p) = PublicInbox::PktOp->pair;
-        $c->{ops}->{recv_ibx_done} = [ $self, $associate ];
-        $c->{ops}->{index_next} = [ $self ];
-        my $io = [ $p->{op_p}, $r, $sort_w ];
-        $_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS;
-        $IBXDIR_FEED = $w;
+        ($XHC, $XH_PID) = PublicInbox::XapClient::start_helper("-j$NPROC");
 }
 
 sub index_next ($) {
@@ -584,8 +572,8 @@ sub index_next ($) {
         } elsif ($TMPDIR) {
                 delete $TODO{dump_ibx_start}; # runs OnDestroy once
                 return dump_ibx($self, shift @IBXQ) if @IBXQ;
-                progress($self, 'done dumping inboxes') if $IBXDIR_FEED;
-                undef $IBXDIR_FEED; # done dumping inboxes, dump roots
+                progress($self, 'done dumping inboxes') if $DUMP_IBX_WPIPE;
+                undef $DUMP_IBX_WPIPE; # done dumping inboxes, dump roots
                 dump_roots_once($self, delete($TODO{associate}) // return);
         }
         # else: wait for shards_active (post_loop_do) callback
@@ -795,7 +783,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
 
 sub parent_quit {
         $DO_QUIT = POSIX->can("SIG$_[0]")->();
-        $IBXDIR_FEED = undef;
+        $XHC = undef;
         kill_shards(@_);
         warn "# SIG$_[0] received, quitting...\n";
 }
@@ -875,8 +863,8 @@ sub associate {
         @IDX_SHARDS or return warn("# aborting on no shards\n");
         grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
                 die "E: shards not dumped properly\n";
-        grep(defined, @RECV_IBX_OK) == @IDX_SHARDS or
-                die "E: inboxes not dumped properly\n";
+        my @pending = keys %{$self->{PENDING}};
+        die "E: pending=@pending jobs not done\n" if @pending;
         progress($self, 'associating...');
         my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
         my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
@@ -1032,8 +1020,9 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
 sub init_associate_prefork ($) {
         my ($self) = @_;
         return unless $self->{-opt}->{associate};
-        require PublicInbox::CidxRecvIbx;
         require PublicInbox::CidxDumpShardRoots;
+        require PublicInbox::CidxXapHelperAux;
+        require PublicInbox::XapClient;
         $self->{-pi_cfg} = PublicInbox::Config->new;
         my @unknown;
         my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]};
@@ -1055,7 +1044,7 @@ EOM
 sub _prep_ibx { # each_inbox callback
         my ($ibx, $self, $incl) = @_;
         ($self->{-opt}->{all} || exists($incl->{$ibx->{inboxdir}})) and
-                push @{$self->{IBX}}, $ibx;
+                push @IBX, $ibx;
 }
 
 sub show_roots { # for diagnostics
@@ -1102,13 +1091,13 @@ sub cidx_run { # main entry point
         local $GIT_TODO = [];
         local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
                 $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
-                %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT,
-                @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK);
+                %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
+                @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC);
         local $BATCH_BYTES = $self->{-opt}->{batch_size} //
                                 $PublicInbox::SearchIdx::BATCH_BYTES;
         local @SORT = (undef, '-u');
-        local $self->{IBX} = \@IBX;
         local $self->{ASSOC_PFX} = \@ASSOC_PFX;
+        local $self->{PENDING} = {};
         local $self->{-pi_cfg};
         if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) {
                 require File::Temp;
@@ -1165,13 +1154,14 @@ sub cidx_run { # main entry point
                 @GIT_DIR_GONE = uniqstr @GIT_DIR_GONE, @excl;
         }
         local $NCHANGE = 0;
-        local $LIVE_JOBS = $self->{-opt}->{jobs} ||
-                        PublicInbox::IPC::detect_nproc() || 2;
+        local $NPROC = PublicInbox::IPC::detect_nproc();
+        local $LIVE_JOBS = $self->{-opt}->{jobs} || $NPROC || 2;
         local @RDONLY_XDB = $self->xdb_shards_flat;
         init_prune($self);
         init_associate_postfork($self);
         scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
-        index_next($self) for (1..$LIVE_JOBS);
+        my $max = $TODO{associate} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS;
+        index_next($self) for (1..$max);
 
         # FreeBSD ignores/discards SIGCHLD while signals are blocked and
         # EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
diff --git a/lib/PublicInbox/Isearch.pm b/lib/PublicInbox/Isearch.pm
index 5cac08ba..62112171 100644
--- a/lib/PublicInbox/Isearch.pm
+++ b/lib/PublicInbox/Isearch.pm
@@ -123,4 +123,9 @@ sub has_threadid { 1 }
 
 sub help { $_[0]->{es}->help }
 
+sub xh_args { # prep getopt args to feed to xap_helper.h socket
+        my ($self, $opt) = @_; # TODO uid_range
+        ($self->{es}->xh_args, '-O', $self->{eidx_key});
+}
+
 1;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index d5b0bceb..2e784646 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -66,6 +66,15 @@ our $NVRP; # '$Xap::'.('NumberValueRangeProcessor' or 'NumberRangeProcessor')
 # let's hope the ABI is stable
 our $ENQ_DESCENDING = 0;
 our $ENQ_ASCENDING = 1;
+our @MAIL_VMAP = (
+        [ YYYYMMDD, 'd:'],
+        [ DT, 'dt:' ],
+        # these are undocumented for WWW, but lei and IMAP use them
+        [ BYTES, 'z:' ],
+        [ TS, 'rt:' ],
+        [ UID, 'uid:' ]
+);
+our @MAIL_NRP;
 
 sub load_xapian () {
         return 1 if defined $Xap;
@@ -101,6 +110,7 @@ sub load_xapian () {
                 # or make indexlevel=medium as default
                 $QP_FLAGS = FLAG_PHRASE() | FLAG_BOOLEAN() | FLAG_LOVEHATE() |
                                 FLAG_WILDCARD();
+                @MAIL_NRP = map { $NVRP->new(@$_) } @MAIL_VMAP;
                 return 1;
         }
         undef;
@@ -490,14 +500,8 @@ sub qparse_new {
         my $qp = qp_init_common($self);
         my $cb = $qp->can('add_valuerangeprocessor') //
                 $qp->can('add_rangeprocessor'); # Xapian 1.5.0+
-        $cb->($qp, $NVRP->new(YYYYMMDD, 'd:'));
-        $cb->($qp, $NVRP->new(DT, 'dt:'));
-
-        # for IMAP, undocumented for WWW and may be split off go away
-        $cb->($qp, $NVRP->new(BYTES, 'z:'));
-        $cb->($qp, $NVRP->new(TS, 'rt:'));
-        $cb->($qp, $NVRP->new(UID, 'uid:'));
 
+        $cb->($qp, $_) for @MAIL_NRP;
         while (my ($name, $prefix) = each %bool_pfx_external) {
                 $qp->add_boolean_prefix($name, $_) foreach split(/ /, $prefix);
         }
@@ -527,6 +531,40 @@ EOF
         $qp;
 }
 
+sub generate_cxx () { # generates snippet for xap_helper.h
+        my $ret = <<EOM;
+# line ${\__LINE__} "${\__FILE__}"
+static NRP *mail_nrp[${\scalar(@MAIL_VMAP)}];
+static void mail_nrp_init(void)
+{
+EOM
+        for (0..$#MAIL_VMAP) {
+                my $x = $MAIL_VMAP[$_];
+                $ret .= qq{\tmail_nrp[$_] = new NRP($x->[0], "$x->[1]");\n}
+        }
+$ret .= <<EOM;
+}
+
+# line ${\__LINE__} "${\__FILE__}"
+static void qp_init_mail_search(Xapian::QueryParser *qp)
+{
+        for (size_t i = 0; i < MY_ARRAY_SIZE(mail_nrp); i++)
+                qp->ADD_RP(mail_nrp[i]);
+EOM
+        for my $name (sort keys %bool_pfx_external) {
+                for (split(/ /, $bool_pfx_external{$name})) {
+                        $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n}
+                }
+        }
+        # TODO: altid support
+        for my $name (sort keys %prob_prefix) {
+                for (split(/ /, $prob_prefix{$name})) {
+                        $ret .= qq{\tqp->add_prefix("$name", "$_");\n}
+                }
+        }
+        $ret .= "}\n";
+}
+
 sub help {
         my ($self) = @_;
         $self->{qp} //= $self->qparse_new; # parse altids
@@ -585,4 +623,8 @@ sub all_terms {
         wantarray ? (sort keys %ret) : \%ret;
 }
 
+sub xh_args { # prep getopt args to feed to xap_helper.h socket
+        map { ('-d', $_) } shard_dirs($_[0]);
+}
+
 1;
diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm
new file mode 100644
index 00000000..56e3c3b4
--- /dev/null
+++ b/lib/PublicInbox/XapClient.pm
@@ -0,0 +1,50 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# This talks to (XapHelperCxx.pm + xap_helper.h) or XapHelper.pm
+# and will eventually allow users with neither XS nor SWIG Perl
+# bindings to use Xapian as long as they have Xapian development
+# headers/libs and a C++ compiler
+package PublicInbox::XapClient;
+use v5.12;
+use PublicInbox::Spawn qw(spawn);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR);
+use PublicInbox::IPC;
+
+sub mkreq {
+        my ($self, $ios, @arg) = @_;
+        my ($r, $w, $n);
+        if (!defined($ios->[0])) {
+                pipe($r, $w) or die "pipe: $!";
+                $ios->[0] = $w;
+        }
+        my @fds = map fileno($_), @$ios;
+        my $buf = join("\0", @arg, '');
+        $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, MSG_EOR) //
+                die "send_cmd: $!";
+        $n == length($buf) or die "send_cmd: $n != ".length($buf);
+        $r;
+}
+
+sub start_helper {
+        my @argv = @_;
+        socketpair(my $sock, my $in, AF_UNIX, SOCK_SEQPACKET, 0) or
+                die "socketpair: $!";
+        my $cls = ($ENV{PI_NO_CXX} ? undef : eval {
+                        require PublicInbox::XapHelperCxx;
+                        PublicInbox::XapHelperCxx::check_build();
+                        'PublicInbox::XapHelperCxx';
+                }) // do {
+                        require PublicInbox::XapHelper;
+                        'PublicInbox::XapHelper';
+                };
+        # ensure the child process has the same @INC we do:
+        my $env = { PERL5LIB => join(':', @INC) };
+        my $pid = spawn([$^X, ($^W ? ('-w') : ()), "-M$cls", '-e',
+                                $cls.'::start(@ARGV)', '--', @argv],
+                        $env, { 0 => $in });
+        ((bless { io => $sock, impl => $cls }, __PACKAGE__), $pid);
+}
+
+1;
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
new file mode 100644
index 00000000..bf2f99a2
--- /dev/null
+++ b/lib/PublicInbox/XapHelper.pm
@@ -0,0 +1,144 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Perl + SWIG||XS implementation if XapHelperCxx / xap_helper.h isn't usable.
+package PublicInbox::XapHelper;
+use v5.12;
+use Getopt::Long (); # good API even if we only use short options
+our $GLP = Getopt::Long::Parser->new;
+$GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev));
+use PublicInbox::Search qw(xap_terms);
+use PublicInbox::IPC;
+my $X = \%PublicInbox::Search::X;
+our (%SRCH, %PIDS, $parent_pid);
+our $stderr = \*STDERR;
+
+# only short options for portability in C++ implementation
+our @SPEC = (
+        'a', # ascending sort
+        'c', # code search
+        'd=s@', # shard dirs
+        'k=i', # sort column (like sort(1))
+        'm=i', # maximum number of results
+        'o=i', # offset
+        'r', # 1=relevance then column
+        't', # collapse threads
+        'A=s@', # prefixes
+        'O=s', # eidx_key
+        'T=i', # timeout in seconds
+);
+
+sub cmd_test_inspect {
+        my ($req) = @_;
+        print { $req->{0} } "pid=$$ has_threadid=",
+                ($req->{srch}->has_threadid ? 1 : 0)
+}
+
+sub cmd_dump_ibx {
+        my ($req, $ibx_id, $qry_str) = @_;
+        $qry_str // return warn('usage: dump_ibx [OPTIONS] IBX_ID QRY_STR');
+        my @pfx = @{$req->{A}} or return warn('dump_ibx requires -A PREFIX');
+        my $max = $req->{srch}->{xdb}->get_doccount;
+        my $opt = { relevance => -1, limit => $max, offset => $req->{o} // 0 };
+        $opt->{eidx_key} = $req->{O} if defined $req->{O};
+        my $mset = $req->{srch}->mset($qry_str, $opt);
+        my $out = $req->{0};
+        $out->autoflush(1);
+        for my $it ($mset->items) {
+                my $doc = $it->get_document;
+                for my $p (@pfx) {
+                        for (xap_terms($p, $doc)) {
+                                print $out "$_ $ibx_id\n" or die "print: $!";
+                        }
+                }
+        }
+        if (my $err = $req->{1}) { say $err 'mset.size=', $mset->size }
+}
+
+sub dispatch {
+        my ($req, $cmd, @argv) = @_;
+        my $fn = $req->can("cmd_$cmd") or return;
+        $GLP->getoptionsfromarray(\@argv, $req, @SPEC) or return;
+        my $dirs = delete $req->{d} or return warn 'no -d args';
+        my $key = join("\0", @$dirs);
+        $req->{srch} = $SRCH{$key} //= do {
+                my $new = { qp_flags => $PublicInbox::Search::QP_FLAGS };
+                my $first = shift @$dirs;
+                my $slow_phrase = -f "$first/iamchert";
+                $new->{xdb} = $X->{Database}->new($first);
+                for (@$dirs) {
+                        $slow_phrase ||= -f "$_/iamchert";
+                        $new->{xdb}->add_database($X->{Database}->new($_));
+                }
+                $slow_phrase or
+                        $new->{qp_flags} |= PublicInbox::Search::FLAG_PHRASE();
+                bless $new, $req->{c} ? 'PublicInbox::CodeSearch' :
+                                        'PublicInbox::Search';
+                $new->{qp} = $new->qparse_new;
+                $new;
+        };
+        eval { $fn->($req, @argv) };
+        warn "E: $@" if $@;
+}
+
+sub recv_loop {
+        local $SIG{__WARN__} = sub { print $stderr @_ };
+        my $rbuf;
+        while (!defined($parent_pid) || getppid != $parent_pid) {
+                my $req = bless {}, __PACKAGE__;
+                my @fds = PublicInbox::IPC::recv_cmd(\*STDIN, $rbuf, 4096*33);
+                scalar(@fds) or exit(66); # EX_NOINPUT
+                $fds[0] // die "recvmsg: $!";
+                my $i = 0;
+                for my $fd (@fds) {
+                        open($req->{$i++}, '+<&=', $fd) and next;
+                        warn("open(+<&=$fd) (FD=$i): $!");
+                        undef $req;
+                        last;
+                }
+                $req or next;
+                local $stderr = $req->{1} // \*STDERR;
+                if (chop($rbuf) ne "\0") {
+                        warn "not NUL-terminated";
+                        next;
+                }
+                my @argv = split(/\0/, $rbuf);
+                eval { $req->dispatch(@argv) } if @argv;
+        }
+}
+
+sub start_worker ($) {
+        my ($nr) = @_;
+        my $pid = fork // return warn("fork: $!");
+        if ($pid == 0) {
+                undef %PIDS;
+                recv_loop();
+                exit(0);
+        } else {
+                $PIDS{$pid} = $nr;
+        }
+}
+
+sub start (@) {
+        my (@argv) = @_;
+        local (%SRCH, %PIDS, $parent_pid);
+        PublicInbox::Search::load_xapian();
+        $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or
+                die 'bad args';
+        return recv_loop() if !$opt->{j};
+        die '-j must be >= 0' if $opt->{j} < 0;
+        start_worker($_) for (1..($opt->{j}));
+
+        my $quit;
+        until ($quit) {
+                my $p = waitpid(-1, 0) or return;
+                if (defined(my $nr = delete $PIDS{$p})) {
+                        $quit = 1 if ($? >> 8) == 66; # EX_NOINPUT
+                        start_worker($nr) if !$quit;
+                } else {
+                        warn "W: unknown pid=$p reaped\n";
+                }
+        }
+}
+
+1;
diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm
new file mode 100644
index 00000000..4571676b
--- /dev/null
+++ b/lib/PublicInbox/XapHelperCxx.pm
@@ -0,0 +1,93 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Just-ahead-of-time builder for the lib/PublicInbox/xap_helper.h shim.
+# I never want users to be without source code for repairs, so this
+# aims to replicate the feel of a scripting language using C++.
+# The resulting executable is not linked to Perl in any way.
+package PublicInbox::XapHelperCxx;
+use v5.12;
+use PublicInbox::Spawn;
+use PublicInbox::Search;
+my $dir = ($ENV{PERL_INLINE_DIRECTORY} //
+        die('BUG: PERL_INLINE_DIRECTORY unset')) . '/cxx';
+my $bin = "$dir/xap_helper";
+my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!);
+my @srcs = map { $srcpfx.$_ } qw(xap_helper.h);
+my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm);
+my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -O0') . ' ' .
+        ($ENV{LDFLAGS} // '-Wl,-O1 -Wl,--compress-debug-sections=zlib') .
+        qq{ -DTHREADID=}.PublicInbox::Search::THREADID;
+
+sub xflags_chg () {
+        open my $fh, '<', "$dir/XFLAGS" or return 1;
+        chomp(my $prev = <$fh>);
+        $prev ne $xflags;
+}
+
+sub build () {
+        if (!-d $dir) {
+                my $err;
+                mkdir($dir) or $err = $!;
+                die "mkdir($dir): $err" if !-d $dir;
+        }
+        use autodie;
+        require File::Temp;
+        require PublicInbox::CodeSearch;
+        my ($prog) = ($bin =~ m!/([^/]+)\z!);
+        my $pkg_config = $ENV{PKG_CONFIG} // 'pkg-config';
+        my $tmp = File::Temp->newdir(DIR => $dir) // die "newdir: $!";
+        my $src = "$tmp/$prog.cpp";
+        open my $fh, '>', $src;
+        for (@srcs) {
+                say $fh qq(# line 1 "$_");
+                open my $rfh, '<', $_;
+                local $/;
+                print $fh readline($rfh);
+        }
+        print $fh PublicInbox::Search::generate_cxx();
+        print $fh PublicInbox::CodeSearch::generate_cxx();
+        close $fh;
+
+        my $cmd = "$pkg_config --libs --cflags xapian-core";
+        chomp(my $fl = `$cmd`);
+        die "$cmd failed: \$?=$?" if $?;
+        my $cxx = $ENV{CXX} // 'c++';
+        $cmd = "$cxx $src $fl $xflags -o $tmp/$prog";
+        system($cmd) and die "$cmd failed: \$?=$?";
+        my $cf = "$tmp/XFLAGS";
+        open $fh, '>', $cf;
+        say $fh $xflags;
+        close $fh;
+        # not quite atomic, but close enough :P
+        rename("$tmp/$_", "$dir/$_") for ($prog, 'XFLAGS');
+}
+
+sub check_build () {
+        use Time::HiRes qw(stat);
+        my $ctime = 0;
+        my @bin = stat($bin) or return build();
+        for (@srcs, @pm_dep) {
+                my @st = stat($_) or die "stat $_: $!";
+                if ($st[10] > $ctime) {
+                        $ctime = $st[10];
+                        return build() if $ctime > $bin[10];
+                }
+        }
+        xflags_chg() ? build() : 0;
+}
+
+sub start (@) {
+        check_build();
+        my @cmd;
+        if (my $v = $ENV{VALGRIND}) {
+                $v = 'valgrind -v' if $v eq '1';
+                @cmd = split(/\s+/, $v);
+        }
+        push @cmd, $bin, @_;
+        my $prog = $cmd[0];
+        $cmd[0] =~ s!\A.*?/([^/]+)\z!$1!;
+        exec { $prog } @cmd;
+}
+
+1;
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
new file mode 100644
index 00000000..52db92b7
--- /dev/null
+++ b/lib/PublicInbox/xap_helper.h
@@ -0,0 +1,654 @@
+/*
+ * Copyright (C) all contributors <meta@public-inbox.org>
+ * License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+ *
+ * Standalone helper process using C and minimal C++ for Xapian,
+ * this is not linked to Perl in any way.
+ * C (not C++) is used as much as possible to lower the contribution
+ * barrier for hackers who mainly know C (this includes the maintainer).
+ * Everything here is an unstable internal API of public-inbox and
+ * NOT intended for ordinary users; only public-inbox hackers
+ */
+#ifndef _ALL_SOURCE
+#        define _ALL_SOURCE
+#endif
+#include <sys/resource.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/wait.h>
+
+#include <assert.h>
+#include <err.h> // BSD, glibc, and musl all have this
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <search.h>
+#include <stdio.h>
+#include <string.h>
+#include <sysexits.h>
+#include <unistd.h>
+#include <xapian.h> // our only reason for using C++
+
+#define MY_VER(maj,min,rev) ((maj) << 16 | (min) << 8 | (rev))
+#define XAP_VER \
+        MY_VER(XAPIAN_MAJOR_VERSION,XAPIAN_MINOR_VERSION,XAPIAN_REVISION)
+
+#if XAP_VER >= MY_VER(1,3,6)
+#        define NRP Xapian::NumberRangeProcessor
+#        define ADD_RP add_rangeprocessor
+#        define SET_MAX_EXPANSION set_max_expansion // technically 1.3.3
+#else
+#        define NRP Xapian::NumberValueRangeProcessor
+#        define ADD_RP add_valuerangeprocessor
+#        define SET_MAX_EXPANSION set_max_wildcard_expansion
+#endif
+
+static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P
+static pid_t parent_pid;
+static FILE *orig_err = stderr;
+static void *srch_tree; // tsearch + tdelete + twalk
+static pid_t *worker_pids; // nr => pid
+static unsigned long nworker;
+
+// PublicInbox::Search and PublicInbox::CodeSearch generate these:
+static void mail_nrp_init(void);
+static void code_nrp_init(void);
+static void qp_init_mail_search(Xapian::QueryParser *);
+static void qp_init_code_search(Xapian::QueryParser *);
+
+struct srch {
+        int paths_len; // int for comparisons
+        unsigned qp_flags;
+        Xapian::Database *db;
+        Xapian::QueryParser *qp;
+        char paths[]; // $shard_path0\0$shard_path1\0...
+};
+
+#define MY_ARG_MAX 256
+typedef bool (*cmd)(struct req *);
+
+// only one request per-process since we have RLIMIT_CPU timeout
+struct req { // argv and pfxv point into global rbuf
+        char *argv[MY_ARG_MAX];
+        char *pfxv[MY_ARG_MAX]; // -A <prefix>
+        struct srch *srch;
+        char *Oeidx_key;
+        cmd fn;
+        unsigned long long max;
+        unsigned long long off;
+        unsigned long timeout_sec;
+        long sort_col; // value column, negative means BoolWeight
+        int argc;
+        int pfxc;
+        FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional)
+        bool has_input; // fp[0] is bidirectional
+        bool collapse_threads;
+        bool code_search;
+        bool relevance; // sort by relevance before column
+        bool asc; // ascending sort
+};
+
+struct worker {
+        pid_t pid;
+        unsigned nr;
+};
+
+static bool has_threadid(const struct srch *srch)
+{
+        return srch->db->get_metadata("has_threadid") == "1";
+}
+
+static Xapian::Enquire prep_enquire(const struct req *req)
+{
+        Xapian::Enquire enq(*req->srch->db);
+        if (req->sort_col < 0) {
+                enq.set_weighting_scheme(Xapian::BoolWeight());
+                enq.set_docid_order(req->asc ? Xapian::Enquire::ASCENDING
+                                        : Xapian::Enquire::DESCENDING);
+        } else if (req->relevance) {
+                enq.set_sort_by_relevance_then_value(req->sort_col, !req->asc);
+        } else {
+                enq.set_sort_by_value_then_relevance(req->sort_col, !req->asc);
+        }
+        return enq;
+}
+
+static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
+{
+        if (!req->max)
+                req->max = 50;
+        for (int i = 0; i < 9; i++) {
+                try {
+                        Xapian::MSet mset = enq->get_mset(req->off, req->max);
+                        return mset;
+                } catch (const Xapian::DatabaseModifiedError & e) {
+                        req->srch->db->reopen();
+                }
+        }
+        return enq->get_mset(req->off, req->max);
+}
+
+static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
+{
+        struct srch *srch = req->srch;
+        Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
+        if (req->Oeidx_key) {
+                req->Oeidx_key[0] = 'O'; // modifies static rbuf
+                fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key);
+                qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+                                        Xapian::Query(req->Oeidx_key));
+        }
+        Xapian::Enquire enq = prep_enquire(req);
+        enq.set_query(qry);
+        // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm
+        if (req->collapse_threads && has_threadid(srch))
+                enq.set_collapse_key(THREADID);
+
+        return enquire_mset(req, &enq);
+}
+
+static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
+{
+        return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
+}
+
+static void dump_ibx_term(struct req *req, const char *pfx,
+                        Xapian::Document *doc, const char *ibx_id)
+{
+        Xapian::TermIterator cur = doc->termlist_begin();
+        Xapian::TermIterator end = doc->termlist_end();
+        size_t pfx_len = strlen(pfx);
+
+        for (cur.skip_to(pfx); cur != end; cur++) {
+                std::string tn = *cur;
+
+                if (starts_with(&tn, pfx, pfx_len))
+                        fprintf(req->fp[0], "%s %s\n",
+                                tn.c_str() + pfx_len, ibx_id);
+        }
+}
+
+static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
+{
+        return setvbuf(fp, NULL, _IOLBF, 0);
+}
+
+static bool cmd_dump_ibx(struct req *req)
+{
+        if ((optind + 1) >= req->argc) {
+                warnx("usage: dump_ibx [OPTIONS] IBX_ID QRY_STR");
+                return false; // need ibx_id + qry_str
+        }
+        if (!req->pfxc) {
+                warnx("dump_ibx requires -A PREFIX");
+                return false;
+        }
+
+        const char *ibx_id = req->argv[optind];
+        if (my_setlinebuf(req->fp[0])) { // for sort(1) pipe
+                perror("setlinebuf(fp[0])");
+                return false;
+        }
+        req->asc = true;
+        req->sort_col = -1;
+        req->max = (unsigned long long)req->srch->db->get_doccount();
+        Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
+        for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+                try {
+                        Xapian::Document doc = i.get_document();
+                        for (int p = 0; p < req->pfxc; p++)
+                                dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
+                } catch (const Xapian::Error & e) {
+                        fprintf(orig_err, "W: %s (#%ld)\n",
+                                e.get_description().c_str(), (long)(*i));
+                        continue;
+                }
+        }
+        if (req->fp[1])
+                fprintf(req->fp[1], "mset.size=%llu\n",
+                        (unsigned long long)mset.size());
+        return true;
+}
+
+// internal usage only
+static bool cmd_test_inspect(struct req *req)
+{
+        fprintf(req->fp[0], "pid=%d has_threadid=%d",
+                (int)getpid(), has_threadid(req->srch) ? 1 : 0);
+        return true;
+}
+
+#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n }
+static const struct cmd_entry {
+        size_t fn_len;
+        const char *fn_name;
+        cmd fn;
+} cmds[] = { // should be small enough to not need bsearch || gperf
+        // most common commands first
+        CMD(dump_ibx),
+        CMD(test_inspect), // least common commands last
+};
+
+#define MY_ARRAY_SIZE(x)        (sizeof(x)/sizeof((x)[0]))
+#define RECV_FD_CAPA 2
+#define RECV_FD_SPACE        (RECV_FD_CAPA * sizeof(int))
+union my_cmsg {
+        struct cmsghdr hdr;
+        char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
+};
+
+static void xclose(int fd)
+{
+        if (close(fd) < 0 && errno != EINTR)
+                err(EXIT_FAILURE, "BUG: close");
+}
+
+static bool recv_req(struct req *req, char *rbuf, size_t *len)
+{
+        union my_cmsg cmsg = { 0 };
+        struct msghdr msg = { .msg_iovlen = 1 };
+        struct iovec iov;
+        iov.iov_base = rbuf;
+        iov.iov_len = *len;
+        msg.msg_iov = &iov;
+        msg.msg_control = &cmsg.hdr;
+        msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE);
+
+        ssize_t r = recvmsg(sock_fd, &msg, 0);
+        if (r < 0)
+                err(EXIT_FAILURE, "recvmsg");
+        if (r == 0)
+                exit(EX_NOINPUT); /* grandparent went away */
+        *len = r;
+        if (r > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
+                        cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+                size_t len = cmsg.hdr.cmsg_len;
+                int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
+                size_t i;
+                bool fd_ok = true;
+                for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) {
+                        int fd = *fdp++;
+                        const char *mode = NULL;
+                        int fl = fd_ok ? fcntl(fd, F_GETFL) : 0;
+                        switch (fl) {
+                        case 0: break; // hit previous error
+                        case -1:
+                                warnx("invalid fd=%d", fd);
+                                fd_ok = false;
+                                break;
+                        case O_WRONLY: mode = "w"; break;
+                        case O_RDWR:
+                                mode = "r+";
+                                if (i == 0) req->has_input = true;
+                                break;
+                        default:
+                                warnx("invalid mode from F_GETFL: 0x%x", fl);
+                                fd_ok = false;
+                        }
+                        if (!fd_ok) {
+                                xclose(fd);
+                        } else {
+                                req->fp[i] = fdopen(fd, mode);
+                                if (!req->fp[i]) {
+                                        warn("fdopen(fd=%d)", fd);
+                                        fd_ok = false;
+                                }
+                        }
+                }
+                for (i = 0; !fd_ok && i < MY_ARRAY_SIZE(req->fp); i++)
+                        if (req->fp[i]) fclose(req->fp[i]);
+                return fd_ok;
+        }
+        warnx("no FD received in %zd-byte request", r);
+        return false;
+}
+
+#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
+static int split2argv(char **dst, char *buf, size_t len, size_t limit)
+{
+        if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
+                warnx("bogus argument given");
+                return 0;
+        }
+        size_t nr = 0;
+        char *c = buf;
+        for (size_t i = 1; i < len; i++) {
+                if (!buf[i]) {
+                        dst[nr++] = c;
+                        c = buf + i + 1;
+                }
+                if (nr == limit) {
+                        warnx("too many args: %zu", nr);
+                        return 0;
+                }
+        }
+        return (int)nr;
+}
+
+static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
+{
+        const struct srch *a = (const struct srch *)pa;
+        const struct srch *b = (const struct srch *)pb;
+        int diff = a->paths_len - b->paths_len;
+
+        return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len);
+}
+
+static bool is_chert(const char *dir)
+{
+        char iamchert[PATH_MAX];
+        struct stat sb;
+        int rc = snprintf(iamchert, sizeof(iamchert), "%s/iamchert", dir);
+
+        if (rc <= 0 || rc >= (int)sizeof(iamchert))
+                err(EXIT_FAILURE, "BUG: snprintf(%s/iamchert)", dir);
+        if (stat(iamchert, &sb) == 0 && S_ISREG(sb.st_mode))
+                return true;
+        return false;
+}
+
+static bool srch_init(struct req *req)
+{
+        char *dirv[MY_ARG_MAX];
+        int i;
+        struct srch *srch = req->srch;
+        int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
+        const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
+        srch->qp_flags = FLAG_PHRASE |
+                        Xapian::QueryParser::FLAG_BOOLEAN |
+                        Xapian::QueryParser::FLAG_LOVEHATE |
+                        Xapian::QueryParser::FLAG_WILDCARD;
+        if (is_chert(dirv[0]))
+                srch->qp_flags &= ~FLAG_PHRASE;
+        try {
+                srch->db = new Xapian::Database(dirv[0]);
+        } catch (...) {
+                warn("E: Xapian::Database(%s)", dirv[0]);
+                return false;
+        }
+        try {
+                for (i = 1; i < dirc; i++) {
+                        if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i]))
+                                srch->qp_flags &= ~FLAG_PHRASE;
+                        srch->db->add_database(Xapian::Database(dirv[i]));
+                }
+        } catch (...) {
+                warn("E: add_database(%s)", dirv[i]);
+                return false;
+        }
+        try {
+                srch->qp = new Xapian::QueryParser;
+        } catch (...) {
+                perror("E: Xapian::QueryParser");
+                return false;
+        }
+        srch->qp->set_default_op(Xapian::Query::OP_AND);
+        srch->qp->set_database(*srch->db);
+        try {
+                srch->qp->set_stemmer(Xapian::Stem("english"));
+        } catch (...) {
+                perror("E: Xapian::Stem");
+                return false;
+        }
+        srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME);
+        srch->qp->SET_MAX_EXPANSION(100);
+
+        if (req->code_search)
+                qp_init_code_search(srch->qp); // CodeSearch.pm
+        else
+                qp_init_mail_search(srch->qp); // Search.pm
+        return true;
+}
+
+static void free_srch(void *p) // tdestroy
+{
+        struct srch *srch = (struct srch *)p;
+        delete srch->qp;
+        delete srch->db;
+        free(srch);
+}
+
+static void dispatch(struct req *req)
+{
+        int c;
+        size_t size = strlen(req->argv[0]);
+        union {
+                struct srch *srch;
+                char *ptr;
+        } fbuf;
+        char *end;
+        FILE *kfp;
+        struct srch **s;
+        req->fn = NULL;
+        for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) {
+                if (cmds[c].fn_len == size &&
+                        !memcmp(cmds[c].fn_name, req->argv[0], size)) {
+                        req->fn = cmds[c].fn;
+                        break;
+                }
+        }
+        if (!req->fn) goto cmd_err;
+
+        kfp = open_memstream(&fbuf.ptr, &size);
+        // write padding, first
+        fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp);
+
+        // global getopt variables:
+        optind = 1;
+        opterr = optopt = 0;
+        optarg = NULL;
+
+        // keep sync with @PublicInbox::XapHelper::SPEC
+        while ((c = getopt(req->argc, req->argv, "acd:k:m:o:rtA:O:T:")) != -1) {
+                switch (c) {
+                case 'a': req->asc = true; break;
+                case 'c': req->code_search = true; break;
+                case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break;
+                case 'k':
+                        req->sort_col = strtol(optarg, &end, 10);
+                        if (*end) goto cmd_err;
+                        switch (req->sort_col) {
+                        case LONG_MAX: case LONG_MIN: goto cmd_err;
+                        }
+                        break;
+                case 'm':
+                        req->max = strtoull(optarg, &end, 10);
+                        if (*end) goto cmd_err;
+                        if (req->max == ULLONG_MAX) goto cmd_err;
+                        break;
+                case 'o':
+                        req->off = strtoull(optarg, &end, 10);
+                        if (*end) goto cmd_err;
+                        if (req->off == ULLONG_MAX) goto cmd_err;
+                        break;
+                case 'r': req->relevance = true; break;
+                case 't': req->collapse_threads = true; break;
+                case 'A':
+                        req->pfxv[req->pfxc++] = optarg;
+                        if (MY_ARG_MAX == req->pfxc) goto cmd_err;
+                        break;
+                case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix
+                case 'T':
+                        req->timeout_sec = strtoul(optarg, &end, 10);
+                        if (*end) goto cmd_err;
+                        if (req->timeout_sec == ULONG_MAX) goto cmd_err;
+                        break;
+                default: goto cmd_err;
+                }
+        }
+        if (ferror(kfp) | fclose(kfp)) {
+                perror("ferror|fclose");
+                goto cmd_err;
+        }
+        fbuf.srch->db = NULL;
+        fbuf.srch->qp = NULL;
+        fbuf.srch->paths_len = size - offsetof(struct srch, paths);
+        if (fbuf.srch->paths_len <= 0) {
+                free_srch(fbuf.srch);
+                warnx("no -d args");
+                goto cmd_err;
+        }
+        s = (struct srch **)tsearch(fbuf.srch, &srch_tree, srch_cmp);
+        if (!s) {
+                perror("tsearch");
+                goto cmd_err;
+        }
+        req->srch = *s;
+        if (req->srch != fbuf.srch) { // reuse existing
+                free_srch(fbuf.srch);
+        } else if (!srch_init(req)) {
+                assert(fbuf.srch == *((struct srch **)tfind(
+                                        fbuf.srch, &srch_tree, srch_cmp)));
+                void *del = tdelete(fbuf.srch, &srch_tree, srch_cmp);
+                assert(del);
+                free_srch(fbuf.srch);
+                goto cmd_err;
+        }
+        try {
+                if (!req->fn(req))
+                        goto cmd_err;
+        } catch (const Xapian::Error & e) {
+                warnx("Xapian::Error: %s", e.get_description().c_str());
+        } catch (...) {
+                warn("unhandled exception");
+        }
+cmd_err:
+        return; // just be silent on errors, for now
+}
+
+static void cleanup_pids(void)
+{
+        free(worker_pids);
+        worker_pids = NULL;
+}
+
+static void recv_loop(void) // worker process loop
+{
+        static char rbuf[4096 * 33]; // per-process
+        while (!parent_pid || getppid() == parent_pid) {
+                size_t len = sizeof(rbuf);
+                struct req req = { 0 };
+                if (!recv_req(&req, rbuf, &len))
+                        continue;
+                if (req.fp[1]) {
+                        if (my_setlinebuf(req.fp[1]))
+                                perror("W: setlinebuf(req.fp[1])");
+                        stderr = req.fp[1];
+                }
+                req.argc = SPLIT2ARGV(req.argv, rbuf, len);
+                if (req.argc > 0)
+                        dispatch(&req);
+                if (ferror(req.fp[0]) | fclose(req.fp[0]))
+                        perror("ferror|fclose fp[0]");
+                if (req.fp[1]) {
+                        stderr = orig_err;
+                        if (ferror(req.fp[1]) | fclose(req.fp[1]))
+                                perror("ferror|fclose fp[1]");
+                }
+        }
+}
+
+static void insert_pid(pid_t pid, unsigned nr)
+{
+        assert(!worker_pids[nr]);
+        worker_pids[nr] = pid;
+}
+
+static int delete_pid(pid_t pid)
+{
+        for (unsigned nr = 0; nr < nworker; nr++) {
+                if (worker_pids[nr] == pid) {
+                        worker_pids[nr] = 0;
+                        return nr;
+                }
+        }
+        warnx("W: unknown pid=%d reaped", (int)pid);
+        return -1;
+}
+
+static void start_worker(unsigned nr)
+{
+        pid_t pid = fork();
+        if (pid < 0) {
+                warn("E: fork(worker=%u)", nr);
+        } else if (pid > 0) {
+                insert_pid(pid, nr);
+        } else {
+                cleanup_pids();
+                recv_loop();
+                exit(0);
+        }
+}
+
+static void cleanup_all(void)
+{
+        cleanup_pids();
+#ifdef __GLIBC__
+        tdestroy(srch_tree, free_srch);
+        srch_tree = NULL;
+#endif
+}
+
+int main(int argc, char *argv[])
+{
+        int c;
+
+        mail_nrp_init();
+        code_nrp_init();
+        atexit(cleanup_all);
+
+        nworker = 0;
+#ifdef _SC_NPROCESSORS_ONLN
+        long j = sysconf(_SC_NPROCESSORS_ONLN);
+        if (j > 0)
+                nworker = j > UCHAR_MAX ? UCHAR_MAX : j;
+#endif // _SC_NPROCESSORS_ONLN
+
+        // make warn/warnx/err multi-process friendly:
+        if (my_setlinebuf(stderr))
+                err(EXIT_FAILURE, "setlinebuf(stderr)");
+        // not using -W<workers> like Daemon.pm, since -W is reserved (glibc)
+        while ((c = getopt(argc, argv, "j:")) != -1) {
+                char *end;
+
+                switch (c) {
+                case 'j':
+                        nworker = strtoul(optarg, &end, 10);
+                        if (*end != 0 || nworker > USHRT_MAX)
+                                errx(EXIT_FAILURE, "-j %s invalid", optarg);
+                        break;
+                case ':':
+                        errx(EXIT_FAILURE, "missing argument: `-%c'", optopt);
+                case '?':
+                        errx(EXIT_FAILURE, "unrecognized: `-%c'", optopt);
+                default:
+                        errx(EXIT_FAILURE, "BUG: `-%c'", c);
+                }
+        }
+        if (nworker == 0) {
+                recv_loop();
+        } else {
+                parent_pid = getpid();
+                worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
+                if (!worker_pids)
+                        err(EXIT_FAILURE, "calloc");
+                for (unsigned i = 0; i < nworker; i++)
+                        start_worker(i);
+
+                int st;
+                pid_t pid;
+                bool quit = false;
+                while ((pid = wait(&st)) > 0) {
+                        int nr = delete_pid(pid);
+                        if (nr < 0) continue;
+                        if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
+                                quit = true;
+                        if (!quit)
+                                start_worker(nr);
+                }
+        }
+        return 0;
+}
diff --git a/t/xap_helper.t b/t/xap_helper.t
new file mode 100644
index 00000000..f00a845a
--- /dev/null
+++ b/t/xap_helper.t
@@ -0,0 +1,147 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use v5.12;
+use PublicInbox::TestCommon;
+require_mods(qw(DBD::SQLite Search::Xapian));
+use PublicInbox::Spawn qw(spawn);
+use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM MSG_EOR);
+require PublicInbox::AutoReap;
+require PublicInbox::IPC;
+require PublicInbox::XapClient;
+use autodie;
+my ($tmp, $for_destroy) = tmpdir();
+
+my $fi_data = './t/git.fast-import-data';
+open my $fi_fh, '<', $fi_data;
+open my $dh, '<', '.';
+my $crepo = create_coderepo 'for-cindex', sub {
+        my ($d) = @_;
+        xsys_e([qw(git init -q --bare)]);
+        xsys_e([qw(git fast-import --quiet)], undef, { 0 => $fi_fh });
+        chdir($dh);
+        run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j1), $d])
+                or xbail '-cindex internal';
+        run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j3 -d),
+                "$d/cidx-ext", $d]) or xbail '-cindex "external"';
+};
+$dh = $fi_fh = undef;
+
+my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2,
+                        tmpdir => "$tmp/v2", sub {
+        my ($im) = @_;
+        for my $f (qw(t/data/0001.patch t/data/binary.patch
+                        t/data/message_embed.eml
+                        t/solve/0001-simple-mod.patch
+                        t/solve/0002-rename-with-modifications.patch
+                        t/solve/bare.patch)) {
+                $im->add(eml_load($f)) or BAIL_OUT;
+        }
+};
+
+my @ibx_idx = glob("$v2->{inboxdir}/xap*/?");
+my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
+my (@ext) = glob("$crepo/cidx-ext/cidx*/?");
+is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext);
+is(scalar(@int), 1, 'have 1 internal shard') or diag explain(\@int);
+
+my $doreq = sub {
+        my ($s, @arg) = @_;
+        my $err = pop @arg if ref($arg[-1]);
+        pipe(my $x, my $y);
+        my $buf = join("\0", @arg, '');
+        my @fds = fileno($y);
+        push @fds, fileno($err) if $err;
+        my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, MSG_EOR);
+        $n // xbail "send: $!";
+        my $arg = "@arg";
+        $arg =~ s/\Q$tmp\E/\$TMP/gs;
+        is(length($buf), $n, "req $arg sent");
+        $x;
+};
+
+my $env = { PERL5LIB => join(':', @INC) };
+my $test = sub {
+        my (@arg) = @_;
+        socketpair(my $s, my $y, AF_UNIX, SOCK_SEQPACKET, 0);
+        my $pid = spawn([$^X, '-w', @arg], $env, { 0 => $y });
+        my $ar = PublicInbox::AutoReap->new($pid);
+        diag "$arg[-1] running pid=$pid";
+        close $y;
+        my $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
+        my %info = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
+        is($info{has_threadid}, '1', 'has_threadid true for inbox');
+        like($info{pid}, qr/\A\d+\z/, 'got PID from inbox inspect');
+
+        $r = $doreq->($s, qw(test_inspect -d), $int[0]);
+        my %cinfo = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
+        is($cinfo{has_threadid}, '0', 'has_threadid false for cindex');
+        is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex');
+
+        my @dump = (qw(dump_ibx -A XDFID), (map { ('-d', $_) } @ibx_idx),
+                        qw(13 rt:0..));
+        $r = $doreq->($s, @dump);
+        my @res;
+        while (sysread($r, my $buf, 512) != 0) { push @res, $buf }
+        is(grep(/\n\z/s, @res), scalar(@res), 'line buffered');
+
+        pipe(my $err_rd, my $err_wr);
+        $r = $doreq->($s, @dump, $err_wr);
+        close $err_wr;
+        my $res = do { local $/; <$r> };
+        is(join('', @res), $res, 'got identical response w/ error pipe');
+        my $stats = do { local $/; <$err_rd> };
+        is($stats, "mset.size=6\n", 'mset.size reported');
+
+        if ($arg[-1] !~ /\('-j0'\)/) {
+                kill('KILL', $cinfo{pid});
+                $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
+                %info = map {
+                        split(/=/, $_, 2)
+                } split(/ /, do { local $/; <$r> });
+                isnt($info{pid}, $cinfo{pid}, 'spawned new worker');
+        }
+        $ar;
+};
+my $ar;
+
+$ar = $test->(qw[-MPublicInbox::XapHelper -e
+                PublicInbox::XapHelper::start('-j0')]);
+$ar = $test->(qw[-MPublicInbox::XapHelper -e
+                PublicInbox::XapHelper::start('-j1')]);
+
+my @NO_CXX = (0);
+SKIP: {
+        eval {
+                require PublicInbox::XapHelperCxx;
+                PublicInbox::XapHelperCxx::check_build();
+        };
+        skip "XapHelperCxx build: $@", 1 if $@;
+        push @NO_CXX, 1;
+
+        $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
+                        PublicInbox::XapHelperCxx::start('-j0')]);
+        $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
+                        PublicInbox::XapHelperCxx::start('-j1')]);
+};
+
+for my $n (@NO_CXX) {
+        local $ENV{PI_NO_CXX} = $n;
+        my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');
+        $ar = PublicInbox::AutoReap->new($pid);
+        pipe(my $err_r, my $err_w);
+
+        # git patch-id --stable <t/data/0001.patch | awk '{print $1}'
+        my $dfid = '91ee6b761fc7f47cad9f2b09b10489f313eb5b71';
+        my $mid = '20180720072141.GA15957@example';
+        my $r = $xhc->mkreq([ undef, $err_w ], qw(dump_ibx -A XDFID -A Q),
+                                (map { ('-d', $_) } @ibx_idx),
+                                9, "mid:$mid");
+        close $err_w;
+        my $res = do { local $/; <$r> };
+        is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})");
+        my $err = do { local $/; <$err_r> };
+        is($err, "mset.size=1\n", "got expected status ($xhc->{impl})");
+}
+
+done_testing;