diff options
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 112 |
1 files changed, 83 insertions, 29 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 7b7dfb53..cf61237c 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -44,7 +44,8 @@ sub new { topdir => $dir, creat => $opt->{creat}, ibx_map => {}, # (newsgroup//inboxdir) => $ibx - ibx_cfg => [], # by config section order + ibx_active => [], # by config section order + ibx_known => [], # by config section order indexlevel => $l, transact_bytes => 0, total_bytes => 0, @@ -61,23 +62,41 @@ sub new { } sub attach_inbox { - my ($self, $ibx) = @_; + my ($self, $ibx, $types) = @_; $self->{ibx_map}->{$ibx->eidx_key} //= do { - delete $self->{-ibx_ary}; # invalidate cache - push @{$self->{ibx_cfg}}, $ibx; + delete $self->{-ibx_ary_known}; # invalidate cache + delete $self->{-ibx_ary_active}; # invalidate cache + $types //= [ qw(active known) ]; + for my $t (@$types) { + push @{$self->{"ibx_$t"}}, $ibx; + } $ibx; } } sub _ibx_attach { # each_inbox callback - my ($ibx, $self) = @_; - attach_inbox($self, $ibx); + my ($ibx, $self, $types) = @_; + attach_inbox($self, $ibx, $types); } sub attach_config { - my ($self, $cfg) = @_; + my ($self, $cfg, $ibxs) = @_; $self->{cfg} = $cfg; - $cfg->each_inbox(\&_ibx_attach, $self); + my $types; + if ($ibxs) { + for my $ibx (@$ibxs) { + $self->{ibx_map}->{$ibx->eidx_key} //= do { + push @{$self->{ibx_active}}, $ibx; + push @{$self->{ibx_known}}, $ibx; + } + } + # invalidate cache + delete $self->{-ibx_ary_known}; + delete $self->{-ibx_ary_active}; + $types = [ 'known' ]; + } + $types //= [ qw(known active) ]; + $cfg->each_inbox(\&_ibx_attach, $self, $types); } sub check_batch_limit ($) { @@ -90,6 +109,25 @@ sub check_batch_limit ($) { ${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes}; } +sub apply_boost ($$) { + my ($req, $smsg) = @_; + my $id2pos = $req->{id2pos}; # index in ibx_sorted + my $xr3 = $req->{self}->{oidx}->get_xref3($smsg->{num}, 1); + @$xr3 = sort { + $id2pos->{$a->[0]} <=> $id2pos->{$b->[0]} + || + $a->[1] <=> $b->[1] # break ties with {xnum} + } @$xr3; + my $top_blob = unpack('H*', $xr3->[0]->[2]); + my $new_smsg = $req->{new_smsg}; + return if $top_blob ne $new_smsg->{blob}; # loser + + # replace the old smsg with the more boosted one + $new_smsg->{num} = $smsg->{num}; + $new_smsg->populate($req->{eml}, $req); + $req->{self}->{oidx}->add_overview($req->{eml}, $new_smsg); +} + sub do_xpost ($$) { my ($req, $smsg) = @_; my $self = $req->{self}; @@ -103,6 +141,7 @@ sub do_xpost ($$) { my $xnum = $req->{xnum}; $self->{oidx}->add_xref3($docid, $xnum, $oid, $eidx_key); $idx->ipc_do('add_eidx_info', $docid, $eidx_key, $eml); + apply_boost($req, $smsg) if $req->{boost_in_use}; } else { # 'd' my $rm_eidx_info; my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key, @@ -389,7 +428,8 @@ sub _ibx_for ($$$) { my ($self, $sync, $smsg) = @_; my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset'; my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos"; - $self->{-ibx_ary}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped" + $self->{-ibx_ary_known}->[$pos] // + die "BUG: ibx for $smsg->{blob} not mapped" } sub _fd_constrained ($) { @@ -403,7 +443,8 @@ sub _fd_constrained ($) { chomp($soft = `sh -c 'ulimit -n'`); } if (defined($soft)) { - my $want = scalar(@{$self->{-ibx_ary}}) + 64; # estimate + # $want is an estimate + my $want = scalar(@{$self->{ibx_active}}) + 64; my $ret = $want > $soft; if ($ret) { warn <<EOF; @@ -622,17 +663,25 @@ EOF undef; } -sub ibx_sorted ($) { - my ($self) = @_; - $self->{-ibx_ary} //= do { +sub ibx_sorted ($$) { + my ($self, $type) = @_; + $self->{"-ibx_ary_$type"} //= do { # highest boost first, stable for config-ordering tiebreaker use sort 'stable'; [ sort { ($b->{boost} // 0) <=> ($a->{boost} // 0) - } @{$self->{ibx_cfg}} ]; + } @{$self->{'ibx_'.$type} // die "BUG: $type unknown"} ]; } } +sub prep_id2pos ($) { + my ($self) = @_; + my %id2pos; + my $pos = 0; + $id2pos{$_->{-ibx_id}} = $pos++ for (@{ibx_sorted($self, 'known')}); + \%id2pos; +} + sub eidxq_process ($$) { # for reindexing my ($self, $sync) = @_; @@ -647,12 +696,7 @@ sub eidxq_process ($$) { # for reindexing my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq'); $pr->("Xapian indexing $min..$max (total=$tot)\n"); } - $sync->{id2pos} //= do { - my %id2pos; - my $pos = 0; - $id2pos{$_->{-ibx_id}} = $pos++ for (@{ibx_sorted($self)}); - \%id2pos; - }; + $sync->{id2pos} //= prep_id2pos($self); my ($del, $iter); restart: $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?'); @@ -841,7 +885,7 @@ sub eidx_reindex { warn "E: aborting --reindex\n"; return; } - for my $ibx (@{ibx_sorted($self)}) { + for my $ibx (@{ibx_sorted($self, 'active')}) { _reindex_inbox($self, $sync, $ibx); last if $sync->{quit}; } @@ -987,9 +1031,15 @@ sub eidx_sync { # main entry point local $SIG{QUIT} = $quit; local $SIG{INT} = $quit; local $SIG{TERM} = $quit; - for my $ibx (@{ibx_sorted($self)}) { + for my $ibx (@{ibx_sorted($self, 'known')}) { $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key); } + + if (scalar(grep { defined($_->{boost}) } @{$self->{ibx_known}})) { + $sync->{id2pos} //= prep_id2pos($self); + $sync->{boost_in_use} = 1; + } + if (my $msgids = delete($opt->{dedupe})) { local $sync->{checkpoint_unlocks} = 1; eidx_dedupe($self, $sync, $msgids); @@ -1001,7 +1051,7 @@ sub eidx_sync { # main entry point # don't use $_ here, it'll get clobbered by reindex_checkpoint if ($opt->{scan} // 1) { - for my $ibx (@{ibx_sorted($self)}) { + for my $ibx (@{ibx_sorted($self, 'active')}) { last if $sync->{quit}; sync_inbox($self, $sync, $ibx); } @@ -1143,7 +1193,7 @@ sub idx_init { # similar to V2Writable } undef $dh; } - for my $ibx (@{ibx_sorted($self)}) { + for my $ibx (@{ibx_sorted($self, 'active')}) { # create symlinks for multi-pack-index $git_midx += symlink_packs($ibx, $pd); # add new lines to our alternates file @@ -1208,8 +1258,10 @@ sub eidx_reload { # -extindex --watch SIGHUP handler my $pr = $self->{-watch_sync}->{-opt}->{-progress}; $pr->('reloading ...') if $pr; delete $self->{-resync_queue}; - delete $self->{-ibx_ary}; - $self->{ibx_cfg} = []; + delete $self->{-ibx_ary_known}; + delete $self->{-ibx_ary_active}; + $self->{ibx_known} = []; + $self->{ibx_active} = []; %{$self->{ibx_map}} = (); delete $self->{-watch_sync}->{id2pos}; my $cfg = PublicInbox::Config->new; @@ -1223,7 +1275,7 @@ sub eidx_reload { # -extindex --watch SIGHUP handler sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler my ($self) = @_; - $self->{-resync_queue} //= [ @{ibx_sorted($self)} ]; + $self->{-resync_queue} //= [ @{ibx_sorted($self, 'active')} ]; PublicInbox::DS::requeue($self); # trigger our ->event_step } @@ -1254,9 +1306,11 @@ sub eidx_watch { # public-inbox-extindex --watch main loop require PublicInbox::Sigfd; my $idler = PublicInbox::InboxIdle->new($self->{cfg}); if (!$self->{cfg}) { - $idler->watch_inbox($_) for (@{ibx_sorted($self)}); + $idler->watch_inbox($_) for (@{ibx_sorted($self, 'active')}); + } + for my $ibx (@{ibx_sorted($self, 'active')}) { + $ibx->subscribe_unlock(__PACKAGE__, $self) } - $_->subscribe_unlock(__PACKAGE__, $self) for (@{ibx_sorted($self)}); my $pr = $opt->{-progress}; $pr->("performing initial scan ...\n") if $pr; my $sync = eidx_sync($self, $opt); # initial sync |