diff options
Diffstat (limited to 'lib/PublicInbox/Import.pm')
-rw-r--r-- | lib/PublicInbox/Import.pm | 307 |
1 files changed, 163 insertions, 144 deletions
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index 1a226cc7..ed34d548 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2020 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # # git fast-import-based ssoma-mda MDA replacement @@ -6,10 +6,9 @@ # and public-inbox-watch. Not the WWW or NNTP code which only # requires read-only access. package PublicInbox::Import; -use strict; +use v5.12; use parent qw(PublicInbox::Lock); -use v5.10.1; -use PublicInbox::Spawn qw(spawn popen_rd); +use PublicInbox::Spawn qw(run_die run_qx spawn); use PublicInbox::MID qw(mids mid2path); use PublicInbox::Address; use PublicInbox::Smsg; @@ -18,14 +17,27 @@ use PublicInbox::ContentHash qw(content_digest); use PublicInbox::MDA; use PublicInbox::Eml; use POSIX qw(strftime); +use autodie qw(socketpair); +use Carp qw(croak); +use Socket qw(AF_UNIX SOCK_STREAM); +use PublicInbox::IO qw(read_all); + +sub default_branch () { + state $default_branch = do { + my $h = run_qx([qw(git config --global init.defaultBranch)], + { GIT_CONFIG => undef }); + chomp $h; + $h eq '' ? 'refs/heads/master' : "refs/heads/$h"; + } +} sub new { # we can't change arg order, this is documented in POD # and external projects may rely on it: my ($class, $git, $name, $email, $ibx) = @_; - my $ref = 'refs/heads/master'; + my $ref; if ($ibx) { - $ref = $ibx->{ref_head} // 'refs/heads/master'; + $ref = $ibx->{ref_head}; $name //= $ibx->{name}; $email //= $ibx->{-primary_address}; $git //= $ibx->git; @@ -34,7 +46,7 @@ sub new { git => $git, ident => "$name <$email>", mark => 1, - ref => $ref, + ref => $ref // default_branch, ibx => $ibx, path_type => '2/38', # or 'v2' lock_path => "$git->{git_dir}/ssoma.lock", # v2 changes this @@ -45,41 +57,36 @@ sub new { # idempotent start function sub gfi_start { my ($self) = @_; - - return ($self->{in}, $self->{out}) if $self->{pid}; - - my (@ret, $out_r, $out_w); - pipe($out_r, $out_w) or die "pipe failed: $!"; + my $io = $self->{io}; + return $io if $io; + socketpair($io, my $s2, AF_UNIX, SOCK_STREAM, 0); + $io->autoflush(1); $self->lock_acquire; eval { my ($git, $ref) = @$self{qw(git ref)}; local $/ = "\n"; chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref)); + die "fatal: rev-parse --revs-only $ref: \$?=$?" if $?; if ($self->{path_type} ne '2/38' && $self->{tip}) { - local $/ = "\0"; - my @t = $git->qx(qw(ls-tree -r -z --name-only), $ref); - chomp @t; - $self->{-tree} = { map { $_ => 1 } @t }; + my $t = $git->qx(qw(ls-tree -r -z --name-only), $ref); + die "fatal: ls-tree -r -z --name-only $ref: \$?=$?" if $?; + $self->{-tree} = { map { $_ => 1 } split(/\0/, $t) }; } - my @cmd = ('git', "--git-dir=$git->{git_dir}", - qw(fast-import --quiet --done --date-format=raw)); - my ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r }); - $out_w->autoflush(1); - $self->{in} = $in_r; - $self->{out} = $out_w; - $self->{pid} = $pid; + my $gfi = [ 'git', "--git-dir=$git->{git_dir}", qw(fast-import + --quiet --done --date-format=raw) ]; + my $pid = spawn($gfi, undef, { 0 => $s2, 1 => $s2 }); $self->{nchg} = 0; - @ret = ($in_r, $out_w); + $self->{io} = PublicInbox::IO::attach_pid($io, $pid); }; if ($@) { $self->lock_release; die $@; } - @ret; + $self->{io}; } -sub wfail () { die "write to fast-import failed: $!" } +sub wfail () { croak "write to fast-import failed: $!" } sub now_raw () { time . ' +0000' } @@ -91,60 +98,43 @@ sub norm_body ($) { } # only used for v1 (ssoma) inboxes -sub _check_path ($$$$) { - my ($r, $w, $tip, $path) = @_; +sub _check_path ($$$) { + my ($io, $tip, $path) = @_; return if $tip eq ''; - print $w "ls $tip $path\n" or wfail; + print $io "ls $tip $path\n" or wfail; local $/ = "\n"; - defined(my $info = <$r>) or die "EOF from fast-import: $!"; + my $info = <$io> // die "EOF from fast-import: $!"; $info =~ /\Amissing / ? undef : $info; } -sub _cat_blob ($$$) { - my ($r, $w, $oid) = @_; - print $w "cat-blob $oid\n" or wfail; +sub _cat_blob ($$) { + my ($io, $oid) = @_; + print $io "cat-blob $oid\n" or wfail; local $/ = "\n"; - my $info = <$r>; - defined $info or die "EOF from fast-import / cat-blob: $!"; + my $info = <$io> // die "EOF from fast-import / cat-blob: $!"; $info =~ /\A[a-f0-9]{40,} blob ([0-9]+)\n\z/ or return; - my $left = $1; - my $offset = 0; - my $buf = ''; - my $n; - while ($left > 0) { - $n = read($r, $buf, $left, $offset); - defined($n) or die "read cat-blob failed: $!"; - $n == 0 and die 'fast-export (cat-blob) died'; - $left -= $n; - $offset += $n; - } - $n = read($r, my $lf, 1); - defined($n) or die "read final byte of cat-blob failed: $!"; - die "bad read on final byte: <$lf>" if $lf ne "\n"; - - # fixup some bugginess in old versions: - $buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s; + my $buf = read_all($io, my $len = $1 + 1); + my $lf = chop $buf; + croak "bad read on final byte: <$lf>" if $lf ne "\n"; \$buf; } sub cat_blob { my ($self, $oid) = @_; - my ($r, $w) = $self->gfi_start; - _cat_blob($r, $w, $oid); + _cat_blob($self->{io} // return, $oid); } sub check_remove_v1 { - my ($r, $w, $tip, $path, $mime) = @_; + my ($io, $tip, $path, $mime) = @_; - my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef); + my $info = _check_path($io, $tip, $path) or return ('MISSING',undef); $info =~ m!\A100644 blob ([a-f0-9]{40,})\t!s or die "not blob: $info"; my $oid = $1; - my $msg = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed"; - my $cur = PublicInbox::Eml->new($msg); - my $cur_s = $cur->header('Subject'); - $cur_s = '' unless defined $cur_s; - my $cur_m = $mime->header('Subject'); - $cur_m = '' unless defined $cur_m; + my $bref = _cat_blob($io, $oid) or die "BUG: cat-blob $1 failed"; + PublicInbox::Eml::strip_from($$bref); + my $cur = PublicInbox::Eml->new($bref); + my $cur_s = $cur->header('Subject') // ''; + my $cur_m = $mime->header('Subject') // ''; if ($cur_s ne $cur_m || norm_body($cur) ne norm_body($mime)) { return ('MISMATCH', $cur); } @@ -153,16 +143,15 @@ sub check_remove_v1 { sub checkpoint { my ($self) = @_; - return unless $self->{pid}; - print { $self->{out} } "checkpoint\n" or wfail; + print { $self->{io} // return } "checkpoint\n" or wfail; undef; } sub progress { my ($self, $msg) = @_; - return unless $self->{pid}; - print { $self->{out} } "progress $msg\n" or wfail; - readline($self->{in}) eq "progress $msg\n" or die + my $io = $self->{io} or return; + print $io "progress $msg\n" or wfail; + readline($io) eq "progress $msg\n" or die "progress $msg not received\n"; undef; } @@ -178,8 +167,8 @@ sub _update_git_info ($$) { my $env = { GIT_INDEX_FILE => $index }; run_die([@cmd, qw(read-tree -m -v -i), $self->{ref}], $env); } - eval { run_die([@cmd, 'update-server-info']) }; my $ibx = $self->{ibx}; + eval { run_die([@cmd, 'update-server-info']) } if $ibx; if ($ibx && $ibx->version == 1 && -d "$ibx->{inboxdir}/public-inbox" && eval { require PublicInbox::SearchIdx }) { eval { @@ -188,7 +177,10 @@ sub _update_git_info ($$) { }; warn "$ibx->{inboxdir} index failed: $@\n" if $@; } - eval { run_die([@cmd, qw(gc --auto)]) } if $do_gc; + if ($do_gc) { + my @quiet = (-t STDERR ? () : '-q'); + eval { run_die([@cmd, qw(gc --auto), @quiet]) } + } } sub barrier { @@ -209,10 +201,9 @@ sub barrier { # used for v2 sub get_mark { my ($self, $mark) = @_; - die "not active\n" unless $self->{pid}; - my ($r, $w) = $self->gfi_start; - print $w "get-mark $mark\n" or wfail; - defined(my $oid = <$r>) or die "get-mark failed, need git 2.6.0+\n"; + my $io = $self->{io} or croak "not active\n"; + print $io "get-mark $mark\n" or wfail; + my $oid = <$io> // die "get-mark failed, need git 2.6.0+\n"; chomp($oid); $oid; } @@ -229,11 +220,11 @@ sub remove { my $path_type = $self->{path_type}; my ($path, $err, $cur, $blob); - my ($r, $w) = $self->gfi_start; + my $io = gfi_start($self); my $tip = $self->{tip}; if ($path_type eq '2/38') { $path = mid2path(v1_mid0($mime)); - ($err, $cur) = check_remove_v1($r, $w, $tip, $path, $mime); + ($err, $cur) = check_remove_v1($io, $tip, $path, $mime); return ($err, $cur) if $err; } else { my $sref; @@ -245,7 +236,7 @@ sub remove { } my $len = length($$sref); $blob = $self->{mark}++; - print $w "blob\nmark :$blob\ndata $len\n", + print $io "blob\nmark :$blob\ndata $len\n", $$sref, "\n" or wfail; } @@ -253,22 +244,22 @@ sub remove { my $commit = $self->{mark}++; my $parent = $tip =~ /\A:/ ? $tip : undef; unless ($parent) { - print $w "reset $ref\n" or wfail; + print $io "reset $ref\n" or wfail; } my $ident = $self->{ident}; my $now = now_raw(); $msg //= 'rm'; my $len = length($msg) + 1; - print $w "commit $ref\nmark :$commit\n", + print $io "commit $ref\nmark :$commit\n", "author $ident $now\n", "committer $ident $now\n", "data $len\n$msg\n\n", 'from ', ($parent ? $parent : $tip), "\n" or wfail; if (defined $path) { - print $w "D $path\n\n" or wfail; + print $io "D $path\n\n" or wfail; } else { - clean_tree_v2($self, $w, 'd'); - print $w "M 100644 :$blob d\n\n" or wfail; + clean_tree_v2($self, $io, 'd'); + print $io "M 100644 :$blob d\n\n" or wfail; } $self->{nchg}++; (($self->{tip} = ":$commit"), $cur); @@ -328,11 +319,40 @@ sub extract_cmt_info ($;$) { } # kill potentially confusing/misleading headers +our @UNWANTED_HEADERS = (qw(Bytes Lines Content-Length), + qw(Status X-Status)); +our $DROP_UNIQUE_UNSUB; sub drop_unwanted_headers ($) { - my ($mime) = @_; + my ($eml) = @_; + for (@UNWANTED_HEADERS, @PublicInbox::MDA::BAD_HEADERS) { + $eml->header_set($_); + } - $mime->header_set($_) for qw(Bytes Lines Content-Length Status); - $mime->header_set($_) for @PublicInbox::MDA::BAD_HEADERS; + # We don't want public-inbox readers to be able to unsubcribe the + # address which does archiving. WARNING: this breaks DKIM if the + # mailing list sender follows RFC 8058, section 4; but breaking DKIM + # (or have senders ignore RFC 8058 sec. 4) is preferable to having + # saboteurs unsubscribing independent archivists: + if ($DROP_UNIQUE_UNSUB && grep(/\AList-Unsubscribe=One-Click\z/, + $eml->header_raw('List-Unsubscribe-Post'))) { + for (qw(List-Unsubscribe-Post List-Unsubscribe)) { + $eml->header_set($_) + } + } +} + +sub load_config ($;$) { + my ($cfg, $do_exit) = @_; + my $v = $cfg->{lc 'publicinboxImport.dropUniqueUnsubscribe'}; + if (defined $v) { + $DROP_UNIQUE_UNSUB = $cfg->git_bool($v) // do { + warn <<EOM; +E: publicinboxImport.dropUniqueUnsubscribe=$v in $cfg->{-f} is not boolean +EOM + $do_exit //= \&CORE::exit; + $do_exit->(78); # EX_CONFIG + }; + } } # used by V2Writable, too @@ -356,11 +376,11 @@ sub v1_mid0 ($) { $mids->[0]; } sub clean_tree_v2 ($$$) { - my ($self, $w, $keep) = @_; + my ($self, $io, $keep) = @_; my $tree = $self->{-tree} or return; #v2 only delete $tree->{$keep}; foreach (keys %$tree) { - print $w "D $_\n" or wfail; + print $io "D $_\n" or wfail; } %$tree = ($keep => 1); } @@ -379,10 +399,10 @@ sub add { $path = 'm'; } - my ($r, $w) = $self->gfi_start; + my $io = gfi_start($self); my $tip = $self->{tip}; if ($path_type eq '2/38') { - _check_path($r, $w, $tip, $path) and return; + _check_path($io, $tip, $path) and return; } drop_unwanted_headers($mime); @@ -396,48 +416,51 @@ sub add { my $raw_email = $mime->{-public_inbox_raw} // $mime->as_string; my $n = length($raw_email); $self->{bytes_added} += $n; - print $w "blob\nmark :$blob\ndata ", $n, "\n" or wfail; - print $w $raw_email, "\n" or wfail; + print $io "blob\nmark :$blob\ndata $n\n", $raw_email, "\n" or wfail; # v2: we need this for Xapian if ($smsg) { $smsg->{blob} = $self->get_mark(":$blob"); - $smsg->{raw_bytes} = $n; - $smsg->{-raw_email} = \$raw_email; + $smsg->set_bytes($raw_email, $n); + if (my $oidx = delete $smsg->{-oidx}) { # used by LeiStore + my $eidx_git = delete $smsg->{-eidx_git}; + + # we need this sharedkv to dedupe blobs added in the + # same fast-import transaction + my $u = $self->{uniq_skv} //= do { + require PublicInbox::SharedKV; + my $x = PublicInbox::SharedKV->new; + $x->dbh; + $x; + }; + return if !$u->set_maybe($smsg->oidbin, 1); + return if (!$oidx->vivify_xvmd($smsg) && + $eidx_git->check($smsg->{blob})); + } } my $ref = $self->{ref}; my $commit = $self->{mark}++; my $parent = $tip =~ /\A:/ ? $tip : undef; unless ($parent) { - print $w "reset $ref\n" or wfail; + print $io "reset $ref\n" or wfail; } - print $w "commit $ref\nmark :$commit\n", + print $io "commit $ref\nmark :$commit\n", "author $author $at\n", - "committer $self->{ident} $ct\n" or wfail; - print $w "data ", (length($subject) + 1), "\n", + "committer $self->{ident} $ct\n", + "data ", (length($subject) + 1), "\n", $subject, "\n\n" or wfail; if ($tip ne '') { - print $w 'from ', ($parent ? $parent : $tip), "\n" or wfail; + print $io 'from ', ($parent ? $parent : $tip), "\n" or wfail; } - clean_tree_v2($self, $w, $path); - print $w "M 100644 :$blob $path\n\n" or wfail; + clean_tree_v2($self, $io, $path); + print $io "M 100644 :$blob $path\n\n" or wfail; $self->{nchg}++; $self->{tip} = ":$commit"; } -sub run_die ($;$$) { - my ($cmd, $env, $rdr) = @_; - my $pid = spawn($cmd, $env, $rdr); - waitpid($pid, 0) == $pid or die join(' ', @$cmd) .' did not finish'; - $? == 0 or die join(' ', @$cmd) . " failed: $?\n"; -} - -my @INIT_FILES = ('HEAD' => "ref: refs/heads/master\n", - 'description' => <<EOD, -Unnamed repository; edit this file 'description' to name the repository. -EOD +my @INIT_FILES = ('HEAD' => undef, # filled in at runtime 'config' => <<EOC); [core] repositoryFormatVersion = 0 @@ -448,32 +471,37 @@ EOD EOC sub init_bare { - my ($dir) = @_; # or self + my ($dir, $head, $fmt) = @_; # or self $dir = $dir->{git}->{git_dir} if ref($dir); require File::Path; - File::Path::mkpath([ map { "$dir/$_" } qw(objects/info refs/heads) ]); - for (my $i = 0; $i < @INIT_FILES; $i++) { - my $f = $dir.'/'.$INIT_FILES[$i++]; + File::Path::make_path(map { $dir.$_ } qw(/objects/info /refs/heads)); + $INIT_FILES[1] //= 'ref: '.default_branch."\n"; + my @fn_contents = @INIT_FILES; + $fn_contents[1] = "ref: refs/heads/$head\n" if defined $head; + $fn_contents[3] = <<EOM if defined($fmt) && $fmt ne 'sha1'; +[core] + repositoryFormatVersion = 1 + filemode = true + bare = true +[extensions] + objectFormat = $fmt +EOM + while (my ($fn, $contents) = splice(@fn_contents, 0, 2)) { + my $f = $dir.'/'.$fn; next if -f $f; - open my $fh, '>', $f or die "open $f: $!"; - print $fh $INIT_FILES[$i] or die "print $f: $!"; - close $fh or die "close $f: $!"; + PublicInbox::IO::write_file '>', $f, $contents; } } # true if locked and active -sub active { !!$_[0]->{out} } +sub active { !!$_[0]->{io} } sub done { my ($self) = @_; - my $w = delete $self->{out} or return; + my $io = delete $self->{io} or return; eval { - my $r = delete $self->{in} or die 'BUG: missing {in} when done'; - print $w "done\n" or wfail; - my $pid = delete $self->{pid} or - die 'BUG: missing {pid} when done'; - waitpid($pid, 0) == $pid or die 'fast-import did not finish'; - $? == 0 or die "fast-import failed: $?"; + print $io "done\n" or wfail; + $io->close or croak "close fast-import \$?=$?"; # reaps }; my $wait_err = $@; my $nchg = delete $self->{nchg}; @@ -486,16 +514,10 @@ sub done { die $wait_err if $wait_err; } -sub atfork_child { - my ($self) = @_; - foreach my $f (qw(in out)) { - next unless defined($self->{$f}); - close $self->{$f} or die "failed to close import[$f]: $!\n"; - } -} +sub atfork_child { (delete($_[0]->{io}) // return)->close } -sub digest2mid ($$) { - my ($dig, $hdr) = @_; +sub digest2mid ($$;$) { + my ($dig, $hdr, $fallback_time) = @_; my $b64 = $dig->clone->b64digest; # Make our own URLs nicer: # See "Base 64 Encoding with URL and Filename Safe Alphabet" in RFC4648 @@ -504,7 +526,7 @@ sub digest2mid ($$) { # Add a date prefix to prevent a leading '-' in case that trips # up some tools (e.g. if a Message-ID were a expected as a # command-line arg) - my $dt = msg_datestamp($hdr); + my $dt = msg_datestamp($hdr, $fallback_time); $dt = POSIX::strftime('%Y%m%d%H%M%S', gmtime($dt)); "$dt.$b64" . '@z'; } @@ -545,7 +567,7 @@ sub replace_oids { my $git = $self->{git}; my @export = (qw(fast-export --no-data --use-done-feature), $old); my $rd = $git->popen(@export); - my ($r, $w) = $self->gfi_start; + my $io = gfi_start($self); my @buf; my $nreplace = 0; my @oids; @@ -556,17 +578,14 @@ sub replace_oids { push @buf, "reset $tmp\n"; } elsif (/^commit (?:.+)/) { if (@buf) { - print $w @buf or wfail; + print $io @buf or wfail; @buf = (); } push @buf, "commit $tmp\n"; } elsif (/^data ([0-9]+)/) { # only commit message, so $len is small: - my $len = $1; # + 1 for trailing "\n" push @buf, $_; - my $n = read($rd, my $buf, $len) or die "read: $!"; - $len == $n or die "short read ($n < $len)"; - push @buf, $buf; + push @buf, read_all($rd, my $len = $1); } elsif (/^M 100644 ([a-f0-9]+) (\w+)/) { my ($oid, $path) = ($1, $2); $tree->{$path} = 1; @@ -593,7 +612,7 @@ sub replace_oids { rewrite_commit($self, \@oids, \@buf, $mime); $nreplace++; } - print $w @buf, "\n" or wfail; + print $io @buf, "\n" or wfail; @buf = (); } elsif ($_ eq "done\n") { $done = 1; @@ -604,9 +623,9 @@ sub replace_oids { push @buf, $_; } } - close $rd or die "close fast-export failed: $?"; + $rd->close or die "E: git @export (\$?=$?)"; if (@buf) { - print $w @buf or wfail; + print $io @buf or wfail; } die 'done\n not seen from fast-export' unless $done; chomp(my $cmt = $self->get_mark(":$mark")) if $nreplace; |