From fe3883762faf67fd6c4624ee721000e1f36bc59b Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 21 Nov 2023 12:43:15 +0000 Subject: cindex: rename --associate to --join, test w/ real repos The association data is just stored as deflated JSON in Xapian metadata keys of shard[0] for now. It should be reasonably compact and fit in memory for now since we'll assume sane, non-malicious git coderepo history, for now. The new cindex-join.t test requires TEST_REMOTE_JOIN=1 to be set in the environment and tests the joins against the inboxes and coderepos of two small projects with a common history. Internally, we'll use `ibx_off', `root_off' instead of `ibx_id' and `root_id' since `_id' may be mistaken for columns in an SQL database which they are not. --- lib/PublicInbox/CodeSearch.pm | 62 ++++++- lib/PublicInbox/CodeSearchIdx.pm | 370 ++++++++++++++++++++++++--------------- lib/PublicInbox/TestCommon.pm | 9 +- lib/PublicInbox/XapHelper.pm | 14 +- lib/PublicInbox/xap_helper.h | 59 ++++--- 5 files changed, 329 insertions(+), 185 deletions(-) (limited to 'lib/PublicInbox') diff --git a/lib/PublicInbox/CodeSearch.pm b/lib/PublicInbox/CodeSearch.pm index 6234e259..9051d85f 100644 --- a/lib/PublicInbox/CodeSearch.pm +++ b/lib/PublicInbox/CodeSearch.pm @@ -7,7 +7,9 @@ package PublicInbox::CodeSearch; use v5.12; use parent qw(PublicInbox::Search); +use PublicInbox::Config; use PublicInbox::Search qw(retry_reopen int_val xap_terms); +use Compress::Zlib qw(uncompress); use constant { AT => 0, # author time YYYYMMDDHHMMSS, dt: for mail) CT => 1, # commit time (Unix time stamp, like TS/rt: in mail) @@ -47,8 +49,21 @@ my %prob_prefix = ( # copied from PublicInbox::Search ); sub new { - my ($cls, $dir) = @_; - bless { xpfx => "$dir/cidx".CIDX_SCHEMA_VER }, $cls; + my ($cls, $dir, $cfg) = @_; + # can't have a PublicInbox::Config here due to circular refs + bless { xpfx => "$dir/cidx".CIDX_SCHEMA_VER, + -cfg_f => $cfg->{-f} }, $cls; +} + +sub join_data_key ($) { "join:$_[0]->{-cfg_f}" } + +sub join_data { + my ($self) = @_; + my $key = join_data_key($self); + my $cur = $self->xdb->get_metadata($key) or return; + $cur = eval { PublicInbox::Config::json()->decode(uncompress($cur)) }; + warn "E: $@ (corrupt metadata in `$key' key?)" if $@; + $cur; } sub qparse_new ($) { @@ -151,4 +166,47 @@ sub mset { $self->do_enquire($qry, $opt, CT); } +sub roots2paths { # for diagnostics + my ($self) = @_; + my $cur = $self->xdb->allterms_begin('G'); + my $end = $self->{xdb}->allterms_end('G'); + my $qrepo = $PublicInbox::Search::X{Query}->new('T'.'r'); + my $enq = $PublicInbox::Search::X{Enquire}->new($self->{xdb}); + $enq->set_weighting_scheme($PublicInbox::Search::X{BoolWeight}->new); + $enq->set_docid_order($PublicInbox::Search::ENQ_ASCENDING); + my %ret; + for (; $cur != $end; $cur++) { + my $G_oidhex = $cur->get_termname; + my $qry = $PublicInbox::Search::X{Query}->new( + PublicInbox::Search::OP_FILTER(), + $qrepo, $G_oidhex); + $enq->set_query($qry); + my ($size, $off, $lim) = (0, 0, 100000); + my $dirs = $ret{substr($G_oidhex, 1)} = []; + do { + my $mset = $enq->get_mset($off += $size, $lim); + for my $x ($mset->items) { + my $tmp = xap_terms('P', $x->get_document); + push @$dirs, keys %$tmp; + } + $size = $mset->size; + } while ($size); + substr($_, 0, 1, '/') for @$dirs; # s!^P!/! + @$dirs = sort @$dirs; + } + \%ret; +} + +sub paths2roots { # for diagnostics + my ($self) = @_; + my %ret; + my $tmp = roots2paths($self); + for my $root_oidhex (keys %$tmp) { + my $paths = delete $tmp->{$root_oidhex}; + push @{$ret{$_}}, $root_oidhex for @$paths; + } + @$_ = sort(@$_) for values %ret; + \%ret; +} + 1; diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index bbccc0e3..3a551c84 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -4,6 +4,9 @@ # indexer for git coderepos, just commits and repo paths for now # this stores normalized absolute paths of indexed GIT_DIR inside # the DB itself and is designed to handle forks by designating roots +# At minimum, it needs to have the pathnames of all git repos in +# memory at runtime. --join also requires all inbox pathnames to +# be in memory (as it happens when loaded from ~/.public-inbox/config). # # Unlike mail search, docid isn't tied to NNTP artnum or IMAP UID, # there's no serial number dependency at all. The first 32-bits of @@ -11,28 +14,38 @@ # # We shard repos using the first 32-bits of sha256($ABS_GIT_DIR) # -# --associate joins root commits of coderepos to inboxes based on prefixes. +# --join associates root commits of coderepos to inboxes based on prefixes. # -# Internally, each inbox is assigned a non-negative integer index ($IBX_ID), +# Internally, each inbox is assigned a non-negative integer index ($IBX_OFF), # and each root commit object ID (SHA-1/SHA-256 hex) is also assigned # a non-negative integer index ($ROOT_COMMIT_OID_ID). # -# associate dumps to 2 intermediate files in $TMPDIR: +# join dumps to 2 intermediate files in $TMPDIR: # -# * to_root_id - each line is of the format: +# * to_root_off - each line is of the format: # -# $PFX $ROOT_COMMIT_OID_IDS +# $PFX @ROOT_COMMIT_OID_OFFS # -# * to_ibx_id - each line is of the format: +# * to_ibx_off - each line is of the format: # -# $PFX $IBX_IDS +# $PFX @IBX_OFFS # -# $IBX_IDS is a comma-delimited list of integers ($IBX_ID) -# $ROOT_COMMIT_OID_IDS is space-delimited +# $IBX_OFFS is a comma-delimited list of integers ($IBX_ID) +# The $IBX_OFF here is ephemeral (per-join_data) and NOT related to +# the `ibx_off' column of `over.sqlite3' for extindex. +# @ROOT_COMMIT_OID_OFFS is space-delimited # In both cases, $PFX is typically the value of the patchid (XDFID) but it # can be configured to use any combination of patchid, dfpre, dfpost or # dfblob. # +# WARNING: this is vulnerable to arbitrary memory usage attacks if we +# attempt to index or join against malicious coderepos with +# thousands/millions of root commits. Most coderepos have only one +# root commit, some have several: git.git currently has 7, +# torvalds/linux.git has 4. +# --max-size= is required to keep memory usage reasonable for gigantic +# commits. +# # See PublicInbox::CodeSearch (read-only API) for more package PublicInbox::CodeSearchIdx; use v5.12; @@ -41,7 +54,7 @@ use parent qw(PublicInbox::CodeSearch PublicInbox::IPC PublicInbox::SearchIdx); use PublicInbox::DS qw(awaitpid); use PublicInbox::PktOp; use PublicInbox::IPC qw(nproc_shards); -use POSIX qw(WNOHANG SEEK_SET); +use POSIX qw(WNOHANG SEEK_SET strftime); use File::Path (); use File::Spec (); use List::Util qw(max); @@ -56,7 +69,9 @@ use PublicInbox::CidxComm; use PublicInbox::Git qw(%OFMT2HEXLEN); use PublicInbox::Compat qw(uniqstr); use PublicInbox::Aspawn qw(run_await); +use Compress::Zlib qw(compress); use Carp (); +use Time::Local qw(timegm); use autodie qw(close pipe open sysread seek sysseek send); our $DO_QUIT = 15; # signal number our ( @@ -82,13 +97,15 @@ our ( $TMPDIR, # File::Temp->newdir object for prune @PRUNE_QUEUE, # GIT_DIRs to prepare for pruning %TODO, @IBXQ, @IBX, - @JOIN, # join(1) command for associate + @JOIN, # join(1) command for --join $CMD_ENV, # env for awk(1), comm(1), sort(1) commands during prune @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands - @ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST + %JOIN, # CLI --join= suboptions + @JOIN_PFX, # any combination of XDFID, XDFPRE, XDFPOST + @JOIN_DT, # YYYYmmddHHMMSS for dt: $QRY_STR, # common query string for both code and inbox associations $DUMP_IBX_WPIPE, # goes to sort(1) - @ID2ROOT, + @OFF2ROOT, ); # stop walking history if we see >$SEEN_MAX existing commits, this assumes @@ -97,7 +114,7 @@ our ( our $SEEN_MAX = 100000; # window for commits/emails to determine a inbox <-> coderepo association -my $ASSOC_WINDOW = 50000; +my $JOIN_WINDOW = 50000; our @PRUNE_BATCH = qw(git _ cat-file --batch-all-objects --batch-check); @@ -320,18 +337,18 @@ sub shard_done { # called via PktOp on shard_index completion } sub repo_stored { - my ($self, $repo_ctx, $did) = @_; + my ($self, $repo_ctx, $drs, $did) = @_; $did > 0 or die "BUG: $repo_ctx->{repo}->{git_dir}: docid=$did"; - my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx); my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $self, $repo_ctx, $next ]; + $c->{ops}->{shard_done} = [ $self, $repo_ctx, + PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx, $drs)]; # shard_done fires when all shards are committed my @active = keys %{$repo_ctx->{active}}; $IDX_SHARDS[$_]->wq_io_do('shard_commit', [ $p->{op_p} ]) for @active; } sub prune_done { # called via prune_do completion - my ($self, $n) = @_; + my ($self, $drs, $n) = @_; return if $DO_QUIT || !$PRUNE_DONE; die "BUG: \$PRUNE_DONE->[$n] already defined" if $PRUNE_DONE->[$n]; $PRUNE_DONE->[$n] = 1; @@ -499,12 +516,6 @@ sub shard_commit { # via wq_io_do send($op_p, "shard_done $self->{shard}", 0); } -sub assoc_window_args ($) { - my ($self) = @_; - my $n = $self->{-opt}->{'associate-window'} // $ASSOC_WINDOW; - $n <= 0 ? () : ('-m', $n); -} - sub start_xhc () { my ($xhc, $pid) = PublicInbox::XapClient::start_helper("-j$NPROC"); awaitpid($pid, \&cmd_done, ['xap_helper', "-j$NPROC"]); @@ -512,68 +523,69 @@ sub start_xhc () { } sub dump_roots_start { - my ($self, $associate) = @_; + my ($self, $do_join) = @_; $XHC //= start_xhc; - $associate // die 'BUG: no $associate'; - $TODO{associating} = 1; # keep shards_active() happy + $do_join // die 'BUG: no $do_join'; progress($self, 'dumping IDs from coderepos'); local $self->{xdb}; - @ID2ROOT = $self->all_terms('G'); + @OFF2ROOT = $self->all_terms('G'); my $root2id = "$TMPDIR/root2id"; open my $fh, '>', $root2id; my $nr = -1; - for (@ID2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly + for (@OFF2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly close $fh; - # dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id + # dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_off my ($sort_opt, $fold_opt); pipe(local $sort_opt->{0}, my $sort_w); pipe(local $fold_opt->{0}, local $sort_opt->{1}); my @sort = (@SORT, '-k1,1'); - my $dst = "$TMPDIR/to_root_id"; + my $dst = "$TMPDIR/to_root_off"; open $fold_opt->{1}, '>', $dst; my $fold_env = { %$CMD_ENV, OFS => ' ' }; - run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $associate); - run_await(\@UNIQ_FOLD, $fold_env, $fold_opt, \&cmd_done, $associate); - my @arg = ((map { ('-A', $_) } @ASSOC_PFX), '-c', - assoc_window_args($self), $root2id, $QRY_STR); + run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $do_join); + run_await(\@UNIQ_FOLD, $fold_env, $fold_opt, \&cmd_done, $do_join); + my $window = $JOIN{window} // $JOIN_WINDOW; + my @m = $window <= 0 ? () : ('-m', $window); + my @arg = ((map { ('-A', $_) } @JOIN_PFX), '-c', + @m, $root2id, $QRY_STR); for my $d ($self->shard_dirs) { pipe(my $err_r, my $err_w); $XHC->mkreq([$sort_w, $err_w], qw(dump_roots -d), $d, @arg); my $desc = "dump_roots $d"; - $self->{PENDING}->{$desc} = $associate; + $self->{PENDING}->{$desc} = $do_join; PublicInbox::CidxXapHelperAux->new($err_r, $self, $desc); } progress($self, 'waiting on dump_roots sort'); } sub dump_ibx { # sends to xap_helper.h - my ($self, $ibx_id) = @_; - my $ibx = $IBX[$ibx_id] // die "BUG: no IBX[$ibx_id]"; + my ($self, $ibx_off) = @_; + my $ibx = $IBX[$ibx_off] // die "BUG: no IBX[$ibx_off]"; my $ekey = $ibx->eidx_key; my $srch = $ibx->isrch or return warn <xh_args, - (map { ('-A', $_) } @ASSOC_PFX), $ibx_id, $QRY_STR); + (map { ('-A', $_) } @JOIN_PFX), $ibx_off, $QRY_STR); pipe(my $r, my $w); $XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd); - $self->{PENDING}->{$ekey} = $TODO{associate}; + $self->{PENDING}->{$ekey} = $TODO{do_join}; PublicInbox::CidxXapHelperAux->new($r, $self, $ekey); } sub dump_ibx_start { - my ($self, $associate) = @_; + my ($self, $do_join) = @_; $XHC //= start_xhc; my ($sort_opt, $fold_opt); pipe(local $sort_opt->{0}, $DUMP_IBX_WPIPE); pipe(local $fold_opt->{0}, local $sort_opt->{1}); - my @sort = (@SORT, '-k1,1'); # sort only on ASSOC_PFX - # pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_id - open $fold_opt->{1}, '>', "$TMPDIR/to_ibx_id"; - run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $associate); - run_await(\@UNIQ_FOLD, $CMD_ENV, $fold_opt, \&cmd_done, $associate); + my @sort = (@SORT, '-k1,1'); # sort only on JOIN_PFX + # pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_off + open $fold_opt->{1}, '>', "$TMPDIR/to_ibx_off"; + run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $do_join); + run_await(\@UNIQ_FOLD, $CMD_ENV, $fold_opt, \&cmd_done, $do_join); } sub index_next ($) { @@ -592,13 +604,13 @@ sub index_next ($) { delete $TODO{dump_ibx_start}; # runs OnDestroy once return dump_ibx($self, shift @IBXQ) if @IBXQ; undef $DUMP_IBX_WPIPE; # done dumping inboxes - delete $TODO{associate}; + delete $TODO{do_join}; } # else: wait for shards_active (post_loop_do) callback } sub next_repos { # OnDestroy cb - my ($repo_ctx) = @_; + my ($repo_ctx, $drs) = @_; my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)}; progress($self, "$repo->{git_dir}: done"); return if $DO_QUIT || !$REPO_CTX; @@ -620,8 +632,7 @@ sub index_done { # OnDestroy cb called when done indexing each code repo $n = $repo->{shard_n}; $active->{$n} = undef; my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{repo_stored} = [ $self, $repo_ctx ]; - $c->{-cidx_dump_roots_start} = $drs if $drs; + $c->{ops}->{repo_stored} = [ $self, $repo_ctx, $drs ]; $IDX_SHARDS[$n]->wq_io_do('store_repo', [ $p->{op_p} ], $repo); # repo_stored will fire once store_repo is done } @@ -875,46 +886,99 @@ sub prep_alternate_start { sub cmd_done { # run_await cb for sort, xapian-delve, sed failures my ($pid, $cmd, undef, undef, $run_on_destroy) = @_; $? and die "fatal: @$cmd (\$?=$?)\n"; - # $run_on_destroy calls associate() or run_prune() + # $run_on_destroy calls do_join() or run_prune() +} + +sub current_join_data ($) { + my ($self) = @_; + local $self->{xdb} = $RDONLY_XDB[0] // die 'BUG: shard[0] undef'; + # we support multiple PI_CONFIG files for a cindex: + $self->join_data; +} + +# combined previously stored stats with new +sub score_old_join_data ($$$) { + my ($self, $score, $ekeys_new) = @_; + my $old = ($JOIN{reset} ? undef : current_join_data($self)) or return; + my @old = @$old{qw(ekeys roots ibx2root)}; + @old == 3 or return warn "W: ekeys/roots missing from old JOIN data\n"; + progress($self, 'merging old join data...'); + my ($ekeys_old, $roots_old, $ibx2root_old) = @old; + # score: "ibx_off root_off" => nr + my $i = -1; + my %root2id_new = map { $_ => ++$i } @OFF2ROOT; + $i = -1; + my %ekey2id_new = map { $_ => ++$i } @$ekeys_new; + for my $ibx_off_old (0..$#$ibx2root_old) { + my $root_offs_old = $ibx2root_old->[$ibx_off_old]; + my $ekey = $ekeys_old->[$ibx_off_old] // + warn "W: no ibx #$ibx_off_old in old JOIN data\n"; + my $ibx_off_new = $ekey2id_new{$ekey // next} // + warn "W: `$ekey' no longer exists\n"; + for (@$root_offs_old) { + my ($nr, $rid_old) = @$_; + my $root_old = $roots_old->[$rid_old] // + warn "W: no root #$rid_old in old JOIN data\n"; + my $rid_new = $root2id_new{$root_old // next} // + warn "W: root `$root_old' no longer exists\n"; + $score->{"$ibx_off_new $rid_new"} += $nr; + } + } +} + +sub metadata_set { # via wq_do + my ($self, $key, $val, $commit) = @_; + $self->begin_txn_lazy; + $self->{xdb}->set_metadata($key, $val); + $self->commit_txn_lazy if $commit || defined(wantarray); } # runs once all inboxes and shards are dumped via OnDestroy -sub associate { +sub do_join { my ($self) = @_; return if $DO_QUIT; $XHC = 0; # should not be recreated again @IDX_SHARDS or return warn("# aborting on no shards\n"); unlink("$TMPDIR/root2id"); my @pending = keys %{$self->{PENDING}}; - die "E: pending=@pending jobs not done\n" if @pending; - progress($self, 'associating...'); - my @join = (@JOIN, 'to_ibx_id', 'to_root_id'); + die "BUG: pending=@pending jobs not done\n" if @pending; + progress($self, 'joining...'); + my @join = (@JOIN, 'to_ibx_off', 'to_root_off'); + if (my $time = which('time')) { unshift @join, $time }; my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" }); my %score; - while (<$rd>) { # PFX ibx_ids root_id - my (undef, $ibx_ids, @root_ids) = split(/ /, $_); - for my $ibx_id (split(/,/, $ibx_ids)) { - ++$score{"$ibx_id $_"} for @root_ids; + while (<$rd>) { # PFX ibx_offs root_off + chop eq "\n" or die "no newline from @join: <$_>"; + my (undef, $ibx_offs, @root_offs) = split / /, $_; + for my $ibx_off (split(/,/, $ibx_offs)) { + ++$score{"$ibx_off $_"} for @root_offs; } } $rd->close or die "fatal: @join failed: \$?=$?"; - my $min = $self->{-opt}->{'assoc-min'} // 10; - progress($self, scalar(keys %score).' potential pairings...'); - for my $k (keys %score) { - my $nr = $score{$k}; - my ($ibx_id, $root) = split(/ /, $k); - my $ekey = $IBX[$ibx_id]->eidx_key; - $root = $ID2ROOT[$root]; + my $nr = scalar(keys %score) or do { + delete $TODO{joining}; + return progress($self, 'no potential new pairings'); + }; + progress($self, "$nr potential new pairings..."); + my @ekeys = map { $_->eidx_key } @IBX; + score_old_join_data($self, \%score, \@ekeys); + my $new; + while (my ($k, $nr) = each %score) { + my ($ibx_off, $root_off) = split(/ /, $k); + my ($ekey, $root) = ($ekeys[$ibx_off], $OFF2ROOT[$root_off]); progress($self, "$ekey => $root has $nr matches"); + push @{$new->{ibx2root}->[$ibx_off]}, [ $nr, $root_off ]; + } + for my $ary (values %$new) { # sort by nr + for (@$ary) { @$_ = sort { $b->[0] <=> $a->[0] } @$_ } } - delete $TODO{associating}; # break out of shards_active() - # TODO - warn "# Waiting for $TMPDIR/cont @JOIN"; - system "ls -Rl $TMPDIR >&2"; - system "wc -l $TMPDIR/to_*_id >&2"; - #sleep(1) until -f "$TMPDIR/cont"; - # warn "# Waiting for $TMPDIR/cont"; - # sleep(1) until -f "$TMPDIR/cont"; + $new->{ekeys} = \@ekeys; + $new->{roots} = \@OFF2ROOT; + $new->{dt} = \@JOIN_DT; + $new = compress(PublicInbox::Config::json()->encode($new)); + my $key = $self->join_data_key; + my $wait = $IDX_SHARDS[0]->wq_do('metadata_set', $key, $new); + delete $TODO{joining}; } sub require_progs { @@ -927,21 +991,48 @@ sub require_progs { } } -sub init_associate_postfork ($) { +sub init_join_postfork ($) { my ($self) = @_; - return unless $self->{-opt}->{associate}; - require_progs('associate', join => \@JOIN); - $QRY_STR = $self->{-opt}->{'associate-date-range'} // '1.year.ago..'; - substr($QRY_STR, 0, 0) = 'dt:'; - @{$self->{git_dirs} // []} or die "E: no coderepos to associate\n"; - @IBX or die "E: no inboxes to associate\n"; + return unless $self->{-opt}->{join}; + require_progs('join', join => \@JOIN); + my $d2 = '([0-9]{2})'; + my $dt_re = qr!([0-9]{4})$d2$d2$d2$d2$d2!; + if (my $cur = $JOIN{reset} ? current_join_data($self) : undef) { + if (($cur->{dt}->[1] // '') =~ m!\A$dt_re\z!o) { + my ($Y, $m, $d, $H, $M, $S) = ($1, $2, $3, $4, $5, $6); + my $t = timegm($S, $M, $H, $d, $m - 1, $Y); + $t = strftime('%Y%m%d%H%M%S', gmtime($t + 1)); + $JOIN{dt} //= "$t.."; + } else { + warn <= 0 or die "E: dt:$QRY_STR is not a range\n"; + # Account for send->apply delay (torvalds/linux.git mean is ~20 days + # from Author to CommitDate in cases where CommitDate > AuthorDate + $QRY_STR .= '1.month.ago' if $QRY_STR =~ /\.\.\z/; + @{$self->{git_dirs} // []} or die "E: no coderepos to join\n"; + @IBX or die "E: no inboxes to join\n"; my $approx_git = PublicInbox::Git->new($self->{git_dirs}->[0]); # ugh + substr($QRY_STR, 0, 0) = 'dt:'; $self->query_approxidate($approx_git, $QRY_STR); # in-place - $TODO{associate} = PublicInbox::OnDestroy->new($$, \&associate, $self); + ($JOIN_DT[1]) = ($QRY_STR =~ /\.\.([0-9]{14})\z/); # YYYYmmddHHMMSS + ($JOIN_DT[0]) = ($QRY_STR =~ /\Adt:([0-9]{14})/); # YYYYmmddHHMMSS + $JOIN_DT[0] //= '19700101'.'000000'; # git uses unsigned times + $TODO{do_join} = PublicInbox::OnDestroy->new($$, \&do_join, $self); + $TODO{joining} = 1; # keep shards_active() happy $TODO{dump_ibx_start} = PublicInbox::OnDestroy->new($$, - \&dump_ibx_start, $self, $TODO{associate}); + \&dump_ibx_start, $self, $TODO{do_join}); $TODO{dump_roots_start} = PublicInbox::OnDestroy->new($$, - \&dump_roots_start, $self, $TODO{associate}); + \&dump_roots_start, $self, $TODO{do_join}); + progress($self, "will join in $QRY_STR date range..."); my $id = -1; @IBXQ = map { ++$id } @IBX; } @@ -1035,31 +1126,39 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step $IDX_SHARDS[$n]->wq_do('prune_one', 'P'.$git_dir); } my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{prune_done} = [ $self ]; - $c->{-cidx_dump_roots_start} = $drs; + $c->{ops}->{prune_done} = [ $self, $drs ]; $_->wq_io_do('prune_commit', [ $p->{op_p} ]) for @IDX_SHARDS; } -sub init_associate_prefork ($) { +sub init_join_prefork ($) { my ($self) = @_; - return unless $self->{-opt}->{associate}; + my $subopt = $self->{-opt}->{join} // return; + %JOIN = map { + my ($k, $v) = split /:/, $_, 2; + $k => $v // 1; + } split(/,/, join(',', @$subopt)); require PublicInbox::CidxXapHelperAux; require PublicInbox::XapClient; - $self->{-pi_cfg} = PublicInbox::Config->new; + my $cfg = $self->{-opt}->{-pi_cfg} // die 'BUG: -pi_cfg unset'; + $self->{-cfg_f} = $cfg->{-f} = rel2abs_collapsed($cfg->{-f}); my @unknown; - my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]}; - @pfx = map { split(/\s*,\s*/) } @pfx; - for (@pfx) { + my $pfx = $JOIN{prefixes} // 'patchid'; + for (split /\+/, $pfx) { my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$_} // push(@unknown, $_); - push(@ASSOC_PFX, split(/ /, $v)); + push(@JOIN_PFX, split(/ /, $v)); } - die < undef; + if (-f "$_/inbox.lock" || -d "$_/public-inbox") { + rel2abs_collapsed($_) => undef; + } else { + warn "W: `$_' is not a public inbox, skipping\n"; + (); + } } (@{$self->{-opt}->{include} // []}); my $all = $self->{-opt}->{all}; if (my $only = $self->{-opt}->{only}) { @@ -1074,10 +1173,10 @@ E: --all is incompatible with --only # --all implied since no inboxes were specified with --only or --include EOM } - $self->{-pi_cfg}->each_inbox(\&_prep_ibx, $self, \%incl, $all); - my $nr = scalar(@IBX) or die "E: no inboxes to associate\n"; - progress($self, "will associate $nr inboxes in ", - $self->{-pi_cfg}->{-f}, " using: @pfx"); + $self->{-opt}->{-pi_cfg}->each_inbox(\&_prep_ibx, $self, \%incl, $all); + my $nr = scalar(@IBX) or die "E: no inboxes to join with\n"; + progress($self, "will join with $nr inboxes in ", + $self->{-opt}->{-pi_cfg}->{-f}, " using: $pfx"); } sub _prep_ibx { # each_inbox callback @@ -1085,43 +1184,32 @@ sub _prep_ibx { # each_inbox callback ($all || exists($incl->{$ibx->{inboxdir}})) and push @IBX, $ibx; } -sub show_roots { # for diagnostics +sub show_json { # for diagnostics (unstable output) my ($self) = @_; + my $s = $self->{-opt}->{show} or return; # for diagnostics local $self->{xdb}; - my $cur = $self->xdb->allterms_begin('G'); - my $end = $self->{xdb}->allterms_end('G'); - my $qrepo = $PublicInbox::Search::X{Query}->new('T'.'r'); - my $enq = $PublicInbox::Search::X{Enquire}->new($self->{xdb}); - $enq->set_weighting_scheme($PublicInbox::Search::X{BoolWeight}->new); - $enq->set_docid_order($PublicInbox::Search::ENQ_ASCENDING); - for (; $cur != $end; $cur++) { - my $G_oidhex = $cur->get_termname; - my $qry = $PublicInbox::Search::X{Query}->new( - PublicInbox::Search::OP_FILTER(), - $qrepo, $G_oidhex); - $enq->set_query($qry); - my ($off, $lim) = (0, 10000); - say 'commit ',substr($G_oidhex, 1), ' appears in:'; - while (1) { - my $mset = $enq->get_mset($off, $lim); - my $size = $mset->size or last; - for my $x ($mset->items) { - my $doc = $x->get_document; - for (xap_terms('P', $x->get_document)) { - say '- /', substr($_, 1); - } - } - $off += $size; + my %ret; + my @todo = @$s; + while (defined(my $f = shift @todo)) { + if ($f =~ /\A(?:roots2paths|paths2roots|join_data)\z/) { + $ret{$f} = $self->$f; + } elsif ($f eq '') { # default --show (no args) + push @todo, qw(roots2paths join_data); + } else { + warn "E: cannot show `$f'\n"; } } + my $json = ref(PublicInbox::Config::json())->new; + $json->utf8->canonical->pretty; # n.b. FS pathnames may not be UTF-8... + say $json->encode(\%ret); } sub do_inits { # called via PublicInbox::DS::add_timer my ($self) = @_; - init_associate_postfork($self); + init_join_postfork($self); init_prune($self); scan_git_dirs($self) if $self->{-opt}->{scan} // 1; - my $max = $TODO{associate} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS; + my $max = $TODO{do_join} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS; index_next($self) for (1..$max); } @@ -1137,29 +1225,23 @@ sub cidx_run { # main entry point local $GIT_TODO = []; local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE, $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV, - %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE, - @ID2ROOT, $XHC, @SORT, $GITS_NR); + %TODO, @IBXQ, @IBX, @JOIN, %JOIN, @JOIN_PFX, + @JOIN_DT, $DUMP_IBX_WPIPE, @OFF2ROOT, $XHC, @SORT, $GITS_NR); local $BATCH_BYTES = $self->{-opt}->{batch_size} // $PublicInbox::SearchIdx::BATCH_BYTES; local $MAX_SIZE = $self->{-opt}->{max_size}; - local $self->{ASSOC_PFX} = \@ASSOC_PFX; - local $self->{PENDING} = {}; - local $self->{-pi_cfg}; - if ($self->{-opt}->{'associate-aggressive'}) { # shortcut - $self->{-opt}->{'associate-date-range'} //= '19700101000000..'; - $self->{-opt}->{'associate-window'} //= -1; - $self->{-opt}->{associate} //= 1; - } - if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) { + local $self->{PENDING} = {}; # used by PublicInbox::CidxXapHelperAux + local $self->{-cfg_f}; + if (grep { $_ } @{$self->{-opt}}{qw(prune join)}) { require File::Temp; $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); $CMD_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' }; - require_progs('(prune|associate)', sort => \@SORT); + require_progs('(prune|join)', sort => \@SORT); for (qw(parallel compress-program buffer-size)) { # GNU sort my $v = $self->{-opt}->{"sort-$_"}; push @SORT, "--$_=$v" if defined $v; } - init_associate_prefork($self) + init_join_prefork($self) } local @IDX_SHARDS = cidx_init($self); # forks workers local $self->{current_info} = ''; @@ -1218,7 +1300,7 @@ sub cidx_run { # main entry point PublicInbox::DS::event_loop($MY_SIG, $SIGSET); PublicInbox::DS->Reset; $self->lock_release(!!$NCHANGE); - show_roots($self) if $self->{-opt}->{'show-roots'} # for diagnostics + show_json($self); } sub ipc_atfork_child { # @IDX_SHARDS diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index 0d17e6e7..361a2356 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -14,7 +14,7 @@ use Scalar::Util qw(isvstring); use Carp (); our @EXPORT; my $lei_loud = $ENV{TEST_LEI_ERR_LOUD}; -my $tail_cmd = $ENV{TAIL}; +our $tail_cmd = $ENV{TAIL}; our ($lei_opt, $lei_out, $lei_err); use autodie qw(chdir close fcntl mkdir open opendir seek unlink); @@ -364,12 +364,15 @@ sub run_script ($;$$) { my $fhref = []; my $spawn_opt = {}; my @tail_paths; + local $tail_cmd = $tail_cmd; for my $fd (0..2) { my $redir = $opt->{$fd}; my $ref = ref($redir); if ($ref eq 'SCALAR') { my $fh; - if ($tail_cmd && $ENV{TAIL_ALL} && $fd > 0) { + if ($ENV{TAIL_ALL} && $fd > 0) { + # tail -F is better, but not portable :< + $tail_cmd //= 'tail -f'; require File::Temp; $fh = File::Temp->new("fd.$fd-XXXX", TMPDIR=>1); push @tail_paths, $fh->filename; @@ -820,7 +823,7 @@ sub create_coderepo ($$;@) { $tmpdir; } -sub create_inbox ($$;@) { +sub create_inbox ($;@) { my $ident = shift; my $cb = pop; my %opt = @_; diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm index 428b732e..fe831b8f 100644 --- a/lib/PublicInbox/XapHelper.pm +++ b/lib/PublicInbox/XapHelper.pm @@ -93,10 +93,10 @@ sub cmd_dump_ibx { } sub dump_roots_iter ($$$) { - my ($req, $root2id, $it) = @_; + my ($req, $root2off, $it) = @_; eval { my $doc = $it->get_document; - my $G = join(' ', map { $root2id->{$_} } xap_terms('G', $doc)); + my $G = join(' ', map { $root2off->{$_} } xap_terms('G', $doc)); for my $p (@{$req->{A}}) { for (xap_terms($p, $doc)) { $req->{wbuf} .= "$_ $G\n"; @@ -118,14 +118,14 @@ sub dump_roots_flush ($$) { } sub cmd_dump_roots { - my ($req, $root2id_file, $qry_str) = @_; + my ($req, $root2off_file, $qry_str) = @_; $qry_str // die 'usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR'; $req->{A} or die 'dump_roots requires -A PREFIX'; - open my $fh, '<', $root2id_file; - my $root2id; # record format: $OIDHEX "\0" uint32_t + open my $fh, '<', $root2off_file; + my $root2off; # record format: $OIDHEX "\0" uint32_t my @x = split(/\0/, read_all $fh); while (defined(my $oidhex = shift @x)) { - $root2id->{$oidhex} = shift @x; + $root2off->{$oidhex} = shift @x; } my $opt = { relevance => -1, limit => $req->{'m'}, offset => $req->{o} // 0 }; @@ -134,7 +134,7 @@ sub cmd_dump_roots { $req->{wbuf} = ''; for my $it ($mset->items) { for (my $t = 10; $t > 0; --$t) { - $t = dump_roots_iter($req, $root2id, $it) // $t; + $t = dump_roots_iter($req, $root2off, $it) // $t; } if (!($req->{nr_out} & 0x3fff)) { dump_roots_flush($req, $fh); diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 0a652abd..b6b517d5 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -331,7 +331,7 @@ struct dump_roots_tmp { void *mm_ptr; char **entries; struct fbuf wbuf; - int root2id_fd; + int root2off_fd; }; // n.b. __cleanup__ works fine with C++ exceptions, but not longjmp @@ -364,8 +364,8 @@ static void xclose(int fd) static void dump_roots_ensure(void *ptr) { struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr; - if (drt->root2id_fd >= 0) - xclose(drt->root2id_fd); + if (drt->root2off_fd >= 0) + xclose(drt->root2off_fd); hdestroy(); // idempotent if (drt->mm_ptr && munmap(drt->mm_ptr, drt->sb.st_size)) EABORT("BUG: munmap(%p, %zu)", drt->mm_ptr, drt->sb.st_size); @@ -373,12 +373,12 @@ static void dump_roots_ensure(void *ptr) fbuf_ensure(&drt->wbuf); } -static bool root2ids_str(struct fbuf *root_ids, Xapian::Document *doc) +static bool root2offs_str(struct fbuf *root_offs, Xapian::Document *doc) { Xapian::TermIterator cur = doc->termlist_begin(); Xapian::TermIterator end = doc->termlist_end(); ENTRY e, *ep; - fbuf_init(root_ids); + fbuf_init(root_offs); for (cur.skip_to("G"); cur != end; cur++) { std::string tn = *cur; if (!starts_with(&tn, "G", 1)) @@ -389,21 +389,21 @@ static bool root2ids_str(struct fbuf *root_ids, Xapian::Document *doc) ep = hsearch(e, FIND); if (!ep) ABORT("hsearch miss `%s'", e.key); // ep->data is a NUL-terminated string matching /[0-9]+/ - fputc(' ', root_ids->fp); - fputs((const char *)ep->data, root_ids->fp); + fputc(' ', root_offs->fp); + fputs((const char *)ep->data, root_offs->fp); } - fputc('\n', root_ids->fp); - if (ferror(root_ids->fp) | fclose(root_ids->fp)) - err(EXIT_FAILURE, "ferror|fclose(root_ids)"); // ENOMEM - root_ids->fp = NULL; + fputc('\n', root_offs->fp); + if (ferror(root_offs->fp) | fclose(root_offs->fp)) + err(EXIT_FAILURE, "ferror|fclose(root_offs)"); // ENOMEM + root_offs->fp = NULL; return true; } // writes term values matching @pfx for a given @doc, ending the line -// with the contents of @root_ids +// with the contents of @root_offs static void dump_roots_term(struct req *req, const char *pfx, struct dump_roots_tmp *drt, - struct fbuf *root_ids, + struct fbuf *root_offs, Xapian::Document *doc) { Xapian::TermIterator cur = doc->termlist_begin(); @@ -415,7 +415,7 @@ static void dump_roots_term(struct req *req, const char *pfx, if (!starts_with(&tn, pfx, pfx_len)) continue; fputs(tn.c_str() + pfx_len, drt->wbuf.fp); - fwrite(root_ids->ptr, root_ids->len, 1, drt->wbuf.fp); + fwrite(root_offs->ptr, root_offs->len, 1, drt->wbuf.fp); ++req->nr_out; } } @@ -434,7 +434,7 @@ static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt) err(EXIT_FAILURE, "ferror|fclose(drt->wbuf.fp)"); drt->wbuf.fp = NULL; if (!drt->wbuf.len) goto done_free; - while (flock(drt->root2id_fd, LOCK_EX)) { + while (flock(drt->root2off_fd, LOCK_EX)) { if (errno == EINTR) continue; err(EXIT_FAILURE, "LOCK_EX"); // ENOLCK? } @@ -449,7 +449,7 @@ static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt) return false; } } while (drt->wbuf.len); - while (flock(drt->root2id_fd, LOCK_UN)) { + while (flock(drt->root2off_fd, LOCK_UN)) { if (errno == EINTR) continue; err(EXIT_FAILURE, "LOCK_UN"); // ENOLCK? } @@ -463,14 +463,14 @@ static enum exc_iter dump_roots_iter(struct req *req, struct dump_roots_tmp *drt, Xapian::MSetIterator *i) { - CLEANUP_FBUF struct fbuf root_ids = {}; // " $ID0 $ID1 $IDx..\n" + CLEANUP_FBUF struct fbuf root_offs = {}; // " $ID0 $ID1 $IDx..\n" try { Xapian::Document doc = i->get_document(); - if (!root2ids_str(&root_ids, &doc)) + if (!root2offs_str(&root_offs, &doc)) return ITER_ABORT; // bad request, abort for (int p = 0; p < req->pfxc; p++) dump_roots_term(req, req->pfxv[p], drt, - &root_ids, &doc); + &root_offs, &doc); } catch (const Xapian::DatabaseModifiedError & e) { req->srch->db->reopen(); return ITER_RETRY; @@ -502,28 +502,29 @@ static char *hsearch_enter_key(char *s) static bool cmd_dump_roots(struct req *req) { CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt = {}; - drt.root2id_fd = -1; + drt.root2off_fd = -1; if ((optind + 1) >= req->argc) ABORT("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR"); if (!req->pfxc) ABORT("dump_roots requires -A PREFIX"); - const char *root2id_file = req->argv[optind]; - drt.root2id_fd = open(root2id_file, O_RDONLY); - if (drt.root2id_fd < 0) - EABORT("open(%s)", root2id_file); - if (fstat(drt.root2id_fd, &drt.sb)) // ENOMEM? - err(EXIT_FAILURE, "fstat(%s)", root2id_file); + const char *root2off_file = req->argv[optind]; + drt.root2off_fd = open(root2off_file, O_RDONLY); + if (drt.root2off_fd < 0) + EABORT("open(%s)", root2off_file); + if (fstat(drt.root2off_fd, &drt.sb)) // ENOMEM? + err(EXIT_FAILURE, "fstat(%s)", root2off_file); // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0), // so /32 overestimates the number of expected entries by // ~%25 (as recommended by Linux hcreate(3) manpage) size_t est = (drt.sb.st_size / 32) + 1; //+1 for "\0" termination if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) err(EXIT_FAILURE, "%s size too big (%lld bytes > %zu)", - root2id_file, (long long)drt.sb.st_size, SIZE_MAX); + root2off_file, (long long)drt.sb.st_size, SIZE_MAX); drt.mm_ptr = mmap(NULL, drt.sb.st_size, PROT_READ, - MAP_PRIVATE, drt.root2id_fd, 0); + MAP_PRIVATE, drt.root2off_fd, 0); if (drt.mm_ptr == MAP_FAILED) - err(EXIT_FAILURE, "mmap(%zu, %s)", drt.sb.st_size,root2id_file); + err(EXIT_FAILURE, "mmap(%zu, %s)", + drt.sb.st_size, root2off_file); drt.entries = (char **)calloc(est * 2, sizeof(char *)); if (!drt.entries) err(EXIT_FAILURE, "calloc(%zu * 2, %zu)", est, sizeof(char *)); -- cgit v1.2.3-24-ge0c7