about summary refs log tree commit homepage
path: root/lib/PublicInbox/V2Writable.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/V2Writable.pm')
-rw-r--r--lib/PublicInbox/V2Writable.pm118
1 files changed, 38 insertions, 80 deletions
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 971b007b..43f37f60 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2018-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # This interface wraps and mimics PublicInbox::Import
@@ -8,7 +8,7 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::Lock PublicInbox::IPC);
 use PublicInbox::SearchIdxShard;
-use PublicInbox::IPC;
+use PublicInbox::IPC qw(nproc_shards);
 use PublicInbox::Eml;
 use PublicInbox::Git;
 use PublicInbox::Import;
@@ -22,37 +22,12 @@ use PublicInbox::Spawn qw(spawn popen_rd run_die);
 use PublicInbox::Search;
 use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob);
 use IO::Handle; # ->autoflush
-use File::Temp ();
 use POSIX ();
 
 my $OID = qr/[a-f0-9]{40,}/;
 # an estimate of the post-packed size to the raw uncompressed size
 our $PACKING_FACTOR = 0.4;
 
-# SATA storage lags behind what CPUs are capable of, so relying on
-# nproc(1) can be misleading and having extra Xapian shards is a
-# waste of FDs and space.  It can also lead to excessive IO latency
-# and slow things down.  Users on NVME or other fast storage can
-# use the NPROC env or switches in our script/public-inbox-* programs
-# to increase Xapian shards
-our $NPROC_MAX_DEFAULT = 4;
-
-sub nproc_shards ($) {
-        my ($creat_opt) = @_;
-        my $n = $creat_opt->{nproc} if ref($creat_opt) eq 'HASH';
-        $n //= $ENV{NPROC};
-        if (!$n) {
-                # assume 2 cores if not detectable or zero
-                state $NPROC_DETECTED = PublicInbox::IPC::detect_nproc() || 2;
-                $n = $NPROC_DETECTED;
-                $n = $NPROC_MAX_DEFAULT if $n > $NPROC_MAX_DEFAULT;
-        }
-
-        # subtract for the main process and git-fast-import
-        $n -= 1;
-        $n < 1 ? 1 : $n;
-}
-
 sub count_shards ($) {
         my ($self) = @_;
         # always load existing shards in case core count changes:
@@ -113,13 +88,6 @@ sub init_inbox {
         $self->done;
 }
 
-# returns undef on duplicate or spam
-# mimics Import::add and wraps it for v2
-sub add {
-        my ($self, $eml, $check_cb) = @_;
-        $self->{ibx}->with_umask(\&_add, $self, $eml, $check_cb);
-}
-
 sub idx_shard ($$) {
         my ($self, $num) = @_;
         $self->{idx_shards}->[$num % scalar(@{$self->{idx_shards}})];
@@ -137,8 +105,11 @@ sub do_idx ($$$) {
         $n >= $self->{batch_bytes};
 }
 
-sub _add {
+# returns undef on duplicate or spam
+# mimics Import::add and wraps it for v2
+sub add {
         my ($self, $mime, $check_cb) = @_;
+        my $restore = $self->{ibx}->with_umask;
 
         # spam check:
         if ($check_cb) {
@@ -164,7 +135,6 @@ sub _add {
         if (do_idx($self, $mime, $smsg)) {
                 $self->checkpoint;
         }
-
         $cmt;
 }
 
@@ -267,9 +237,7 @@ sub _idx_init { # with_umask callback
 
         # Now that all subprocesses are up, we can open the FDs
         # for SQLite:
-        my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
-                                "$ibx->{inboxdir}/msgmap.sqlite3",
-                                $ibx->{-no_fsync} ? 2 : 1);
+        my $mm = $self->{mm} = PublicInbox::Msgmap->new_file($ibx, 1);
         $mm->{dbh}->begin_work;
 }
 
@@ -417,17 +385,16 @@ sub rewrite_internal ($$;$$$) {
 # (retval[2]) is not part of the stable API shared with Import->remove
 sub remove {
         my ($self, $eml, $cmt_msg) = @_;
-        my $r = $self->{ibx}->with_umask(\&rewrite_internal,
-                                                $self, $eml, $cmt_msg);
+        my $restore = $self->{ibx}->with_umask;
+        my $r = rewrite_internal($self, $eml, $cmt_msg);
         defined($r) && defined($r->[0]) ? @$r: undef;
 }
 
 sub _replace ($$;$$) {
         my ($self, $old_eml, $new_eml, $sref) = @_;
-        my $arg = [ $self, $old_eml, undef, $new_eml, $sref ];
-        my $rewritten = $self->{ibx}->with_umask(\&rewrite_internal,
-                        $self, $old_eml, undef, $new_eml, $sref) or return;
-
+        my $restore = $self->{ibx}->with_umask;
+        my $rewritten = rewrite_internal($self, $old_eml, undef,
+                                        $new_eml, $sref) or return;
         my $rewrites = $rewritten->{rewrites};
         # ->done is called if there are rewrites since we gc+prune from git
         $self->idx_init if @$rewrites;
@@ -540,20 +507,14 @@ sub set_last_commits ($) { # this is NOT for ExtSearchIdx
 sub checkpoint ($;$) {
         my ($self, $wait) = @_;
 
-        if (my $im = $self->{im}) {
-                if ($wait) {
-                        $im->barrier;
-                } else {
-                        $im->checkpoint;
-                }
-        }
+        $self->{im}->barrier if $self->{im};
         my $shards = $self->{idx_shards};
         if ($shards) {
-                my $mm = $self->{mm};
-                my $dbh = $mm->{dbh} if $mm;
+                my $dbh = $self->{mm}->{dbh} if $self->{mm};
 
                 # SQLite msgmap data is second in importance
                 $dbh->commit if $dbh;
+                eval { $dbh->do('PRAGMA optimize') };
 
                 # SQLite overview is third
                 $self->{oidx}->commit_lazy;
@@ -623,6 +584,11 @@ sub done {
                 eval { $mm->{dbh}->$m };
                 $err .= "msgmap $m: $@\n" if $@;
         }
+        if ($self->{oidx} && $self->{oidx}->{dbh} && $err) {
+                eval { $self->{oidx}->rollback_lazy };
+                $err .= "overview rollback: $@\n" if $@;
+        }
+
         my $shards = delete $self->{idx_shards};
         if ($shards) {
                 for (@$shards) {
@@ -686,23 +652,6 @@ sub import_init {
         $im;
 }
 
-# XXX experimental
-sub diff ($$$) {
-        my ($mid, $cur, $new) = @_;
-
-        my $ah = File::Temp->new(TEMPLATE => 'email-cur-XXXX', TMPDIR => 1);
-        print $ah $cur->as_string or die "print: $!";
-        $ah->flush or die "flush: $!";
-        PublicInbox::Import::drop_unwanted_headers($new);
-        my $bh = File::Temp->new(TEMPLATE => 'email-new-XXXX', TMPDIR => 1);
-        print $bh $new->as_string or die "print: $!";
-        $bh->flush or die "flush: $!";
-        my $cmd = [ qw(diff -u), $ah->filename, $bh->filename ];
-        print STDERR "# MID conflict <$mid>\n";
-        my $pid = spawn($cmd, undef, { 1 => 2 });
-        waitpid($pid, 0) == $pid or die "diff did not finish";
-}
-
 sub get_blob ($$) {
         my ($self, $smsg) = @_;
         if (my $im = $self->{im}) {
@@ -726,9 +675,6 @@ sub content_exists ($$$) {
                 }
                 my $cur = PublicInbox::Eml->new($msg);
                 return 1 if content_matches($chashes, $cur);
-
-                # XXX DEBUG_DIFF is experimental and may be removed
-                diff($mid, $cur, $mime) if $ENV{DEBUG_DIFF};
         }
         undef;
 }
@@ -810,8 +756,8 @@ sub index_oid { # cat_async callback
                         }
                 }
         }
+        my $oidx = $self->{oidx};
         if (!defined($num)) { # reuse if reindexing (or duplicates)
-                my $oidx = $self->{oidx};
                 for my $mid (@$mids) {
                         ($num, $mid0) = $oidx->num_mid0_for_oid($oid, $mid);
                         last if defined $num;
@@ -819,6 +765,11 @@ sub index_oid { # cat_async callback
         }
         $mid0 //= do { # is this a number we got before?
                 $num = $arg->{mm_tmp}->num_for($mids->[0]);
+
+                # don't clobber existing if Message-ID is reused:
+                if (my $x = defined($num) ? $oidx->get_art($num) : undef) {
+                        undef($num) if $x->{blob} ne $oid;
+                }
                 defined($num) ? $mids->[0] : undef;
         };
         if (!defined($num)) {
@@ -876,6 +827,11 @@ sub update_last_commit {
                 chomp(my $n = $unit->{git}->qx(@cmd));
                 return if $n ne '' && $n == 0;
         }
+        # don't rewind if --{since,until,before,after} are in use
+        return if (defined($last) &&
+                        grep(defined, @{$sync->{-opt}}{qw(since until)}) &&
+                        is_ancestor($self->git, $latest_cmt, $last));
+
         last_epoch_commit($self, $unit->{epoch}, $latest_cmt);
 }
 
@@ -1026,7 +982,7 @@ sub sync_prepare ($$) {
                         my $req = { %$sync, oid => $oid };
                         $self->git->cat_async($oid, $unindex_oid, $req);
                 }
-                $self->git->cat_async_wait;
+                $self->git->async_wait_all;
         }
         return 0 if $sync->{quit};
         if (!$regen_max) {
@@ -1107,8 +1063,8 @@ sub unindex_todo ($$$) {
                 /\A:\d{6} 100644 $OID ($OID) [AM]\tm$/o or next;
                 $self->git->cat_async($1, $unindex_oid, { %$sync, oid => $1 });
         }
-        close $fh or die "git log failed: \$?=$?";
-        $self->git->cat_async_wait;
+        $fh->close or die "git log failed: \$?=$?";
+        $self->git->async_wait_all;
 
         return unless $sync->{-opt}->{prune};
         my $after = scalar keys %$unindexed;
@@ -1197,6 +1153,7 @@ sub index_todo ($$$) {
                 };
                 if ($f eq 'm') {
                         if ($sync->{max_size}) {
+                                $req->{git} = $all;
                                 $all->check_async($oid, \&check_size, $req);
                         } else {
                                 $all->cat_async($oid, $index_oid, $req);
@@ -1240,7 +1197,7 @@ sub xapian_only {
                         index_xap_step($self, $sync, $art_beg, 1);
                 }
         }
-        $self->git->cat_async_wait;
+        $self->git->async_wait_all;
         $self->{ibx}->cleanup;
         $self->done;
 }
@@ -1337,7 +1294,8 @@ sub index_sync {
         }
 
         # reindex does not pick up new changes, so we rerun w/o it:
-        if ($opt->{reindex} && !$sync->{quit}) {
+        if ($opt->{reindex} && !$sync->{quit} &&
+                        !grep(defined, @$opt{qw(since until)})) {
                 my %again = %$opt;
                 $sync = undef;
                 delete @again{qw(rethread reindex -skip_lock)};