From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 4/7] introduce optional C++ xap_helper
Date: Thu, 24 Aug 2023 01:22:33 +0000 [thread overview]
Message-ID: <20230824012236.3968030-5-e@80x24.org> (raw)
In-Reply-To: <20230824012236.3968030-1-e@80x24.org>
This allows us to perform the expensive "dump_ibx" operations in
native C++ code using the Xapian C++ library. This provides the
majority of the speedup with the -cindex --associate switch.
Eventually this may be expanded to cover all uses of Xapian
within the project to ensure we have access to Xapian APIs which
aren't available in XS|SWIG bindings; and also for
ease-of-installation on systems which don't provide
pre-packaged Perl Xapian bindings (e.g. OpenBSD 7.3) but
do provide Xapian development libraries.
Most of the C++ code is still C, as I'm not remotely familiar
with C++ compared to C. I suspect many users and potential
hackers being from git, Linux kernel, and glibc world are in the
same boat.
---
MANIFEST | 7 +-
lib/PublicInbox/CidxDumpIbx.pm | 59 ---
lib/PublicInbox/CidxXapHelperAux.pm | 48 ++
lib/PublicInbox/CodeSearch.pm | 54 ++-
lib/PublicInbox/CodeSearchIdx.pm | 78 ++--
lib/PublicInbox/Isearch.pm | 5 +
lib/PublicInbox/Search.pm | 56 ++-
lib/PublicInbox/XapClient.pm | 50 +++
lib/PublicInbox/XapHelper.pm | 144 ++++++
lib/PublicInbox/XapHelperCxx.pm | 93 ++++
lib/PublicInbox/xap_helper.h | 654 ++++++++++++++++++++++++++++
t/xap_helper.t | 147 +++++++
12 files changed, 1278 insertions(+), 117 deletions(-)
delete mode 100644 lib/PublicInbox/CidxDumpIbx.pm
create mode 100644 lib/PublicInbox/CidxXapHelperAux.pm
create mode 100644 lib/PublicInbox/XapClient.pm
create mode 100644 lib/PublicInbox/XapHelper.pm
create mode 100644 lib/PublicInbox/XapHelperCxx.pm
create mode 100644 lib/PublicInbox/xap_helper.h
create mode 100644 t/xap_helper.t
diff --git a/MANIFEST b/MANIFEST
index 162e3038..4f61af42 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,10 +162,10 @@ lib/PublicInbox/AltId.pm
lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
-lib/PublicInbox/CidxDumpIbx.pm
lib/PublicInbox/CidxDumpShardRoots.pm
lib/PublicInbox/CidxLogP.pm
lib/PublicInbox/CidxRecvIbx.pm
+lib/PublicInbox/CidxXapHelperAux.pm
lib/PublicInbox/CmdIPC4.pm
lib/PublicInbox/CodeSearch.pm
lib/PublicInbox/CodeSearchIdx.pm
@@ -368,8 +368,12 @@ lib/PublicInbox/WwwListing.pm
lib/PublicInbox/WwwStatic.pm
lib/PublicInbox/WwwStream.pm
lib/PublicInbox/WwwText.pm
+lib/PublicInbox/XapClient.pm
+lib/PublicInbox/XapHelper.pm
+lib/PublicInbox/XapHelperCxx.pm
lib/PublicInbox/Xapcmd.pm
lib/PublicInbox/gcf2_libgit2.h
+lib/PublicInbox/xap_helper.h
sa_config/Makefile
sa_config/README
sa_config/root/etc/spamassassin/public-inbox.pre
@@ -610,6 +614,7 @@ t/www_altid.t
t/www_listing.t
t/www_static.t
t/x-unknown-alpine.eml
+t/xap_helper.t
t/xcpdb-reshard.t
version-gen.perl
xt/cmp-msgstr.t
diff --git a/lib/PublicInbox/CidxDumpIbx.pm b/lib/PublicInbox/CidxDumpIbx.pm
deleted file mode 100644
index e1bc273d..00000000
--- a/lib/PublicInbox/CidxDumpIbx.pm
+++ /dev/null
@@ -1,59 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# Intended for PublicInbox::DS::event_loop for -cindex --associate
-# Iterating through mset->items is slow in Perl due to method dispatch
-# and that loop may implemented in C++ using Xapian directly
-package PublicInbox::CidxDumpIbx;
-use v5.12;
-use PublicInbox::Search qw(xap_terms);
-use PublicInbox::DS;
-use Socket qw(MSG_EOR);
-
-sub start {
- my ($rcvibx, $ibx_id) = @_;
- my $cidx = $rcvibx->{cidx};
- my $ibx = $cidx->{IBX}->[$ibx_id] // die "BUG: no IBX[$ibx_id]";
- my $self = bless { rcvibx => $rcvibx, ekey => $ibx->eidx_key,
- ibx_id => $ibx_id }, __PACKAGE__;
- $self->{srch} = $ibx->isrch // do {
- warn("W: $self->{ekey} has no search index (ignoring)\n");
- return undef;
- };
- my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
- $self->{mset} = $self->{srch}->mset($rcvibx->{qry_str}, $opt);
- $self->{iter} = 0;
- event_step($self);
-}
-
-sub event_step {
- my ($self) = @_;
- my $rcvibx = $self->{rcvibx} // die 'BUG: no rcvibx';
- return if $rcvibx->{cidx}->do_quit;
- my $last = $self->{mset}->size - 1;
- my $cur = $self->{iter};
- my $end = $cur + 9999;
- if ($end >= $last) {
- send($rcvibx->{op_p}, 'index_next', MSG_EOR);
- $end = $last;
- }
- $self->{iter} = $end + 1;
- local $0 = "dumping $self->{ekey} $cur..$end";
-
- my $sort_w = $rcvibx->{sort_w};
- my $ibx_id = $self->{ibx_id};
- local $0 = "dumping $self->{ekey} $cur..$end";
- $rcvibx->{cidx}->progress($0);
- for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
- my $doc = $x->get_document;
- for my $p (@{$rcvibx->{cidx}->{ASSOC_PFX}}) {
- for (xap_terms($p, $doc)) {
- print $sort_w "$_ $ibx_id\n" or die "print: $!";
- }
- }
- }
- $end < $last && !$rcvibx->{cidx}->do_quit and
- PublicInbox::DS::requeue($self);
-}
-
-1;
diff --git a/lib/PublicInbox/CidxXapHelperAux.pm b/lib/PublicInbox/CidxXapHelperAux.pm
new file mode 100644
index 00000000..c9a5ddad
--- /dev/null
+++ b/lib/PublicInbox/CidxXapHelperAux.pm
@@ -0,0 +1,48 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Intended for PublicInbox::DS::event_loop for -cindex --associate,
+# this reports auxilliary status while dumping
+package PublicInbox::CidxXapHelperAux;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN);
+
+# rpipe connects to req->fp[1] in xap_helper.h
+sub new {
+ my ($cls, $rpipe, $cidx, $pfx, $associate) = @_;
+ my $self = bless {
+ cidx => $cidx,
+ pfx => $pfx,
+ associate => $associate
+ }, $cls;
+ $rpipe->blocking(0);
+ $self->SUPER::new($rpipe, EPOLLIN);
+}
+
+sub event_step {
+ my ($self) = @_; # xap_helper.h is line-buffered
+ my $buf = delete($self->{buf}) // '';
+ my $n = sysread($self->{sock}, $buf, 65536, length($buf));
+ if (!defined($n)) {
+ return if $!{EAGAIN};
+ die "sysread: $!";
+ }
+ my $pfx = $self->{pfx};
+ if ($n == 0) {
+ $self->{cidx}->progress("$pfx $buf") if $buf ne '';
+ return $self->close;
+ }
+ my @lines = split(/^/m, $buf);
+ $self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n";
+ for my $l (@lines) {
+ if ($l =~ /\Amset\.size=[0-9]+\n\z/) {
+ delete $self->{cidx}->{PENDING}->{$pfx};
+ $self->{cidx}->index_next;
+ }
+ chomp $l;
+ $self->{cidx}->progress("$pfx $l");
+ }
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearch.pm b/lib/PublicInbox/CodeSearch.pm
index a5ccce03..6234e259 100644
--- a/lib/PublicInbox/CodeSearch.pm
+++ b/lib/PublicInbox/CodeSearch.pm
@@ -16,6 +16,12 @@ use constant {
# in refs/{heads,tags}. AT(col=0) may be used to store disk usage
# in the future, but disk usage calculation is espensive w/ alternates
};
+our @CODE_NRP;
+our @CODE_VMAP = (
+ [ AT, 'd:' ], # mairix compat
+ [ AT, 'dt:' ], # mail compat
+ [ CT, 'ct:' ],
+);
# note: the non-X term prefix allocations are shared with Xapian omega,
# see xapian-applications/omega/docs/termprefixes.rst
@@ -45,15 +51,17 @@ sub new {
bless { xpfx => "$dir/cidx".CIDX_SCHEMA_VER }, $cls;
}
-sub cqparse_new ($) {
+sub qparse_new ($) {
my ($self) = @_;
my $qp = $self->qp_init_common;
my $cb = $qp->can('add_valuerangeprocessor') //
$qp->can('add_rangeprocessor'); # Xapian 1.5.0+
- $cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'd:')); # mairix compat
- $cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'dt:')); # mail compat
- $cb->($qp, $PublicInbox::Search::NVRP->new(CT, 'ct:'));
-
+ if (!@CODE_NRP) {
+ @CODE_NRP = map {
+ $PublicInbox::Search::NVRP->new(@$_)
+ } @CODE_VMAP;
+ }
+ $cb->($qp, $_) for @CODE_NRP;
while (my ($name, $pfx) = each %bool_pfx_external) {
$qp->add_boolean_prefix($name, $_) for split(/ /, $pfx);
}
@@ -63,6 +71,40 @@ sub cqparse_new ($) {
$qp;
}
+sub generate_cxx () { # generates snippet for xap_helper.h
+ my ($line, $file) = (__LINE__ + 2, __FILE__);
+ my $ret = <<EOM;
+# line ${\__LINE__} "${\__FILE__}"
+static NRP *code_nrp[${\scalar(@CODE_VMAP)}];
+static void code_nrp_init(void)
+{
+EOM
+ for (0..$#CODE_VMAP) {
+ my $x = $CODE_VMAP[$_];
+ $ret .= qq{\tcode_nrp[$_] = new NRP($x->[0], "$x->[1]");\n}
+ }
+$ret .= <<EOM;
+}
+
+# line ${\__LINE__} "${\__FILE__}"
+static void qp_init_code_search(Xapian::QueryParser *qp)
+{
+ for (size_t i = 0; i < MY_ARRAY_SIZE(code_nrp); i++)
+ qp->ADD_RP(code_nrp[i]);
+EOM
+ for my $name (sort keys %bool_pfx_external) {
+ for (split(/ /, $bool_pfx_external{$name})) {
+ $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n}
+ }
+ }
+ for my $name (sort keys %prob_prefix) {
+ for (split(/ /, $prob_prefix{$name})) {
+ $ret .= qq{\tqp->add_prefix("$name", "$_");\n}
+ }
+ }
+ $ret .= "}\n";
+}
+
# returns a Xapian::Query to filter by roots
sub roots_filter { # retry_reopen callback
my ($self, $git_dir) = @_;
@@ -89,7 +131,7 @@ sub roots_filter { # retry_reopen callback
sub mset {
my ($self, $qry_str, $opt) = @_;
- my $qp = $self->{qp} //= cqparse_new($self);
+ my $qp = $self->{qp} //= qparse_new($self);
my $qry = $qp->parse_query($qry_str, $self->{qp_flags});
# limit to commits with shared roots
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index e795c2b3..b8afecd2 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -42,6 +42,7 @@ use PublicInbox::IPC qw(nproc_shards);
use POSIX qw(WNOHANG SEEK_SET);
use File::Path ();
use File::Spec ();
+use List::Util qw(max);
use PublicInbox::SHA qw(sha256_hex);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::SearchIdx qw(add_val);
@@ -69,6 +70,9 @@ our (
@GIT_DIR_GONE, # [ git_dir1, git_dir2 ]
$PRUNE_DONE, # marks off prune completions
$NCHANGE, # current number of changes
+ $NPROC,
+ $XH_PID, # XapHelper PID
+ $XHC, # XapClient
$REPO_CTX, # current repo being indexed in shards
$IDX_TODO, # [ $git0, $root0, $git1, $root1, ...]
$GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...]
@@ -81,8 +85,8 @@ our (
@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
@ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
$QRY_STR, # common query string for both code and inbox associations
- $IBXDIR_FEED, # SOCK_SEQPACKET
- @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate
+ @DUMP_SHARD_ROOTS_OK, # for associate
+ $DUMP_IBX_WPIPE, # goes to sort(1)
@ID2ROOT,
);
@@ -529,45 +533,29 @@ sub dump_roots_once {
progress($self, 'waiting on dump_shard_roots sort');
}
-sub recv_ibx_done { # via PktOp on recv_ibx completion
- my ($self, $pid, $n) = @_;
- return if $DO_QUIT;
- progress($self, "recv_ibx [$n] done");
- $RECV_IBX_OK[$n] = 1;
-}
-
-# causes a worker to become a dumper for inbox/extindex
-sub recv_ibx { # wq_io_do
- my ($self, $qry_str) = @_;
- PublicInbox::CidxRecvIbx->new($self, $qry_str);
-}
-
-sub dump_ibx { # sends to PublicInbox::CidxRecvIbx::event_step
- my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR"
- my $n = length($id_dir);
- my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!";
- $n == $w or die "send($id_dir) $w != $n";
+sub dump_ibx { # sends to xap_helper.h
+ my ($self, $ibx_id) = @_;
+ my $ibx = $IBX[$ibx_id] // die "BUG: no IBX[$ibx_id]";
+ my @cmd = ('dump_ibx', $ibx->isrch->xh_args,
+ (map { ('-A', $_) } @ASSOC_PFX),
+ $ibx_id, $QRY_STR);
+ pipe(my ($r, $w)) or die "pipe: $!";
+ $XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
+ my $ekey = $ibx->eidx_key;
+ $self->{PENDING}->{$ekey} = undef;
+ PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate});
}
-# repurpose shard workers to dump inbox patchids with perfect balance
sub dump_ibx_start {
my ($self, $associate) = @_;
- pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+ pipe(my $sort_r, $DUMP_IBX_WPIPE) or die "pipe: $!";
my @sort = (@SORT, '-k1,1');
my $dst = "$TMPDIR/to_ibx_id";
open my $fh, '>', $dst or die "open($dst): $!";
my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
close $sort_r or die "close: $!";
awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
-
- my ($r, $w);
- socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{recv_ibx_done} = [ $self, $associate ];
- $c->{ops}->{index_next} = [ $self ];
- my $io = [ $p->{op_p}, $r, $sort_w ];
- $_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS;
- $IBXDIR_FEED = $w;
+ ($XHC, $XH_PID) = PublicInbox::XapClient::start_helper("-j$NPROC");
}
sub index_next ($) {
@@ -584,8 +572,8 @@ sub index_next ($) {
} elsif ($TMPDIR) {
delete $TODO{dump_ibx_start}; # runs OnDestroy once
return dump_ibx($self, shift @IBXQ) if @IBXQ;
- progress($self, 'done dumping inboxes') if $IBXDIR_FEED;
- undef $IBXDIR_FEED; # done dumping inboxes, dump roots
+ progress($self, 'done dumping inboxes') if $DUMP_IBX_WPIPE;
+ undef $DUMP_IBX_WPIPE; # done dumping inboxes, dump roots
dump_roots_once($self, delete($TODO{associate}) // return);
}
# else: wait for shards_active (post_loop_do) callback
@@ -795,7 +783,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
sub parent_quit {
$DO_QUIT = POSIX->can("SIG$_[0]")->();
- $IBXDIR_FEED = undef;
+ $XHC = undef;
kill_shards(@_);
warn "# SIG$_[0] received, quitting...\n";
}
@@ -875,8 +863,8 @@ sub associate {
@IDX_SHARDS or return warn("# aborting on no shards\n");
grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
die "E: shards not dumped properly\n";
- grep(defined, @RECV_IBX_OK) == @IDX_SHARDS or
- die "E: inboxes not dumped properly\n";
+ my @pending = keys %{$self->{PENDING}};
+ die "E: pending=@pending jobs not done\n" if @pending;
progress($self, 'associating...');
my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
@@ -1032,8 +1020,9 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
sub init_associate_prefork ($) {
my ($self) = @_;
return unless $self->{-opt}->{associate};
- require PublicInbox::CidxRecvIbx;
require PublicInbox::CidxDumpShardRoots;
+ require PublicInbox::CidxXapHelperAux;
+ require PublicInbox::XapClient;
$self->{-pi_cfg} = PublicInbox::Config->new;
my @unknown;
my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]};
@@ -1055,7 +1044,7 @@ EOM
sub _prep_ibx { # each_inbox callback
my ($ibx, $self, $incl) = @_;
($self->{-opt}->{all} || exists($incl->{$ibx->{inboxdir}})) and
- push @{$self->{IBX}}, $ibx;
+ push @IBX, $ibx;
}
sub show_roots { # for diagnostics
@@ -1102,13 +1091,13 @@ sub cidx_run { # main entry point
local $GIT_TODO = [];
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
- %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT,
- @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK);
+ %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
+ @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
local @SORT = (undef, '-u');
- local $self->{IBX} = \@IBX;
local $self->{ASSOC_PFX} = \@ASSOC_PFX;
+ local $self->{PENDING} = {};
local $self->{-pi_cfg};
if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) {
require File::Temp;
@@ -1165,13 +1154,14 @@ sub cidx_run { # main entry point
@GIT_DIR_GONE = uniqstr @GIT_DIR_GONE, @excl;
}
local $NCHANGE = 0;
- local $LIVE_JOBS = $self->{-opt}->{jobs} ||
- PublicInbox::IPC::detect_nproc() || 2;
+ local $NPROC = PublicInbox::IPC::detect_nproc();
+ local $LIVE_JOBS = $self->{-opt}->{jobs} || $NPROC || 2;
local @RDONLY_XDB = $self->xdb_shards_flat;
init_prune($self);
init_associate_postfork($self);
scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
- index_next($self) for (1..$LIVE_JOBS);
+ my $max = $TODO{associate} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS;
+ index_next($self) for (1..$max);
# FreeBSD ignores/discards SIGCHLD while signals are blocked and
# EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
diff --git a/lib/PublicInbox/Isearch.pm b/lib/PublicInbox/Isearch.pm
index 5cac08ba..62112171 100644
--- a/lib/PublicInbox/Isearch.pm
+++ b/lib/PublicInbox/Isearch.pm
@@ -123,4 +123,9 @@ sub has_threadid { 1 }
sub help { $_[0]->{es}->help }
+sub xh_args { # prep getopt args to feed to xap_helper.h socket
+ my ($self, $opt) = @_; # TODO uid_range
+ ($self->{es}->xh_args, '-O', $self->{eidx_key});
+}
+
1;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index d5b0bceb..2e784646 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -66,6 +66,15 @@ our $NVRP; # '$Xap::'.('NumberValueRangeProcessor' or 'NumberRangeProcessor')
# let's hope the ABI is stable
our $ENQ_DESCENDING = 0;
our $ENQ_ASCENDING = 1;
+our @MAIL_VMAP = (
+ [ YYYYMMDD, 'd:'],
+ [ DT, 'dt:' ],
+ # these are undocumented for WWW, but lei and IMAP use them
+ [ BYTES, 'z:' ],
+ [ TS, 'rt:' ],
+ [ UID, 'uid:' ]
+);
+our @MAIL_NRP;
sub load_xapian () {
return 1 if defined $Xap;
@@ -101,6 +110,7 @@ sub load_xapian () {
# or make indexlevel=medium as default
$QP_FLAGS = FLAG_PHRASE() | FLAG_BOOLEAN() | FLAG_LOVEHATE() |
FLAG_WILDCARD();
+ @MAIL_NRP = map { $NVRP->new(@$_) } @MAIL_VMAP;
return 1;
}
undef;
@@ -490,14 +500,8 @@ sub qparse_new {
my $qp = qp_init_common($self);
my $cb = $qp->can('add_valuerangeprocessor') //
$qp->can('add_rangeprocessor'); # Xapian 1.5.0+
- $cb->($qp, $NVRP->new(YYYYMMDD, 'd:'));
- $cb->($qp, $NVRP->new(DT, 'dt:'));
-
- # for IMAP, undocumented for WWW and may be split off go away
- $cb->($qp, $NVRP->new(BYTES, 'z:'));
- $cb->($qp, $NVRP->new(TS, 'rt:'));
- $cb->($qp, $NVRP->new(UID, 'uid:'));
+ $cb->($qp, $_) for @MAIL_NRP;
while (my ($name, $prefix) = each %bool_pfx_external) {
$qp->add_boolean_prefix($name, $_) foreach split(/ /, $prefix);
}
@@ -527,6 +531,40 @@ EOF
$qp;
}
+sub generate_cxx () { # generates snippet for xap_helper.h
+ my $ret = <<EOM;
+# line ${\__LINE__} "${\__FILE__}"
+static NRP *mail_nrp[${\scalar(@MAIL_VMAP)}];
+static void mail_nrp_init(void)
+{
+EOM
+ for (0..$#MAIL_VMAP) {
+ my $x = $MAIL_VMAP[$_];
+ $ret .= qq{\tmail_nrp[$_] = new NRP($x->[0], "$x->[1]");\n}
+ }
+$ret .= <<EOM;
+}
+
+# line ${\__LINE__} "${\__FILE__}"
+static void qp_init_mail_search(Xapian::QueryParser *qp)
+{
+ for (size_t i = 0; i < MY_ARRAY_SIZE(mail_nrp); i++)
+ qp->ADD_RP(mail_nrp[i]);
+EOM
+ for my $name (sort keys %bool_pfx_external) {
+ for (split(/ /, $bool_pfx_external{$name})) {
+ $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n}
+ }
+ }
+ # TODO: altid support
+ for my $name (sort keys %prob_prefix) {
+ for (split(/ /, $prob_prefix{$name})) {
+ $ret .= qq{\tqp->add_prefix("$name", "$_");\n}
+ }
+ }
+ $ret .= "}\n";
+}
+
sub help {
my ($self) = @_;
$self->{qp} //= $self->qparse_new; # parse altids
@@ -585,4 +623,8 @@ sub all_terms {
wantarray ? (sort keys %ret) : \%ret;
}
+sub xh_args { # prep getopt args to feed to xap_helper.h socket
+ map { ('-d', $_) } shard_dirs($_[0]);
+}
+
1;
diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm
new file mode 100644
index 00000000..56e3c3b4
--- /dev/null
+++ b/lib/PublicInbox/XapClient.pm
@@ -0,0 +1,50 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# This talks to (XapHelperCxx.pm + xap_helper.h) or XapHelper.pm
+# and will eventually allow users with neither XS nor SWIG Perl
+# bindings to use Xapian as long as they have Xapian development
+# headers/libs and a C++ compiler
+package PublicInbox::XapClient;
+use v5.12;
+use PublicInbox::Spawn qw(spawn);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR);
+use PublicInbox::IPC;
+
+sub mkreq {
+ my ($self, $ios, @arg) = @_;
+ my ($r, $w, $n);
+ if (!defined($ios->[0])) {
+ pipe($r, $w) or die "pipe: $!";
+ $ios->[0] = $w;
+ }
+ my @fds = map fileno($_), @$ios;
+ my $buf = join("\0", @arg, '');
+ $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, MSG_EOR) //
+ die "send_cmd: $!";
+ $n == length($buf) or die "send_cmd: $n != ".length($buf);
+ $r;
+}
+
+sub start_helper {
+ my @argv = @_;
+ socketpair(my $sock, my $in, AF_UNIX, SOCK_SEQPACKET, 0) or
+ die "socketpair: $!";
+ my $cls = ($ENV{PI_NO_CXX} ? undef : eval {
+ require PublicInbox::XapHelperCxx;
+ PublicInbox::XapHelperCxx::check_build();
+ 'PublicInbox::XapHelperCxx';
+ }) // do {
+ require PublicInbox::XapHelper;
+ 'PublicInbox::XapHelper';
+ };
+ # ensure the child process has the same @INC we do:
+ my $env = { PERL5LIB => join(':', @INC) };
+ my $pid = spawn([$^X, ($^W ? ('-w') : ()), "-M$cls", '-e',
+ $cls.'::start(@ARGV)', '--', @argv],
+ $env, { 0 => $in });
+ ((bless { io => $sock, impl => $cls }, __PACKAGE__), $pid);
+}
+
+1;
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
new file mode 100644
index 00000000..bf2f99a2
--- /dev/null
+++ b/lib/PublicInbox/XapHelper.pm
@@ -0,0 +1,144 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Perl + SWIG||XS implementation if XapHelperCxx / xap_helper.h isn't usable.
+package PublicInbox::XapHelper;
+use v5.12;
+use Getopt::Long (); # good API even if we only use short options
+our $GLP = Getopt::Long::Parser->new;
+$GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev));
+use PublicInbox::Search qw(xap_terms);
+use PublicInbox::IPC;
+my $X = \%PublicInbox::Search::X;
+our (%SRCH, %PIDS, $parent_pid);
+our $stderr = \*STDERR;
+
+# only short options for portability in C++ implementation
+our @SPEC = (
+ 'a', # ascending sort
+ 'c', # code search
+ 'd=s@', # shard dirs
+ 'k=i', # sort column (like sort(1))
+ 'm=i', # maximum number of results
+ 'o=i', # offset
+ 'r', # 1=relevance then column
+ 't', # collapse threads
+ 'A=s@', # prefixes
+ 'O=s', # eidx_key
+ 'T=i', # timeout in seconds
+);
+
+sub cmd_test_inspect {
+ my ($req) = @_;
+ print { $req->{0} } "pid=$$ has_threadid=",
+ ($req->{srch}->has_threadid ? 1 : 0)
+}
+
+sub cmd_dump_ibx {
+ my ($req, $ibx_id, $qry_str) = @_;
+ $qry_str // return warn('usage: dump_ibx [OPTIONS] IBX_ID QRY_STR');
+ my @pfx = @{$req->{A}} or return warn('dump_ibx requires -A PREFIX');
+ my $max = $req->{srch}->{xdb}->get_doccount;
+ my $opt = { relevance => -1, limit => $max, offset => $req->{o} // 0 };
+ $opt->{eidx_key} = $req->{O} if defined $req->{O};
+ my $mset = $req->{srch}->mset($qry_str, $opt);
+ my $out = $req->{0};
+ $out->autoflush(1);
+ for my $it ($mset->items) {
+ my $doc = $it->get_document;
+ for my $p (@pfx) {
+ for (xap_terms($p, $doc)) {
+ print $out "$_ $ibx_id\n" or die "print: $!";
+ }
+ }
+ }
+ if (my $err = $req->{1}) { say $err 'mset.size=', $mset->size }
+}
+
+sub dispatch {
+ my ($req, $cmd, @argv) = @_;
+ my $fn = $req->can("cmd_$cmd") or return;
+ $GLP->getoptionsfromarray(\@argv, $req, @SPEC) or return;
+ my $dirs = delete $req->{d} or return warn 'no -d args';
+ my $key = join("\0", @$dirs);
+ $req->{srch} = $SRCH{$key} //= do {
+ my $new = { qp_flags => $PublicInbox::Search::QP_FLAGS };
+ my $first = shift @$dirs;
+ my $slow_phrase = -f "$first/iamchert";
+ $new->{xdb} = $X->{Database}->new($first);
+ for (@$dirs) {
+ $slow_phrase ||= -f "$_/iamchert";
+ $new->{xdb}->add_database($X->{Database}->new($_));
+ }
+ $slow_phrase or
+ $new->{qp_flags} |= PublicInbox::Search::FLAG_PHRASE();
+ bless $new, $req->{c} ? 'PublicInbox::CodeSearch' :
+ 'PublicInbox::Search';
+ $new->{qp} = $new->qparse_new;
+ $new;
+ };
+ eval { $fn->($req, @argv) };
+ warn "E: $@" if $@;
+}
+
+sub recv_loop {
+ local $SIG{__WARN__} = sub { print $stderr @_ };
+ my $rbuf;
+ while (!defined($parent_pid) || getppid != $parent_pid) {
+ my $req = bless {}, __PACKAGE__;
+ my @fds = PublicInbox::IPC::recv_cmd(\*STDIN, $rbuf, 4096*33);
+ scalar(@fds) or exit(66); # EX_NOINPUT
+ $fds[0] // die "recvmsg: $!";
+ my $i = 0;
+ for my $fd (@fds) {
+ open($req->{$i++}, '+<&=', $fd) and next;
+ warn("open(+<&=$fd) (FD=$i): $!");
+ undef $req;
+ last;
+ }
+ $req or next;
+ local $stderr = $req->{1} // \*STDERR;
+ if (chop($rbuf) ne "\0") {
+ warn "not NUL-terminated";
+ next;
+ }
+ my @argv = split(/\0/, $rbuf);
+ eval { $req->dispatch(@argv) } if @argv;
+ }
+}
+
+sub start_worker ($) {
+ my ($nr) = @_;
+ my $pid = fork // return warn("fork: $!");
+ if ($pid == 0) {
+ undef %PIDS;
+ recv_loop();
+ exit(0);
+ } else {
+ $PIDS{$pid} = $nr;
+ }
+}
+
+sub start (@) {
+ my (@argv) = @_;
+ local (%SRCH, %PIDS, $parent_pid);
+ PublicInbox::Search::load_xapian();
+ $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or
+ die 'bad args';
+ return recv_loop() if !$opt->{j};
+ die '-j must be >= 0' if $opt->{j} < 0;
+ start_worker($_) for (1..($opt->{j}));
+
+ my $quit;
+ until ($quit) {
+ my $p = waitpid(-1, 0) or return;
+ if (defined(my $nr = delete $PIDS{$p})) {
+ $quit = 1 if ($? >> 8) == 66; # EX_NOINPUT
+ start_worker($nr) if !$quit;
+ } else {
+ warn "W: unknown pid=$p reaped\n";
+ }
+ }
+}
+
+1;
diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm
new file mode 100644
index 00000000..4571676b
--- /dev/null
+++ b/lib/PublicInbox/XapHelperCxx.pm
@@ -0,0 +1,93 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Just-ahead-of-time builder for the lib/PublicInbox/xap_helper.h shim.
+# I never want users to be without source code for repairs, so this
+# aims to replicate the feel of a scripting language using C++.
+# The resulting executable is not linked to Perl in any way.
+package PublicInbox::XapHelperCxx;
+use v5.12;
+use PublicInbox::Spawn;
+use PublicInbox::Search;
+my $dir = ($ENV{PERL_INLINE_DIRECTORY} //
+ die('BUG: PERL_INLINE_DIRECTORY unset')) . '/cxx';
+my $bin = "$dir/xap_helper";
+my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!);
+my @srcs = map { $srcpfx.$_ } qw(xap_helper.h);
+my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm);
+my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -O0') . ' ' .
+ ($ENV{LDFLAGS} // '-Wl,-O1 -Wl,--compress-debug-sections=zlib') .
+ qq{ -DTHREADID=}.PublicInbox::Search::THREADID;
+
+sub xflags_chg () {
+ open my $fh, '<', "$dir/XFLAGS" or return 1;
+ chomp(my $prev = <$fh>);
+ $prev ne $xflags;
+}
+
+sub build () {
+ if (!-d $dir) {
+ my $err;
+ mkdir($dir) or $err = $!;
+ die "mkdir($dir): $err" if !-d $dir;
+ }
+ use autodie;
+ require File::Temp;
+ require PublicInbox::CodeSearch;
+ my ($prog) = ($bin =~ m!/([^/]+)\z!);
+ my $pkg_config = $ENV{PKG_CONFIG} // 'pkg-config';
+ my $tmp = File::Temp->newdir(DIR => $dir) // die "newdir: $!";
+ my $src = "$tmp/$prog.cpp";
+ open my $fh, '>', $src;
+ for (@srcs) {
+ say $fh qq(# line 1 "$_");
+ open my $rfh, '<', $_;
+ local $/;
+ print $fh readline($rfh);
+ }
+ print $fh PublicInbox::Search::generate_cxx();
+ print $fh PublicInbox::CodeSearch::generate_cxx();
+ close $fh;
+
+ my $cmd = "$pkg_config --libs --cflags xapian-core";
+ chomp(my $fl = `$cmd`);
+ die "$cmd failed: \$?=$?" if $?;
+ my $cxx = $ENV{CXX} // 'c++';
+ $cmd = "$cxx $src $fl $xflags -o $tmp/$prog";
+ system($cmd) and die "$cmd failed: \$?=$?";
+ my $cf = "$tmp/XFLAGS";
+ open $fh, '>', $cf;
+ say $fh $xflags;
+ close $fh;
+ # not quite atomic, but close enough :P
+ rename("$tmp/$_", "$dir/$_") for ($prog, 'XFLAGS');
+}
+
+sub check_build () {
+ use Time::HiRes qw(stat);
+ my $ctime = 0;
+ my @bin = stat($bin) or return build();
+ for (@srcs, @pm_dep) {
+ my @st = stat($_) or die "stat $_: $!";
+ if ($st[10] > $ctime) {
+ $ctime = $st[10];
+ return build() if $ctime > $bin[10];
+ }
+ }
+ xflags_chg() ? build() : 0;
+}
+
+sub start (@) {
+ check_build();
+ my @cmd;
+ if (my $v = $ENV{VALGRIND}) {
+ $v = 'valgrind -v' if $v eq '1';
+ @cmd = split(/\s+/, $v);
+ }
+ push @cmd, $bin, @_;
+ my $prog = $cmd[0];
+ $cmd[0] =~ s!\A.*?/([^/]+)\z!$1!;
+ exec { $prog } @cmd;
+}
+
+1;
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
new file mode 100644
index 00000000..52db92b7
--- /dev/null
+++ b/lib/PublicInbox/xap_helper.h
@@ -0,0 +1,654 @@
+/*
+ * Copyright (C) all contributors <meta@public-inbox.org>
+ * License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+ *
+ * Standalone helper process using C and minimal C++ for Xapian,
+ * this is not linked to Perl in any way.
+ * C (not C++) is used as much as possible to lower the contribution
+ * barrier for hackers who mainly know C (this includes the maintainer).
+ * Everything here is an unstable internal API of public-inbox and
+ * NOT intended for ordinary users; only public-inbox hackers
+ */
+#ifndef _ALL_SOURCE
+# define _ALL_SOURCE
+#endif
+#include <sys/resource.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/wait.h>
+
+#include <assert.h>
+#include <err.h> // BSD, glibc, and musl all have this
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <search.h>
+#include <stdio.h>
+#include <string.h>
+#include <sysexits.h>
+#include <unistd.h>
+#include <xapian.h> // our only reason for using C++
+
+#define MY_VER(maj,min,rev) ((maj) << 16 | (min) << 8 | (rev))
+#define XAP_VER \
+ MY_VER(XAPIAN_MAJOR_VERSION,XAPIAN_MINOR_VERSION,XAPIAN_REVISION)
+
+#if XAP_VER >= MY_VER(1,3,6)
+# define NRP Xapian::NumberRangeProcessor
+# define ADD_RP add_rangeprocessor
+# define SET_MAX_EXPANSION set_max_expansion // technically 1.3.3
+#else
+# define NRP Xapian::NumberValueRangeProcessor
+# define ADD_RP add_valuerangeprocessor
+# define SET_MAX_EXPANSION set_max_wildcard_expansion
+#endif
+
+static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P
+static pid_t parent_pid;
+static FILE *orig_err = stderr;
+static void *srch_tree; // tsearch + tdelete + twalk
+static pid_t *worker_pids; // nr => pid
+static unsigned long nworker;
+
+// PublicInbox::Search and PublicInbox::CodeSearch generate these:
+static void mail_nrp_init(void);
+static void code_nrp_init(void);
+static void qp_init_mail_search(Xapian::QueryParser *);
+static void qp_init_code_search(Xapian::QueryParser *);
+
+struct srch {
+ int paths_len; // int for comparisons
+ unsigned qp_flags;
+ Xapian::Database *db;
+ Xapian::QueryParser *qp;
+ char paths[]; // $shard_path0\0$shard_path1\0...
+};
+
+#define MY_ARG_MAX 256
+typedef bool (*cmd)(struct req *);
+
+// only one request per-process since we have RLIMIT_CPU timeout
+struct req { // argv and pfxv point into global rbuf
+ char *argv[MY_ARG_MAX];
+ char *pfxv[MY_ARG_MAX]; // -A <prefix>
+ struct srch *srch;
+ char *Oeidx_key;
+ cmd fn;
+ unsigned long long max;
+ unsigned long long off;
+ unsigned long timeout_sec;
+ long sort_col; // value column, negative means BoolWeight
+ int argc;
+ int pfxc;
+ FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional)
+ bool has_input; // fp[0] is bidirectional
+ bool collapse_threads;
+ bool code_search;
+ bool relevance; // sort by relevance before column
+ bool asc; // ascending sort
+};
+
+struct worker {
+ pid_t pid;
+ unsigned nr;
+};
+
+static bool has_threadid(const struct srch *srch)
+{
+ return srch->db->get_metadata("has_threadid") == "1";
+}
+
+static Xapian::Enquire prep_enquire(const struct req *req)
+{
+ Xapian::Enquire enq(*req->srch->db);
+ if (req->sort_col < 0) {
+ enq.set_weighting_scheme(Xapian::BoolWeight());
+ enq.set_docid_order(req->asc ? Xapian::Enquire::ASCENDING
+ : Xapian::Enquire::DESCENDING);
+ } else if (req->relevance) {
+ enq.set_sort_by_relevance_then_value(req->sort_col, !req->asc);
+ } else {
+ enq.set_sort_by_value_then_relevance(req->sort_col, !req->asc);
+ }
+ return enq;
+}
+
+static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
+{
+ if (!req->max)
+ req->max = 50;
+ for (int i = 0; i < 9; i++) {
+ try {
+ Xapian::MSet mset = enq->get_mset(req->off, req->max);
+ return mset;
+ } catch (const Xapian::DatabaseModifiedError & e) {
+ req->srch->db->reopen();
+ }
+ }
+ return enq->get_mset(req->off, req->max);
+}
+
+static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
+{
+ struct srch *srch = req->srch;
+ Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
+ if (req->Oeidx_key) {
+ req->Oeidx_key[0] = 'O'; // modifies static rbuf
+ fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key);
+ qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+ Xapian::Query(req->Oeidx_key));
+ }
+ Xapian::Enquire enq = prep_enquire(req);
+ enq.set_query(qry);
+ // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm
+ if (req->collapse_threads && has_threadid(srch))
+ enq.set_collapse_key(THREADID);
+
+ return enquire_mset(req, &enq);
+}
+
+static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
+{
+ return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
+}
+
+static void dump_ibx_term(struct req *req, const char *pfx,
+ Xapian::Document *doc, const char *ibx_id)
+{
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ size_t pfx_len = strlen(pfx);
+
+ for (cur.skip_to(pfx); cur != end; cur++) {
+ std::string tn = *cur;
+
+ if (starts_with(&tn, pfx, pfx_len))
+ fprintf(req->fp[0], "%s %s\n",
+ tn.c_str() + pfx_len, ibx_id);
+ }
+}
+
+static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
+{
+ return setvbuf(fp, NULL, _IOLBF, 0);
+}
+
+static bool cmd_dump_ibx(struct req *req)
+{
+ if ((optind + 1) >= req->argc) {
+ warnx("usage: dump_ibx [OPTIONS] IBX_ID QRY_STR");
+ return false; // need ibx_id + qry_str
+ }
+ if (!req->pfxc) {
+ warnx("dump_ibx requires -A PREFIX");
+ return false;
+ }
+
+ const char *ibx_id = req->argv[optind];
+ if (my_setlinebuf(req->fp[0])) { // for sort(1) pipe
+ perror("setlinebuf(fp[0])");
+ return false;
+ }
+ req->asc = true;
+ req->sort_col = -1;
+ req->max = (unsigned long long)req->srch->db->get_doccount();
+ Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
+ for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+ try {
+ Xapian::Document doc = i.get_document();
+ for (int p = 0; p < req->pfxc; p++)
+ dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
+ } catch (const Xapian::Error & e) {
+ fprintf(orig_err, "W: %s (#%ld)\n",
+ e.get_description().c_str(), (long)(*i));
+ continue;
+ }
+ }
+ if (req->fp[1])
+ fprintf(req->fp[1], "mset.size=%llu\n",
+ (unsigned long long)mset.size());
+ return true;
+}
+
+// internal usage only
+static bool cmd_test_inspect(struct req *req)
+{
+ fprintf(req->fp[0], "pid=%d has_threadid=%d",
+ (int)getpid(), has_threadid(req->srch) ? 1 : 0);
+ return true;
+}
+
+#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n }
+static const struct cmd_entry {
+ size_t fn_len;
+ const char *fn_name;
+ cmd fn;
+} cmds[] = { // should be small enough to not need bsearch || gperf
+ // most common commands first
+ CMD(dump_ibx),
+ CMD(test_inspect), // least common commands last
+};
+
+#define MY_ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0]))
+#define RECV_FD_CAPA 2
+#define RECV_FD_SPACE (RECV_FD_CAPA * sizeof(int))
+union my_cmsg {
+ struct cmsghdr hdr;
+ char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
+};
+
+static void xclose(int fd)
+{
+ if (close(fd) < 0 && errno != EINTR)
+ err(EXIT_FAILURE, "BUG: close");
+}
+
+static bool recv_req(struct req *req, char *rbuf, size_t *len)
+{
+ union my_cmsg cmsg = { 0 };
+ struct msghdr msg = { .msg_iovlen = 1 };
+ struct iovec iov;
+ iov.iov_base = rbuf;
+ iov.iov_len = *len;
+ msg.msg_iov = &iov;
+ msg.msg_control = &cmsg.hdr;
+ msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE);
+
+ ssize_t r = recvmsg(sock_fd, &msg, 0);
+ if (r < 0)
+ err(EXIT_FAILURE, "recvmsg");
+ if (r == 0)
+ exit(EX_NOINPUT); /* grandparent went away */
+ *len = r;
+ if (r > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
+ cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+ size_t len = cmsg.hdr.cmsg_len;
+ int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
+ size_t i;
+ bool fd_ok = true;
+ for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) {
+ int fd = *fdp++;
+ const char *mode = NULL;
+ int fl = fd_ok ? fcntl(fd, F_GETFL) : 0;
+ switch (fl) {
+ case 0: break; // hit previous error
+ case -1:
+ warnx("invalid fd=%d", fd);
+ fd_ok = false;
+ break;
+ case O_WRONLY: mode = "w"; break;
+ case O_RDWR:
+ mode = "r+";
+ if (i == 0) req->has_input = true;
+ break;
+ default:
+ warnx("invalid mode from F_GETFL: 0x%x", fl);
+ fd_ok = false;
+ }
+ if (!fd_ok) {
+ xclose(fd);
+ } else {
+ req->fp[i] = fdopen(fd, mode);
+ if (!req->fp[i]) {
+ warn("fdopen(fd=%d)", fd);
+ fd_ok = false;
+ }
+ }
+ }
+ for (i = 0; !fd_ok && i < MY_ARRAY_SIZE(req->fp); i++)
+ if (req->fp[i]) fclose(req->fp[i]);
+ return fd_ok;
+ }
+ warnx("no FD received in %zd-byte request", r);
+ return false;
+}
+
+#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
+static int split2argv(char **dst, char *buf, size_t len, size_t limit)
+{
+ if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
+ warnx("bogus argument given");
+ return 0;
+ }
+ size_t nr = 0;
+ char *c = buf;
+ for (size_t i = 1; i < len; i++) {
+ if (!buf[i]) {
+ dst[nr++] = c;
+ c = buf + i + 1;
+ }
+ if (nr == limit) {
+ warnx("too many args: %zu", nr);
+ return 0;
+ }
+ }
+ return (int)nr;
+}
+
+static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
+{
+ const struct srch *a = (const struct srch *)pa;
+ const struct srch *b = (const struct srch *)pb;
+ int diff = a->paths_len - b->paths_len;
+
+ return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len);
+}
+
+static bool is_chert(const char *dir)
+{
+ char iamchert[PATH_MAX];
+ struct stat sb;
+ int rc = snprintf(iamchert, sizeof(iamchert), "%s/iamchert", dir);
+
+ if (rc <= 0 || rc >= (int)sizeof(iamchert))
+ err(EXIT_FAILURE, "BUG: snprintf(%s/iamchert)", dir);
+ if (stat(iamchert, &sb) == 0 && S_ISREG(sb.st_mode))
+ return true;
+ return false;
+}
+
+static bool srch_init(struct req *req)
+{
+ char *dirv[MY_ARG_MAX];
+ int i;
+ struct srch *srch = req->srch;
+ int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
+ const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
+ srch->qp_flags = FLAG_PHRASE |
+ Xapian::QueryParser::FLAG_BOOLEAN |
+ Xapian::QueryParser::FLAG_LOVEHATE |
+ Xapian::QueryParser::FLAG_WILDCARD;
+ if (is_chert(dirv[0]))
+ srch->qp_flags &= ~FLAG_PHRASE;
+ try {
+ srch->db = new Xapian::Database(dirv[0]);
+ } catch (...) {
+ warn("E: Xapian::Database(%s)", dirv[0]);
+ return false;
+ }
+ try {
+ for (i = 1; i < dirc; i++) {
+ if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i]))
+ srch->qp_flags &= ~FLAG_PHRASE;
+ srch->db->add_database(Xapian::Database(dirv[i]));
+ }
+ } catch (...) {
+ warn("E: add_database(%s)", dirv[i]);
+ return false;
+ }
+ try {
+ srch->qp = new Xapian::QueryParser;
+ } catch (...) {
+ perror("E: Xapian::QueryParser");
+ return false;
+ }
+ srch->qp->set_default_op(Xapian::Query::OP_AND);
+ srch->qp->set_database(*srch->db);
+ try {
+ srch->qp->set_stemmer(Xapian::Stem("english"));
+ } catch (...) {
+ perror("E: Xapian::Stem");
+ return false;
+ }
+ srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME);
+ srch->qp->SET_MAX_EXPANSION(100);
+
+ if (req->code_search)
+ qp_init_code_search(srch->qp); // CodeSearch.pm
+ else
+ qp_init_mail_search(srch->qp); // Search.pm
+ return true;
+}
+
+static void free_srch(void *p) // tdestroy
+{
+ struct srch *srch = (struct srch *)p;
+ delete srch->qp;
+ delete srch->db;
+ free(srch);
+}
+
+static void dispatch(struct req *req)
+{
+ int c;
+ size_t size = strlen(req->argv[0]);
+ union {
+ struct srch *srch;
+ char *ptr;
+ } fbuf;
+ char *end;
+ FILE *kfp;
+ struct srch **s;
+ req->fn = NULL;
+ for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) {
+ if (cmds[c].fn_len == size &&
+ !memcmp(cmds[c].fn_name, req->argv[0], size)) {
+ req->fn = cmds[c].fn;
+ break;
+ }
+ }
+ if (!req->fn) goto cmd_err;
+
+ kfp = open_memstream(&fbuf.ptr, &size);
+ // write padding, first
+ fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp);
+
+ // global getopt variables:
+ optind = 1;
+ opterr = optopt = 0;
+ optarg = NULL;
+
+ // keep sync with @PublicInbox::XapHelper::SPEC
+ while ((c = getopt(req->argc, req->argv, "acd:k:m:o:rtA:O:T:")) != -1) {
+ switch (c) {
+ case 'a': req->asc = true; break;
+ case 'c': req->code_search = true; break;
+ case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break;
+ case 'k':
+ req->sort_col = strtol(optarg, &end, 10);
+ if (*end) goto cmd_err;
+ switch (req->sort_col) {
+ case LONG_MAX: case LONG_MIN: goto cmd_err;
+ }
+ break;
+ case 'm':
+ req->max = strtoull(optarg, &end, 10);
+ if (*end) goto cmd_err;
+ if (req->max == ULLONG_MAX) goto cmd_err;
+ break;
+ case 'o':
+ req->off = strtoull(optarg, &end, 10);
+ if (*end) goto cmd_err;
+ if (req->off == ULLONG_MAX) goto cmd_err;
+ break;
+ case 'r': req->relevance = true; break;
+ case 't': req->collapse_threads = true; break;
+ case 'A':
+ req->pfxv[req->pfxc++] = optarg;
+ if (MY_ARG_MAX == req->pfxc) goto cmd_err;
+ break;
+ case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix
+ case 'T':
+ req->timeout_sec = strtoul(optarg, &end, 10);
+ if (*end) goto cmd_err;
+ if (req->timeout_sec == ULONG_MAX) goto cmd_err;
+ break;
+ default: goto cmd_err;
+ }
+ }
+ if (ferror(kfp) | fclose(kfp)) {
+ perror("ferror|fclose");
+ goto cmd_err;
+ }
+ fbuf.srch->db = NULL;
+ fbuf.srch->qp = NULL;
+ fbuf.srch->paths_len = size - offsetof(struct srch, paths);
+ if (fbuf.srch->paths_len <= 0) {
+ free_srch(fbuf.srch);
+ warnx("no -d args");
+ goto cmd_err;
+ }
+ s = (struct srch **)tsearch(fbuf.srch, &srch_tree, srch_cmp);
+ if (!s) {
+ perror("tsearch");
+ goto cmd_err;
+ }
+ req->srch = *s;
+ if (req->srch != fbuf.srch) { // reuse existing
+ free_srch(fbuf.srch);
+ } else if (!srch_init(req)) {
+ assert(fbuf.srch == *((struct srch **)tfind(
+ fbuf.srch, &srch_tree, srch_cmp)));
+ void *del = tdelete(fbuf.srch, &srch_tree, srch_cmp);
+ assert(del);
+ free_srch(fbuf.srch);
+ goto cmd_err;
+ }
+ try {
+ if (!req->fn(req))
+ goto cmd_err;
+ } catch (const Xapian::Error & e) {
+ warnx("Xapian::Error: %s", e.get_description().c_str());
+ } catch (...) {
+ warn("unhandled exception");
+ }
+cmd_err:
+ return; // just be silent on errors, for now
+}
+
+static void cleanup_pids(void)
+{
+ free(worker_pids);
+ worker_pids = NULL;
+}
+
+static void recv_loop(void) // worker process loop
+{
+ static char rbuf[4096 * 33]; // per-process
+ while (!parent_pid || getppid() == parent_pid) {
+ size_t len = sizeof(rbuf);
+ struct req req = { 0 };
+ if (!recv_req(&req, rbuf, &len))
+ continue;
+ if (req.fp[1]) {
+ if (my_setlinebuf(req.fp[1]))
+ perror("W: setlinebuf(req.fp[1])");
+ stderr = req.fp[1];
+ }
+ req.argc = SPLIT2ARGV(req.argv, rbuf, len);
+ if (req.argc > 0)
+ dispatch(&req);
+ if (ferror(req.fp[0]) | fclose(req.fp[0]))
+ perror("ferror|fclose fp[0]");
+ if (req.fp[1]) {
+ stderr = orig_err;
+ if (ferror(req.fp[1]) | fclose(req.fp[1]))
+ perror("ferror|fclose fp[1]");
+ }
+ }
+}
+
+static void insert_pid(pid_t pid, unsigned nr)
+{
+ assert(!worker_pids[nr]);
+ worker_pids[nr] = pid;
+}
+
+static int delete_pid(pid_t pid)
+{
+ for (unsigned nr = 0; nr < nworker; nr++) {
+ if (worker_pids[nr] == pid) {
+ worker_pids[nr] = 0;
+ return nr;
+ }
+ }
+ warnx("W: unknown pid=%d reaped", (int)pid);
+ return -1;
+}
+
+static void start_worker(unsigned nr)
+{
+ pid_t pid = fork();
+ if (pid < 0) {
+ warn("E: fork(worker=%u)", nr);
+ } else if (pid > 0) {
+ insert_pid(pid, nr);
+ } else {
+ cleanup_pids();
+ recv_loop();
+ exit(0);
+ }
+}
+
+static void cleanup_all(void)
+{
+ cleanup_pids();
+#ifdef __GLIBC__
+ tdestroy(srch_tree, free_srch);
+ srch_tree = NULL;
+#endif
+}
+
+int main(int argc, char *argv[])
+{
+ int c;
+
+ mail_nrp_init();
+ code_nrp_init();
+ atexit(cleanup_all);
+
+ nworker = 0;
+#ifdef _SC_NPROCESSORS_ONLN
+ long j = sysconf(_SC_NPROCESSORS_ONLN);
+ if (j > 0)
+ nworker = j > UCHAR_MAX ? UCHAR_MAX : j;
+#endif // _SC_NPROCESSORS_ONLN
+
+ // make warn/warnx/err multi-process friendly:
+ if (my_setlinebuf(stderr))
+ err(EXIT_FAILURE, "setlinebuf(stderr)");
+ // not using -W<workers> like Daemon.pm, since -W is reserved (glibc)
+ while ((c = getopt(argc, argv, "j:")) != -1) {
+ char *end;
+
+ switch (c) {
+ case 'j':
+ nworker = strtoul(optarg, &end, 10);
+ if (*end != 0 || nworker > USHRT_MAX)
+ errx(EXIT_FAILURE, "-j %s invalid", optarg);
+ break;
+ case ':':
+ errx(EXIT_FAILURE, "missing argument: `-%c'", optopt);
+ case '?':
+ errx(EXIT_FAILURE, "unrecognized: `-%c'", optopt);
+ default:
+ errx(EXIT_FAILURE, "BUG: `-%c'", c);
+ }
+ }
+ if (nworker == 0) {
+ recv_loop();
+ } else {
+ parent_pid = getpid();
+ worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
+ if (!worker_pids)
+ err(EXIT_FAILURE, "calloc");
+ for (unsigned i = 0; i < nworker; i++)
+ start_worker(i);
+
+ int st;
+ pid_t pid;
+ bool quit = false;
+ while ((pid = wait(&st)) > 0) {
+ int nr = delete_pid(pid);
+ if (nr < 0) continue;
+ if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
+ quit = true;
+ if (!quit)
+ start_worker(nr);
+ }
+ }
+ return 0;
+}
diff --git a/t/xap_helper.t b/t/xap_helper.t
new file mode 100644
index 00000000..f00a845a
--- /dev/null
+++ b/t/xap_helper.t
@@ -0,0 +1,147 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use v5.12;
+use PublicInbox::TestCommon;
+require_mods(qw(DBD::SQLite Search::Xapian));
+use PublicInbox::Spawn qw(spawn);
+use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM MSG_EOR);
+require PublicInbox::AutoReap;
+require PublicInbox::IPC;
+require PublicInbox::XapClient;
+use autodie;
+my ($tmp, $for_destroy) = tmpdir();
+
+my $fi_data = './t/git.fast-import-data';
+open my $fi_fh, '<', $fi_data;
+open my $dh, '<', '.';
+my $crepo = create_coderepo 'for-cindex', sub {
+ my ($d) = @_;
+ xsys_e([qw(git init -q --bare)]);
+ xsys_e([qw(git fast-import --quiet)], undef, { 0 => $fi_fh });
+ chdir($dh);
+ run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j1), $d])
+ or xbail '-cindex internal';
+ run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j3 -d),
+ "$d/cidx-ext", $d]) or xbail '-cindex "external"';
+};
+$dh = $fi_fh = undef;
+
+my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2,
+ tmpdir => "$tmp/v2", sub {
+ my ($im) = @_;
+ for my $f (qw(t/data/0001.patch t/data/binary.patch
+ t/data/message_embed.eml
+ t/solve/0001-simple-mod.patch
+ t/solve/0002-rename-with-modifications.patch
+ t/solve/bare.patch)) {
+ $im->add(eml_load($f)) or BAIL_OUT;
+ }
+};
+
+my @ibx_idx = glob("$v2->{inboxdir}/xap*/?");
+my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
+my (@ext) = glob("$crepo/cidx-ext/cidx*/?");
+is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext);
+is(scalar(@int), 1, 'have 1 internal shard') or diag explain(\@int);
+
+my $doreq = sub {
+ my ($s, @arg) = @_;
+ my $err = pop @arg if ref($arg[-1]);
+ pipe(my $x, my $y);
+ my $buf = join("\0", @arg, '');
+ my @fds = fileno($y);
+ push @fds, fileno($err) if $err;
+ my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, MSG_EOR);
+ $n // xbail "send: $!";
+ my $arg = "@arg";
+ $arg =~ s/\Q$tmp\E/\$TMP/gs;
+ is(length($buf), $n, "req $arg sent");
+ $x;
+};
+
+my $env = { PERL5LIB => join(':', @INC) };
+my $test = sub {
+ my (@arg) = @_;
+ socketpair(my $s, my $y, AF_UNIX, SOCK_SEQPACKET, 0);
+ my $pid = spawn([$^X, '-w', @arg], $env, { 0 => $y });
+ my $ar = PublicInbox::AutoReap->new($pid);
+ diag "$arg[-1] running pid=$pid";
+ close $y;
+ my $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
+ my %info = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
+ is($info{has_threadid}, '1', 'has_threadid true for inbox');
+ like($info{pid}, qr/\A\d+\z/, 'got PID from inbox inspect');
+
+ $r = $doreq->($s, qw(test_inspect -d), $int[0]);
+ my %cinfo = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
+ is($cinfo{has_threadid}, '0', 'has_threadid false for cindex');
+ is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex');
+
+ my @dump = (qw(dump_ibx -A XDFID), (map { ('-d', $_) } @ibx_idx),
+ qw(13 rt:0..));
+ $r = $doreq->($s, @dump);
+ my @res;
+ while (sysread($r, my $buf, 512) != 0) { push @res, $buf }
+ is(grep(/\n\z/s, @res), scalar(@res), 'line buffered');
+
+ pipe(my $err_rd, my $err_wr);
+ $r = $doreq->($s, @dump, $err_wr);
+ close $err_wr;
+ my $res = do { local $/; <$r> };
+ is(join('', @res), $res, 'got identical response w/ error pipe');
+ my $stats = do { local $/; <$err_rd> };
+ is($stats, "mset.size=6\n", 'mset.size reported');
+
+ if ($arg[-1] !~ /\('-j0'\)/) {
+ kill('KILL', $cinfo{pid});
+ $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
+ %info = map {
+ split(/=/, $_, 2)
+ } split(/ /, do { local $/; <$r> });
+ isnt($info{pid}, $cinfo{pid}, 'spawned new worker');
+ }
+ $ar;
+};
+my $ar;
+
+$ar = $test->(qw[-MPublicInbox::XapHelper -e
+ PublicInbox::XapHelper::start('-j0')]);
+$ar = $test->(qw[-MPublicInbox::XapHelper -e
+ PublicInbox::XapHelper::start('-j1')]);
+
+my @NO_CXX = (0);
+SKIP: {
+ eval {
+ require PublicInbox::XapHelperCxx;
+ PublicInbox::XapHelperCxx::check_build();
+ };
+ skip "XapHelperCxx build: $@", 1 if $@;
+ push @NO_CXX, 1;
+
+ $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
+ PublicInbox::XapHelperCxx::start('-j0')]);
+ $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
+ PublicInbox::XapHelperCxx::start('-j1')]);
+};
+
+for my $n (@NO_CXX) {
+ local $ENV{PI_NO_CXX} = $n;
+ my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');
+ $ar = PublicInbox::AutoReap->new($pid);
+ pipe(my $err_r, my $err_w);
+
+ # git patch-id --stable <t/data/0001.patch | awk '{print $1}'
+ my $dfid = '91ee6b761fc7f47cad9f2b09b10489f313eb5b71';
+ my $mid = '20180720072141.GA15957@example';
+ my $r = $xhc->mkreq([ undef, $err_w ], qw(dump_ibx -A XDFID -A Q),
+ (map { ('-d', $_) } @ibx_idx),
+ 9, "mid:$mid");
+ close $err_w;
+ my $res = do { local $/; <$r> };
+ is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})");
+ my $err = do { local $/; <$err_r> };
+ is($err, "mset.size=1\n", "got expected status ($xhc->{impl})");
+}
+
+done_testing;
next prev parent reply other threads:[~2023-08-24 1:22 UTC|newest]
Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
2023-08-24 1:22 ` [PATCH 1/7] search: hoist out shards_dir for future use Eric Wong
2023-08-24 1:22 ` [PATCH 2/7] cindex: read-only association dump Eric Wong
2023-08-24 1:22 ` [PATCH 3/7] cindex: add --show-roots switch Eric Wong
2023-08-24 1:22 ` Eric Wong [this message]
2023-08-24 11:23 ` [PATCH 4/7] introduce optional C++ xap_helper Štěpán Němec
2023-08-24 11:49 ` Eric Wong
2023-08-24 1:22 ` [PATCH 5/7] cindex: fix sorting and uniqueness Eric Wong
2023-08-24 1:22 ` [PATCH 6/7] cindex: implement dump_roots in C++ Eric Wong
2023-08-24 1:22 ` [PATCH 7/7] xap_helper: reopen+retry in MSetIterator loops Eric Wong
2023-08-24 12:30 ` [PATCH 8/7] drop unused CidxRecvIbx.pm Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20230824012236.3968030-5-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).