diff options
Diffstat (limited to 'lib/PublicInbox/V2Writable.pm')
-rw-r--r-- | lib/PublicInbox/V2Writable.pm | 118 |
1 files changed, 38 insertions, 80 deletions
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 971b007b..43f37f60 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2018-2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # This interface wraps and mimics PublicInbox::Import @@ -8,7 +8,7 @@ use strict; use v5.10.1; use parent qw(PublicInbox::Lock PublicInbox::IPC); use PublicInbox::SearchIdxShard; -use PublicInbox::IPC; +use PublicInbox::IPC qw(nproc_shards); use PublicInbox::Eml; use PublicInbox::Git; use PublicInbox::Import; @@ -22,37 +22,12 @@ use PublicInbox::Spawn qw(spawn popen_rd run_die); use PublicInbox::Search; use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob); use IO::Handle; # ->autoflush -use File::Temp (); use POSIX (); my $OID = qr/[a-f0-9]{40,}/; # an estimate of the post-packed size to the raw uncompressed size our $PACKING_FACTOR = 0.4; -# SATA storage lags behind what CPUs are capable of, so relying on -# nproc(1) can be misleading and having extra Xapian shards is a -# waste of FDs and space. It can also lead to excessive IO latency -# and slow things down. Users on NVME or other fast storage can -# use the NPROC env or switches in our script/public-inbox-* programs -# to increase Xapian shards -our $NPROC_MAX_DEFAULT = 4; - -sub nproc_shards ($) { - my ($creat_opt) = @_; - my $n = $creat_opt->{nproc} if ref($creat_opt) eq 'HASH'; - $n //= $ENV{NPROC}; - if (!$n) { - # assume 2 cores if not detectable or zero - state $NPROC_DETECTED = PublicInbox::IPC::detect_nproc() || 2; - $n = $NPROC_DETECTED; - $n = $NPROC_MAX_DEFAULT if $n > $NPROC_MAX_DEFAULT; - } - - # subtract for the main process and git-fast-import - $n -= 1; - $n < 1 ? 1 : $n; -} - sub count_shards ($) { my ($self) = @_; # always load existing shards in case core count changes: @@ -113,13 +88,6 @@ sub init_inbox { $self->done; } -# returns undef on duplicate or spam -# mimics Import::add and wraps it for v2 -sub add { - my ($self, $eml, $check_cb) = @_; - $self->{ibx}->with_umask(\&_add, $self, $eml, $check_cb); -} - sub idx_shard ($$) { my ($self, $num) = @_; $self->{idx_shards}->[$num % scalar(@{$self->{idx_shards}})]; @@ -137,8 +105,11 @@ sub do_idx ($$$) { $n >= $self->{batch_bytes}; } -sub _add { +# returns undef on duplicate or spam +# mimics Import::add and wraps it for v2 +sub add { my ($self, $mime, $check_cb) = @_; + my $restore = $self->{ibx}->with_umask; # spam check: if ($check_cb) { @@ -164,7 +135,6 @@ sub _add { if (do_idx($self, $mime, $smsg)) { $self->checkpoint; } - $cmt; } @@ -267,9 +237,7 @@ sub _idx_init { # with_umask callback # Now that all subprocesses are up, we can open the FDs # for SQLite: - my $mm = $self->{mm} = PublicInbox::Msgmap->new_file( - "$ibx->{inboxdir}/msgmap.sqlite3", - $ibx->{-no_fsync} ? 2 : 1); + my $mm = $self->{mm} = PublicInbox::Msgmap->new_file($ibx, 1); $mm->{dbh}->begin_work; } @@ -417,17 +385,16 @@ sub rewrite_internal ($$;$$$) { # (retval[2]) is not part of the stable API shared with Import->remove sub remove { my ($self, $eml, $cmt_msg) = @_; - my $r = $self->{ibx}->with_umask(\&rewrite_internal, - $self, $eml, $cmt_msg); + my $restore = $self->{ibx}->with_umask; + my $r = rewrite_internal($self, $eml, $cmt_msg); defined($r) && defined($r->[0]) ? @$r: undef; } sub _replace ($$;$$) { my ($self, $old_eml, $new_eml, $sref) = @_; - my $arg = [ $self, $old_eml, undef, $new_eml, $sref ]; - my $rewritten = $self->{ibx}->with_umask(\&rewrite_internal, - $self, $old_eml, undef, $new_eml, $sref) or return; - + my $restore = $self->{ibx}->with_umask; + my $rewritten = rewrite_internal($self, $old_eml, undef, + $new_eml, $sref) or return; my $rewrites = $rewritten->{rewrites}; # ->done is called if there are rewrites since we gc+prune from git $self->idx_init if @$rewrites; @@ -540,20 +507,14 @@ sub set_last_commits ($) { # this is NOT for ExtSearchIdx sub checkpoint ($;$) { my ($self, $wait) = @_; - if (my $im = $self->{im}) { - if ($wait) { - $im->barrier; - } else { - $im->checkpoint; - } - } + $self->{im}->barrier if $self->{im}; my $shards = $self->{idx_shards}; if ($shards) { - my $mm = $self->{mm}; - my $dbh = $mm->{dbh} if $mm; + my $dbh = $self->{mm}->{dbh} if $self->{mm}; # SQLite msgmap data is second in importance $dbh->commit if $dbh; + eval { $dbh->do('PRAGMA optimize') }; # SQLite overview is third $self->{oidx}->commit_lazy; @@ -623,6 +584,11 @@ sub done { eval { $mm->{dbh}->$m }; $err .= "msgmap $m: $@\n" if $@; } + if ($self->{oidx} && $self->{oidx}->{dbh} && $err) { + eval { $self->{oidx}->rollback_lazy }; + $err .= "overview rollback: $@\n" if $@; + } + my $shards = delete $self->{idx_shards}; if ($shards) { for (@$shards) { @@ -686,23 +652,6 @@ sub import_init { $im; } -# XXX experimental -sub diff ($$$) { - my ($mid, $cur, $new) = @_; - - my $ah = File::Temp->new(TEMPLATE => 'email-cur-XXXX', TMPDIR => 1); - print $ah $cur->as_string or die "print: $!"; - $ah->flush or die "flush: $!"; - PublicInbox::Import::drop_unwanted_headers($new); - my $bh = File::Temp->new(TEMPLATE => 'email-new-XXXX', TMPDIR => 1); - print $bh $new->as_string or die "print: $!"; - $bh->flush or die "flush: $!"; - my $cmd = [ qw(diff -u), $ah->filename, $bh->filename ]; - print STDERR "# MID conflict <$mid>\n"; - my $pid = spawn($cmd, undef, { 1 => 2 }); - waitpid($pid, 0) == $pid or die "diff did not finish"; -} - sub get_blob ($$) { my ($self, $smsg) = @_; if (my $im = $self->{im}) { @@ -726,9 +675,6 @@ sub content_exists ($$$) { } my $cur = PublicInbox::Eml->new($msg); return 1 if content_matches($chashes, $cur); - - # XXX DEBUG_DIFF is experimental and may be removed - diff($mid, $cur, $mime) if $ENV{DEBUG_DIFF}; } undef; } @@ -810,8 +756,8 @@ sub index_oid { # cat_async callback } } } + my $oidx = $self->{oidx}; if (!defined($num)) { # reuse if reindexing (or duplicates) - my $oidx = $self->{oidx}; for my $mid (@$mids) { ($num, $mid0) = $oidx->num_mid0_for_oid($oid, $mid); last if defined $num; @@ -819,6 +765,11 @@ sub index_oid { # cat_async callback } $mid0 //= do { # is this a number we got before? $num = $arg->{mm_tmp}->num_for($mids->[0]); + + # don't clobber existing if Message-ID is reused: + if (my $x = defined($num) ? $oidx->get_art($num) : undef) { + undef($num) if $x->{blob} ne $oid; + } defined($num) ? $mids->[0] : undef; }; if (!defined($num)) { @@ -876,6 +827,11 @@ sub update_last_commit { chomp(my $n = $unit->{git}->qx(@cmd)); return if $n ne '' && $n == 0; } + # don't rewind if --{since,until,before,after} are in use + return if (defined($last) && + grep(defined, @{$sync->{-opt}}{qw(since until)}) && + is_ancestor($self->git, $latest_cmt, $last)); + last_epoch_commit($self, $unit->{epoch}, $latest_cmt); } @@ -1026,7 +982,7 @@ sub sync_prepare ($$) { my $req = { %$sync, oid => $oid }; $self->git->cat_async($oid, $unindex_oid, $req); } - $self->git->cat_async_wait; + $self->git->async_wait_all; } return 0 if $sync->{quit}; if (!$regen_max) { @@ -1107,8 +1063,8 @@ sub unindex_todo ($$$) { /\A:\d{6} 100644 $OID ($OID) [AM]\tm$/o or next; $self->git->cat_async($1, $unindex_oid, { %$sync, oid => $1 }); } - close $fh or die "git log failed: \$?=$?"; - $self->git->cat_async_wait; + $fh->close or die "git log failed: \$?=$?"; + $self->git->async_wait_all; return unless $sync->{-opt}->{prune}; my $after = scalar keys %$unindexed; @@ -1197,6 +1153,7 @@ sub index_todo ($$$) { }; if ($f eq 'm') { if ($sync->{max_size}) { + $req->{git} = $all; $all->check_async($oid, \&check_size, $req); } else { $all->cat_async($oid, $index_oid, $req); @@ -1240,7 +1197,7 @@ sub xapian_only { index_xap_step($self, $sync, $art_beg, 1); } } - $self->git->cat_async_wait; + $self->git->async_wait_all; $self->{ibx}->cleanup; $self->done; } @@ -1337,7 +1294,8 @@ sub index_sync { } # reindex does not pick up new changes, so we rerun w/o it: - if ($opt->{reindex} && !$sync->{quit}) { + if ($opt->{reindex} && !$sync->{quit} && + !grep(defined, @$opt{qw(since until)})) { my %again = %$opt; $sync = undef; delete @again{qw(rethread reindex -skip_lock)}; |