diff options
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/SearchIdxShard.pm | 10 | ||||
-rw-r--r-- | lib/PublicInbox/V2Writable.pm | 51 |
2 files changed, 45 insertions, 16 deletions
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index cb79f3dc..59b36087 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -89,16 +89,20 @@ sub shard_worker_loop ($$$$$) { # called by V2Writable sub index_raw { - my ($self, $msgref, $mime, $smsg) = @_; + my ($self, $msgref, $eml, $smsg) = @_; if (my $w = $self->{w}) { # mid must be last, it can contain spaces (but not LF) print $w join(' ', @$smsg{qw(raw_bytes bytes num blob ds ts mid)}), "\n", $$msgref or die "failed to write shard $!\n"; } else { - $$msgref = undef; + if ($eml) { + $$msgref = undef; + } else { # --xapian-only + --sequential-shard: + $eml = PublicInbox::Eml->new($msgref); + } $self->begin_txn_lazy; - $self->add_message($mime, $smsg); + $self->add_message($eml, $smsg); } } diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 7bc24592..6b1effe5 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -1185,22 +1185,24 @@ sub index_xap_only { # git->cat_async callback my ($bref, $oid, $type, $size, $smsg) = @_; my $self = $smsg->{v2w}; my $idx = idx_shard($self, $smsg->{num} % $self->{shards}); - $idx->begin_txn_lazy; - $idx->add_message(PublicInbox::Eml->new($bref), $smsg); + $smsg->{raw_bytes} = $size; + $idx->index_raw($bref, undef, $smsg); $self->{transact_bytes} += $size; } -sub index_seq_shard ($$$) { - my ($self, $sync, $off) = @_; +sub index_xap_step ($$$;$) { + my ($self, $sync, $beg, $step) = @_; my $ibx = $self->{ibx}; - my $max = $ibx->mm->max or return; my $all = $ibx->git; my $over = $ibx->over; my $batch_bytes = $PublicInbox::SearchIdx::BATCH_BYTES; + $step //= $self->{shards}; + my $end = $sync->{art_end}; if (my $pr = $sync->{-opt}->{-progress}) { - $pr->("Xapian indexlevel=$ibx->{indexlevel} % $off\n"); + $pr->("Xapian indexlevel=$ibx->{indexlevel} ". + "$beg..$end (% $step)\n"); } - for (my $num = $off; $num <= $max; $num += $self->{shards}) { + for (my $num = $beg; $num <= $end; $num += $step) { my $smsg = $over->get_art($num) or next; $smsg->{v2w} = $self; $all->cat_async($smsg->{blob}, \&index_xap_only, $smsg); @@ -1244,10 +1246,37 @@ sub index_epoch ($$$) { update_last_commit($self, $git, $i, $stk->{latest_cmt}); } +sub xapian_only { + my ($self, $opt, $sync) = @_; + my $seq = $opt->{sequentialshard}; + local $self->{parallel} = 0 if $seq; + $self->idx_init($opt); # acquire lock + if (my $art_end = $self->{ibx}->mm->max) { + $sync //= { + need_checkpoint => \(my $bool = 0), + -opt => $opt, + v2w => $self, + nr => \(my $nr = 0), + -regen_fmt => "%u/?\n", + }; + $sync->{art_end} = $art_end; + if ($seq || !$self->{parallel}) { + my $shard_end = $self->{shards} - 1; + index_xap_step($self, $sync, $_) for (0..$shard_end); + } else { # parallel (maybe) + index_xap_step($self, $sync, 0, 1); + } + } + $self->{ibx}->git->cat_async_wait; + $self->done; +} + # public, called by public-inbox-index sub index_sync { my ($self, $opt) = @_; - $opt ||= {}; + $opt //= $_[1] //= {}; + goto \&xapian_only if $opt->{xapianonly}; + my $pr = $opt->{-progress}; my $epoch_max; my $latest = git_dir_latest($self, \$epoch_max); @@ -1292,13 +1321,9 @@ sub index_sync { } if ($seq) { # deal with Xapian shards sequentially - my $end = $self->{shards} - 1; $self->{ibx}->{indexlevel} = $idxlevel; delete $sync->{mm_tmp}; - $self->idx_init($opt); # re-acquire lock - index_seq_shard($self, $sync, $_) for (0..$end); - $self->{ibx}->git->cat_async_wait; - $self->done; + xapian_only($self, $opt, $sync); } # reindex does not pick up new changes, so we rerun w/o it: |