about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-03-20 19:04:04 +0900
committerEric Wong <e@80x24.org>2021-03-21 09:45:47 +0000
commitb6829bbfd86f5d22a8ffb80fd7bfe59299fe6b55 (patch)
tree97a6e0a9bb519c24fdbef52776312861990af9b5 /lib
parent7d2e572aca7297ea2015d2b6e7c71b672521ec82 (diff)
downloadpublic-inbox-b6829bbfd86f5d22a8ffb80fd7bfe59299fe6b55.tar.gz
"lei q" now preserves changes per-message keywords across
invocations when it's --output (Maildir or mbox) is reused
(with or without --augment).

In the future, these changes will be monitored via inotify,
EVFILT_VNODE or IMAP IDLE, too.

Unfortunately, this currently prevents "lei import" from ever
importing a message that's in an external.  That will be fixed
in a future change.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/LeiOverview.pm4
-rw-r--r--lib/PublicInbox/LeiSearch.pm37
-rw-r--r--lib/PublicInbox/LeiStore.pm83
-rw-r--r--lib/PublicInbox/LeiToMail.pm16
-rw-r--r--lib/PublicInbox/LeiXSearch.pm17
-rw-r--r--lib/PublicInbox/Over.pm22
-rw-r--r--lib/PublicInbox/OverIdx.pm10
-rw-r--r--lib/PublicInbox/SearchIdx.pm3
8 files changed, 123 insertions, 69 deletions
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 1036f465..48237f8a 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -216,9 +216,11 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                 }
         } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
                 my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
