From f5462c8b125be3881c28d77617c26ed79fdaa58f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 24 Jul 2020 05:56:05 +0000 Subject: searchidx: $batch_cb => v1_checkpoint Another closure gone, and we may be able to share more code with v2 in upcoming commits. --- lib/PublicInbox/SearchIdx.pm | 90 ++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 45 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 39dc1f87..fe089c8e 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -573,9 +573,48 @@ sub ck_size { # check_async cb for -index --max-size=... } } +sub v1_checkpoint ($$;$) { + my ($self, $sync, $stk) = @_; + $self->{ibx}->git->check_async_wait; + $self->{ibx}->git->cat_async_wait; + + # latest_cmt may be undef + my $newest = $stk ? $stk->{latest_cmt} : undef; + if ($newest) { + my $cur = $self->{mm}->last_commit || ''; + if (need_update($self, $cur, $newest)) { + $self->{mm}->last_commit($newest); + } + } else { + ${$sync->{max}} = $BATCH_BYTES; + } + + $self->{mm}->{dbh}->commit; + if ($newest && need_xapian($self)) { + my $cur = $self->{xdb}->get_metadata('last_commit'); + if (need_update($self, $cur, $newest)) { + $self->{xdb}->set_metadata('last_commit', $newest); + } + } + + $self->{over}->rethread_done($sync->{-opt}) if $newest; # all done + commit_txn_lazy($self); + $self->{ibx}->git->cleanup; + my $nr = ${$sync->{nr}}; + idx_release($self, $nr); + # let another process do some work... + if (my $pr = $sync->{-opt}->{-progress}) { + $pr->("indexed $nr/$sync->{ntodo}\n") if $nr; + } + if (!$stk) { # more to come + begin_txn_lazy($self); + $self->{mm}->{dbh}->begin_work; + } +} + # only for v1 sub process_stack { - my ($self, $stk, $sync, $batch_cb) = @_; + my ($self, $sync, $stk) = @_; my $git = $self->{ibx}->git; my $max = $BATCH_BYTES; my $nr = 0; @@ -583,6 +622,7 @@ sub process_stack { $sync->{max} = \$max; $sync->{sidx} = $self; + $self->{mm}->{dbh}->begin_work; if (my @leftovers = keys %{delete($sync->{D}) // {}}) { warn('W: unindexing '.scalar(@leftovers)." leftovers\n"); for my $oid (@leftovers) { @@ -599,19 +639,12 @@ sub process_stack { } else { $git->cat_async($oid, \&index_both, $arg); } - if ($max <= 0) { - $git->check_async_wait; - $git->cat_async_wait; - $max = $BATCH_BYTES; - $batch_cb->($nr); - } + v1_checkpoint($self, $sync) if $max <= 0; } elsif ($f eq 'd') { $git->cat_async($oid, \&unindex_both, $self); } } - $git->check_async_wait; - $git->cat_async_wait; - $batch_cb->($nr, $stk); + v1_checkpoint($self, $sync, $stk); } sub log2stack ($$$$) { @@ -729,7 +762,7 @@ sub _index_sync { my $git = $self->{ibx}->git; $git->batch_prepare; my $pr = $opts->{-progress}; - my $sync = { reindex => $opts->{reindex} }; + my $sync = { reindex => $opts->{reindex}, -opt => $opts }; my $xdb = $self->begin_txn_lazy; $self->{over}->rethread_prepare($opts); my $mm = _msgmap_init($self); @@ -750,40 +783,7 @@ sub _index_sync { my $stk = prepare_stack($self, $sync, $range); $sync->{ntodo} = $stk ? $stk->num_records : 0; $pr->("$sync->{ntodo}\n") if $pr; # continue previous line - - my $dbh = $mm->{dbh}; - my $batch_cb = sub { - my ($nr, $stk) = @_; - # latest_cmt may be undef - my $newest = $stk ? $stk->{latest_cmt} : undef; - if ($newest) { - my $cur = $mm->last_commit || ''; - if (need_update($self, $cur, $newest)) { - $mm->last_commit($newest); - } - } - $dbh->commit; - if ($newest && need_xapian($self)) { - my $cur = $xdb->get_metadata('last_commit'); - if (need_update($self, $cur, $newest)) { - $xdb->set_metadata('last_commit', $newest); - } - } - - $self->{over}->rethread_done($opts) if $newest; # all done - $self->commit_txn_lazy; - $git->cleanup; - $xdb = idx_release($self, $nr); - # let another process do some work... - $pr->("indexed $nr/$sync->{ntodo}\n") if $pr && $nr; - if (!$stk) { # more to come - $xdb = $self->begin_txn_lazy; - $dbh->begin_work; - } - }; - - $dbh->begin_work; - process_stack($self, $stk, $sync, $batch_cb); + process_stack($self, $sync, $stk); } sub DESTROY { -- cgit v1.2.3-24-ge0c7