From 2e1a7378395af3c1db61f26b106befbc42876622 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 4 Aug 2021 10:02:48 +0000 Subject: extindex: fix boost with partial runs Boost relies on knowledge of all inboxes in a given config file to work properly. So while we support indexing a subset of inboxes, we must still account for boost in inboxes we're not indexing. So split internal inbox groups into "known" and "active", where previously we only cared for inboxes which were being actively indexed. Furthermore, boost checks need to be applied when a message arrives in different inboxes across multiple invocations. Reported-by: Konstantin Ryabitsev Link: https://public-inbox.org/meta/20210802204058.vscbxs5q7xyolyu2@nitro.local/ --- lib/PublicInbox/ExtSearchIdx.pm | 112 +++++++++++++++++++++++++++++----------- 1 file changed, 83 insertions(+), 29 deletions(-) (limited to 'lib/PublicInbox/ExtSearchIdx.pm') diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 7b7dfb53..cf61237c 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -44,7 +44,8 @@ sub new { topdir => $dir, creat => $opt->{creat}, ibx_map => {}, # (newsgroup//inboxdir) => $ibx - ibx_cfg => [], # by config section order + ibx_active => [], # by config section order + ibx_known => [], # by config section order indexlevel => $l, transact_bytes => 0, total_bytes => 0, @@ -61,23 +62,41 @@ sub new { } sub attach_inbox { - my ($self, $ibx) = @_; + my ($self, $ibx, $types) = @_; $self->{ibx_map}->{$ibx->eidx_key} //= do { - delete $self->{-ibx_ary}; # invalidate cache - push @{$self->{ibx_cfg}}, $ibx; + delete $self->{-ibx_ary_known}; # invalidate cache + delete $self->{-ibx_ary_active}; # invalidate cache + $types //= [ qw(active known) ]; + for my $t (@$types) { + push @{$self->{"ibx_$t"}}, $ibx; + } $ibx; } } sub _ibx_attach { # each_inbox callback - my ($ibx, $self) = @_; - attach_inbox($self, $ibx); + my ($ibx, $self, $types) = @_; + attach_inbox($self, $ibx, $types); } sub attach_config { - my ($self, $cfg) = @_; + my ($self, $cfg, $ibxs) = @_; $self->{cfg} = $cfg; - $cfg->each_inbox(\&_ibx_attach, $self); + my $types; + if ($ibxs) { + for my $ibx (@$ibxs) { + $self->{ibx_map}->{$ibx->eidx_key} //= do { + push @{$self->{ibx_active}}, $ibx; + push @{$self->{ibx_known}}, $ibx; + } + } + # invalidate cache + delete $self->{-ibx_ary_known}; + delete $self->{-ibx_ary_active}; + $types = [ 'known' ]; + } + $types //= [ qw(known active) ]; + $cfg->each_inbox(\&_ibx_attach, $self, $types); } sub check_batch_limit ($) { @@ -90,6 +109,25 @@ sub check_batch_limit ($) { ${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes}; } +sub apply_boost ($$) { + my ($req, $smsg) = @_; + my $id2pos = $req->{id2pos}; # index in ibx_sorted + my $xr3 = $req->{self}->{oidx}->get_xref3($smsg->{num}, 1); + @$xr3 = sort { + $id2pos->{$a->[0]} <=> $id2pos->{$b->[0]} + || + $a->[1] <=> $b->[1] # break ties with {xnum} + } @$xr3; + my $top_blob = unpack('H*', $xr3->[0]->[2]); + my $new_smsg = $req->{new_smsg}; + return if $top_blob ne $new_smsg->{blob}; # loser + + # replace the old smsg with the more boosted one + $new_smsg->{num} = $smsg->{num}; + $new_smsg->populate($req->{eml}, $req); + $req->{self}->{oidx}->add_overview($req->{eml}, $new_smsg); +} + sub do_xpost ($$) { my ($req, $smsg) = @_; my $self = $req->{self}; @@ -103,6 +141,7 @@ sub do_xpost ($$) { my $xnum = $req->{xnum}; $self->{oidx}->add_xref3($docid, $xnum, $oid, $eidx_key); $idx->ipc_do('add_eidx_info', $docid, $eidx_key, $eml); + apply_boost($req, $smsg) if $req->{boost_in_use}; } else { # 'd' my $rm_eidx_info; my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key, @@ -389,7 +428,8 @@ sub _ibx_for ($$$) { my ($self, $sync, $smsg) = @_; my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset'; my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos"; - $self->{-ibx_ary}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped" + $self->{-ibx_ary_known}->[$pos] // + die "BUG: ibx for $smsg->{blob} not mapped" } sub _fd_constrained ($) { @@ -403,7 +443,8 @@ sub _fd_constrained ($) { chomp($soft = `sh -c 'ulimit -n'`); } if (defined($soft)) { - my $want = scalar(@{$self->{-ibx_ary}}) + 64; # estimate + # $want is an estimate + my $want = scalar(@{$self->{ibx_active}}) + 64; my $ret = $want > $soft; if ($ret) { warn <{-ibx_ary} //= do { +sub ibx_sorted ($$) { + my ($self, $type) = @_; + $self->{"-ibx_ary_$type"} //= do { # highest boost first, stable for config-ordering tiebreaker use sort 'stable'; [ sort { ($b->{boost} // 0) <=> ($a->{boost} // 0) - } @{$self->{ibx_cfg}} ]; + } @{$self->{'ibx_'.$type} // die "BUG: $type unknown"} ]; } } +sub prep_id2pos ($) { + my ($self) = @_; + my %id2pos; + my $pos = 0; + $id2pos{$_->{-ibx_id}} = $pos++ for (@{ibx_sorted($self, 'known')}); + \%id2pos; +} + sub eidxq_process ($$) { # for reindexing my ($self, $sync) = @_; @@ -647,12 +696,7 @@ sub eidxq_process ($$) { # for reindexing my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq'); $pr->("Xapian indexing $min..$max (total=$tot)\n"); } - $sync->{id2pos} //= do { - my %id2pos; - my $pos = 0; - $id2pos{$_->{-ibx_id}} = $pos++ for (@{ibx_sorted($self)}); - \%id2pos; - }; + $sync->{id2pos} //= prep_id2pos($self); my ($del, $iter); restart: $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?'); @@ -841,7 +885,7 @@ sub eidx_reindex { warn "E: aborting --reindex\n"; return; } - for my $ibx (@{ibx_sorted($self)}) { + for my $ibx (@{ibx_sorted($self, 'active')}) { _reindex_inbox($self, $sync, $ibx); last if $sync->{quit}; } @@ -987,9 +1031,15 @@ sub eidx_sync { # main entry point local $SIG{QUIT} = $quit; local $SIG{INT} = $quit; local $SIG{TERM} = $quit; - for my $ibx (@{ibx_sorted($self)}) { + for my $ibx (@{ibx_sorted($self, 'known')}) { $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key); } + + if (scalar(grep { defined($_->{boost}) } @{$self->{ibx_known}})) { + $sync->{id2pos} //= prep_id2pos($self); + $sync->{boost_in_use} = 1; + } + if (my $msgids = delete($opt->{dedupe})) { local $sync->{checkpoint_unlocks} = 1; eidx_dedupe($self, $sync, $msgids); @@ -1001,7 +1051,7 @@ sub eidx_sync { # main entry point # don't use $_ here, it'll get clobbered by reindex_checkpoint if ($opt->{scan} // 1) { - for my $ibx (@{ibx_sorted($self)}) { + for my $ibx (@{ibx_sorted($self, 'active')}) { last if $sync->{quit}; sync_inbox($self, $sync, $ibx); } @@ -1143,7 +1193,7 @@ sub idx_init { # similar to V2Writable } undef $dh; } - for my $ibx (@{ibx_sorted($self)}) { + for my $ibx (@{ibx_sorted($self, 'active')}) { # create symlinks for multi-pack-index $git_midx += symlink_packs($ibx, $pd); # add new lines to our alternates file @@ -1208,8 +1258,10 @@ sub eidx_reload { # -extindex --watch SIGHUP handler my $pr = $self->{-watch_sync}->{-opt}->{-progress}; $pr->('reloading ...') if $pr; delete $self->{-resync_queue}; - delete $self->{-ibx_ary}; - $self->{ibx_cfg} = []; + delete $self->{-ibx_ary_known}; + delete $self->{-ibx_ary_active}; + $self->{ibx_known} = []; + $self->{ibx_active} = []; %{$self->{ibx_map}} = (); delete $self->{-watch_sync}->{id2pos}; my $cfg = PublicInbox::Config->new; @@ -1223,7 +1275,7 @@ sub eidx_reload { # -extindex --watch SIGHUP handler sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler my ($self) = @_; - $self->{-resync_queue} //= [ @{ibx_sorted($self)} ]; + $self->{-resync_queue} //= [ @{ibx_sorted($self, 'active')} ]; PublicInbox::DS::requeue($self); # trigger our ->event_step } @@ -1254,9 +1306,11 @@ sub eidx_watch { # public-inbox-extindex --watch main loop require PublicInbox::Sigfd; my $idler = PublicInbox::InboxIdle->new($self->{cfg}); if (!$self->{cfg}) { - $idler->watch_inbox($_) for (@{ibx_sorted($self)}); + $idler->watch_inbox($_) for (@{ibx_sorted($self, 'active')}); + } + for my $ibx (@{ibx_sorted($self, 'active')}) { + $ibx->subscribe_unlock(__PACKAGE__, $self) } - $_->subscribe_unlock(__PACKAGE__, $self) for (@{ibx_sorted($self)}); my $pr = $opt->{-progress}; $pr->("performing initial scan ...\n") if $pr; my $sync = eidx_sync($self, $opt); # initial sync -- cgit v1.2.3-24-ge0c7