about summary refs log tree commit homepage
path: root/lib/PublicInbox/ExtSearchIdx.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/ExtSearchIdx.pm')
-rw-r--r--lib/PublicInbox/ExtSearchIdx.pm112
1 files changed, 83 insertions, 29 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 7b7dfb53..cf61237c 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -44,7 +44,8 @@ sub new {
                 topdir => $dir,
                 creat => $opt->{creat},
                 ibx_map => {}, # (newsgroup//inboxdir) => $ibx
-                ibx_cfg => [], # by config section order
+                ibx_active => [], # by config section order
+                ibx_known => [], # by config section order
                 indexlevel => $l,
                 transact_bytes => 0,
                 total_bytes => 0,
@@ -61,23 +62,41 @@ sub new {
 }
 
 sub attach_inbox {
-        my ($self, $ibx) = @_;
+        my ($self, $ibx, $types) = @_;
         $self->{ibx_map}->{$ibx->eidx_key} //= do {
-                delete $self->{-ibx_ary}; # invalidate cache
-                push @{$self->{ibx_cfg}}, $ibx;
+                delete $self->{-ibx_ary_known}; # invalidate cache
+                delete $self->{-ibx_ary_active}; # invalidate cache
+                $types //= [ qw(active known) ];
+                for my $t (@$types) {
+                        push @{$self->{"ibx_$t"}}, $ibx;
+                }
                 $ibx;
         }
 }
 
 sub _ibx_attach { # each_inbox callback
-        my ($ibx, $self) = @_;
-        attach_inbox($self, $ibx);
+        my ($ibx, $self, $types) = @_;
+        attach_inbox($self, $ibx, $types);
 }
 
 sub attach_config {
-        my ($self, $cfg) = @_;
+        my ($self, $cfg, $ibxs) = @_;
         $self->{cfg} = $cfg;
-        $cfg->each_inbox(\&_ibx_attach, $self);
+        my $types;
+        if ($ibxs) {
+                for my $ibx (@$ibxs) {
+                        $self->{ibx_map}->{$ibx->eidx_key} //= do {
+                                push @{$self->{ibx_active}}, $ibx;
+                                push @{$self->{ibx_known}}, $ibx;
+                        }
+                }
+                # invalidate cache
+                delete $self->{-ibx_ary_known};
+                delete $self->{-ibx_ary_active};
+                $types = [ 'known' ];
+        }
+        $types //= [ qw(known active) ];
+        $cfg->each_inbox(\&_ibx_attach, $self, $types);
 }
 
 sub check_batch_limit ($) {
@@ -90,6 +109,25 @@ sub check_batch_limit ($) {
         ${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes};
 }
 
+sub apply_boost ($$) {
+        my ($req, $smsg) = @_;
+        my $id2pos = $req->{id2pos}; # index in ibx_sorted
+        my $xr3 = $req->{self}->{oidx}->get_xref3($smsg->{num}, 1);
+        @$xr3 = sort {
+                $id2pos->{$a->[0]} <=> $id2pos->{$b->[0]}
+                                ||
+                $a->[1] <=> $b->[1] # break ties with {xnum}
+        } @$xr3;
+        my $top_blob = unpack('H*', $xr3->[0]->[2]);
+        my $new_smsg = $req->{new_smsg};
+        return if $top_blob ne $new_smsg->{blob}; # loser
+
+        # replace the old smsg with the more boosted one
+        $new_smsg->{num} = $smsg->{num};
+        $new_smsg->populate($req->{eml}, $req);
+        $req->{self}->{oidx}->add_overview($req->{eml}, $new_smsg);
+}
+
 sub do_xpost ($$) {
         my ($req, $smsg) = @_;
         my $self = $req->{self};
@@ -103,6 +141,7 @@ sub do_xpost ($$) {
                 my $xnum = $req->{xnum};
                 $self->{oidx}->add_xref3($docid, $xnum, $oid, $eidx_key);
                 $idx->ipc_do('add_eidx_info', $docid, $eidx_key, $eml);
+                apply_boost($req, $smsg) if $req->{boost_in_use};
         } else { # 'd'
                 my $rm_eidx_info;
                 my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key,
@@ -389,7 +428,8 @@ sub _ibx_for ($$$) {
         my ($self, $sync, $smsg) = @_;
         my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset';
         my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
-        $self->{-ibx_ary}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
+        $self->{-ibx_ary_known}->[$pos] //
+                die "BUG: ibx for $smsg->{blob} not mapped"
 }
 
 sub _fd_constrained ($) {
@@ -403,7 +443,8 @@ sub _fd_constrained ($) {
                         chomp($soft = `sh -c 'ulimit -n'`);
                 }
                 if (defined($soft)) {
-                        my $want = scalar(@{$self->{-ibx_ary}}) + 64; # estimate
+                        # $want is an estimate
+                        my $want = scalar(@{$self->{ibx_active}}) + 64;
                         my $ret = $want > $soft;
                         if ($ret) {
                                 warn <<EOF;
@@ -622,17 +663,25 @@ EOF
         undef;
 }
 
-sub ibx_sorted ($) {
-        my ($self) = @_;
-        $self->{-ibx_ary} //= do {
+sub ibx_sorted ($$) {
+        my ($self, $type) = @_;
+        $self->{"-ibx_ary_$type"} //= do {
                 # highest boost first, stable for config-ordering tiebreaker
                 use sort 'stable';
                 [ sort {
                         ($b->{boost} // 0) <=> ($a->{boost} // 0)
-                  } @{$self->{ibx_cfg}} ];
+                  } @{$self->{'ibx_'.$type} // die "BUG: $type unknown"} ];
         }
 }
 
+sub prep_id2pos ($) {
+        my ($self) = @_;
+        my %id2pos;
+        my $pos = 0;
+        $id2pos{$_->{-ibx_id}} = $pos++ for (@{ibx_sorted($self, 'known')});
+        \%id2pos;
+}
+
 sub eidxq_process ($$) { # for reindexing
         my ($self, $sync) = @_;
 
@@ -647,12 +696,7 @@ sub eidxq_process ($$) { # for reindexing
                 my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
                 $pr->("Xapian indexing $min..$max (total=$tot)\n");
         }
-        $sync->{id2pos} //= do {
-                my %id2pos;
-                my $pos = 0;
-                $id2pos{$_->{-ibx_id}} = $pos++ for (@{ibx_sorted($self)});
-                \%id2pos;
-        };
+        $sync->{id2pos} //= prep_id2pos($self);
         my ($del, $iter);
 restart:
         $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
@@ -841,7 +885,7 @@ sub eidx_reindex {
                 warn "E: aborting --reindex\n";
                 return;
         }
-        for my $ibx (@{ibx_sorted($self)}) {
+        for my $ibx (@{ibx_sorted($self, 'active')}) {
                 _reindex_inbox($self, $sync, $ibx);
                 last if $sync->{quit};
         }
@@ -987,9 +1031,15 @@ sub eidx_sync { # main entry point
         local $SIG{QUIT} = $quit;
         local $SIG{INT} = $quit;
         local $SIG{TERM} = $quit;
-        for my $ibx (@{ibx_sorted($self)}) {
+        for my $ibx (@{ibx_sorted($self, 'known')}) {
                 $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
         }
+
+        if (scalar(grep { defined($_->{boost}) } @{$self->{ibx_known}})) {
+                $sync->{id2pos} //= prep_id2pos($self);
+                $sync->{boost_in_use} = 1;
+        }
+
         if (my $msgids = delete($opt->{dedupe})) {
                 local $sync->{checkpoint_unlocks} = 1;
                 eidx_dedupe($self, $sync, $msgids);
@@ -1001,7 +1051,7 @@ sub eidx_sync { # main entry point
 
         # don't use $_ here, it'll get clobbered by reindex_checkpoint
         if ($opt->{scan} // 1) {
-                for my $ibx (@{ibx_sorted($self)}) {
+                for my $ibx (@{ibx_sorted($self, 'active')}) {
                         last if $sync->{quit};
                         sync_inbox($self, $sync, $ibx);
                 }
@@ -1143,7 +1193,7 @@ sub idx_init { # similar to V2Writable
                 }
                 undef $dh;
         }
-        for my $ibx (@{ibx_sorted($self)}) {
+        for my $ibx (@{ibx_sorted($self, 'active')}) {
                 # create symlinks for multi-pack-index
                 $git_midx += symlink_packs($ibx, $pd);
                 # add new lines to our alternates file
@@ -1208,8 +1258,10 @@ sub eidx_reload { # -extindex --watch SIGHUP handler
                 my $pr = $self->{-watch_sync}->{-opt}->{-progress};
                 $pr->('reloading ...') if $pr;
                 delete $self->{-resync_queue};
-                delete $self->{-ibx_ary};
-                $self->{ibx_cfg} = [];
+                delete $self->{-ibx_ary_known};
+                delete $self->{-ibx_ary_active};
+                $self->{ibx_known} = [];
+                $self->{ibx_active} = [];
                 %{$self->{ibx_map}} = ();
                 delete $self->{-watch_sync}->{id2pos};
                 my $cfg = PublicInbox::Config->new;
@@ -1223,7 +1275,7 @@ sub eidx_reload { # -extindex --watch SIGHUP handler
 
 sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler
         my ($self) = @_;
-        $self->{-resync_queue} //= [ @{ibx_sorted($self)} ];
+        $self->{-resync_queue} //= [ @{ibx_sorted($self, 'active')} ];
         PublicInbox::DS::requeue($self); # trigger our ->event_step
 }
 
@@ -1254,9 +1306,11 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
         require PublicInbox::Sigfd;
         my $idler = PublicInbox::InboxIdle->new($self->{cfg});
         if (!$self->{cfg}) {
-                $idler->watch_inbox($_) for (@{ibx_sorted($self)});
+                $idler->watch_inbox($_) for (@{ibx_sorted($self, 'active')});
+        }
+        for my $ibx (@{ibx_sorted($self, 'active')}) {
+                $ibx->subscribe_unlock(__PACKAGE__, $self)
         }
-        $_->subscribe_unlock(__PACKAGE__, $self) for (@{ibx_sorted($self)});
         my $pr = $opt->{-progress};
         $pr->("performing initial scan ...\n") if $pr;
         my $sync = eidx_sync($self, $opt); # initial sync