about summary refs log tree commit homepage
path: root/lib/PublicInbox/ExtSearchIdx.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2020-12-15 02:02:22 +0000
committerEric Wong <e@80x24.org>2020-12-17 19:13:14 +0000
commit75ffc6a266699e465471adf5992d36a1db8dc1ae (patch)
tree874275015e175ebf12c9fad5858228021bba4ce5 /lib/PublicInbox/ExtSearchIdx.pm
parentc014cd93de1f2c73348db0e6531f93cf0f1be60f (diff)
downloadpublic-inbox-75ffc6a266699e465471adf5992d36a1db8dc1ae.tar.gz
Since we're inside a Xapian transaction, calling ->index_raw
followed by ->shard_add_eidx_info calls on the same docid
doesn't seem to hurt indexing performance.  It definitely
reduces FS read traffic and IPC from git at the cost of some
more IPC between the parent and workers.  Nevertheless, the code
and FD reductions seem worth it.
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm58
1 files changed, 34 insertions, 24 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 3b021a1b..d5295735 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -149,7 +149,7 @@ sub index_unseen ($) {
         my $oid = $new_smsg->{blob};
         my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset';
         $self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
-        $idx->index_raw(undef, $eml, $new_smsg, $ibx);
+        $idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key);
         check_batch_limit($req);
 }
 
@@ -395,23 +395,39 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
         done($self);
 }
 
+sub _ibx_for ($$$) {
+        my ($self, $sync, $smsg) = @_;
+        my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset';
+        my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+        $self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
+}
+
 sub _reindex_finalize ($$$) {
         my ($req, $smsg, $eml) = @_;
         my $sync = $req->{sync};
         my $self = $sync->{self};
-        my $by_chash = $req->{by_chash};
+        my $by_chash = delete $req->{by_chash} or die 'BUG: no {by_chash}';
         my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
         my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
         my $docid = $smsg->{num} = $orig_smsg->{num};
         $self->{oidx}->add_overview($eml, $smsg); # may rethread
         check_batch_limit({ %$sync, new_smsg => $smsg });
-        if ($nr == 1) { # likely, all good
-                $self->idx_shard($docid)->shard_reindex_docid($docid);
-                return;
+        my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
+        my $stable = delete($by_chash->{$chash0}) //
+                                die "BUG: $smsg->{blob} chash missing";
+        my $idx = $self->idx_shard($docid);
+        my $top_smsg = pop @$stable;
+        $top_smsg == $smsg or die 'BUG: top_smsg != smsg';
+        my $ibx = _ibx_for($self, $sync, $smsg);
+        $idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key);
+        for my $x (reverse @$stable) {
+                $ibx = _ibx_for($self, $sync, $x);
+                my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}';
+                $idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr);
         }
+        return if $nr == 1; # likely, all good
+
         warn "W: #$docid split into $nr due to deduplication change\n";
-        my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
-        delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
         my @todo;
         for my $ary (values %$by_chash) {
                 for my $x (reverse @$ary) {
@@ -419,21 +435,16 @@ sub _reindex_finalize ($$$) {
                         my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
                         die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
                 }
-                my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty";
+                my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
                 $x->{num} = delete($x->{xnum}) // die '{xnum} unset';
-                my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset';
-                my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
-                my $ibx = $self->{ibx_list}->[$pos] //
-                        die "BUG: ibx for $x->{blob} not mapped";
+                $ibx = _ibx_for($self, $sync, $x);
                 my $e = $ibx->over->get_art($x->{num});
                 $e->{blob} eq $x->{blob} or die <<EOF;
 $x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
 EOF
                 push @todo, $ibx, $e;
         }
-        $self->{oidx}->commit_lazy; # ensure shard workers can see xref removals
-        $self->{oidx}->begin_lazy;
-        $self->idx_shard($docid)->shard_reindex_docid($docid);
+        undef $by_chash;
         while (my ($ibx, $e) = splice(@todo, 0, 2)) {
                 reindex_unseen($self, $sync, $ibx, $e);
         }
@@ -444,14 +455,14 @@ sub _reindex_oid { # git->cat_async callback
         my $sync = $req->{sync};
         my $self = $sync->{self};
         my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
-        my $expect_oid = $req->{xr3}->[$req->{ix}]->[2];
+        my $expect_oid = $req->{xr3r}->[$req->{ix}]->[2];
         my $docid = $orig_smsg->{num};
         if (is_bad_blob($oid, $type, $size, $expect_oid)) {
                 my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
                 if ($remain == 0) {
                         warn "W: #$docid gone or corrupted\n";
                         $self->idx_shard($docid)->shard_remove($docid);
-                } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+                } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
                         $self->git->cat_async($next_oid, \&_reindex_oid, $req);
                 } else {
                         warn "BUG: #$docid gone (UNEXPECTED)\n";
@@ -468,10 +479,11 @@ sub _reindex_oid { # git->cat_async callback
                                 cotime => $orig_smsg->{ts} });
         my $chash = content_hash($eml);
         $re_smsg->{chash} = $chash;
-        $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1];
-        $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0];
+        $re_smsg->{xnum} = $req->{xr3r}->[$req->{ix}]->[1];
+        $re_smsg->{ibx_id} = $req->{xr3r}->[$req->{ix}]->[0];
+        $re_smsg->{hdr} = $eml->header_obj;
         push @{$req->{by_chash}->{$chash}}, $re_smsg;
-        if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+        if (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
                 $self->git->cat_async($next_oid, \&_reindex_oid, $req);
         } else { # last $re_smsg is the highest priority xref3
                 local $self->{current_info} = "$ci #$docid";
@@ -492,7 +504,7 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
                 return;
         }
 
-        # we sort {xr3} in the reverse order of {ibx_list} so we can
+        # we sort {xr3r} in the reverse order of {ibx_list} so we can
         # hit the common case in _reindex_finalize without rereading
         # from git (or holding multiple messages in memory).
         my $id2pos = $sync->{id2pos}; # index in {ibx_list}
@@ -502,15 +514,13 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
                 $b->[1] <=> $a->[1] # break ties with {xnum}
         } @$xr3;
         @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
-        my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 };
+        my $req = { orig_smsg => $smsg, sync => $sync, xr3r => $xr3, ix => 0 };
         $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
 }
 
 sub eidxq_process ($$) { # for reindexing
         my ($self, $sync) = @_;
 
-        $self->{oidx}->commit_lazy; # ensure shard workers can see it
-        $self->{oidx}->begin_lazy;
         my $dbh = $self->{oidx}->dbh;
         my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
         ${$sync->{nr}} = 0;