about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/Msgmap.pm40
-rw-r--r--lib/PublicInbox/SearchIdx.pm14
-rw-r--r--lib/PublicInbox/SearchIdxSkeleton.pm1
-rw-r--r--lib/PublicInbox/V2Writable.pm134
-rwxr-xr-xscript/public-inbox-index25
5 files changed, 189 insertions, 25 deletions
diff --git a/lib/PublicInbox/Msgmap.pm b/lib/PublicInbox/Msgmap.pm
index 78922d36..12833050 100644
--- a/lib/PublicInbox/Msgmap.pm
+++ b/lib/PublicInbox/Msgmap.pm
@@ -24,9 +24,8 @@ sub new {
         new_file($class, "$d/msgmap.sqlite3", $writable);
 }
 
-sub new_file {
-        my ($class, $f, $writable) = @_;
-
+sub dbh_new {
+        my ($f, $writable) = @_;
         my $dbh = DBI->connect("dbi:SQLite:dbname=$f",'','', {
                 AutoCommit => 1,
                 RaiseError => 1,
@@ -35,6 +34,13 @@ sub new_file {
                 sqlite_use_immediate_transaction => 1,
         });
         $dbh->do('PRAGMA case_sensitive_like = ON');
+        $dbh;
+}
+
+sub new_file {
+        my ($class, $f, $writable) = @_;
+
+        my $dbh = dbh_new($f, $writable);
         my $self = bless { dbh => $dbh }, $class;
 
         if ($writable) {
@@ -49,12 +55,13 @@ sub new_file {
 # used to keep track of used numeric mappings for v2 reindex
 sub tmp_clone {
         my ($self) = @_;
-        my ($fh, $fn) = tempfile(EXLOCK => 0);
+        my ($fh, $fn) = tempfile('msgmap-XXXXXXXX', EXLOCK => 0, TMPDIR => 1);
         $self->{dbh}->sqlite_backup_to_file($fn);
         my $tmp = ref($self)->new_file($fn, 1);
         $tmp->{dbh}->do('PRAGMA synchronous = OFF');
         $tmp->{tmp_name} = $fn; # SQLite won't work if unlinked, apparently
-        $fh = undef;
+        $tmp->{pid} = $$;
+        close $fh or die "failed to close $fn: $!";
         $tmp;
 }
 
@@ -205,7 +212,28 @@ sub mid_set {
 sub DESTROY {
         my ($self) = @_;
         delete $self->{dbh};
-        unlink $self->{tmp_name} if defined $self->{tmp_name};
+        my $f = delete $self->{tmp_name};
+        if (defined $f && $self->{pid} == $$) {
+                unlink $f or warn "failed to unlink $f: $!\n";
+        }
+}
+
+sub atfork_parent {
+        my ($self) = @_;
+        my $f = $self->{tmp_name} or die "not a temporary clone\n";
+        delete $self->{dbh} and die "tmp_clone dbh not prepared for parent";
+        my $dbh = $self->{dbh} = dbh_new($f, 1);
+        $dbh->do('PRAGMA synchronous = OFF');
+}
+
+sub atfork_prepare {
+        my ($self) = @_;
+        my $f = $self->{tmp_name} or die "not a temporary clone\n";
+        $self->{pid} == $$ or
+                die "BUG: atfork_prepare not called from $self->{pid}\n";
+        $self->{dbh} or die "temporary clone not open\n";
+        # must clobber prepared statements
+        %$self = (tmp_name => $f, pid => $$);
 }
 
 1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index ef723a4b..7ac16ec2 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -352,7 +352,7 @@ sub add_message {
 
                 # populates smsg->references for smsg->to_doc_data
                 my $refs = parse_references($smsg);
-                $mid0 = $mids->[0] unless defined $mid0;
+                $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility
                 my $data = $smsg->to_doc_data($oid, $mid0);
                 foreach my $mid (@$mids) {
                         $tg->index_text($mid, 1, 'XM');
@@ -369,10 +369,12 @@ sub add_message {
                         }
                 }
 
+                $self->delete_article($num) if defined $num; # for reindexing
                 if ($skel) {
                         push @values, $mids, $xpath, $data;
                         $skel->index_skeleton(\@values);
                         $doc->add_boolean_term('Q' . $_) foreach @$mids;
+                        $doc->add_boolean_term('XNUM' . $num) if defined $num;
                         $doc_id = $self->{xdb}->add_document($doc);
                 } else {
                         $doc_id = link_and_save($self, $doc, $mids, $refs,
@@ -421,6 +423,16 @@ sub remove_message {
         }
 }
 
+sub delete_article {
+        my ($self, $num) = @_;
+        my $ndel = 0;
+        batch_do($self, 'XNUM' . $num, sub {
+                my ($ids) = @_;
+                $ndel += scalar @$ids;
+                $self->{xdb}->delete_document($_) for @$ids;
+        });
+}
+
 # MID is a hint in V2
 sub remove_by_oid {
         my ($self, $oid, $mid) = @_;
diff --git a/lib/PublicInbox/SearchIdxSkeleton.pm b/lib/PublicInbox/SearchIdxSkeleton.pm
index 78a17303..4f158169 100644
--- a/lib/PublicInbox/SearchIdxSkeleton.pm
+++ b/lib/PublicInbox/SearchIdxSkeleton.pm
@@ -134,6 +134,7 @@ sub index_skeleton_real ($$) {
         $smsg->load_from_data($doc_data);
         my $num = $values->[PublicInbox::Search::NUM];
         my @refs = ($smsg->references =~ /<([^>]+)>/g);
+        $self->delete_article($num) if defined $num; # for reindexing
         $self->link_and_save($doc, $mids, \@refs, $num, $xpath);
 }
 
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 46bfebbd..550a74d4 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -359,6 +359,23 @@ sub git_init {
         $git_dir
 }
 
+sub git_dir_latest {
+        my ($self, $max) = @_;
+        $$max = -1;
+        my $pfx = "$self->{-inbox}->{mainrepo}/git";
+        return unless -d $pfx;
+        my $latest;
+        opendir my $dh, $pfx or die "opendir $pfx: $!\n";
+        while (defined(my $git_dir = readdir($dh))) {
+                $git_dir =~ m!\A(\d+)\.git\z! or next;
+                if ($1 > $$max) {
+                        $$max = $1;
+                        $latest = "$pfx/$git_dir";
+                }
+        }
+        $latest;
+}
+
 sub importer {
         my ($self) = @_;
         my $im = $self->{im};
@@ -375,20 +392,9 @@ sub importer {
                         return $self->import_init($git, 0);
                 }
         }
-        my $latest;
-        my $max = -1;
         my $new = 0;
-        my $pfx = "$self->{-inbox}->{mainrepo}/git";
-        if (-d $pfx) {
-                foreach my $git_dir (glob("$pfx/*.git")) {
-                        $git_dir =~ m!/(\d+)\.git\z! or next;
-                        my $n = $1;
-                        if ($n > $max) {
-                                $max = $n;
-                                $latest = $git_dir;
-                        }
-                }
-        }
+        my $max;
+        my $latest = git_dir_latest($self, \$max);
         if (defined $latest) {
                 my $git = PublicInbox::Git->new($latest);
                 my $packed_bytes = $git->packed_bytes;
@@ -466,6 +472,8 @@ sub lookup_content {
 
 sub atfork_child {
         my ($self) = @_;
+        my $fh = delete $self->{reindex_pipe};
+        close $fh if $fh;
         if (my $parts = $self->{idx_parts}) {
                 $_->atfork_child foreach @$parts;
         }
@@ -474,4 +482,104 @@ sub atfork_child {
         }
 }
 
+sub mark_deleted {
+        my ($self, $D, $git, $oid) = @_;
+        my $msgref = $git->cat_file($oid);
+        my $mime = PublicInbox::MIME->new($$msgref);
+        my $mids = mids($mime->header_obj);
+        my $cid = content_id($mime);
+        foreach my $mid (@$mids) {
+                $D->{$mid.$cid} = 1;
+        }
+}
+
+sub reindex_oid {
+        my ($self, $mm_tmp, $D, $git, $oid) = @_;
+        my $len;
+        my $msgref = $git->cat_file($oid, \$len);
+        my $mime = PublicInbox::MIME->new($$msgref);
+        my $mids = mids($mime->header_obj);
+        my $cid = content_id($mime);
+
+        # get the NNTP article number we used before, highest number wins
+        # and gets deleted from mm_tmp;
+        my $mid0;
+        my $num = -1;
+        my $del = 0;
+        foreach my $mid (@$mids) {
+                $del += (delete $D->{$mid.$cid} || 0);
+                my $n = $mm_tmp->num_for($mid);
+                if (defined $n && $n > $num) {
+                        $mid0 = $mid;
+                        $num = $n;
+                }
+        }
+        if (!defined($mid0) || $del) {
+                return if (!defined($mid0) && $del); # expected for deletes
+
+                my $id = '<' . join('> <', @$mids) . '>';
+                defined($mid0) or
+                        warn "Skipping $id, no article number found\n";
+                if ($del && defined($mid0)) {
+                        warn "$id was deleted $del " .
+                                "time(s) but mapped to article #$num\n";
+                }
+                return;
+
+        }
+        $mm_tmp->mid_delete($mid0) or
+                die "failed to delete <$mid0> for article #$num\n";
+
+        my $nparts = $self->{partitions};
+        my $part = $num % $nparts;
+        my $idx = $self->idx_part($part);
+        $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
+        my $n = $self->{transact_bytes} += $len;
+        if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
+                $git->cleanup;
+                $mm_tmp->atfork_prepare;
+                $self->done; # release lock
+                # allow -watch or -mda to write...
+                $self->idx_init; # reacquire lock
+                $mm_tmp->atfork_parent;
+        }
+}
+
+sub reindex {
+        my ($self) = @_;
+        my $ibx = $self->{-inbox};
+        my $pfx = "$ibx->{mainrepo}/git";
+        my $max_git;
+        my $latest = git_dir_latest($self, \$max_git);
+        return unless defined $latest;
+        my @cmd = qw(log --raw -r --pretty=tformat:%h
+                        --no-notes --no-color --no-abbrev);
+        my $head = $ibx->{ref_head} || 'refs/heads/master';
+        $self->idx_init; # acquire lock
+        my $x40 = qr/[a-f0-9]{40}/;
+        my $mm_tmp = $self->{skel}->{mm}->tmp_clone;
+        my $D = {};
+
+        # work backwards through history
+        for (my $cur = $max_git; $cur >= 0; $cur--) {
+                die "already reindexing!\n" if delete $self->{reindex_pipe};
+                my $cmt;
+                my $git_dir = "$pfx/$cur.git";
+                my $git = PublicInbox::Git->new($git_dir);
+                my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $head);
+                while (<$fh>) {
+                        if (/\A$x40$/o) {
+                                chomp($cmt = $_);
+                        } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o) {
+                                $self->reindex_oid($mm_tmp, $D, $git, $1);
+                        } elsif (m!\A:\d{6} 100644 $x40 ($x40) [AM]\t_/D$!o) {
+                                $self->mark_deleted($D, $git, $1);
+                        }
+                }
+                delete $self->{reindex_pipe};
+        }
+        my ($min, $max) = $mm_tmp->minmax;
+        defined $max and die "leftover article numbers at $min..$max\n";
+}
+
 1;
diff --git a/script/public-inbox-index b/script/public-inbox-index
index 1debbaac..cea35738 100755
--- a/script/public-inbox-index
+++ b/script/public-inbox-index
@@ -31,6 +31,9 @@ my @dirs;
 sub resolve_repo_dir {
         my ($cd) = @_;
         my $prefix = defined $cd ? $cd : './';
+        if (-d $prefix && -f "$prefix/inbox.lock") { # v2
+                return abs_path($prefix);
+        }
 
         my @cmd = qw(git rev-parse --git-dir);
         my $cmd = join(' ', @cmd);
@@ -75,14 +78,26 @@ foreach my $k (keys %$config) {
 }
 
 foreach my $dir (@dirs) {
+        if (!ref($dir) && -f "$dir/inbox.lock") { # v2
+                my $ibx = { mainrepo => $dir, name => 'unnamed' };
+                $dir = PublicInbox::Inbox->new($ibx);
+        }
         index_dir($dir);
 }
 
 sub index_dir {
-        my ($git_dir) = @_;
-        if (!ref $git_dir && ! -d $git_dir) {
-                die "$git_dir does not appear to be a git repository\n";
+        my ($repo) = @_;
+        if (!ref $repo && ! -d $repo) {
+                die "$repo does not appear to be an inbox repository\n";
+        }
+        if (ref($repo) && ($repo->{version} || 1) == 2) {
+                eval { require PublicInbox::V2Writable };
+                die "v2 requirements not met: $@\n" if $@;
+                my $v2w = PublicInbox::V2Writable->new($repo);
+                $v2w->reindex;
+                $v2w->done;
+        } else {
+                my $s = PublicInbox::SearchIdx->new($repo, 1);
+                $s->index_sync({ reindex => $reindex });
         }
-        my $s = PublicInbox::SearchIdx->new($git_dir, 1);
-        $s->index_sync({ reindex => $reindex });
 }