about summary refs log tree commit homepage
path: root/lib/PublicInbox/ExtSearchIdx.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm58
1 files changed, 34 insertions, 24 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 3b021a1b..d5295735 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -149,7 +149,7 @@ sub index_unseen ($) {
         my $oid = $new_smsg->{blob};
         my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset';
         $self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
-        $idx->index_raw(undef, $eml, $new_smsg, $ibx);
+        $idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key);
         check_batch_limit($req);
 }
 
@@ -395,23 +395,39 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
         done($self);
 }
 
+sub _ibx_for ($$$) {
+        my ($self, $sync, $smsg) = @_;
+        my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset';
+        my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+        $self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
+}
+
 sub _reindex_finalize ($$$) {
         my ($req, $smsg, $eml) = @_;
         my $sync = $req->{sync};
         my $self = $sync->{self};
-        my $by_chash = $req->{by_chash};
+        my $by_chash = delete $req->{by_chash} or die 'BUG: no {by_chash}';
         my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
         my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
         my $docid = $smsg->{num} = $orig_smsg->{num};
         $self->{oidx}->add_overview($eml, $smsg); # may rethread
         check_batch_limit({ %$sync, new_smsg => $smsg });
-        if ($nr == 1) { # likely, all good
-                $self->idx_shard($docid)->shard_reindex_docid($docid);
-                return;
+        my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
+        my $stable = delete($by_chash->{$chash0}) //
+                                die "BUG: $smsg->{blob} chash missing";
+        my $idx = $self->idx_shard($docid);
+        my $top_smsg = pop @$stable;
+        $top_smsg == $smsg or die 'BUG: top_smsg != smsg';
+        my $ibx = _ibx_for($self, $sync, $smsg);
+        $idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key);
+        for my $x (reverse @$stable) {
+                $ibx = _ibx_for($self, $sync, $x);
+                my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}';
+                $idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr);
         }
+        return if $nr == 1; # likely, all good
+
         warn "W: #$docid split into $nr due to deduplication change\n";
-        my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
-        delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
         my @todo;
         for my $ary (values %$by_chash) {
                 for my $x (reverse @$ary) {
@@ -419,21 +435,16 @@ sub _reindex_finalize ($$$) {
                         my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
                         die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
                 }
-                my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty";
+                my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
                 $x->{num} = delete($x->{xnum}) // die '{xnum} unset';
-                my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset';
-                my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
-                my $ibx = $self->{ibx_list}->[$pos] //
-                        die "BUG: ibx for $x->{blob} not mapped";
+                $ibx = _ibx_for($self, $sync, $x);
                 my $e = $ibx->over->get_art($x->{num});
                 $e->{blob} eq $x->{blob} or die <<EOF;
 $x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
 EOF
                 push @todo, $ibx, $e;
         }
-        $self->{oidx}->commit_lazy; # ensure shard workers can see xref removals
-        $self->{oidx}->begin_lazy;
-        $self->idx_shard($docid)->shard_reindex_docid($docid);
+        undef $by_chash;
         while (my ($ibx, $e) = splice(@todo, 0, 2)) {
                 reindex_unseen($self, $sync, $ibx, $e);
         }
@@ -444,14 +455,14 @@ sub _reindex_oid { # git->cat_async callback
         my $sync = $req->{sync};
         my $self = $sync->{self};
         my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
-        my $expect_oid = $req->{xr3}->[$req->{ix}]->[2];
+        my $expect_oid = $req->{xr3r}->[$req->{ix}]->[2];
         my $docid = $orig_smsg->{num};
         if (is_bad_blob($oid, $type, $size, $expect_oid)) {
                 my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
                 if ($remain == 0) {
                         warn "W: #$docid gone or corrupted\n";
                         $self->idx_shard($docid)->shard_remove($docid);
-                } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+                } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
                         $self->git->cat_async($next_oid, \&_reindex_oid, $req);
                 } else {
                         warn "BUG: #$docid gone (UNEXPECTED)\n";
@@ -468,10 +479,11 @@ sub _reindex_oid { # git->cat_async callback
                                 cotime => $orig_smsg->{ts} });
         my $chash = content_hash($eml);
         $re_smsg->{chash} = $chash;
-        $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1];
-        $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0];
+        $re_smsg->{xnum} = $req->{xr3r}->[$req->{ix}]->[1];
+        $re_smsg->{ibx_id} = $req->{xr3r}->[$req->{ix}]->[0];
+        $re_smsg->{hdr} = $eml->header_obj;
         push @{$req->{by_chash}->{$chash}}, $re_smsg;
-        if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+        if (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
                 $self->git->cat_async($next_oid, \&_reindex_oid, $req);
         } else { # last $re_smsg is the highest priority xref3
                 local $self->{current_info} = "$ci #$docid";
@@ -492,7 +504,7 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
                 return;
         }
 
-        # we sort {xr3} in the reverse order of {ibx_list} so we can
+        # we sort {xr3r} in the reverse order of {ibx_list} so we can
         # hit the common case in _reindex_finalize without rereading
         # from git (or holding multiple messages in memory).
         my $id2pos = $sync->{id2pos}; # index in {ibx_list}
@@ -502,15 +514,13 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
                 $b->[1] <=> $a->[1] # break ties with {xnum}
         } @$xr3;
         @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
-        my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 };
+        my $req = { orig_smsg => $smsg, sync => $sync, xr3r => $xr3, ix => 0 };
         $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
 }
 
 sub eidxq_process ($$) { # for reindexing
         my ($self, $sync) = @_;
 
-        $self->{oidx}->commit_lazy; # ensure shard workers can see it
-        $self->{oidx}->begin_lazy;
         my $dbh = $self->{oidx}->dbh;
         my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
         ${$sync->{nr}} = 0;