+                my $lse = $lei->{sto}->search;
                 sub { # DIY prettiness :P
                         my ($smsg, $mitem) = @_;
                         return if $dedupe->is_smsg_dup($smsg);
+                        $lse->xsmsg_vmd($smsg);
                         $smsg = _unbless_smsg($smsg, $mitem);
                         $buf .= "{\n";
                         $buf .= join(",\n", map {
@@ -238,9 +240,11 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                 }
         } elsif ($json) {
                 my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
+                my $lse = $lei->{sto}->search;
                 sub {
                         my ($smsg, $mitem) = @_;
                         return if $dedupe->is_smsg_dup($smsg);
+                        $lse->xsmsg_vmd($smsg);
                         $buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
                         return if length($buf) < 65536;
                         my $lk = $self->lock_for_scope;
diff --git a/lib/PublicInbox/LeiSearch.pm b/lib/PublicInbox/LeiSearch.pm
index 2e3f10fd..360a37e5 100644
--- a/lib/PublicInbox/LeiSearch.pm
+++ b/lib/PublicInbox/LeiSearch.pm
@@ -27,6 +27,20 @@ sub msg_keywords {
         wantarray ? sort(keys(%$kw)) : $kw;
 }
 
+sub xsmsg_vmd {
+        my ($self, $smsg) = @_;
+        return if $smsg->{kw};
+        my $xdb = $self->xdb; # set {nshard};
+        my %kw;
+        $kw{flagged} = 1 if delete($smsg->{lei_q_tt_flagged});
+        my @num = $self->over->blob_exists($smsg->{blob});
+        for my $num (@num) { # there should only be one...
+                my $kw = xap_terms('K', $xdb, num2docid($self, $num));
+                %kw = (%kw, %$kw);
+        }
+        $smsg->{kw} = [ sort keys %kw ] if scalar(keys(%kw));
+}
+
 # when a message has no Message-IDs at all, this is needed for
 # unsent Draft messages, at least
 sub content_key ($) {
@@ -43,41 +57,42 @@ sub content_key ($) {
 }
 
 sub _cmp_1st { # git->cat_async callback
-        my ($bref, $oid, $type, $size, $cmp) = @_; # cmp: [chash, found, smsg]
-        if (content_hash(PublicInbox::Eml->new($bref)) eq $cmp->[0]) {
+        my ($bref, $oid, $type, $size, $cmp) = @_; # cmp: [chash, xoids, smsg]
+        if ($bref && content_hash(PublicInbox::Eml->new($bref)) eq $cmp->[0]) {
                 $cmp->[1]->{$oid} = $cmp->[2]->{num};
         }
 }
 
-sub xids_for { # returns { OID => docid } mapping for $eml matches
+sub xoids_for { # returns { OID => docid } mapping for $eml matches
         my ($self, $eml, $min) = @_;
         my ($chash, $mids) = content_key($eml);
         my @overs = ($self->over // $self->overs_all);
         my $git = $self->git;
-        my $found = {};
+        my $xoids = {};
         for my $mid (@$mids) {
                 for my $o (@overs) {
                         my ($id, $prev);
                         while (my $cur = $o->next_by_mid($mid, \$id, \$prev)) {
-                                next if $found->{$cur->{blob}};
+                                next if $cur->{bytes} == 0 ||
+                                        $xoids->{$cur->{blob}};
                                 $git->cat_async($cur->{blob}, \&_cmp_1st,
-                                                [ $chash, $found, $cur ]);
-                                if ($min && scalar(keys %$found) >= $min) {
+                                                [ $chash, $xoids, $cur ]);
+                                if ($min && scalar(keys %$xoids) >= $min) {
                                         $git->cat_async_wait;
-                                        return $found;
+                                        return $xoids;
                                 }
                         }
                 }
         }
         $git->cat_async_wait;
-        scalar(keys %$found) ? $found : undef;
+        scalar(keys %$xoids) ? $xoids : undef;
 }
 
 # returns true if $eml is indexed by lei/store and keywords don't match
 sub kw_changed {
         my ($self, $eml, $new_kw_sorted) = @_;
-        my $found = xids_for($self, $eml, 1) // return;
-        my ($num) = values %$found;
+        my $xoids = xoids_for($self, $eml, 1) // return;
+        my ($num) = values %$xoids;
         my @cur_kw = msg_keywords($self, $num);
         join("\0", @$new_kw_sorted) eq join("\0", @cur_kw) ? 0 : 1;
 }
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index c1abc288..c66d3dc2 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -114,6 +114,7 @@ sub _docids_for ($$) {
         for my $mid (@$mids) {
                 my ($id, $prev);
                 while (my $cur = $oidx->next_by_mid($mid, \$id, \$prev)) {
+                        next if $cur->{bytes} == 0; # external-only message
                         my $oid = $cur->{blob};
                         my $docid = $cur->{num};
                         my $bref = $im ? $im->cat_blob($oid) : undef;
@@ -163,7 +164,7 @@ sub add_eml {
         my ($self, $eml, $vmd) = @_;
         my $im = $self->importer; # may create new epoch
         my $eidx = eidx_init($self); # writes ALL.git/objects/info/alternates
-        my $oidx = $eidx->{oidx};
+        my $oidx = $eidx->{oidx}; # PublicInbox::Import::add checks this
         my $smsg = bless { -oidx => $oidx }, 'PublicInbox::Smsg';
         $im->add($eml, undef, $smsg) or return; # duplicate returns undef
 
@@ -193,22 +194,54 @@ sub set_eml {
         add_eml($self, $eml, $vmd) // set_eml_vmd($self, $eml, $vmd);
 }
 
-sub add_eml_maybe {
-        my ($self, $eml) = @_;
-        my $lxs = $self->{lxs_all_local} // die 'BUG: no {lxs_all_local}';
-        return if $lxs->xids_for($eml, 1);
-        add_eml($self, $eml);
-}
-
 # set or update keywords for external message, called via ipc_do
-sub set_xkw {
-        my ($self, $eml, $kw) = @_;
-        my $lxs = $self->{lxs_all_local} // die 'BUG: no {lxs_all_local}';
-        if ($lxs->xids_for($eml, 1)) { # is it in a local external?
-                # TODO: index keywords only
-        } else {
-                set_eml($self, $eml, { kw => $kw });
+sub set_xvmd {
+        my ($self, $xoids, $eml, $vmd) = @_;
+
+        my $eidx = eidx_init($self);
+        my $oidx = $eidx->{oidx};
+
+        # see if we can just update existing docs
+        for my $oid (keys %$xoids) {
+                my @docids = $oidx->blob_exists($oid) or next;
+                scalar(@docids) > 1 and
+                        warn "W: $oid indexed as multiple docids: @docids\n";
+                for my $docid (@docids) {
+                        my $idx = $eidx->idx_shard($docid);
+                        $idx->ipc_do('set_vmd', $docid, $vmd);
+                }
+                delete $xoids->{$oid}; # all done with this oid
         }
+        return unless scalar(keys(%$xoids));
+
+        # see if it was indexed, but with different OID(s)
+        if (my @docids = _docids_for($self, $eml)) {
+                for my $docid (@docids) {
+                        for my $oid (keys %$xoids) {
+                                $oidx->add_xref3($docid, -1, $oid, '.');
+                        }
+                        my $idx = $eidx->idx_shard($docid);
+                        $idx->ipc_do('set_vmd', $docid, $vmd);
+                }
+                return;
+        }
+        # totally unseen
+        my $smsg = bless { blob => '' }, 'PublicInbox::Smsg';
+        $smsg->{num} = $oidx->adj_counter('eidx_docid', '+');
+        # save space for an externals-only message
+        my $hdr = $eml->header_obj;
+        $smsg->populate($hdr); # sets lines == 0
+        $smsg->{bytes} = 0;
+        delete @$smsg{qw(From Subject)};
+        $smsg->{to} = $smsg->{cc} = $smsg->{from} = '';
+        $oidx->add_overview($hdr, $smsg); # subject+references for threading
+        $smsg->{subject} = '';
+        for my $oid (keys %$xoids) {
+                $oidx->add_xref3($smsg->{num}, -1, $oid, '.');
+        }
+        my $idx = $eidx->idx_shard($smsg->{num});
+        $idx->index_eml(PublicInbox::Eml->new("\n\n"), $smsg);
+        $idx->ipc_do('add_vmd', $smsg->{num}, $vmd);
 }
 
 sub checkpoint {
@@ -240,28 +273,9 @@ sub ipc_atfork_child {
         $self->SUPER::ipc_atfork_child;
 }
 
-sub refresh_local_externals {
-        my ($self) = @_;
-        my $cfg = $self->{lei}->_lei_cfg or return;
-        my $cur_cfg = $self->{cur_cfg} // -1;
-        my $lxs = $self->{lxs_all_local};
-        if ($cfg != $cur_cfg || !$lxs) {
-                $lxs = PublicInbox::LeiXSearch->new;
-                my @loc = $self->{lei}->externals_each;
-                for my $loc (@loc) { # locals only
-                        $lxs->prepare_external($loc) if -d $loc;
-                }
-                $self->{lei}->ale->refresh_externals($lxs);
-                $lxs->{git} = $self->{lei}->ale->git;
-                $self->{lxs_all_local} = $lxs;
-                $self->{cur_cfg} = $cfg;
-        }
-}
-
 sub write_prepare {
         my ($self, $lei) = @_;
         unless ($self->{-ipc_req}) {
-                require PublicInbox::LeiXSearch;
                 $self->ipc_lock_init($lei->store_path . '/ipc.lock');
                 # Mail we import into lei are private, so headers filtered out
                 # by -mda for public mail are not appropriate
@@ -269,7 +283,6 @@ sub write_prepare {
                 $self->ipc_worker_spawn('lei_store', $lei->oldset,
                                         { lei => $lei });
         }
-        my $wait = $self->ipc_do('refresh_local_externals');
         $lei->{sto} = $self;
 }
 
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 7e821646..3e6cf00c 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -11,6 +11,7 @@ use PublicInbox::Lock;
 use PublicInbox::ProcessPipe;
 use PublicInbox::Spawn qw(which spawn popen_rd);
 use PublicInbox::LeiDedupe;
+use PublicInbox::Git;
 use PublicInbox::GitAsyncCat;
 use PublicInbox::PktOp qw(pkt_do);
 use Symbol qw(gensym);
@@ -260,10 +261,12 @@ sub _mbox_write_cb ($$) {
         my $atomic_append = !defined($ovv->{lock_path});
         my $dedupe = $lei->{dedupe};
         $dedupe->prepare_dedupe;
+        my $lse = $lei->{sto} ? $lei->{sto}->search : undef;
         sub { # for git_to_mail
                 my ($buf, $smsg, $eml) = @_;
                 $eml //= PublicInbox::Eml->new($buf);
                 return if $dedupe->is_dup($eml, $smsg->{blob});
+                $lse->xsmsg_vmd($smsg) if $lse;
                 $buf = $eml2mbox->($eml, $smsg);
                 return atomic_append($lei, $buf) if $atomic_append;
                 my $lk = $ovv->lock_for_scope;
@@ -275,10 +278,15 @@ sub update_kw_maybe ($$$$) {
         my ($lei, $lse, $eml, $kw) = @_;
         return unless $lse;
         my $x = $lse->kw_changed($eml, $kw);
+        my $vmd = { kw => $kw };
         if ($x) {
-                $lei->{sto}->ipc_do('set_eml', $eml, { kw => $kw });
+                $lei->{sto}->ipc_do('set_eml', $eml, $vmd);
         } elsif (!defined($x)) {
-                $lei->{sto}->ipc_do('set_xkw', $eml, $kw);
+                if (my $xoids = $lei->{ale}->xoids_for($eml)) {
+                        $lei->{sto}->ipc_do('set_xvmd', $xoids, $eml, $vmd);
+                } else {
+                        $lei->{sto}->ipc_do('set_eml', $eml, $vmd);
+                }
         }
 }
 
@@ -342,10 +350,12 @@ sub _maildir_write_cb ($$) {
         my $dedupe = $lei->{dedupe};
         $dedupe->prepare_dedupe if $dedupe;
         my $dst = $lei->{ovv}->{dst};
+        my $lse = $lei->{sto} ? $lei->{sto}->search : undef;
         sub { # for git_to_mail
                 my ($buf, $smsg, $eml) = @_;
                 $dst // return $lei->fail; # dst may be undef-ed in last run
                 $buf //= \($eml->as_string);
+                $lse->xsmsg_vmd($smsg) if $lse;
                 return _buf2maildir($dst, $buf, $smsg) if !$dedupe;
                 $eml //= PublicInbox::Eml->new($$buf); # copy buf
                 return if $dedupe->is_dup($eml, $smsg->{blob});
@@ -361,6 +371,7 @@ sub _imap_write_cb ($$) {
         my $imap_append = $lei->{net}->can('imap_append');
         my $mic = $lei->{net}->mic_get($self->{uri});
         my $folder = $self->{uri}->mailbox;
+        my $lse = $lei->{sto} ? $lei->{sto}->search : undef;
         sub { # for git_to_mail
                 my ($bref, $smsg, $eml) = @_;
                 $mic // return $lei->fail; # dst may be undef-ed in last run
@@ -368,6 +379,7 @@ sub _imap_write_cb ($$) {
                         $eml //= PublicInbox::Eml->new($$bref); # copy bref
                         return if $dedupe->is_dup($eml, $smsg->{blob});
                 }
+                $lse->xsmsg_vmd($smsg) if $lse;
                 eval { $imap_append->($mic, $folder, $bref, $smsg, $eml) };
                 if (my $err = $@) {
                         undef $mic;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 1266b3b3..57717b87 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -83,6 +83,7 @@ sub smsg_for {
         my $num = int(($docid - 1) / $nshard) + 1;
         my $ibx = $self->{shard2ibx}->[$shard];
         my $smsg = $ibx->over->get_art($num);
+        return if $smsg->{bytes} == 0;
         mitem_kw($smsg, $mitem) if $ibx->can('msg_keywords');
         $smsg->{docid} = $docid;
         $smsg;
@@ -97,11 +98,6 @@ sub recent {
 
 sub over {}
 
-sub overs_all { # for xids_for
-        my ($self) = @_;
-        grep(defined, map { $_->over } locals($self))
-}
-
 sub _mset_more ($$) {
         my ($mset, $mo) = @_;
         my $size = $mset->size;
@@ -153,7 +149,7 @@ sub query_thread_mset { # for --threads
         my $mset;
         my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
         my $can_kw = !!$ibxish->can('msg_keywords');
-        my $fl = $lei->{opt}->{threads} > 1 ? [ 'flagged' ] : undef;
+        my $fl = $lei->{opt}->{threads} > 1 ? 1 : undef;
         do {
                 $mset = $srch->mset($mo->{qstr}, $mo);
                 mset_progress($lei, $desc, $mset->size,
@@ -165,13 +161,14 @@ sub query_thread_mset { # for --threads
                 while ($over->expand_thread($ctx)) {
                         for my $n (@{$ctx->{xids}}) {
                                 my $smsg = $over->get_art($n) or next;
-                                wait_startq($lei);
                                 my $mitem = delete $n2item{$smsg->{num}};
+                                next if $smsg->{bytes} == 0;
+                                wait_startq($lei); # wait for keyword updates
                                 if ($mitem) {
                                         if ($can_kw) {
                                                 mitem_kw($smsg, $mitem, $fl);
                                         } elsif ($fl) {
-                                                $smsg->{kw} = $fl;
+                                                $smsg->{lei_q_tt_flagged} = 1;
                                         }
                                 }
                                 $each_smsg->($smsg, $mitem);
@@ -209,8 +206,8 @@ sub query_mset { # non-parallel for non-"--threads" users
 
 sub each_remote_eml { # callback for MboxReader->mboxrd
         my ($eml, $self, $lei, $each_smsg) = @_;
-        if (my $sto = $self->{import_sto}) {
-                $sto->ipc_do('add_eml_maybe', $eml);
+        if ($self->{import_sto} && !$lei->{ale}->xoids_for($eml, 1)) {
+                $self->{import_sto}->ipc_do('add_eml', $eml);
         }
         my $smsg = bless {}, 'PublicInbox::Smsg';
         $smsg->populate($eml);
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
index 06ea439d..587e0516 100644
--- a/lib/PublicInbox/Over.pm
+++ b/lib/PublicInbox/Over.pm
@@ -7,7 +7,7 @@
 package PublicInbox::Over;
 use strict;
 use v5.10.1;
-use DBI;
+use DBI qw(:sql_types); # SQL_BLOB
 use DBD::SQLite;
 use PublicInbox::Smsg;
 use Compress::Zlib qw(uncompress);
@@ -349,4 +349,24 @@ sub check_inodes {
         }
 }
 
+sub blob_exists {
+        my ($self, $oidhex) = @_;
+        if (wantarray) {
+                my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
+SELECT docid FROM xref3 WHERE oidbin = ?
+
+                $sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
+                $sth->execute;
+                my $tmp = $sth->fetchall_arrayref;
+                map { $_->[0] } @$tmp;
+        } else {
+                my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
+SELECT COUNT(*) FROM xref3 WHERE oidbin = ?
+
+                $sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
+                $sth->execute;
+                $sth->fetchrow_array;
+        }
+}
+
 1;
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index 9013ae23..e1cd31b9 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -668,14 +668,4 @@ DELETE FROM eidxq WHERE docid = ?
 
 }
 
-sub blob_exists {
-        my ($self, $oidhex) = @_;
-        my $sth = $self->dbh->prepare_cached(<<'', undef, 1);
-SELECT COUNT(*) FROM xref3 WHERE oidbin = ?
-
-        $sth->bind_param(1, pack('H*', $oidhex), SQL_BLOB);
-        $sth->execute;
-        $sth->fetchrow_array;
-}
-
 1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index e2a1a678..3237aadc 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -494,7 +494,10 @@ sub add_eidx_info {
         begin_txn_lazy($self);
         my $doc = _get_doc($self, $docid) or return;
         term_generator($self)->set_document($doc);
+
+        # '.' is special for lei_store
         $doc->add_boolean_term('O'.$eidx_key) if $eidx_key ne '.';
+
         index_list_id($self, $doc, $eml);
         $self->{xdb}->replace_document($docid, $doc);
 }