From b18ecb7707e83cb8cb38c3736aecd984999ca0a7 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 24 Aug 2023 01:22:33 +0000 Subject: introduce optional C++ xap_helper This allows us to perform the expensive "dump_ibx" operations in native C++ code using the Xapian C++ library. This provides the majority of the speedup with the -cindex --associate switch. Eventually this may be expanded to cover all uses of Xapian within the project to ensure we have access to Xapian APIs which aren't available in XS|SWIG bindings; and also for ease-of-installation on systems which don't provide pre-packaged Perl Xapian bindings (e.g. OpenBSD 7.3) but do provide Xapian development libraries. Most of the C++ code is still C, as I'm not remotely familiar with C++ compared to C. I suspect many users and potential hackers being from git, Linux kernel, and glibc world are in the same boat. --- MANIFEST | 7 +- lib/PublicInbox/CidxDumpIbx.pm | 59 ---- lib/PublicInbox/CidxXapHelperAux.pm | 48 +++ lib/PublicInbox/CodeSearch.pm | 54 ++- lib/PublicInbox/CodeSearchIdx.pm | 78 ++--- lib/PublicInbox/Isearch.pm | 5 + lib/PublicInbox/Search.pm | 56 ++- lib/PublicInbox/XapClient.pm | 50 +++ lib/PublicInbox/XapHelper.pm | 144 ++++++++ lib/PublicInbox/XapHelperCxx.pm | 93 +++++ lib/PublicInbox/xap_helper.h | 654 ++++++++++++++++++++++++++++++++++++ t/xap_helper.t | 147 ++++++++ 12 files changed, 1278 insertions(+), 117 deletions(-) delete mode 100644 lib/PublicInbox/CidxDumpIbx.pm create mode 100644 lib/PublicInbox/CidxXapHelperAux.pm create mode 100644 lib/PublicInbox/XapClient.pm create mode 100644 lib/PublicInbox/XapHelper.pm create mode 100644 lib/PublicInbox/XapHelperCxx.pm create mode 100644 lib/PublicInbox/xap_helper.h create mode 100644 t/xap_helper.t diff --git a/MANIFEST b/MANIFEST index 162e3038..4f61af42 100644 --- a/MANIFEST +++ b/MANIFEST @@ -162,10 +162,10 @@ lib/PublicInbox/AltId.pm lib/PublicInbox/AutoReap.pm lib/PublicInbox/Cgit.pm lib/PublicInbox/CidxComm.pm -lib/PublicInbox/CidxDumpIbx.pm lib/PublicInbox/CidxDumpShardRoots.pm lib/PublicInbox/CidxLogP.pm lib/PublicInbox/CidxRecvIbx.pm +lib/PublicInbox/CidxXapHelperAux.pm lib/PublicInbox/CmdIPC4.pm lib/PublicInbox/CodeSearch.pm lib/PublicInbox/CodeSearchIdx.pm @@ -368,8 +368,12 @@ lib/PublicInbox/WwwListing.pm lib/PublicInbox/WwwStatic.pm lib/PublicInbox/WwwStream.pm lib/PublicInbox/WwwText.pm +lib/PublicInbox/XapClient.pm +lib/PublicInbox/XapHelper.pm +lib/PublicInbox/XapHelperCxx.pm lib/PublicInbox/Xapcmd.pm lib/PublicInbox/gcf2_libgit2.h +lib/PublicInbox/xap_helper.h sa_config/Makefile sa_config/README sa_config/root/etc/spamassassin/public-inbox.pre @@ -610,6 +614,7 @@ t/www_altid.t t/www_listing.t t/www_static.t t/x-unknown-alpine.eml +t/xap_helper.t t/xcpdb-reshard.t version-gen.perl xt/cmp-msgstr.t diff --git a/lib/PublicInbox/CidxDumpIbx.pm b/lib/PublicInbox/CidxDumpIbx.pm deleted file mode 100644 index e1bc273d..00000000 --- a/lib/PublicInbox/CidxDumpIbx.pm +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright (C) all contributors -# License: AGPL-3.0+ - -# Intended for PublicInbox::DS::event_loop for -cindex --associate -# Iterating through mset->items is slow in Perl due to method dispatch -# and that loop may implemented in C++ using Xapian directly -package PublicInbox::CidxDumpIbx; -use v5.12; -use PublicInbox::Search qw(xap_terms); -use PublicInbox::DS; -use Socket qw(MSG_EOR); - -sub start { - my ($rcvibx, $ibx_id) = @_; - my $cidx = $rcvibx->{cidx}; - my $ibx = $cidx->{IBX}->[$ibx_id] // die "BUG: no IBX[$ibx_id]"; - my $self = bless { rcvibx => $rcvibx, ekey => $ibx->eidx_key, - ibx_id => $ibx_id }, __PACKAGE__; - $self->{srch} = $ibx->isrch // do { - warn("W: $self->{ekey} has no search index (ignoring)\n"); - return undef; - }; - my $opt = { limit => $cidx->assoc_max_init, relevance => -2 }; - $self->{mset} = $self->{srch}->mset($rcvibx->{qry_str}, $opt); - $self->{iter} = 0; - event_step($self); -} - -sub event_step { - my ($self) = @_; - my $rcvibx = $self->{rcvibx} // die 'BUG: no rcvibx'; - return if $rcvibx->{cidx}->do_quit; - my $last = $self->{mset}->size - 1; - my $cur = $self->{iter}; - my $end = $cur + 9999; - if ($end >= $last) { - send($rcvibx->{op_p}, 'index_next', MSG_EOR); - $end = $last; - } - $self->{iter} = $end + 1; - local $0 = "dumping $self->{ekey} $cur..$end"; - - my $sort_w = $rcvibx->{sort_w}; - my $ibx_id = $self->{ibx_id}; - local $0 = "dumping $self->{ekey} $cur..$end"; - $rcvibx->{cidx}->progress($0); - for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop - my $doc = $x->get_document; - for my $p (@{$rcvibx->{cidx}->{ASSOC_PFX}}) { - for (xap_terms($p, $doc)) { - print $sort_w "$_ $ibx_id\n" or die "print: $!"; - } - } - } - $end < $last && !$rcvibx->{cidx}->do_quit and - PublicInbox::DS::requeue($self); -} - -1; diff --git a/lib/PublicInbox/CidxXapHelperAux.pm b/lib/PublicInbox/CidxXapHelperAux.pm new file mode 100644 index 00000000..c9a5ddad --- /dev/null +++ b/lib/PublicInbox/CidxXapHelperAux.pm @@ -0,0 +1,48 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ + +# Intended for PublicInbox::DS::event_loop for -cindex --associate, +# this reports auxilliary status while dumping +package PublicInbox::CidxXapHelperAux; +use v5.12; +use parent qw(PublicInbox::DS); +use PublicInbox::Syscall qw(EPOLLIN); + +# rpipe connects to req->fp[1] in xap_helper.h +sub new { + my ($cls, $rpipe, $cidx, $pfx, $associate) = @_; + my $self = bless { + cidx => $cidx, + pfx => $pfx, + associate => $associate + }, $cls; + $rpipe->blocking(0); + $self->SUPER::new($rpipe, EPOLLIN); +} + +sub event_step { + my ($self) = @_; # xap_helper.h is line-buffered + my $buf = delete($self->{buf}) // ''; + my $n = sysread($self->{sock}, $buf, 65536, length($buf)); + if (!defined($n)) { + return if $!{EAGAIN}; + die "sysread: $!"; + } + my $pfx = $self->{pfx}; + if ($n == 0) { + $self->{cidx}->progress("$pfx $buf") if $buf ne ''; + return $self->close; + } + my @lines = split(/^/m, $buf); + $self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n"; + for my $l (@lines) { + if ($l =~ /\Amset\.size=[0-9]+\n\z/) { + delete $self->{cidx}->{PENDING}->{$pfx}; + $self->{cidx}->index_next; + } + chomp $l; + $self->{cidx}->progress("$pfx $l"); + } +} + +1; diff --git a/lib/PublicInbox/CodeSearch.pm b/lib/PublicInbox/CodeSearch.pm index a5ccce03..6234e259 100644 --- a/lib/PublicInbox/CodeSearch.pm +++ b/lib/PublicInbox/CodeSearch.pm @@ -16,6 +16,12 @@ use constant { # in refs/{heads,tags}. AT(col=0) may be used to store disk usage # in the future, but disk usage calculation is espensive w/ alternates }; +our @CODE_NRP; +our @CODE_VMAP = ( + [ AT, 'd:' ], # mairix compat + [ AT, 'dt:' ], # mail compat + [ CT, 'ct:' ], +); # note: the non-X term prefix allocations are shared with Xapian omega, # see xapian-applications/omega/docs/termprefixes.rst @@ -45,15 +51,17 @@ sub new { bless { xpfx => "$dir/cidx".CIDX_SCHEMA_VER }, $cls; } -sub cqparse_new ($) { +sub qparse_new ($) { my ($self) = @_; my $qp = $self->qp_init_common; my $cb = $qp->can('add_valuerangeprocessor') // $qp->can('add_rangeprocessor'); # Xapian 1.5.0+ - $cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'd:')); # mairix compat - $cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'dt:')); # mail compat - $cb->($qp, $PublicInbox::Search::NVRP->new(CT, 'ct:')); - + if (!@CODE_NRP) { + @CODE_NRP = map { + $PublicInbox::Search::NVRP->new(@$_) + } @CODE_VMAP; + } + $cb->($qp, $_) for @CODE_NRP; while (my ($name, $pfx) = each %bool_pfx_external) { $qp->add_boolean_prefix($name, $_) for split(/ /, $pfx); } @@ -63,6 +71,40 @@ sub cqparse_new ($) { $qp; } +sub generate_cxx () { # generates snippet for xap_helper.h + my ($line, $file) = (__LINE__ + 2, __FILE__); + my $ret = <[0], "$x->[1]");\n} + } +$ret .= <ADD_RP(code_nrp[i]); +EOM + for my $name (sort keys %bool_pfx_external) { + for (split(/ /, $bool_pfx_external{$name})) { + $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n} + } + } + for my $name (sort keys %prob_prefix) { + for (split(/ /, $prob_prefix{$name})) { + $ret .= qq{\tqp->add_prefix("$name", "$_");\n} + } + } + $ret .= "}\n"; +} + # returns a Xapian::Query to filter by roots sub roots_filter { # retry_reopen callback my ($self, $git_dir) = @_; @@ -89,7 +131,7 @@ sub roots_filter { # retry_reopen callback sub mset { my ($self, $qry_str, $opt) = @_; - my $qp = $self->{qp} //= cqparse_new($self); + my $qp = $self->{qp} //= qparse_new($self); my $qry = $qp->parse_query($qry_str, $self->{qp_flags}); # limit to commits with shared roots diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index e795c2b3..b8afecd2 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -42,6 +42,7 @@ use PublicInbox::IPC qw(nproc_shards); use POSIX qw(WNOHANG SEEK_SET); use File::Path (); use File::Spec (); +use List::Util qw(max); use PublicInbox::SHA qw(sha256_hex); use PublicInbox::Search qw(xap_terms); use PublicInbox::SearchIdx qw(add_val); @@ -69,6 +70,9 @@ our ( @GIT_DIR_GONE, # [ git_dir1, git_dir2 ] $PRUNE_DONE, # marks off prune completions $NCHANGE, # current number of changes + $NPROC, + $XH_PID, # XapHelper PID + $XHC, # XapClient $REPO_CTX, # current repo being indexed in shards $IDX_TODO, # [ $git0, $root0, $git1, $root1, ...] $GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...] @@ -81,8 +85,8 @@ our ( @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands @ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST $QRY_STR, # common query string for both code and inbox associations - $IBXDIR_FEED, # SOCK_SEQPACKET - @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate + @DUMP_SHARD_ROOTS_OK, # for associate + $DUMP_IBX_WPIPE, # goes to sort(1) @ID2ROOT, ); @@ -529,45 +533,29 @@ sub dump_roots_once { progress($self, 'waiting on dump_shard_roots sort'); } -sub recv_ibx_done { # via PktOp on recv_ibx completion - my ($self, $pid, $n) = @_; - return if $DO_QUIT; - progress($self, "recv_ibx [$n] done"); - $RECV_IBX_OK[$n] = 1; -} - -# causes a worker to become a dumper for inbox/extindex -sub recv_ibx { # wq_io_do - my ($self, $qry_str) = @_; - PublicInbox::CidxRecvIbx->new($self, $qry_str); -} - -sub dump_ibx { # sends to PublicInbox::CidxRecvIbx::event_step - my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR" - my $n = length($id_dir); - my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!"; - $n == $w or die "send($id_dir) $w != $n"; +sub dump_ibx { # sends to xap_helper.h + my ($self, $ibx_id) = @_; + my $ibx = $IBX[$ibx_id] // die "BUG: no IBX[$ibx_id]"; + my @cmd = ('dump_ibx', $ibx->isrch->xh_args, + (map { ('-A', $_) } @ASSOC_PFX), + $ibx_id, $QRY_STR); + pipe(my ($r, $w)) or die "pipe: $!"; + $XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd); + my $ekey = $ibx->eidx_key; + $self->{PENDING}->{$ekey} = undef; + PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate}); } -# repurpose shard workers to dump inbox patchids with perfect balance sub dump_ibx_start { my ($self, $associate) = @_; - pipe(my ($sort_r, $sort_w)) or die "pipe: $!"; + pipe(my $sort_r, $DUMP_IBX_WPIPE) or die "pipe: $!"; my @sort = (@SORT, '-k1,1'); my $dst = "$TMPDIR/to_ibx_id"; open my $fh, '>', $dst or die "open($dst): $!"; my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh }); close $sort_r or die "close: $!"; awaitpid($sort_pid, \&cmd_done, \@sort, $associate); - - my ($r, $w); - socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!"; - my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{recv_ibx_done} = [ $self, $associate ]; - $c->{ops}->{index_next} = [ $self ]; - my $io = [ $p->{op_p}, $r, $sort_w ]; - $_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS; - $IBXDIR_FEED = $w; + ($XHC, $XH_PID) = PublicInbox::XapClient::start_helper("-j$NPROC"); } sub index_next ($) { @@ -584,8 +572,8 @@ sub index_next ($) { } elsif ($TMPDIR) { delete $TODO{dump_ibx_start}; # runs OnDestroy once return dump_ibx($self, shift @IBXQ) if @IBXQ; - progress($self, 'done dumping inboxes') if $IBXDIR_FEED; - undef $IBXDIR_FEED; # done dumping inboxes, dump roots + progress($self, 'done dumping inboxes') if $DUMP_IBX_WPIPE; + undef $DUMP_IBX_WPIPE; # done dumping inboxes, dump roots dump_roots_once($self, delete($TODO{associate}) // return); } # else: wait for shards_active (post_loop_do) callback @@ -795,7 +783,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) } sub parent_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->(); - $IBXDIR_FEED = undef; + $XHC = undef; kill_shards(@_); warn "# SIG$_[0] received, quitting...\n"; } @@ -875,8 +863,8 @@ sub associate { @IDX_SHARDS or return warn("# aborting on no shards\n"); grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or die "E: shards not dumped properly\n"; - grep(defined, @RECV_IBX_OK) == @IDX_SHARDS or - die "E: inboxes not dumped properly\n"; + my @pending = keys %{$self->{PENDING}}; + die "E: pending=@pending jobs not done\n" if @pending; progress($self, 'associating...'); my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id'); my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" }); @@ -1032,8 +1020,9 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step sub init_associate_prefork ($) { my ($self) = @_; return unless $self->{-opt}->{associate}; - require PublicInbox::CidxRecvIbx; require PublicInbox::CidxDumpShardRoots; + require PublicInbox::CidxXapHelperAux; + require PublicInbox::XapClient; $self->{-pi_cfg} = PublicInbox::Config->new; my @unknown; my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]}; @@ -1055,7 +1044,7 @@ EOM sub _prep_ibx { # each_inbox callback my ($ibx, $self, $incl) = @_; ($self->{-opt}->{all} || exists($incl->{$ibx->{inboxdir}})) and - push @{$self->{IBX}}, $ibx; + push @IBX, $ibx; } sub show_roots { # for diagnostics @@ -1102,13 +1091,13 @@ sub cidx_run { # main entry point local $GIT_TODO = []; local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE, $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV, - %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT, - @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK); + %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE, + @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC); local $BATCH_BYTES = $self->{-opt}->{batch_size} // $PublicInbox::SearchIdx::BATCH_BYTES; local @SORT = (undef, '-u'); - local $self->{IBX} = \@IBX; local $self->{ASSOC_PFX} = \@ASSOC_PFX; + local $self->{PENDING} = {}; local $self->{-pi_cfg}; if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) { require File::Temp; @@ -1165,13 +1154,14 @@ sub cidx_run { # main entry point @GIT_DIR_GONE = uniqstr @GIT_DIR_GONE, @excl; } local $NCHANGE = 0; - local $LIVE_JOBS = $self->{-opt}->{jobs} || - PublicInbox::IPC::detect_nproc() || 2; + local $NPROC = PublicInbox::IPC::detect_nproc(); + local $LIVE_JOBS = $self->{-opt}->{jobs} || $NPROC || 2; local @RDONLY_XDB = $self->xdb_shards_flat; init_prune($self); init_associate_postfork($self); scan_git_dirs($self) if $self->{-opt}->{scan} // 1; - index_next($self) for (1..$LIVE_JOBS); + my $max = $TODO{associate} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS; + index_next($self) for (1..$max); # FreeBSD ignores/discards SIGCHLD while signals are blocked and # EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending diff --git a/lib/PublicInbox/Isearch.pm b/lib/PublicInbox/Isearch.pm index 5cac08ba..62112171 100644 --- a/lib/PublicInbox/Isearch.pm +++ b/lib/PublicInbox/Isearch.pm @@ -123,4 +123,9 @@ sub has_threadid { 1 } sub help { $_[0]->{es}->help } +sub xh_args { # prep getopt args to feed to xap_helper.h socket + my ($self, $opt) = @_; # TODO uid_range + ($self->{es}->xh_args, '-O', $self->{eidx_key}); +} + 1; diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index d5b0bceb..2e784646 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -66,6 +66,15 @@ our $NVRP; # '$Xap::'.('NumberValueRangeProcessor' or 'NumberRangeProcessor') # let's hope the ABI is stable our $ENQ_DESCENDING = 0; our $ENQ_ASCENDING = 1; +our @MAIL_VMAP = ( + [ YYYYMMDD, 'd:'], + [ DT, 'dt:' ], + # these are undocumented for WWW, but lei and IMAP use them + [ BYTES, 'z:' ], + [ TS, 'rt:' ], + [ UID, 'uid:' ] +); +our @MAIL_NRP; sub load_xapian () { return 1 if defined $Xap; @@ -101,6 +110,7 @@ sub load_xapian () { # or make indexlevel=medium as default $QP_FLAGS = FLAG_PHRASE() | FLAG_BOOLEAN() | FLAG_LOVEHATE() | FLAG_WILDCARD(); + @MAIL_NRP = map { $NVRP->new(@$_) } @MAIL_VMAP; return 1; } undef; @@ -490,14 +500,8 @@ sub qparse_new { my $qp = qp_init_common($self); my $cb = $qp->can('add_valuerangeprocessor') // $qp->can('add_rangeprocessor'); # Xapian 1.5.0+ - $cb->($qp, $NVRP->new(YYYYMMDD, 'd:')); - $cb->($qp, $NVRP->new(DT, 'dt:')); - - # for IMAP, undocumented for WWW and may be split off go away - $cb->($qp, $NVRP->new(BYTES, 'z:')); - $cb->($qp, $NVRP->new(TS, 'rt:')); - $cb->($qp, $NVRP->new(UID, 'uid:')); + $cb->($qp, $_) for @MAIL_NRP; while (my ($name, $prefix) = each %bool_pfx_external) { $qp->add_boolean_prefix($name, $_) foreach split(/ /, $prefix); } @@ -527,6 +531,40 @@ EOF $qp; } +sub generate_cxx () { # generates snippet for xap_helper.h + my $ret = <[0], "$x->[1]");\n} + } +$ret .= <ADD_RP(mail_nrp[i]); +EOM + for my $name (sort keys %bool_pfx_external) { + for (split(/ /, $bool_pfx_external{$name})) { + $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n} + } + } + # TODO: altid support + for my $name (sort keys %prob_prefix) { + for (split(/ /, $prob_prefix{$name})) { + $ret .= qq{\tqp->add_prefix("$name", "$_");\n} + } + } + $ret .= "}\n"; +} + sub help { my ($self) = @_; $self->{qp} //= $self->qparse_new; # parse altids @@ -585,4 +623,8 @@ sub all_terms { wantarray ? (sort keys %ret) : \%ret; } +sub xh_args { # prep getopt args to feed to xap_helper.h socket + map { ('-d', $_) } shard_dirs($_[0]); +} + 1; diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm new file mode 100644 index 00000000..56e3c3b4 --- /dev/null +++ b/lib/PublicInbox/XapClient.pm @@ -0,0 +1,50 @@ +#!perl -w +# Copyright (C) all contributors +# License: AGPL-3.0+ + +# This talks to (XapHelperCxx.pm + xap_helper.h) or XapHelper.pm +# and will eventually allow users with neither XS nor SWIG Perl +# bindings to use Xapian as long as they have Xapian development +# headers/libs and a C++ compiler +package PublicInbox::XapClient; +use v5.12; +use PublicInbox::Spawn qw(spawn); +use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR); +use PublicInbox::IPC; + +sub mkreq { + my ($self, $ios, @arg) = @_; + my ($r, $w, $n); + if (!defined($ios->[0])) { + pipe($r, $w) or die "pipe: $!"; + $ios->[0] = $w; + } + my @fds = map fileno($_), @$ios; + my $buf = join("\0", @arg, ''); + $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, MSG_EOR) // + die "send_cmd: $!"; + $n == length($buf) or die "send_cmd: $n != ".length($buf); + $r; +} + +sub start_helper { + my @argv = @_; + socketpair(my $sock, my $in, AF_UNIX, SOCK_SEQPACKET, 0) or + die "socketpair: $!"; + my $cls = ($ENV{PI_NO_CXX} ? undef : eval { + require PublicInbox::XapHelperCxx; + PublicInbox::XapHelperCxx::check_build(); + 'PublicInbox::XapHelperCxx'; + }) // do { + require PublicInbox::XapHelper; + 'PublicInbox::XapHelper'; + }; + # ensure the child process has the same @INC we do: + my $env = { PERL5LIB => join(':', @INC) }; + my $pid = spawn([$^X, ($^W ? ('-w') : ()), "-M$cls", '-e', + $cls.'::start(@ARGV)', '--', @argv], + $env, { 0 => $in }); + ((bless { io => $sock, impl => $cls }, __PACKAGE__), $pid); +} + +1; diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm new file mode 100644 index 00000000..bf2f99a2 --- /dev/null +++ b/lib/PublicInbox/XapHelper.pm @@ -0,0 +1,144 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ + +# Perl + SWIG||XS implementation if XapHelperCxx / xap_helper.h isn't usable. +package PublicInbox::XapHelper; +use v5.12; +use Getopt::Long (); # good API even if we only use short options +our $GLP = Getopt::Long::Parser->new; +$GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev)); +use PublicInbox::Search qw(xap_terms); +use PublicInbox::IPC; +my $X = \%PublicInbox::Search::X; +our (%SRCH, %PIDS, $parent_pid); +our $stderr = \*STDERR; + +# only short options for portability in C++ implementation +our @SPEC = ( + 'a', # ascending sort + 'c', # code search + 'd=s@', # shard dirs + 'k=i', # sort column (like sort(1)) + 'm=i', # maximum number of results + 'o=i', # offset + 'r', # 1=relevance then column + 't', # collapse threads + 'A=s@', # prefixes + 'O=s', # eidx_key + 'T=i', # timeout in seconds +); + +sub cmd_test_inspect { + my ($req) = @_; + print { $req->{0} } "pid=$$ has_threadid=", + ($req->{srch}->has_threadid ? 1 : 0) +} + +sub cmd_dump_ibx { + my ($req, $ibx_id, $qry_str) = @_; + $qry_str // return warn('usage: dump_ibx [OPTIONS] IBX_ID QRY_STR'); + my @pfx = @{$req->{A}} or return warn('dump_ibx requires -A PREFIX'); + my $max = $req->{srch}->{xdb}->get_doccount; + my $opt = { relevance => -1, limit => $max, offset => $req->{o} // 0 }; + $opt->{eidx_key} = $req->{O} if defined $req->{O}; + my $mset = $req->{srch}->mset($qry_str, $opt); + my $out = $req->{0}; + $out->autoflush(1); + for my $it ($mset->items) { + my $doc = $it->get_document; + for my $p (@pfx) { + for (xap_terms($p, $doc)) { + print $out "$_ $ibx_id\n" or die "print: $!"; + } + } + } + if (my $err = $req->{1}) { say $err 'mset.size=', $mset->size } +} + +sub dispatch { + my ($req, $cmd, @argv) = @_; + my $fn = $req->can("cmd_$cmd") or return; + $GLP->getoptionsfromarray(\@argv, $req, @SPEC) or return; + my $dirs = delete $req->{d} or return warn 'no -d args'; + my $key = join("\0", @$dirs); + $req->{srch} = $SRCH{$key} //= do { + my $new = { qp_flags => $PublicInbox::Search::QP_FLAGS }; + my $first = shift @$dirs; + my $slow_phrase = -f "$first/iamchert"; + $new->{xdb} = $X->{Database}->new($first); + for (@$dirs) { + $slow_phrase ||= -f "$_/iamchert"; + $new->{xdb}->add_database($X->{Database}->new($_)); + } + $slow_phrase or + $new->{qp_flags} |= PublicInbox::Search::FLAG_PHRASE(); + bless $new, $req->{c} ? 'PublicInbox::CodeSearch' : + 'PublicInbox::Search'; + $new->{qp} = $new->qparse_new; + $new; + }; + eval { $fn->($req, @argv) }; + warn "E: $@" if $@; +} + +sub recv_loop { + local $SIG{__WARN__} = sub { print $stderr @_ }; + my $rbuf; + while (!defined($parent_pid) || getppid != $parent_pid) { + my $req = bless {}, __PACKAGE__; + my @fds = PublicInbox::IPC::recv_cmd(\*STDIN, $rbuf, 4096*33); + scalar(@fds) or exit(66); # EX_NOINPUT + $fds[0] // die "recvmsg: $!"; + my $i = 0; + for my $fd (@fds) { + open($req->{$i++}, '+<&=', $fd) and next; + warn("open(+<&=$fd) (FD=$i): $!"); + undef $req; + last; + } + $req or next; + local $stderr = $req->{1} // \*STDERR; + if (chop($rbuf) ne "\0") { + warn "not NUL-terminated"; + next; + } + my @argv = split(/\0/, $rbuf); + eval { $req->dispatch(@argv) } if @argv; + } +} + +sub start_worker ($) { + my ($nr) = @_; + my $pid = fork // return warn("fork: $!"); + if ($pid == 0) { + undef %PIDS; + recv_loop(); + exit(0); + } else { + $PIDS{$pid} = $nr; + } +} + +sub start (@) { + my (@argv) = @_; + local (%SRCH, %PIDS, $parent_pid); + PublicInbox::Search::load_xapian(); + $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or + die 'bad args'; + return recv_loop() if !$opt->{j}; + die '-j must be >= 0' if $opt->{j} < 0; + start_worker($_) for (1..($opt->{j})); + + my $quit; + until ($quit) { + my $p = waitpid(-1, 0) or return; + if (defined(my $nr = delete $PIDS{$p})) { + $quit = 1 if ($? >> 8) == 66; # EX_NOINPUT + start_worker($nr) if !$quit; + } else { + warn "W: unknown pid=$p reaped\n"; + } + } +} + +1; diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm new file mode 100644 index 00000000..4571676b --- /dev/null +++ b/lib/PublicInbox/XapHelperCxx.pm @@ -0,0 +1,93 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ + +# Just-ahead-of-time builder for the lib/PublicInbox/xap_helper.h shim. +# I never want users to be without source code for repairs, so this +# aims to replicate the feel of a scripting language using C++. +# The resulting executable is not linked to Perl in any way. +package PublicInbox::XapHelperCxx; +use v5.12; +use PublicInbox::Spawn; +use PublicInbox::Search; +my $dir = ($ENV{PERL_INLINE_DIRECTORY} // + die('BUG: PERL_INLINE_DIRECTORY unset')) . '/cxx'; +my $bin = "$dir/xap_helper"; +my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!); +my @srcs = map { $srcpfx.$_ } qw(xap_helper.h); +my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm); +my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -O0') . ' ' . + ($ENV{LDFLAGS} // '-Wl,-O1 -Wl,--compress-debug-sections=zlib') . + qq{ -DTHREADID=}.PublicInbox::Search::THREADID; + +sub xflags_chg () { + open my $fh, '<', "$dir/XFLAGS" or return 1; + chomp(my $prev = <$fh>); + $prev ne $xflags; +} + +sub build () { + if (!-d $dir) { + my $err; + mkdir($dir) or $err = $!; + die "mkdir($dir): $err" if !-d $dir; + } + use autodie; + require File::Temp; + require PublicInbox::CodeSearch; + my ($prog) = ($bin =~ m!/([^/]+)\z!); + my $pkg_config = $ENV{PKG_CONFIG} // 'pkg-config'; + my $tmp = File::Temp->newdir(DIR => $dir) // die "newdir: $!"; + my $src = "$tmp/$prog.cpp"; + open my $fh, '>', $src; + for (@srcs) { + say $fh qq(# line 1 "$_"); + open my $rfh, '<', $_; + local $/; + print $fh readline($rfh); + } + print $fh PublicInbox::Search::generate_cxx(); + print $fh PublicInbox::CodeSearch::generate_cxx(); + close $fh; + + my $cmd = "$pkg_config --libs --cflags xapian-core"; + chomp(my $fl = `$cmd`); + die "$cmd failed: \$?=$?" if $?; + my $cxx = $ENV{CXX} // 'c++'; + $cmd = "$cxx $src $fl $xflags -o $tmp/$prog"; + system($cmd) and die "$cmd failed: \$?=$?"; + my $cf = "$tmp/XFLAGS"; + open $fh, '>', $cf; + say $fh $xflags; + close $fh; + # not quite atomic, but close enough :P + rename("$tmp/$_", "$dir/$_") for ($prog, 'XFLAGS'); +} + +sub check_build () { + use Time::HiRes qw(stat); + my $ctime = 0; + my @bin = stat($bin) or return build(); + for (@srcs, @pm_dep) { + my @st = stat($_) or die "stat $_: $!"; + if ($st[10] > $ctime) { + $ctime = $st[10]; + return build() if $ctime > $bin[10]; + } + } + xflags_chg() ? build() : 0; +} + +sub start (@) { + check_build(); + my @cmd; + if (my $v = $ENV{VALGRIND}) { + $v = 'valgrind -v' if $v eq '1'; + @cmd = split(/\s+/, $v); + } + push @cmd, $bin, @_; + my $prog = $cmd[0]; + $cmd[0] =~ s!\A.*?/([^/]+)\z!$1!; + exec { $prog } @cmd; +} + +1; diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h new file mode 100644 index 00000000..52db92b7 --- /dev/null +++ b/lib/PublicInbox/xap_helper.h @@ -0,0 +1,654 @@ +/* + * Copyright (C) all contributors + * License: AGPL-3.0+ + * + * Standalone helper process using C and minimal C++ for Xapian, + * this is not linked to Perl in any way. + * C (not C++) is used as much as possible to lower the contribution + * barrier for hackers who mainly know C (this includes the maintainer). + * Everything here is an unstable internal API of public-inbox and + * NOT intended for ordinary users; only public-inbox hackers + */ +#ifndef _ALL_SOURCE +# define _ALL_SOURCE +#endif +#include +#include +#include +#include +#include +#include +#include + +#include +#include // BSD, glibc, and musl all have this +#include +#include +#include +#include +#include +#include +#include +#include +#include // our only reason for using C++ + +#define MY_VER(maj,min,rev) ((maj) << 16 | (min) << 8 | (rev)) +#define XAP_VER \ + MY_VER(XAPIAN_MAJOR_VERSION,XAPIAN_MINOR_VERSION,XAPIAN_REVISION) + +#if XAP_VER >= MY_VER(1,3,6) +# define NRP Xapian::NumberRangeProcessor +# define ADD_RP add_rangeprocessor +# define SET_MAX_EXPANSION set_max_expansion // technically 1.3.3 +#else +# define NRP Xapian::NumberValueRangeProcessor +# define ADD_RP add_valuerangeprocessor +# define SET_MAX_EXPANSION set_max_wildcard_expansion +#endif + +static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P +static pid_t parent_pid; +static FILE *orig_err = stderr; +static void *srch_tree; // tsearch + tdelete + twalk +static pid_t *worker_pids; // nr => pid +static unsigned long nworker; + +// PublicInbox::Search and PublicInbox::CodeSearch generate these: +static void mail_nrp_init(void); +static void code_nrp_init(void); +static void qp_init_mail_search(Xapian::QueryParser *); +static void qp_init_code_search(Xapian::QueryParser *); + +struct srch { + int paths_len; // int for comparisons + unsigned qp_flags; + Xapian::Database *db; + Xapian::QueryParser *qp; + char paths[]; // $shard_path0\0$shard_path1\0... +}; + +#define MY_ARG_MAX 256 +typedef bool (*cmd)(struct req *); + +// only one request per-process since we have RLIMIT_CPU timeout +struct req { // argv and pfxv point into global rbuf + char *argv[MY_ARG_MAX]; + char *pfxv[MY_ARG_MAX]; // -A + struct srch *srch; + char *Oeidx_key; + cmd fn; + unsigned long long max; + unsigned long long off; + unsigned long timeout_sec; + long sort_col; // value column, negative means BoolWeight + int argc; + int pfxc; + FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional) + bool has_input; // fp[0] is bidirectional + bool collapse_threads; + bool code_search; + bool relevance; // sort by relevance before column + bool asc; // ascending sort +}; + +struct worker { + pid_t pid; + unsigned nr; +}; + +static bool has_threadid(const struct srch *srch) +{ + return srch->db->get_metadata("has_threadid") == "1"; +} + +static Xapian::Enquire prep_enquire(const struct req *req) +{ + Xapian::Enquire enq(*req->srch->db); + if (req->sort_col < 0) { + enq.set_weighting_scheme(Xapian::BoolWeight()); + enq.set_docid_order(req->asc ? Xapian::Enquire::ASCENDING + : Xapian::Enquire::DESCENDING); + } else if (req->relevance) { + enq.set_sort_by_relevance_then_value(req->sort_col, !req->asc); + } else { + enq.set_sort_by_value_then_relevance(req->sort_col, !req->asc); + } + return enq; +} + +static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq) +{ + if (!req->max) + req->max = 50; + for (int i = 0; i < 9; i++) { + try { + Xapian::MSet mset = enq->get_mset(req->off, req->max); + return mset; + } catch (const Xapian::DatabaseModifiedError & e) { + req->srch->db->reopen(); + } + } + return enq->get_mset(req->off, req->max); +} + +static Xapian::MSet mail_mset(struct req *req, const char *qry_str) +{ + struct srch *srch = req->srch; + Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags); + if (req->Oeidx_key) { + req->Oeidx_key[0] = 'O'; // modifies static rbuf + fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key); + qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, + Xapian::Query(req->Oeidx_key)); + } + Xapian::Enquire enq = prep_enquire(req); + enq.set_query(qry); + // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm + if (req->collapse_threads && has_threadid(srch)) + enq.set_collapse_key(THREADID); + + return enquire_mset(req, &enq); +} + +static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len) +{ + return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len); +} + +static void dump_ibx_term(struct req *req, const char *pfx, + Xapian::Document *doc, const char *ibx_id) +{ + Xapian::TermIterator cur = doc->termlist_begin(); + Xapian::TermIterator end = doc->termlist_end(); + size_t pfx_len = strlen(pfx); + + for (cur.skip_to(pfx); cur != end; cur++) { + std::string tn = *cur; + + if (starts_with(&tn, pfx, pfx_len)) + fprintf(req->fp[0], "%s %s\n", + tn.c_str() + pfx_len, ibx_id); + } +} + +static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors +{ + return setvbuf(fp, NULL, _IOLBF, 0); +} + +static bool cmd_dump_ibx(struct req *req) +{ + if ((optind + 1) >= req->argc) { + warnx("usage: dump_ibx [OPTIONS] IBX_ID QRY_STR"); + return false; // need ibx_id + qry_str + } + if (!req->pfxc) { + warnx("dump_ibx requires -A PREFIX"); + return false; + } + + const char *ibx_id = req->argv[optind]; + if (my_setlinebuf(req->fp[0])) { // for sort(1) pipe + perror("setlinebuf(fp[0])"); + return false; + } + req->asc = true; + req->sort_col = -1; + req->max = (unsigned long long)req->srch->db->get_doccount(); + Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]); + for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) { + try { + Xapian::Document doc = i.get_document(); + for (int p = 0; p < req->pfxc; p++) + dump_ibx_term(req, req->pfxv[p], &doc, ibx_id); + } catch (const Xapian::Error & e) { + fprintf(orig_err, "W: %s (#%ld)\n", + e.get_description().c_str(), (long)(*i)); + continue; + } + } + if (req->fp[1]) + fprintf(req->fp[1], "mset.size=%llu\n", + (unsigned long long)mset.size()); + return true; +} + +// internal usage only +static bool cmd_test_inspect(struct req *req) +{ + fprintf(req->fp[0], "pid=%d has_threadid=%d", + (int)getpid(), has_threadid(req->srch) ? 1 : 0); + return true; +} + +#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n } +static const struct cmd_entry { + size_t fn_len; + const char *fn_name; + cmd fn; +} cmds[] = { // should be small enough to not need bsearch || gperf + // most common commands first + CMD(dump_ibx), + CMD(test_inspect), // least common commands last +}; + +#define MY_ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0])) +#define RECV_FD_CAPA 2 +#define RECV_FD_SPACE (RECV_FD_CAPA * sizeof(int)) +union my_cmsg { + struct cmsghdr hdr; + char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE]; +}; + +static void xclose(int fd) +{ + if (close(fd) < 0 && errno != EINTR) + err(EXIT_FAILURE, "BUG: close"); +} + +static bool recv_req(struct req *req, char *rbuf, size_t *len) +{ + union my_cmsg cmsg = { 0 }; + struct msghdr msg = { .msg_iovlen = 1 }; + struct iovec iov; + iov.iov_base = rbuf; + iov.iov_len = *len; + msg.msg_iov = &iov; + msg.msg_control = &cmsg.hdr; + msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE); + + ssize_t r = recvmsg(sock_fd, &msg, 0); + if (r < 0) + err(EXIT_FAILURE, "recvmsg"); + if (r == 0) + exit(EX_NOINPUT); /* grandparent went away */ + *len = r; + if (r > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET && + cmsg.hdr.cmsg_type == SCM_RIGHTS) { + size_t len = cmsg.hdr.cmsg_len; + int *fdp = (int *)CMSG_DATA(&cmsg.hdr); + size_t i; + bool fd_ok = true; + for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) { + int fd = *fdp++; + const char *mode = NULL; + int fl = fd_ok ? fcntl(fd, F_GETFL) : 0; + switch (fl) { + case 0: break; // hit previous error + case -1: + warnx("invalid fd=%d", fd); + fd_ok = false; + break; + case O_WRONLY: mode = "w"; break; + case O_RDWR: + mode = "r+"; + if (i == 0) req->has_input = true; + break; + default: + warnx("invalid mode from F_GETFL: 0x%x", fl); + fd_ok = false; + } + if (!fd_ok) { + xclose(fd); + } else { + req->fp[i] = fdopen(fd, mode); + if (!req->fp[i]) { + warn("fdopen(fd=%d)", fd); + fd_ok = false; + } + } + } + for (i = 0; !fd_ok && i < MY_ARRAY_SIZE(req->fp); i++) + if (req->fp[i]) fclose(req->fp[i]); + return fd_ok; + } + warnx("no FD received in %zd-byte request", r); + return false; +} + +#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst)) +static int split2argv(char **dst, char *buf, size_t len, size_t limit) +{ + if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) { + warnx("bogus argument given"); + return 0; + } + size_t nr = 0; + char *c = buf; + for (size_t i = 1; i < len; i++) { + if (!buf[i]) { + dst[nr++] = c; + c = buf + i + 1; + } + if (nr == limit) { + warnx("too many args: %zu", nr); + return 0; + } + } + return (int)nr; +} + +static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch +{ + const struct srch *a = (const struct srch *)pa; + const struct srch *b = (const struct srch *)pb; + int diff = a->paths_len - b->paths_len; + + return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len); +} + +static bool is_chert(const char *dir) +{ + char iamchert[PATH_MAX]; + struct stat sb; + int rc = snprintf(iamchert, sizeof(iamchert), "%s/iamchert", dir); + + if (rc <= 0 || rc >= (int)sizeof(iamchert)) + err(EXIT_FAILURE, "BUG: snprintf(%s/iamchert)", dir); + if (stat(iamchert, &sb) == 0 && S_ISREG(sb.st_mode)) + return true; + return false; +} + +static bool srch_init(struct req *req) +{ + char *dirv[MY_ARG_MAX]; + int i; + struct srch *srch = req->srch; + int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len); + const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE; + srch->qp_flags = FLAG_PHRASE | + Xapian::QueryParser::FLAG_BOOLEAN | + Xapian::QueryParser::FLAG_LOVEHATE | + Xapian::QueryParser::FLAG_WILDCARD; + if (is_chert(dirv[0])) + srch->qp_flags &= ~FLAG_PHRASE; + try { + srch->db = new Xapian::Database(dirv[0]); + } catch (...) { + warn("E: Xapian::Database(%s)", dirv[0]); + return false; + } + try { + for (i = 1; i < dirc; i++) { + if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i])) + srch->qp_flags &= ~FLAG_PHRASE; + srch->db->add_database(Xapian::Database(dirv[i])); + } + } catch (...) { + warn("E: add_database(%s)", dirv[i]); + return false; + } + try { + srch->qp = new Xapian::QueryParser; + } catch (...) { + perror("E: Xapian::QueryParser"); + return false; + } + srch->qp->set_default_op(Xapian::Query::OP_AND); + srch->qp->set_database(*srch->db); + try { + srch->qp->set_stemmer(Xapian::Stem("english")); + } catch (...) { + perror("E: Xapian::Stem"); + return false; + } + srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME); + srch->qp->SET_MAX_EXPANSION(100); + + if (req->code_search) + qp_init_code_search(srch->qp); // CodeSearch.pm + else + qp_init_mail_search(srch->qp); // Search.pm + return true; +} + +static void free_srch(void *p) // tdestroy +{ + struct srch *srch = (struct srch *)p; + delete srch->qp; + delete srch->db; + free(srch); +} + +static void dispatch(struct req *req) +{ + int c; + size_t size = strlen(req->argv[0]); + union { + struct srch *srch; + char *ptr; + } fbuf; + char *end; + FILE *kfp; + struct srch **s; + req->fn = NULL; + for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) { + if (cmds[c].fn_len == size && + !memcmp(cmds[c].fn_name, req->argv[0], size)) { + req->fn = cmds[c].fn; + break; + } + } + if (!req->fn) goto cmd_err; + + kfp = open_memstream(&fbuf.ptr, &size); + // write padding, first + fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp); + + // global getopt variables: + optind = 1; + opterr = optopt = 0; + optarg = NULL; + + // keep sync with @PublicInbox::XapHelper::SPEC + while ((c = getopt(req->argc, req->argv, "acd:k:m:o:rtA:O:T:")) != -1) { + switch (c) { + case 'a': req->asc = true; break; + case 'c': req->code_search = true; break; + case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break; + case 'k': + req->sort_col = strtol(optarg, &end, 10); + if (*end) goto cmd_err; + switch (req->sort_col) { + case LONG_MAX: case LONG_MIN: goto cmd_err; + } + break; + case 'm': + req->max = strtoull(optarg, &end, 10); + if (*end) goto cmd_err; + if (req->max == ULLONG_MAX) goto cmd_err; + break; + case 'o': + req->off = strtoull(optarg, &end, 10); + if (*end) goto cmd_err; + if (req->off == ULLONG_MAX) goto cmd_err; + break; + case 'r': req->relevance = true; break; + case 't': req->collapse_threads = true; break; + case 'A': + req->pfxv[req->pfxc++] = optarg; + if (MY_ARG_MAX == req->pfxc) goto cmd_err; + break; + case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix + case 'T': + req->timeout_sec = strtoul(optarg, &end, 10); + if (*end) goto cmd_err; + if (req->timeout_sec == ULONG_MAX) goto cmd_err; + break; + default: goto cmd_err; + } + } + if (ferror(kfp) | fclose(kfp)) { + perror("ferror|fclose"); + goto cmd_err; + } + fbuf.srch->db = NULL; + fbuf.srch->qp = NULL; + fbuf.srch->paths_len = size - offsetof(struct srch, paths); + if (fbuf.srch->paths_len <= 0) { + free_srch(fbuf.srch); + warnx("no -d args"); + goto cmd_err; + } + s = (struct srch **)tsearch(fbuf.srch, &srch_tree, srch_cmp); + if (!s) { + perror("tsearch"); + goto cmd_err; + } + req->srch = *s; + if (req->srch != fbuf.srch) { // reuse existing + free_srch(fbuf.srch); + } else if (!srch_init(req)) { + assert(fbuf.srch == *((struct srch **)tfind( + fbuf.srch, &srch_tree, srch_cmp))); + void *del = tdelete(fbuf.srch, &srch_tree, srch_cmp); + assert(del); + free_srch(fbuf.srch); + goto cmd_err; + } + try { + if (!req->fn(req)) + goto cmd_err; + } catch (const Xapian::Error & e) { + warnx("Xapian::Error: %s", e.get_description().c_str()); + } catch (...) { + warn("unhandled exception"); + } +cmd_err: + return; // just be silent on errors, for now +} + +static void cleanup_pids(void) +{ + free(worker_pids); + worker_pids = NULL; +} + +static void recv_loop(void) // worker process loop +{ + static char rbuf[4096 * 33]; // per-process + while (!parent_pid || getppid() == parent_pid) { + size_t len = sizeof(rbuf); + struct req req = { 0 }; + if (!recv_req(&req, rbuf, &len)) + continue; + if (req.fp[1]) { + if (my_setlinebuf(req.fp[1])) + perror("W: setlinebuf(req.fp[1])"); + stderr = req.fp[1]; + } + req.argc = SPLIT2ARGV(req.argv, rbuf, len); + if (req.argc > 0) + dispatch(&req); + if (ferror(req.fp[0]) | fclose(req.fp[0])) + perror("ferror|fclose fp[0]"); + if (req.fp[1]) { + stderr = orig_err; + if (ferror(req.fp[1]) | fclose(req.fp[1])) + perror("ferror|fclose fp[1]"); + } + } +} + +static void insert_pid(pid_t pid, unsigned nr) +{ + assert(!worker_pids[nr]); + worker_pids[nr] = pid; +} + +static int delete_pid(pid_t pid) +{ + for (unsigned nr = 0; nr < nworker; nr++) { + if (worker_pids[nr] == pid) { + worker_pids[nr] = 0; + return nr; + } + } + warnx("W: unknown pid=%d reaped", (int)pid); + return -1; +} + +static void start_worker(unsigned nr) +{ + pid_t pid = fork(); + if (pid < 0) { + warn("E: fork(worker=%u)", nr); + } else if (pid > 0) { + insert_pid(pid, nr); + } else { + cleanup_pids(); + recv_loop(); + exit(0); + } +} + +static void cleanup_all(void) +{ + cleanup_pids(); +#ifdef __GLIBC__ + tdestroy(srch_tree, free_srch); + srch_tree = NULL; +#endif +} + +int main(int argc, char *argv[]) +{ + int c; + + mail_nrp_init(); + code_nrp_init(); + atexit(cleanup_all); + + nworker = 0; +#ifdef _SC_NPROCESSORS_ONLN + long j = sysconf(_SC_NPROCESSORS_ONLN); + if (j > 0) + nworker = j > UCHAR_MAX ? UCHAR_MAX : j; +#endif // _SC_NPROCESSORS_ONLN + + // make warn/warnx/err multi-process friendly: + if (my_setlinebuf(stderr)) + err(EXIT_FAILURE, "setlinebuf(stderr)"); + // not using -W like Daemon.pm, since -W is reserved (glibc) + while ((c = getopt(argc, argv, "j:")) != -1) { + char *end; + + switch (c) { + case 'j': + nworker = strtoul(optarg, &end, 10); + if (*end != 0 || nworker > USHRT_MAX) + errx(EXIT_FAILURE, "-j %s invalid", optarg); + break; + case ':': + errx(EXIT_FAILURE, "missing argument: `-%c'", optopt); + case '?': + errx(EXIT_FAILURE, "unrecognized: `-%c'", optopt); + default: + errx(EXIT_FAILURE, "BUG: `-%c'", c); + } + } + if (nworker == 0) { + recv_loop(); + } else { + parent_pid = getpid(); + worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t)); + if (!worker_pids) + err(EXIT_FAILURE, "calloc"); + for (unsigned i = 0; i < nworker; i++) + start_worker(i); + + int st; + pid_t pid; + bool quit = false; + while ((pid = wait(&st)) > 0) { + int nr = delete_pid(pid); + if (nr < 0) continue; + if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT) + quit = true; + if (!quit) + start_worker(nr); + } + } + return 0; +} diff --git a/t/xap_helper.t b/t/xap_helper.t new file mode 100644 index 00000000..f00a845a --- /dev/null +++ b/t/xap_helper.t @@ -0,0 +1,147 @@ +#!perl -w +# Copyright (C) all contributors +# License: AGPL-3.0+ +use v5.12; +use PublicInbox::TestCommon; +require_mods(qw(DBD::SQLite Search::Xapian)); +use PublicInbox::Spawn qw(spawn); +use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM MSG_EOR); +require PublicInbox::AutoReap; +require PublicInbox::IPC; +require PublicInbox::XapClient; +use autodie; +my ($tmp, $for_destroy) = tmpdir(); + +my $fi_data = './t/git.fast-import-data'; +open my $fi_fh, '<', $fi_data; +open my $dh, '<', '.'; +my $crepo = create_coderepo 'for-cindex', sub { + my ($d) = @_; + xsys_e([qw(git init -q --bare)]); + xsys_e([qw(git fast-import --quiet)], undef, { 0 => $fi_fh }); + chdir($dh); + run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j1), $d]) + or xbail '-cindex internal'; + run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j3 -d), + "$d/cidx-ext", $d]) or xbail '-cindex "external"'; +}; +$dh = $fi_fh = undef; + +my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2, + tmpdir => "$tmp/v2", sub { + my ($im) = @_; + for my $f (qw(t/data/0001.patch t/data/binary.patch + t/data/message_embed.eml + t/solve/0001-simple-mod.patch + t/solve/0002-rename-with-modifications.patch + t/solve/bare.patch)) { + $im->add(eml_load($f)) or BAIL_OUT; + } +}; + +my @ibx_idx = glob("$v2->{inboxdir}/xap*/?"); +my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?"); +my (@ext) = glob("$crepo/cidx-ext/cidx*/?"); +is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext); +is(scalar(@int), 1, 'have 1 internal shard') or diag explain(\@int); + +my $doreq = sub { + my ($s, @arg) = @_; + my $err = pop @arg if ref($arg[-1]); + pipe(my $x, my $y); + my $buf = join("\0", @arg, ''); + my @fds = fileno($y); + push @fds, fileno($err) if $err; + my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, MSG_EOR); + $n // xbail "send: $!"; + my $arg = "@arg"; + $arg =~ s/\Q$tmp\E/\$TMP/gs; + is(length($buf), $n, "req $arg sent"); + $x; +}; + +my $env = { PERL5LIB => join(':', @INC) }; +my $test = sub { + my (@arg) = @_; + socketpair(my $s, my $y, AF_UNIX, SOCK_SEQPACKET, 0); + my $pid = spawn([$^X, '-w', @arg], $env, { 0 => $y }); + my $ar = PublicInbox::AutoReap->new($pid); + diag "$arg[-1] running pid=$pid"; + close $y; + my $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]); + my %info = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> }); + is($info{has_threadid}, '1', 'has_threadid true for inbox'); + like($info{pid}, qr/\A\d+\z/, 'got PID from inbox inspect'); + + $r = $doreq->($s, qw(test_inspect -d), $int[0]); + my %cinfo = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> }); + is($cinfo{has_threadid}, '0', 'has_threadid false for cindex'); + is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex'); + + my @dump = (qw(dump_ibx -A XDFID), (map { ('-d', $_) } @ibx_idx), + qw(13 rt:0..)); + $r = $doreq->($s, @dump); + my @res; + while (sysread($r, my $buf, 512) != 0) { push @res, $buf } + is(grep(/\n\z/s, @res), scalar(@res), 'line buffered'); + + pipe(my $err_rd, my $err_wr); + $r = $doreq->($s, @dump, $err_wr); + close $err_wr; + my $res = do { local $/; <$r> }; + is(join('', @res), $res, 'got identical response w/ error pipe'); + my $stats = do { local $/; <$err_rd> }; + is($stats, "mset.size=6\n", 'mset.size reported'); + + if ($arg[-1] !~ /\('-j0'\)/) { + kill('KILL', $cinfo{pid}); + $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]); + %info = map { + split(/=/, $_, 2) + } split(/ /, do { local $/; <$r> }); + isnt($info{pid}, $cinfo{pid}, 'spawned new worker'); + } + $ar; +}; +my $ar; + +$ar = $test->(qw[-MPublicInbox::XapHelper -e + PublicInbox::XapHelper::start('-j0')]); +$ar = $test->(qw[-MPublicInbox::XapHelper -e + PublicInbox::XapHelper::start('-j1')]); + +my @NO_CXX = (0); +SKIP: { + eval { + require PublicInbox::XapHelperCxx; + PublicInbox::XapHelperCxx::check_build(); + }; + skip "XapHelperCxx build: $@", 1 if $@; + push @NO_CXX, 1; + + $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e + PublicInbox::XapHelperCxx::start('-j0')]); + $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e + PublicInbox::XapHelperCxx::start('-j1')]); +}; + +for my $n (@NO_CXX) { + local $ENV{PI_NO_CXX} = $n; + my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0'); + $ar = PublicInbox::AutoReap->new($pid); + pipe(my $err_r, my $err_w); + + # git patch-id --stable mkreq([ undef, $err_w ], qw(dump_ibx -A XDFID -A Q), + (map { ('-d', $_) } @ibx_idx), + 9, "mid:$mid"); + close $err_w; + my $res = do { local $/; <$r> }; + is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})"); + my $err = do { local $/; <$err_r> }; + is($err, "mset.size=1\n", "got expected status ($xhc->{impl})"); +} + +done_testing; -- cgit v1.2.3-24-ge0c7