diff options
Diffstat (limited to 'lib')
36 files changed, 3887 insertions, 1229 deletions
diff --git a/lib/PublicInbox/Address.pm b/lib/PublicInbox/Address.pm index f334adea..548f417c 100644 --- a/lib/PublicInbox/Address.pm +++ b/lib/PublicInbox/Address.pm @@ -8,7 +8,8 @@ use warnings; # just enough to make thing sanely displayable and pass to git sub emails { - ($_[0] =~ /([\w\.\+=\-]+\@[\w\.\-]+)>?\s*(?:\(.*?\))?(?:,\s*|\z)/g) + ($_[0] =~ /([\w\.\+=\?"\(\)\-!#\$%&'\*\/\^\`\|\{\}~]+\@[\w\.\-\(\)]+) + (?:\s[^>]*)?>?\s*(?:\(.*?\))?(?:,\s*|\z)/gx) } sub names { diff --git a/lib/PublicInbox/AltId.pm b/lib/PublicInbox/AltId.pm index d1b2dc24..4a6ff97c 100644 --- a/lib/PublicInbox/AltId.pm +++ b/lib/PublicInbox/AltId.pm @@ -22,17 +22,31 @@ sub new { } split(/[&;]/, $query); my $f = $params{file} or die "file: required for $type spec $spec\n"; unless (index($f, '/') == 0) { - $f = "$inbox->{mainrepo}/public-inbox/$f"; + if (($inbox->{version} || 1) == 1) { + $f = "$inbox->{mainrepo}/public-inbox/$f"; + } else { + $f = "$inbox->{mainrepo}/$f"; + } } bless { - mm_alt => PublicInbox::Msgmap->new_file($f, $writable), + filename => $f, + writable => $writable, xprefix => 'X'.uc($prefix), }, $class; } +sub mm_alt { + my ($self) = @_; + $self->{mm_alt} ||= eval { + my $f = $self->{filename}; + my $writable = $self->{writable}; + PublicInbox::Msgmap->new_file($f, $writable); + }; +} + sub mid2alt { my ($self, $mid) = @_; - $self->{mm_alt}->num_for($mid); + $self->mm_alt->num_for($mid); } 1; diff --git a/lib/PublicInbox/ContentId.pm b/lib/PublicInbox/ContentId.pm new file mode 100644 index 00000000..b1d27eb8 --- /dev/null +++ b/lib/PublicInbox/ContentId.pm @@ -0,0 +1,102 @@ +# Copyright (C) 2018 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +package PublicInbox::ContentId; +use strict; +use warnings; +use base qw/Exporter/; +our @EXPORT_OK = qw/content_id content_digest/; +use PublicInbox::MID qw(mids references); +use PublicInbox::MsgIter; + +# not sure if less-widely supported hash families are worth bothering with +use Digest::SHA; + +sub digest_addr ($$$) { + my ($dig, $h, $v) = @_; + $v =~ tr/"//d; + $v =~ s/@([a-z0-9\_\.\-\(\)]*([A-Z])\S*)/'@'.lc($1)/ge; + utf8::encode($v); + $dig->add("$h\0$v\0"); +} + +sub content_digest ($) { + my ($mime) = @_; + my $dig = Digest::SHA->new(256); + my $hdr = $mime->header_obj; + + # References: and In-Reply-To: get used interchangeably + # in some "duplicates" in LKML. We treat them the same + # in SearchIdx, so treat them the same for this: + my %seen; + foreach my $mid (@{mids($hdr)}) { + # do NOT consider the Message-ID as part of the content_id + # if we got here, we've already got Message-ID reuse + $seen{$mid} = 1; + } + foreach my $mid (@{references($hdr)}) { + next if $seen{$mid}; + $dig->add("ref\0$mid\0"); + } + + # Only use Sender: if From is not present + foreach my $h (qw(From Sender)) { + my @v = $hdr->header($h); + if (@v) { + digest_addr($dig, $h, $_) foreach @v; + } + } + foreach my $h (qw(Subject Date)) { + my @v = $hdr->header($h); + foreach my $v (@v) { + utf8::encode($v); + $dig->add("$h\0$v\0"); + } + } + # Some mail processors will add " to unquoted names that were + # not in the original message. For the purposes of deduplication, + # do not take it into account: + foreach my $h (qw(To Cc)) { + my @v = $hdr->header($h); + digest_addr($dig, $h, $_) foreach @v; + } + msg_iter($mime, sub { + my ($part, $depth, @idx) = @{$_[0]}; + $dig->add("\0$depth:".join('.', @idx)."\0"); + my $fn = $part->filename; + if (defined $fn) { + utf8::encode($fn); + $dig->add("fn\0$fn\0"); + } + my @d = $part->header('Content-Description'); + foreach my $d (@d) { + utf8::encode($d); + $dig->add("d\0$d\0"); + } + $dig->add("b\0"); + my $ct = $part->content_type || 'text/plain'; + my $s = eval { $part->body_str }; + if ($@ && $ct =~ m!\btext/plain\b!i) { + # Try to assume UTF-8 because Alpine + # seems to do wacky things and set + # charset=X-UNKNOWN + $part->charset_set('UTF-8'); + $s = eval { $part->body_str }; + } + if (defined $s) { + $s =~ s/\r\n/\n/gs; + $s =~ s/\s*\z//s; + utf8::encode($s); + } else { + $s = $part->body; + } + $dig->add($s); + }); + $dig; +} + +sub content_id ($) { + content_digest($_[0])->digest; +} + +1; diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index 0329bd34..4629aadb 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -460,6 +460,7 @@ sub daemon_loop ($$) { @listeners = map { PublicInbox::Listener->new($_, $post_accept) } @listeners; + PublicInbox::EvCleanup::enable(); Danga::Socket->EventLoop; $parent_pipe = undef; } diff --git a/lib/PublicInbox/Emergency.pm b/lib/PublicInbox/Emergency.pm index 231b4197..66adc631 100644 --- a/lib/PublicInbox/Emergency.pm +++ b/lib/PublicInbox/Emergency.pm @@ -18,7 +18,7 @@ sub new { next if -d $d; -d $d or mkdir($d) or die "failed to mkdir($d): $!\n"; } - bless { dir => $dir, files => {}, t => 0, cnt => 0 }, $class; + bless { dir => $dir, files => {}, t => 0, cnt => 0, pid => $$ }, $class; } sub _fn_in { @@ -75,6 +75,7 @@ sub fh { sub commit { my ($self) = @_; + $$ == $self->{pid} or return; # no-op in forked child delete $self->{fh}; my $tmp = delete $self->{tmp} or return; diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm index 559730e9..1a3a3d5e 100644 --- a/lib/PublicInbox/EvCleanup.pm +++ b/lib/PublicInbox/EvCleanup.pm @@ -7,6 +7,10 @@ use strict; use warnings; use base qw(Danga::Socket); use fields qw(rd); + +my $ENABLED; +sub enabled { $ENABLED } +sub enable { $ENABLED = 1 } my $singleton; my $asapq = [ [], undef ]; my $nextq = [ [], undef ]; @@ -77,8 +81,8 @@ sub later ($) { END { _run_asap(); - _run_next(); - _run_later(); + _run_all($nextq); + _run_all($laterq); } 1; diff --git a/lib/PublicInbox/ExtMsg.pm b/lib/PublicInbox/ExtMsg.pm index 760614df..04cb4062 100644 --- a/lib/PublicInbox/ExtMsg.pm +++ b/lib/PublicInbox/ExtMsg.pm @@ -31,30 +31,19 @@ sub ext_msg { my $cur = $ctx->{-inbox}; my $mid = $ctx->{mid}; - eval { require PublicInbox::Search }; - my $have_xap = $@ ? 0 : 1; - my (@nox, @ibx, @found); + eval { require PublicInbox::Msgmap }; + my $have_mm = $@ ? 0 : 1; + my (@ibx, @found); $ctx->{www}->{pi_config}->each_inbox(sub { my ($other) = @_; return if $other->{name} eq $cur->{name} || !$other->base_url; - my $s = $other->search; - if (!$s) { - push @nox, $other; - return; - } - - # try to find the URL with Xapian to avoid forking - my $doc_id = eval { $s->find_unique_doc_id('mid', $mid) }; - if ($@) { - # xapian not configured properly for this repo - push @nox, $other; - return; - } + my $mm = $other->mm or return; - # maybe we found it! - if (defined $doc_id) { + # try to find the URL with Msgmap to avoid forking + my $num = $mm->num_for($mid); + if (defined $num) { push @found, $other; } else { # no point in trying the fork fallback if we @@ -66,20 +55,6 @@ sub ext_msg { return exact($ctx, \@found, $mid) if @found; - # Xapian not installed or configured for some repos, - # do a full MID check (this is expensive...): - if (@nox) { - my $path = mid2path($mid); - foreach my $other (@nox) { - my (undef, $type, undef) = $other->path_check($path); - - if ($type && $type eq 'blob') { - push @found, $other; - } - } - } - return exact($ctx, \@found, $mid) if @found; - # fall back to partial MID matching my $n_partial = 0; my @partial; diff --git a/lib/PublicInbox/Feed.pm b/lib/PublicInbox/Feed.pm index c32e7bde..b373a1eb 100644 --- a/lib/PublicInbox/Feed.pm +++ b/lib/PublicInbox/Feed.pm @@ -8,19 +8,18 @@ use warnings; use PublicInbox::MIME; use PublicInbox::View; use PublicInbox::WwwAtomStream; +use PublicInbox::SearchMsg; # this loads w/o Search::Xapian # main function sub generate { my ($ctx) = @_; - my @paths; - each_recent_blob($ctx, sub { push @paths, $_[0] }); - return _no_thread() unless @paths; + my $msgs = recent_msgs($ctx); + return _no_thread() unless @$msgs; my $ibx = $ctx->{-inbox}; PublicInbox::WwwAtomStream->response($ctx, 200, sub { - while (my $path = shift @paths) { - my $mime = do_cat_mail($ibx, $path) or next; - return $mime; + while (my $smsg = shift @$msgs) { + $ibx->smsg_mime($smsg) and return $smsg; } }); } @@ -28,18 +27,16 @@ sub generate { sub generate_thread_atom { my ($ctx) = @_; my $mid = $ctx->{mid}; - my $res = $ctx->{srch}->get_thread($mid); - return _no_thread() unless $res->{total}; + my $msgs = $ctx->{srch}->get_thread($mid); + return _no_thread() unless @$msgs; my $ibx = $ctx->{-inbox}; my $html_url = $ibx->base_url($ctx->{env}); $html_url .= PublicInbox::Hval->new_msgid($mid)->{href}; $ctx->{-html_url} = $html_url; - my $msgs = $res->{msgs}; PublicInbox::WwwAtomStream->response($ctx, 200, sub { - while (my $msg = shift @$msgs) { - $msg = $ibx->msg_by_smsg($msg) and - return PublicInbox::MIME->new($msg); + while (my $smsg = shift @$msgs) { + $ibx->smsg_mime($smsg) and return $smsg; } }); } @@ -63,27 +60,22 @@ sub generate_html_index { sub new_html { my ($ctx) = @_; - my @paths; - my (undef, $last) = each_recent_blob($ctx, sub { - my ($path, $commit, $ts, $u, $subj) = @_; - $ctx->{first} ||= $commit; - push @paths, $path; - }); - if (!@paths) { + my $msgs = recent_msgs($ctx); + if (!@$msgs) { return [404, ['Content-Type', 'text/plain'], ["No messages, yet\n"] ]; } $ctx->{-html_tip} = '<pre>'; $ctx->{-upfx} = ''; $ctx->{-hr} = 1; + my $ibx = $ctx->{-inbox}; PublicInbox::WwwStream->response($ctx, 200, sub { - while (my $path = shift @paths) { - my $m = do_cat_mail($ctx->{-inbox}, $path) or next; - my $more = scalar @paths; - my $s = PublicInbox::View::index_entry($m, $ctx, $more); - return $s; + while (my $smsg = shift @$msgs) { + my $m = $ibx->smsg_mime($smsg) or next; + my $more = scalar @$msgs; + return PublicInbox::View::index_entry($m, $ctx, $more); } - new_html_footer($ctx, $last); + PublicInbox::View::pagination_footer($ctx, './new.html'); }); } @@ -93,30 +85,23 @@ sub _no_thread () { [404, ['Content-Type', 'text/plain'], ["No feed found for thread\n"]]; } -sub new_html_footer { - my ($ctx, $last) = @_; - my $qp = delete $ctx->{qp} or return; - my $old_r = $qp->{r}; - my $latest = ''; - my $next = ' '; - - if ($last) { - $next = qq!<a\nhref="?r=$last"\nrel=next>next</a>!; +sub recent_msgs { + my ($ctx) = @_; + my $ibx = $ctx->{-inbox}; + my $max = $ibx->{feedmax}; + my $qp = $ctx->{qp}; + my $v = $ibx->{version} || 1; + if ($v > 2) { + die "BUG: unsupported inbox version: $v\n"; } - if ($old_r) { - $latest = qq! <a\nhref='./new.html'>latest</a>!; + if (my $srch = $ibx->search) { + return PublicInbox::View::paginate_recent($ctx, $max); } - "<hr><pre>page: $next$latest</pre>"; -} -sub each_recent_blob { - my ($ctx, $cb) = @_; - my $max = $ctx->{-inbox}->{feedmax}; my $hex = '[a-f0-9]'; - my $addmsg = qr!^:000000 100644 \S+ \S+ A\t(${hex}{2}/${hex}{38})$!; - my $delmsg = qr!^:100644 000000 \S+ \S+ D\t(${hex}{2}/${hex}{38})$!; + my $addmsg = qr!^:000000 100644 \S+ (\S+) A\t${hex}{2}/${hex}{38}$!; + my $delmsg = qr!^:100644 000000 (\S+) \S+ D\t(${hex}{2}/${hex}{38})$!; my $refhex = qr/(?:HEAD|${hex}{4,40})(?:~\d+)?/; - my $qp = $ctx->{qp}; # revision ranges may be specified my $range = 'HEAD'; @@ -128,54 +113,41 @@ sub each_recent_blob { # get recent messages # we could use git log -z, but, we already know ssoma will not # leave us with filenames with spaces in them.. - my $log = $ctx->{-inbox}->git->popen(qw/log + my $log = $ibx->git->popen(qw/log --no-notes --no-color --raw -r - --abbrev=16 --abbrev-commit/, - "--format=%h%x00%ct%x00%an%x00%s%x00", - $range); + --no-abbrev --abbrev-commit/, + "--format=%H", $range); my %deleted; # only an optimization at this point my $last; - my $nr = 0; - my ($cur_commit, $first_commit, $last_commit); - my ($ts, $subj, $u); + my $last_commit; local $/ = "\n"; + my @oids; while (defined(my $line = <$log>)) { if ($line =~ /$addmsg/o) { my $add = $1; next if $deleted{$add}; # optimization-only - $cb->($add, $cur_commit, $ts, $u, $subj) and $nr++; - if ($nr >= $max) { + push @oids, $add; + if (scalar(@oids) >= $max) { $last = 1; last; } } elsif ($line =~ /$delmsg/o) { $deleted{$1} = 1; - } elsif ($line =~ /^${hex}{7,40}/o) { - ($cur_commit, $ts, $u, $subj) = split("\0", $line); - unless (defined $first_commit) { - $first_commit = $cur_commit; - } } } if ($last) { local $/ = "\n"; while (my $line = <$log>) { - if ($line =~ /^(${hex}{7,40})/o) { + if ($line =~ /^(${hex}{7,40})/) { $last_commit = $1; last; } } } - # for pagination - ($first_commit, $last_commit); -} - -sub do_cat_mail { - my ($ibx, $path) = @_; - my $mime = eval { $ibx->msg_by_path($path) } or return; - PublicInbox::MIME->new($mime); + $ctx->{next_page} = "r=$last_commit" if $last_commit; + [ map { bless {blob => $_ }, 'PublicInbox::SearchMsg' } @oids ]; } 1; diff --git a/lib/PublicInbox/Filter/RubyLang.pm b/lib/PublicInbox/Filter/RubyLang.pm index 63e8d422..cb69e38a 100644 --- a/lib/PublicInbox/Filter/RubyLang.pm +++ b/lib/PublicInbox/Filter/RubyLang.pm @@ -6,6 +6,7 @@ package PublicInbox::Filter::RubyLang; use base qw(PublicInbox::Filter::Base); use strict; use warnings; +use PublicInbox::MID qw(mids); my $l1 = qr/Unsubscribe:\s <mailto:ruby-\w+-request\@ruby-lang\.org\?subject=unsubscribe>/x; @@ -44,16 +45,23 @@ sub scrub { my $altid = $self->{-altid}; if ($altid) { my $hdr = $mime->header_obj; - my $mid = $hdr->header_raw('Message-ID'); - unless (defined $mid) { - return $self->REJECT('Message-Id missing'); + my $mids = mids($hdr); + return $self->REJECT('Message-ID missing') unless (@$mids); + my @v = $hdr->header_raw('X-Mail-Count'); + my $n; + foreach (@v) { + /\A\s*(\d+)\s*\z/ or next; + $n = $1; + last; } - my $n = $hdr->header_raw('X-Mail-Count'); - if (!defined($n) || $n !~ /\A\s*\d+\s*\z/) { + unless (defined $n) { return $self->REJECT('X-Mail-Count not numeric'); } - $mid = PublicInbox::MID::mid_clean($mid); - $altid->{mm_alt}->mid_set($n, $mid); + foreach my $mid (@$mids) { + my $r = $altid->mm_alt->mid_set($n, $mid); + next if $r == 0; + last; + } } $self->ACCEPT($mime); } diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index ea2b814e..95df52ed 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -15,7 +15,19 @@ use PublicInbox::Spawn qw(spawn popen_rd); sub new { my ($class, $git_dir) = @_; - bless { git_dir => $git_dir }, $class + my @st; + $st[7] = $st[10] = 0; + bless { git_dir => $git_dir, st => \@st }, $class +} + +sub alternates_changed { + my ($self) = @_; + my $alt = "$self->{git_dir}/objects/info/alternates"; + my @st = stat($alt) or return 0; + my $old_st = $self->{st}; + # 10 - ctime, 7 - size + return 0 if ($st[10] == $old_st->[10] && $st[7] == $old_st->[7]); + $self->{st} = \@st; } sub _bidi_pipe { @@ -38,14 +50,23 @@ sub _bidi_pipe { sub cat_file { my ($self, $obj, $ref) = @_; + my ($retried, $in, $head); +again: batch_prepare($self); $self->{out}->print($obj, "\n") or fail($self, "write error: $!"); - my $in = $self->{in}; + $in = $self->{in}; local $/ = "\n"; - my $head = $in->getline; - $head =~ / missing$/ and return undef; + $head = $in->getline; + if ($head =~ / missing$/) { + if (!$retried && alternates_changed($self)) { + $retried = 1; + cleanup($self); + goto again; + } + return; + } $head =~ /^[0-9a-f]{40} \S+ (\d+)$/ or fail($self, "Unexpected result from git cat-file: $head"); @@ -139,6 +160,18 @@ sub cleanup { _destroy($self, qw(in_c out_c pid_c)); } +# assuming a well-maintained repo, this should be a somewhat +# accurate estimation of its size +# TODO: show this in the WWW UI as a hint to potential cloners +sub packed_bytes { + my ($self) = @_; + my $n = 0; + foreach my $p (glob("$self->{git_dir}/objects/pack/*.pack")) { + $n += -s $p; + } + $n +} + sub DESTROY { cleanup(@_) } 1; diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index 8eec17eb..b25427ee 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -7,19 +7,32 @@ package PublicInbox::Import; use strict; use warnings; -use Fcntl qw(:flock :DEFAULT); +use base qw(PublicInbox::Lock); use PublicInbox::Spawn qw(spawn); -use PublicInbox::MID qw(mid_mime mid2path); +use PublicInbox::MID qw(mids mid_mime mid2path); use PublicInbox::Address; +use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp); +use PublicInbox::ContentId qw(content_digest); +use PublicInbox::MDA; +use POSIX qw(strftime); sub new { - my ($class, $git, $name, $email, $inbox) = @_; + my ($class, $git, $name, $email, $ibx) = @_; + my $ref = 'refs/heads/master'; + if ($ibx) { + $ref = $ibx->{ref_head} || 'refs/heads/master'; + $name ||= $ibx->{name}; + $email ||= $ibx->{-primary_address}; + } bless { git => $git, ident => "$name <$email>", mark => 1, - ref => 'refs/heads/master', - inbox => $inbox, + ref => $ref, + inbox => $ibx, + path_type => '2/38', # or 'v2' + lock_path => "$git->{git_dir}/ssoma.lock", # v2 changes this + bytes_added => 0, }, $class } @@ -33,25 +46,28 @@ sub gfi_start { pipe($in_r, $in_w) or die "pipe failed: $!"; pipe($out_r, $out_w) or die "pipe failed: $!"; my $git = $self->{git}; - my $git_dir = $git->{git_dir}; - my $lockpath = "$git_dir/ssoma.lock"; - sysopen(my $lockfh, $lockpath, O_WRONLY|O_CREAT) or - die "failed to open lock $lockpath: $!"; - # wait for other processes to be done - flock($lockfh, LOCK_EX) or die "lock failed: $!\n"; + $self->lock_acquire; + local $/ = "\n"; - chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $self->{ref})); + my $ref = $self->{ref}; + chomp($self->{tip} = $git->qx(qw(rev-parse --revs-only), $ref)); + if ($self->{path_type} ne '2/38' && $self->{tip}) { + local $/ = "\0"; + my @tree = $git->qx(qw(ls-tree -r -z --name-only), $ref); + chomp @tree; + $self->{-tree} = { map { $_ => 1 } @tree }; + } + my $git_dir = $git->{git_dir}; my @cmd = ('git', "--git-dir=$git_dir", qw(fast-import - --quiet --done --date-format=rfc2822)); + --quiet --done --date-format=raw)); my $rdr = { 0 => fileno($out_r), 1 => fileno($in_w) }; my $pid = spawn(\@cmd, undef, $rdr); die "spawn fast-import failed: $!" unless defined $pid; $out_w->autoflush(1); $self->{in} = $in_r; $self->{out} = $out_w; - $self->{lockfh} = $lockfh; $self->{pid} = $pid; $self->{nchg} = 0; binmode $out_w, ':raw' or die "binmode :raw failed: $!"; @@ -61,14 +77,7 @@ sub gfi_start { sub wfail () { die "write to fast-import failed: $!" } -sub now2822 () { - my @t = gmtime(time); - my $day = qw(Sun Mon Tue Wed Thu Fri Sat)[$t[6]]; - my $mon = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec)[$t[4]]; - - sprintf('%s, %2d %s %d %02d:%02d:%02d +0000', - $day, $t[3], $mon, $t[5] + 1900, $t[2], $t[1], $t[0]); -} +sub now_raw () { time . ' +0000' } sub norm_body ($) { my ($mime) = @_; @@ -77,6 +86,7 @@ sub norm_body ($) { $b } +# only used for v1 (ssoma) inboxes sub _check_path ($$$$) { my ($r, $w, $tip, $path) = @_; return if $tip eq ''; @@ -86,27 +96,13 @@ sub _check_path ($$$$) { $info =~ /\Amissing / ? undef : $info; } -# returns undef on non-existent -# ('MISMATCH', msg) on mismatch -# (:MARK, msg) on success -sub remove { - my ($self, $mime, $msg) = @_; # mime = Email::MIME - - my $mid = mid_mime($mime); - my $path = mid2path($mid); - - my ($r, $w) = $self->gfi_start; - my $tip = $self->{tip}; - my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef); - $info =~ m!\A100644 blob ([a-f0-9]{40})\t!s or die "not blob: $info"; - my $blob = $1; - - print $w "cat-blob $blob\n" or wfail; +sub _cat_blob ($$$) { + my ($r, $w, $oid) = @_; + print $w "cat-blob $oid\n" or wfail; local $/ = "\n"; - $info = <$r>; + my $info = <$r>; defined $info or die "EOF from fast-import / cat-blob: $!"; - $info =~ /\A[a-f0-9]{40} blob (\d+)\n\z/ or - die "unexpected cat-blob response: $info"; + $info =~ /\A[a-f0-9]{40} blob (\d+)\n\z/ or return; my $left = $1; my $offset = 0; my $buf = ''; @@ -121,7 +117,26 @@ sub remove { $n = read($r, my $lf, 1); defined($n) or die "read final byte of cat-blob failed: $!"; die "bad read on final byte: <$lf>" if $lf ne "\n"; - my $cur = PublicInbox::MIME->new($buf); + + # fixup some bugginess in old versions: + $buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s; + \$buf; +} + +sub cat_blob { + my ($self, $oid) = @_; + my ($r, $w) = $self->gfi_start; + _cat_blob($r, $w, $oid); +} + +sub check_remove_v1 { + my ($r, $w, $tip, $path, $mime) = @_; + + my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef); + $info =~ m!\A100644 blob ([a-f0-9]{40})\t!s or die "not blob: $info"; + my $oid = $1; + my $msg = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed"; + my $cur = PublicInbox::MIME->new($msg); my $cur_s = $cur->header('Subject'); $cur_s = '' unless defined $cur_s; my $cur_m = $mime->header('Subject'); @@ -129,6 +144,103 @@ sub remove { if ($cur_s ne $cur_m || norm_body($cur) ne norm_body($mime)) { return ('MISMATCH', $cur); } + (undef, $cur); +} + +sub checkpoint { + my ($self) = @_; + return unless $self->{pid}; + print { $self->{out} } "checkpoint\n" or wfail; + undef; +} + +sub progress { + my ($self, $msg) = @_; + return unless $self->{pid}; + print { $self->{out} } "progress $msg\n" or wfail; + $self->{in}->getline eq "progress $msg\n" or die + "progress $msg not received\n"; + undef; +} + +sub _update_git_info ($$) { + my ($self, $do_gc) = @_; + # for compatibility with existing ssoma installations + # we can probably remove this entirely by 2020 + my $git_dir = $self->{git}->{git_dir}; + my @cmd = ('git', "--git-dir=$git_dir"); + my $index = "$git_dir/ssoma.index"; + if (-e $index && !$ENV{FAST}) { + my $env = { GIT_INDEX_FILE => $index }; + run_die([@cmd, qw(read-tree -m -v -i), $self->{ref}], $env); + } + run_die([@cmd, 'update-server-info'], undef); + ($self->{path_type} eq '2/38') and eval { + require PublicInbox::SearchIdx; + my $inbox = $self->{inbox} || $git_dir; + my $s = PublicInbox::SearchIdx->new($inbox); + $s->index_sync({ ref => $self->{ref} }); + }; + eval { run_die([@cmd, qw(gc --auto)], undef) } if $do_gc; +} + +sub barrier { + my ($self) = @_; + + # For safety, we ensure git checkpoint is complete before because + # the data in git is still more important than what is in Xapian + # in v2. Performance may be gained by delaying the ->progress + # call but we lose safety + if ($self->{nchg}) { + $self->checkpoint; + $self->progress('checkpoint'); + _update_git_info($self, 0); + $self->{nchg} = 0; + } +} + +# used for v2 +sub get_mark { + my ($self, $mark) = @_; + die "not active\n" unless $self->{pid}; + my ($r, $w) = $self->gfi_start; + print $w "get-mark $mark\n" or wfail; + defined(my $oid = <$r>) or die "get-mark failed, need git 2.6.0+\n"; + chomp($oid); + $oid; +} + +# returns undef on non-existent +# ('MISMATCH', Email::MIME) on mismatch +# (:MARK, Email::MIME) on success +# +# v2 callers should check with Xapian before calling this as +# it is not idempotent. +sub remove { + my ($self, $mime, $msg) = @_; # mime = Email::MIME + + my $path_type = $self->{path_type}; + my ($path, $err, $cur, $blob); + + my ($r, $w) = $self->gfi_start; + my $tip = $self->{tip}; + if ($path_type eq '2/38') { + $path = mid2path(v1_mid0($mime)); + ($err, $cur) = check_remove_v1($r, $w, $tip, $path, $mime); + return ($err, $cur) if $err; + } else { + my $sref; + if (ref($mime) eq 'SCALAR') { # optimization used by V2Writable + $sref = $mime; + } else { # XXX should not be necessary: + my $str = $mime->as_string; + $sref = \$str; + } + my $len = length($$sref); + $blob = $self->{mark}++; + print $w "blob\nmark :$blob\ndata $len\n", + $$sref, "\n" or wfail; + } my $ref = $self->{ref}; my $commit = $self->{mark}++; @@ -137,7 +249,7 @@ sub remove { print $w "reset $ref\n" or wfail; } my $ident = $self->{ident}; - my $now = now2822(); + my $now = now_raw(); $msg ||= 'rm'; my $len = length($msg) + 1; print $w "commit $ref\nmark :$commit\n", @@ -145,44 +257,146 @@ sub remove { "committer $ident $now\n", "data $len\n$msg\n\n", 'from ', ($parent ? $parent : $tip), "\n" or wfail; - print $w "D $path\n\n" or wfail; + if (defined $path) { + print $w "D $path\n\n" or wfail; + } else { + clean_tree_v2($self, $w, 'd'); + print $w "M 100644 :$blob d\n\n" or wfail; + } $self->{nchg}++; (($self->{tip} = ":$commit"), $cur); } -# returns undef on duplicate -sub add { - my ($self, $mime, $check_cb) = @_; # mime = Email::MIME +sub git_timestamp { + my ($ts, $zone) = @_; + $ts = 0 if $ts < 0; # git uses unsigned times + "$ts $zone"; +} +sub extract_author_info ($) { + my ($mime) = @_; + + my $sender = ''; my $from = $mime->header('From'); my ($email) = PublicInbox::Address::emails($from); my ($name) = PublicInbox::Address::names($from); + if (!defined($name) || !defined($email)) { + $sender = $mime->header('Sender'); + if (!defined($name)) { + ($name) = PublicInbox::Address::names($sender); + } + if (!defined($email)) { + ($email) = PublicInbox::Address::emails($sender); + } + } + if (defined $email) { + # quiet down wide character warnings with utf8::encode + utf8::encode($email); + } else { + $email = ''; + warn "no email in From: $from or Sender: $sender\n"; + } + # git gets confused with: # "'A U Thor <u@example.com>' via foo" <foo@example.com> # ref: # <CAD0k6qSUYANxbjjbE4jTW4EeVwOYgBD=bXkSu=akiYC_CB7Ffw@mail.gmail.com> - $name =~ tr/<>//d; + if (defined $name) { + $name =~ tr/<>//d; + utf8::encode($name); + } else { + $name = ''; + warn "no name in From: $from or Sender: $sender\n"; + } + ($name, $email); +} + +# kill potentially confusing/misleading headers +sub drop_unwanted_headers ($) { + my ($mime) = @_; + + $mime->header_set($_) for qw(bytes lines content-length status); + $mime->header_set($_) for @PublicInbox::MDA::BAD_HEADERS; +} + +# used by V2Writable, too +sub append_mid ($$) { + my ($hdr, $mid0) = @_; + # @cur is likely empty if we need to call this sub, but it could + # have random unparseable crap which we'll preserve, too. + my @cur = $hdr->header_raw('Message-ID'); + $hdr->header_set('Message-ID', @cur, "<$mid0>"); +} + +sub v1_mid0 ($) { + my ($mime) = @_; + my $hdr = $mime->header_obj; + my $mids = mids($hdr); + + if (!scalar(@$mids)) { # spam often has no Message-Id + my $mid0 = digest2mid(content_digest($mime), $hdr); + append_mid($hdr, $mid0); + return $mid0; + } + $mids->[0]; +} +sub clean_tree_v2 ($$$) { + my ($self, $w, $keep) = @_; + my $tree = $self->{-tree} or return; #v2 only + delete $tree->{$keep}; + foreach (keys %$tree) { + print $w "D $_\n" or wfail; + } + %$tree = ($keep => 1); +} - my $date = $mime->header('Date'); +# returns undef on duplicate +# returns the :MARK of the most recent commit +sub add { + my ($self, $mime, $check_cb) = @_; # mime = Email::MIME + + my ($name, $email) = extract_author_info($mime); + my $hdr = $mime->header_obj; + my @at = msg_datestamp($hdr); + my @ct = msg_timestamp($hdr); + my $author_time_raw = git_timestamp(@at); + my $commit_time_raw = git_timestamp(@ct); my $subject = $mime->header('Subject'); $subject = '(no subject)' unless defined $subject; - my $mid = mid_mime($mime); - my $path = mid2path($mid); + my $path_type = $self->{path_type}; + + my $path; + if ($path_type eq '2/38') { + $path = mid2path(v1_mid0($mime)); + } else { # v2 layout, one file: + $path = 'm'; + } my ($r, $w) = $self->gfi_start; my $tip = $self->{tip}; - _check_path($r, $w, $tip, $path) and return; + if ($path_type eq '2/38') { + _check_path($r, $w, $tip, $path) and return; + } - # kill potentially confusing/misleading headers - $mime->header_set($_) for qw(bytes lines content-length status); + drop_unwanted_headers($mime); + + # spam check: if ($check_cb) { $mime = $check_cb->($mime) or return; } - $mime = $mime->as_string; my $blob = $self->{mark}++; - print $w "blob\nmark :$blob\ndata ", length($mime), "\n" or wfail; - print $w $mime, "\n" or wfail; + my $str = $mime->as_string; + my $n = length($str); + $self->{bytes_added} += $n; + print $w "blob\nmark :$blob\ndata ", $n, "\n" or wfail; + print $w $str, "\n" or wfail; + + # v2: we need this for Xapian + if ($self->{want_object_info}) { + my $oid = $self->get_mark(":$blob"); + $self->{last_object} = [ $oid, $n, \$str ]; + } my $ref = $self->{ref}; my $commit = $self->{mark}++; my $parent = $tip =~ /\A:/ ? $tip : undef; @@ -191,26 +405,24 @@ sub add { print $w "reset $ref\n" or wfail; } - utf8::encode($email); - utf8::encode($name); utf8::encode($subject); - # quiet down wide character warnings: print $w "commit $ref\nmark :$commit\n", - "author $name <$email> $date\n", - "committer $self->{ident} ", now2822(), "\n" or wfail; + "author $name <$email> $author_time_raw\n", + "committer $self->{ident} $commit_time_raw\n" or wfail; print $w "data ", (length($subject) + 1), "\n", $subject, "\n\n" or wfail; if ($tip ne '') { print $w 'from ', ($parent ? $parent : $tip), "\n" or wfail; } + clean_tree_v2($self, $w, $path); print $w "M 100644 :$blob $path\n\n" or wfail; $self->{nchg}++; $self->{tip} = ":$commit"; } -sub run_die ($$) { - my ($cmd, $env) = @_; - my $pid = spawn($cmd, $env, undef); +sub run_die ($;$$) { + my ($cmd, $env, $rdr) = @_; + my $pid = spawn($cmd, $env, $rdr); defined $pid or die "spawning ".join(' ', @$cmd)." failed: $!"; waitpid($pid, 0) == $pid or die join(' ', @$cmd) .' did not finish'; $? == 0 or die join(' ', @$cmd) . " failed: $?\n"; @@ -224,33 +436,138 @@ sub done { my $pid = delete $self->{pid} or die 'BUG: missing {pid} when done'; waitpid($pid, 0) == $pid or die 'fast-import did not finish'; $? == 0 or die "fast-import failed: $?"; - my $nchg = delete $self->{nchg}; - # for compatibility with existing ssoma installations - # we can probably remove this entirely by 2020 - my $git_dir = $self->{git}->{git_dir}; - # XXX: change the following scope to: if (-e $index) # in 2018 or so.. - my @cmd = ('git', "--git-dir=$git_dir"); - if ($nchg && !$ENV{FAST}) { - my $index = "$git_dir/ssoma.index"; - my $env = { GIT_INDEX_FILE => $index }; - run_die([@cmd, qw(read-tree -m -v -i), $self->{ref}], $env); + _update_git_info($self, 1) if delete $self->{nchg}; + + $self->lock_release; +} + +sub atfork_child { + my ($self) = @_; + foreach my $f (qw(in out)) { + close $self->{$f} or die "failed to close import[$f]: $!\n"; } - if ($nchg) { - run_die([@cmd, 'update-server-info'], undef); - eval { - require PublicInbox::SearchIdx; - my $inbox = $self->{inbox} || $git_dir; - my $s = PublicInbox::SearchIdx->new($inbox); - $s->index_sync({ ref => $self->{ref} }); - }; - - eval { run_die([@cmd, qw(gc --auto)], undef) }; +} + +sub digest2mid ($$) { + my ($dig, $hdr) = @_; + my $b64 = $dig->clone->b64digest; + # Make our own URLs nicer: + # See "Base 64 Encoding with URL and Filename Safe Alphabet" in RFC4648 + $b64 =~ tr!+/=!-_!d; + + # Add a date prefix to prevent a leading '-' in case that trips + # up some tools (e.g. if a Message-ID were a expected as a + # command-line arg) + my $dt = msg_datestamp($hdr); + $dt = POSIX::strftime('%Y%m%d%H%M%S', gmtime($dt)); + "$dt.$b64" . '@z'; +} + +sub clean_purge_buffer { + my ($oids, $buf) = @_; + my $cmt_msg = 'purged '.join(' ',@$oids)."\n"; + @$oids = (); + + foreach my $i (0..$#$buf) { + my $l = $buf->[$i]; + if ($l =~ /^author .* (\d+ [\+-]?\d+)$/) { + $buf->[$i] = "author <> $1\n"; + } elsif ($l =~ /^data (\d+)/) { + $buf->[$i++] = "data " . length($cmt_msg) . "\n"; + $buf->[$i] = $cmt_msg; + last; + } } +} - my $lockfh = delete $self->{lockfh} or die "BUG: not locked: $!"; - flock($lockfh, LOCK_UN) or die "unlock failed: $!"; - close $lockfh or die "close lock failed: $!"; +sub purge_oids { + my ($self, $purge) = @_; + my $tmp = "refs/heads/purge-".((keys %$purge)[0]); + my $old = $self->{'ref'}; + my $git = $self->{git}; + my @export = (qw(fast-export --no-data --use-done-feature), $old); + my ($rd, $pid) = $git->popen(@export); + my ($r, $w) = $self->gfi_start; + my @buf; + my $npurge = 0; + my @oids; + my ($done, $mark); + my $tree = $self->{-tree}; + while (<$rd>) { + if (/^reset (?:.+)/) { + push @buf, "reset $tmp\n"; + } elsif (/^commit (?:.+)/) { + if (@buf) { + $w->print(@buf) or wfail; + @buf = (); + } + push @buf, "commit $tmp\n"; + } elsif (/^data (\d+)/) { + # only commit message, so $len is small: + my $len = $1; # + 1 for trailing "\n" + push @buf, $_; + my $n = read($rd, my $buf, $len) or die "read: $!"; + $len == $n or die "short read ($n < $len)"; + push @buf, $buf; + } elsif (/^M 100644 ([a-f0-9]+) (\w+)/) { + my ($oid, $path) = ($1, $2); + if ($purge->{$oid}) { + push @oids, $oid; + delete $tree->{$path}; + } else { + $tree->{$path} = 1; + push @buf, $_; + } + } elsif (/^D (\w+)/) { + my $path = $1; + push @buf, $_ if $tree->{$path}; + } elsif ($_ eq "\n") { + if (@oids) { + my $out = join('', @buf); + $out =~ s/^/# /sgm; + warn "purge rewriting\n", $out, "\n"; + clean_purge_buffer(\@oids, \@buf); + $npurge++; + } + $w->print(@buf, "\n") or wfail; + @buf = (); + } elsif ($_ eq "done\n") { + $done = 1; + } elsif (/^mark :(\d+)$/) { + push @buf, $_; + $mark = $1; + } else { + push @buf, $_; + } + } + if (@buf) { + $w->print(@buf) or wfail; + } + die 'done\n not seen from fast-export' unless $done; + chomp(my $cmt = $self->get_mark(":$mark")) if $npurge; + $self->{nchg} = 0; # prevent _update_git_info until update-ref: + $self->done; + my @git = ('git', "--git-dir=$git->{git_dir}"); + + run_die([@git, qw(update-ref), $old, $tmp]) if $npurge; + + run_die([@git, qw(update-ref -d), $tmp]); + + return if $npurge == 0; + + run_die([@git, qw(-c gc.reflogExpire=now gc --prune=all)]); + my $err = 0; + foreach my $oid (keys %$purge) { + my @info = $git->check($oid); + if (@info) { + warn "$oid not purged\n"; + $err++; + } + } + _update_git_info($self, 0); + die "Failed to purge $err object(s)\n" if $err; + $cmt; } 1; diff --git a/lib/PublicInbox/Inbox.pm b/lib/PublicInbox/Inbox.pm index 2ec2be69..706089ca 100644 --- a/lib/PublicInbox/Inbox.pm +++ b/lib/PublicInbox/Inbox.pm @@ -8,6 +8,8 @@ use warnings; use PublicInbox::Git; use PublicInbox::MID qw(mid2path); use Devel::Peek qw(SvREFCNT); +use PublicInbox::MIME; +use POSIX qw(strftime); my $cleanup_timer; eval { @@ -29,6 +31,7 @@ sub cleanup_task () { sub _cleanup_later ($) { my ($self) = @_; + return unless PublicInbox::EvCleanup::enabled(); $cleanup_timer ||= PublicInbox::EvCleanup::later(*cleanup_task); $CLEANUP->{"$self"} = $self; } @@ -73,24 +76,71 @@ sub new { _set_limiter($opts, $pi_config, 'httpbackend'); _set_uint($opts, 'feedmax', 25); $opts->{nntpserver} ||= $pi_config->{'publicinbox.nntpserver'}; + my $dir = $opts->{mainrepo}; + if (defined $dir && -f "$dir/inbox.lock") { + $opts->{version} = 2; + } bless $opts, $class; } +sub git_part { + my ($self, $part) = @_; + ($self->{version} || 1) == 2 or return; + $self->{"$part.git"} ||= eval { + my $git_dir = "$self->{mainrepo}/git/$part.git"; + my $g = PublicInbox::Git->new($git_dir); + $g->{-httpbackend_limiter} = $self->{-httpbackend_limiter}; + # no cleanup needed, we never cat-file off this, only clone + $g; + }; +} + sub git { my ($self) = @_; $self->{git} ||= eval { - my $g = PublicInbox::Git->new($self->{mainrepo}); + my $git_dir = $self->{mainrepo}; + $git_dir .= '/all.git' if (($self->{version} || 1) == 2); + my $g = PublicInbox::Git->new($git_dir); $g->{-httpbackend_limiter} = $self->{-httpbackend_limiter}; _cleanup_later($self); $g; }; } +sub max_git_part { + my ($self) = @_; + my $v = $self->{version}; + return unless defined($v) && $v == 2; + my $part = $self->{-max_git_part}; + my $changed = git($self)->alternates_changed; + if (!defined($part) || $changed) { + $self->git->cleanup if $changed; + my $gits = "$self->{mainrepo}/git"; + if (opendir my $dh, $gits) { + my $max = -1; + while (defined(my $git_dir = readdir($dh))) { + $git_dir =~ m!\A(\d+)\.git\z! or next; + $max = $1 if $1 > $max; + } + $part = $self->{-max_git_part} = $max if $max >= 0; + } else { + warn "opendir $gits failed: $!\n"; + } + } + $part; +} + sub mm { my ($self) = @_; $self->{mm} ||= eval { + require PublicInbox::Msgmap; _cleanup_later($self); - PublicInbox::Msgmap->new($self->{mainrepo}); + my $dir = $self->{mainrepo}; + if (($self->{version} || 1) >= 2) { + PublicInbox::Msgmap->new_file("$dir/msgmap.sqlite3"); + } else { + PublicInbox::Msgmap->new($dir); + } }; } @@ -98,7 +148,7 @@ sub search { my ($self) = @_; $self->{search} ||= eval { _cleanup_later($self); - PublicInbox::Search->new($self->{mainrepo}, $self->{altid}); + PublicInbox::Search->new($self, $self->{altid}); }; } @@ -120,7 +170,7 @@ sub description { local $/ = "\n"; chomp $desc; $desc =~ s/\s+/ /smg; - $desc = '($GIT_DIR/description missing)' if $desc eq ''; + $desc = '($REPO_DIR/description missing)' if $desc eq ''; $self->{description} = $desc; } @@ -222,26 +272,49 @@ sub msg_by_path ($$;$) { sub msg_by_smsg ($$;$) { my ($self, $smsg, $ref) = @_; - return unless defined $smsg; # ghost - - # backwards compat to fallback to msg_by_mid - # TODO: remove if we bump SCHEMA_VERSION in Search.pm: - defined(my $blob = $smsg->{blob}) or - return msg_by_mid($self, $smsg->mid); + # ghosts may have undef smsg (from SearchThread.node) or + # no {blob} field + return unless defined $smsg; + defined(my $blob = $smsg->{blob}) or return; my $str = git($self)->cat_file($blob, $ref); $$str =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s if $str; $str; } -sub path_check { - my ($self, $path) = @_; - git($self)->check('HEAD:'.$path); +sub smsg_mime { + my ($self, $smsg, $ref) = @_; + if (my $s = msg_by_smsg($self, $smsg, $ref)) { + $smsg->{mime} = PublicInbox::MIME->new($s); + return $smsg; + } +} + +sub mid2num($$) { + my ($self, $mid) = @_; + my $mm = mm($self) or return; + $mm->num_for($mid); +} + +sub smsg_by_mid ($$) { + my ($self, $mid) = @_; + my $srch = search($self) or return; + # favor the Message-ID we used for the NNTP article number: + my $num = mid2num($self, $mid); + defined $num ? $srch->lookup_article($num) : undef; } sub msg_by_mid ($$;$) { my ($self, $mid, $ref) = @_; - msg_by_path($self, mid2path($mid), $ref); + my $srch = search($self) or + return msg_by_path($self, mid2path($mid), $ref); + my $smsg = smsg_by_mid($self, $mid); + $smsg ? msg_by_smsg($self, $smsg, $ref) : undef; +} + +sub recent { + my ($self, $opts, $after, $before) = @_; + search($self)->{over_ro}->recent($opts, $after, $before); } 1; diff --git a/lib/PublicInbox/InboxWritable.pm b/lib/PublicInbox/InboxWritable.pm new file mode 100644 index 00000000..5c11a36c --- /dev/null +++ b/lib/PublicInbox/InboxWritable.pm @@ -0,0 +1,228 @@ +# Copyright (C) 2018 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# Extends read-only Inbox for writing +package PublicInbox::InboxWritable; +use strict; +use warnings; +use base qw(PublicInbox::Inbox); +use PublicInbox::Import; +use PublicInbox::Filter::Base; +*REJECT = *PublicInbox::Filter::Base::REJECT; + +use constant { + PERM_UMASK => 0, + OLD_PERM_GROUP => 1, + OLD_PERM_EVERYBODY => 2, + PERM_GROUP => 0660, + PERM_EVERYBODY => 0664, +}; + +sub new { + my ($class, $ibx) = @_; + bless $ibx, $class; +} + +sub importer { + my ($self, $parallel) = @_; + $self->{-importer} ||= eval { + my $v = $self->{version} || 1; + if ($v == 2) { + eval { require PublicInbox::V2Writable }; + die "v2 not supported: $@\n" if $@; + my $v2w = PublicInbox::V2Writable->new($self); + $v2w->{parallel} = $parallel; + $v2w; + } elsif ($v == 1) { + my $git = $self->git; + my $name = $self->{name}; + my $addr = $self->{-primary_address}; + PublicInbox::Import->new($git, $name, $addr, $self); + } else { + die "unsupported inbox version: $v\n"; + } + } +} + +sub filter { + my ($self) = @_; + my $f = $self->{filter}; + if ($f && $f =~ /::/) { + my @args = (-inbox => $self); + # basic line splitting, only + # Perhaps we can have proper quote splitting one day... + ($f, @args) = split(/\s+/, $f) if $f =~ /\s+/; + + eval "require $f"; + if ($@) { + warn $@; + } else { + # e.g: PublicInbox::Filter::Vger->new(@args) + return $f->new(@args); + } + } + undef; +} + +sub is_maildir_basename ($) { + my ($bn) = @_; + return 0 if $bn !~ /\A[a-zA-Z0-9][\-\w:,=\.]+\z/; + if ($bn =~ /:2,([A-Z]+)\z/i) { + my $flags = $1; + return 0 if $flags =~ /[DT]/; # no [D]rafts or [T]rashed mail + } + 1; +} + +sub is_maildir_path ($) { + my ($path) = @_; + my @p = split(m!/+!, $path); + (is_maildir_basename($p[-1]) && -f $path) ? 1 : 0; +} + +sub maildir_path_load ($) { + my ($path) = @_; + if (open my $fh, '<', $path) { + local $/; + my $str = <$fh>; + $str or return; + return PublicInbox::MIME->new(\$str); + } elsif ($!{ENOENT}) { + # common with Maildir + return; + } else { + warn "failed to open $path: $!\n"; + return; + } +} + +sub import_maildir { + my ($self, $dir) = @_; + my $im = $self->importer(1); + my $filter = $self->filter; + foreach my $sub (qw(cur new tmp)) { + -d "$dir/$sub" or die "$dir is not a Maildir (missing $sub)\n"; + } + foreach my $sub (qw(cur new)) { + opendir my $dh, "$dir/$sub" or die "opendir $dir/$sub: $!\n"; + while (defined(my $fn = readdir($dh))) { + next unless is_maildir_basename($fn); + my $mime = maildir_file_load("$dir/$fn") or next; + if ($filter) { + my $ret = $filter->scrub($mime) or return; + return if $ret == REJECT(); + $mime = $ret; + } + $im->add($mime); + } + } + $im->done; +} + +# asctime: From example@example.com Fri Jun 23 02:56:55 2000 +my $from_strict = qr/^From \S+ +\S+ \S+ +\S+ [^:]+:[^:]+:[^:]+ [^:]+/; + +sub mb_add ($$$$) { + my ($im, $variant, $filter, $msg) = @_; + $$msg =~ s/(\r?\n)+\z/$1/s; + my $mime = PublicInbox::MIME->new($msg); + if ($variant eq 'mboxrd') { + $$msg =~ s/^>(>*From )/$1/sm; + } elsif ($variant eq 'mboxo') { + $$msg =~ s/^>From /From /sm; + } + if ($filter) { + my $ret = $filter->scrub($mime) or return; + return if $ret == REJECT(); + $mime = $ret; + } + $im->add($mime) +} + +sub import_mbox { + my ($self, $fh, $variant) = @_; + if ($variant !~ /\A(?:mboxrd|mboxo)\z/) { + die "variant must be 'mboxrd' or 'mboxo'\n"; + } + my $im = $self->importer(1); + my $prev = undef; + my $msg = ''; + my $filter = $self->filter; + while (defined(my $l = <$fh>)) { + if ($l =~ /$from_strict/o) { + if (!defined($prev) || $prev =~ /^\r?$/) { + mb_add($im, $variant, $filter, \$msg) if $msg; + $msg = ''; + $prev = $l; + next; + } + warn "W[$.] $l\n"; + } + $prev = $l; + $msg .= $l; + } + mb_add($im, $variant, $filter, \$msg) if $msg; + $im->done; +} + +sub _read_git_config_perm { + my ($self) = @_; + my @cmd = qw(config); + chomp(my $perm = $self->git->qx('config', 'core.sharedRepository')); + $perm; +} + +sub _git_config_perm { + my $self = shift; + my $perm = scalar @_ ? $_[0] : _read_git_config_perm($self); + return PERM_GROUP if (!defined($perm) || $perm eq ''); + return PERM_UMASK if ($perm eq 'umask'); + return PERM_GROUP if ($perm eq 'group'); + if ($perm =~ /\A(?:all|world|everybody)\z/) { + return PERM_EVERYBODY; + } + return PERM_GROUP if ($perm =~ /\A(?:true|yes|on|1)\z/); + return PERM_UMASK if ($perm =~ /\A(?:false|no|off|0)\z/); + + my $i = oct($perm); + return PERM_UMASK if ($i == PERM_UMASK); + return PERM_GROUP if ($i == OLD_PERM_GROUP); + return PERM_EVERYBODY if ($i == OLD_PERM_EVERYBODY); + + if (($i & 0600) != 0600) { + die "core.sharedRepository mode invalid: ". + sprintf('%.3o', $i) . "\nOwner must have permissions\n"; + } + ($i & 0666); +} + +sub _umask_for { + my ($perm) = @_; # _git_config_perm return value + my $rv = $perm; + return umask if $rv == 0; + + # set +x bit if +r or +w were set + $rv |= 0100 if ($rv & 0600); + $rv |= 0010 if ($rv & 0060); + $rv |= 0001 if ($rv & 0006); + (~$rv & 0777); +} + +sub with_umask { + my ($self, $cb) = @_; + my $old = umask $self->{umask}; + my $rv = eval { $cb->() }; + my $err = $@; + umask $old; + die $err if $err; + $rv; +} + +sub umask_prepare { + my ($self) = @_; + my $perm = _git_config_perm($self); + my $umask = _umask_for($perm); + $self->{umask} = $umask; +} + +1; diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm new file mode 100644 index 00000000..ca6b33f2 --- /dev/null +++ b/lib/PublicInbox/Lock.pm @@ -0,0 +1,31 @@ +# Copyright (C) 2018 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# Base class for per-inbox locking +package PublicInbox::Lock; +use strict; +use warnings; +use Fcntl qw(:flock :DEFAULT); +use Carp qw(croak); + +# we only acquire the flock if creating or reindexing; +# PublicInbox::Import already has the lock on its own. +sub lock_acquire { + my ($self) = @_; + croak 'already locked' if $self->{lockfh}; + my $lock_path = $self->{lock_path} or return; + sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or + die "failed to open lock $lock_path: $!\n"; + flock($lockfh, LOCK_EX) or die "lock failed: $!\n"; + $self->{lockfh} = $lockfh; +} + +sub lock_release { + my ($self) = @_; + return unless $self->{lock_path}; + my $lockfh = delete $self->{lockfh} or croak 'not locked'; + flock($lockfh, LOCK_UN) or die "unlock failed: $!\n"; + close $lockfh or die "close failed: $!\n"; +} + +1; diff --git a/lib/PublicInbox/MDA.pm b/lib/PublicInbox/MDA.pm index d5af8f94..637404eb 100644 --- a/lib/PublicInbox/MDA.pm +++ b/lib/PublicInbox/MDA.pm @@ -81,8 +81,6 @@ sub set_list_headers { $pa =~ tr/@/./; # RFC2919 $simple->header_set("List-Id", "<$pa>"); } - - $simple->header_set($_) foreach @BAD_HEADERS; } 1; diff --git a/lib/PublicInbox/MID.pm b/lib/PublicInbox/MID.pm index 2c9822f4..c82e8401 100644 --- a/lib/PublicInbox/MID.pm +++ b/lib/PublicInbox/MID.pm @@ -6,10 +6,14 @@ package PublicInbox::MID; use strict; use warnings; use base qw/Exporter/; -our @EXPORT_OK = qw/mid_clean id_compress mid2path mid_mime mid_escape MID_ESC/; +our @EXPORT_OK = qw/mid_clean id_compress mid2path mid_mime mid_escape MID_ESC + mids references/; use URI::Escape qw(uri_escape_utf8); use Digest::SHA qw/sha1_hex/; -use constant MID_MAX => 40; # SHA-1 hex length +use constant { + MID_MAX => 40, # SHA-1 hex length # TODO: get rid of this + MAX_MID_SIZE => 244, # max term size (Xapian limitation) - length('Q') +}; sub mid_clean { my ($mid) = @_; @@ -46,7 +50,53 @@ sub mid2path { "$x2/$x38"; } -sub mid_mime ($) { $_[0]->header_obj->header_raw('Message-ID') } +# Only for v1 code paths: +sub mid_mime ($) { mids($_[0]->header_obj)->[0] } + +sub mids ($) { + my ($hdr) = @_; + my @mids; + my @v = $hdr->header_raw('Message-Id'); + foreach my $v (@v) { + my @cur = ($v =~ /<([^>]+)>/sg); + if (@cur) { + push(@mids, @cur); + } else { + push(@mids, $v); + } + } + uniq_mids(\@mids); +} + +# last References should be IRT, but some mail clients do things +# out of order, so trust IRT over References iff IRT exists +sub references ($) { + my ($hdr) = @_; + my @mids; + foreach my $f (qw(References In-Reply-To)) { + my @v = $hdr->header_raw($f); + foreach my $v (@v) { + push(@mids, ($v =~ /<([^>]+)>/sg)); + } + } + uniq_mids(\@mids); +} + +sub uniq_mids ($) { + my ($mids) = @_; + my @ret; + my %seen; + foreach my $mid (@$mids) { + if (length($mid) > MAX_MID_SIZE) { + warn "Message-ID: <$mid> too long, truncating\n"; + $mid = substr($mid, 0, MAX_MID_SIZE); + } + next if $seen{$mid}; + push @ret, $mid; + $seen{$mid} = 1; + } + \@ret; +} # RFC3986, section 3.3: sub MID_ESC () { '^A-Za-z0-9\-\._~!\$\&\';\(\)\*\+,;=:@' } diff --git a/lib/PublicInbox/MIME.pm b/lib/PublicInbox/MIME.pm index 54925a85..456eed64 100644 --- a/lib/PublicInbox/MIME.pm +++ b/lib/PublicInbox/MIME.pm @@ -23,6 +23,8 @@ package PublicInbox::MIME; use strict; use warnings; use base qw(Email::MIME); +use Email::MIME::ContentType; +$Email::MIME::ContentType::STRICT_PARAMS = 0; if ($Email::MIME::VERSION <= 1.937) { sub parts_multipart { diff --git a/lib/PublicInbox/Mbox.pm b/lib/PublicInbox/Mbox.pm index 04c86cc1..11b23022 100644 --- a/lib/PublicInbox/Mbox.pm +++ b/lib/PublicInbox/Mbox.pm @@ -26,12 +26,51 @@ sub subject_fn ($) { $fn eq '' ? 'no-subject' : $fn; } -sub emit1 { - my ($ctx, $msg) = @_; - $msg = Email::Simple->new($msg); - my $fn = subject_fn($msg); +sub mb_stream { + my ($more) = @_; + bless $more, 'PublicInbox::Mbox'; +} + +# called by PSGI server as body response +sub getline { + my ($more) = @_; # self + my ($ctx, $id, $prev, $next, $cur) = @$more; + if ($cur) { # first + pop @$more; + return msg_str($ctx, $cur); + } + $cur = $next or return; + my $ibx = $ctx->{-inbox}; + $next = $ibx->search->next_by_mid($ctx->{mid}, \$id, \$prev); + @$more = ($ctx, $id, $prev, $next); # $next may be undef, here + my $mref = $ibx->msg_by_smsg($cur) or return; + msg_str($ctx, Email::Simple->new($mref)); +} + +sub close {} # noop + +sub emit_raw { + my ($ctx) = @_; + my $mid = $ctx->{mid}; + my $ibx = $ctx->{-inbox}; + my $first; + my $more; + if (my $srch = $ibx->search) { + my ($id, $prev); + my $smsg = $srch->next_by_mid($mid, \$id, \$prev) or return; + my $mref = $ibx->msg_by_smsg($smsg) or return; + $first = Email::Simple->new($mref); + my $next = $srch->next_by_mid($mid, \$id, \$prev); + # $more is for ->getline + $more = [ $ctx, $id, $prev, $next, $first ] if $next; + } else { + my $mref = $ibx->msg_by_mid($mid) or return; + $first = Email::Simple->new($mref); + } + return unless defined $first; + my $fn = subject_fn($first); my @hdr = ('Content-Type'); - if ($ctx->{-inbox}->{obfuscate}) { + if ($ibx->{obfuscate}) { # obfuscation is stupid, but maybe scrapers are, too... push @hdr, 'application/mbox'; $fn .= '.mbox'; @@ -40,14 +79,11 @@ sub emit1 { $fn .= '.txt'; } push @hdr, 'Content-Disposition', "inline; filename=$fn"; - - # single message should be easily renderable in browsers, - # unless obfuscation is enabled :< - [ 200, \@hdr, [ msg_str($ctx, $msg) ] ] + [ 200, \@hdr, $more ? mb_stream($more) : [ msg_str($ctx, $first) ] ]; } sub msg_str { - my ($ctx, $simple) = @_; # Email::Simple object + my ($ctx, $simple, $mid) = @_; # Email::Simple object my $header_obj = $simple->header_obj; # drop potentially confusing headers, ssoma already should've dropped @@ -57,7 +93,7 @@ sub msg_str { } my $ibx = $ctx->{-inbox}; my $base = $ibx->base_url($ctx->{env}); - my $mid = mid_clean($header_obj->header('Message-ID')); + $mid = $ctx->{mid} unless defined $mid; $mid = mid_escape($mid); my @append = ( 'Archived-At', "<$base$mid/>", @@ -93,9 +129,24 @@ sub thread_mbox { my ($ctx, $srch, $sfx) = @_; eval { require IO::Compress::Gzip }; return sub { need_gzip(@_) } if $@; - - my $cb = sub { $srch->get_thread($ctx->{mid}, @_) }; - PublicInbox::MboxGz->response($ctx, $cb); + my $mid = $ctx->{mid}; + my $msgs = $srch->get_thread($mid, {}); + return [404, [qw(Content-Type text/plain)], []] if !@$msgs; + my $prev = $msgs->[-1]; + my $i = 0; + my $cb = sub { + while (1) { + if (my $smsg = $msgs->[$i++]) { + return $smsg; + } + # refill result set + $msgs = $srch->get_thread($mid, $prev); + return unless @$msgs; + $prev = $msgs->[-1]; + $i = 0; + } + }; + PublicInbox::MboxGz->response($ctx, $cb, $msgs->[0]->subject); } sub emit_range { @@ -110,12 +161,56 @@ sub emit_range { mbox_all($ctx, $query); } +sub mbox_all_ids { + my ($ctx) = @_; + my $prev = 0; + my $ids = $ctx->{-inbox}->mm->ids_after(\$prev) or return + [404, [qw(Content-Type text/plain)], ["No results found\n"]]; + my $i = 0; + my $over = $ctx->{srch}->{over_ro}; + my $cb = sub { + do { + while ((my $num = $ids->[$i++])) { + my $smsg = $over->get_art($num) or next; + return $smsg; + } + $ids = $ctx->{-inbox}->mm->ids_after(\$prev); + $i = 0; + } while (@$ids); + undef; + }; + return PublicInbox::MboxGz->response($ctx, $cb, 'all'); +} + sub mbox_all { my ($ctx, $query) = @_; eval { require IO::Compress::Gzip }; return sub { need_gzip(@_) } if $@; - my $cb = sub { $ctx->{srch}->query($query, @_) }; + return mbox_all_ids($ctx) if $query eq ''; + my $opts = { mset => 2 }; + my $srch = $ctx->{srch}; + my $mset = $srch->query($query, $opts); + $opts->{offset} = $mset->size or + return [404, [qw(Content-Type text/plain)], + ["No results found\n"]]; + my $i = 0; + my $cb = sub { # called by MboxGz->getline + while (1) { + while (my $mi = (($mset->items)[$i++])) { + my $doc = $mi->get_document; + my $smsg = $srch->retry_reopen(sub { + PublicInbox::SearchMsg->load_doc($doc); + }) or next; + return $smsg; + } + # refill result set + $mset = $srch->query($query, $opts); + my $size = $mset->size or return; + $opts->{offset} += $size; + $i = 0; + } + }; PublicInbox::MboxGz->response($ctx, $cb, 'results-'.$query); } @@ -146,8 +241,6 @@ sub new { gz => IO::Compress::Gzip->new(\$buf, Time => 0), cb => $cb, ctx => $ctx, - msgs => [], - opts => { offset => 0 }, }, $class; } @@ -155,63 +248,35 @@ sub response { my ($class, $ctx, $cb, $fn) = @_; my $body = $class->new($ctx, $cb); # http://www.iana.org/assignments/media-types/application/gzip - $body->{hdr} = [ 'Content-Type', 'application/gzip' ]; - $body->{fn} = $fn; - # fill in Content-Disposition filename - my $hdr = $body->getline; - if ($body->{hdr}) { - return [ 404, ['Content-Type','text/plain'], - [ "No results found\n" ] ]; + my @h = qw(Content-Type application/gzip); + if ($fn) { + $fn = to_filename($fn); + push @h, 'Content-Disposition', "inline; filename=$fn.mbox.gz"; } - [ 200, $hdr, $body ]; -} - -sub set_filename ($$) { - my ($fn, $msg) = @_; - return to_filename($fn) if defined($fn); - - PublicInbox::Mbox::subject_fn($msg); + [ 200, \@h, $body ]; } # called by Plack::Util::foreach or similar sub getline { my ($self) = @_; my $ctx = $self->{ctx} or return; - my $res; - my $ibx = $ctx->{-inbox}; - my $gz = $self->{gz}; - do { - # work on existing result set - while (defined(my $smsg = shift @{$self->{msgs}})) { - my $msg = eval { $ibx->msg_by_smsg($smsg) } or next; - $msg = Email::Simple->new($msg); - $gz->write(PublicInbox::Mbox::msg_str($ctx, $msg)); - - # use subject of first message as subject - if (my $hdr = delete $self->{hdr}) { - my $fn = set_filename($self->{fn}, $msg); - push @$hdr, 'Content-Disposition', - "inline; filename=$fn.mbox.gz"; - return $hdr; - } - my $bref = $self->{buf}; - if (length($$bref) >= 8192) { - my $ret = $$bref; # copy :< - ${$self->{buf}} = ''; - return $ret; - } - - # be fair to other clients on public-inbox-httpd: - return ''; + while (my $smsg = $self->{cb}->()) { + my $msg = $ctx->{-inbox}->msg_by_smsg($smsg) or next; + $msg = Email::Simple->new($msg); + $self->{gz}->write(PublicInbox::Mbox::msg_str($ctx, $msg, + $smsg->{mid})); + my $bref = $self->{buf}; + if (length($$bref) >= 8192) { + my $ret = $$bref; # copy :< + ${$self->{buf}} = ''; + return $ret; } - # refill result set - $res = $self->{cb}->($self->{opts}); - $self->{msgs} = $res->{msgs}; - $res = scalar @{$self->{msgs}}; - $self->{opts}->{offset} += $res; - } while ($res); - $gz->close; + # be fair to other clients on public-inbox-httpd: + return ''; + } + delete($self->{gz})->close; + # signal that we're done and can return undef next call: delete $self->{ctx}; ${delete $self->{buf}}; } diff --git a/lib/PublicInbox/MsgTime.pm b/lib/PublicInbox/MsgTime.pm new file mode 100644 index 00000000..c67a41ff --- /dev/null +++ b/lib/PublicInbox/MsgTime.pm @@ -0,0 +1,82 @@ +# Copyright (C) 2018 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +package PublicInbox::MsgTime; +use strict; +use warnings; +use base qw(Exporter); +our @EXPORT_OK = qw(msg_timestamp msg_datestamp); +use Date::Parse qw(str2time); +use Time::Zone qw(tz_offset); + +sub zone_clamp ($) { + my ($zone) = @_; + $zone ||= '+0000'; + # "-1200" is the furthest westermost zone offset, + # but git fast-import is liberal so we use "-1400" + if ($zone >= 1400 || $zone <= -1400) { + warn "bogus TZ offset: $zone, ignoring and assuming +0000\n"; + $zone = '+0000'; + } + $zone; +} + +sub time_response ($) { + my ($ret) = @_; + wantarray ? @$ret : $ret->[0]; +} + +sub msg_received_at ($) { + my ($hdr) = @_; # Email::MIME::Header + my @recvd = $hdr->header_raw('Received'); + my ($ts, $zone); + foreach my $r (@recvd) { + $zone = undef; + $r =~ /\s*(\d+\s+[[:alpha:]]+\s+\d{2,4}\s+ + \d+\D\d+(?:\D\d+)\s+([\+\-]\d+))/sx or next; + $zone = $2; + $ts = eval { str2time($1) } and last; + my $mid = $hdr->header_raw('Message-ID'); + warn "no date in $mid Received: $r\n"; + } + defined $ts ? [ $ts, zone_clamp($zone) ] : undef; +} + +sub msg_date_only ($) { + my ($hdr) = @_; # Email::MIME::Header + my @date = $hdr->header_raw('Date'); + my ($ts, $zone); + foreach my $d (@date) { + $zone = undef; + # Y2K problems: 3-digit years + $d =~ s!([A-Za-z]{3}) (\d{3}) (\d\d:\d\d:\d\d)! + my $yyyy = $2 + 1900; "$1 $yyyy $3"!e; + $ts = eval { str2time($d) }; + if ($@) { + my $mid = $hdr->header_raw('Message-ID'); + warn "bad Date: $d in $mid: $@\n"; + } elsif ($d =~ /\s+([\+\-]\d+)\s*\z/) { + $zone = $1; + } + } + defined $ts ? [ $ts, zone_clamp($zone) ] : undef; +} + +# Favors Received header for sorting globally +sub msg_timestamp ($) { + my ($hdr) = @_; # Email::MIME::Header + my $ret; + $ret = msg_received_at($hdr) and return time_response($ret); + $ret = msg_date_only($hdr) and return time_response($ret); + wantarray ? (time, '+0000') : time; +} + +# Favors the Date: header for display and sorting within a thread +sub msg_datestamp ($) { + my ($hdr) = @_; # Email::MIME::Header + my $ret; + $ret = msg_date_only($hdr) and return time_response($ret); + $ret = msg_received_at($hdr) and return time_response($ret); + wantarray ? (time, '+0000') : time; +} + +1; diff --git a/lib/PublicInbox/Msgmap.pm b/lib/PublicInbox/Msgmap.pm index 6b6d1c6e..ec3d4f9d 100644 --- a/lib/PublicInbox/Msgmap.pm +++ b/lib/PublicInbox/Msgmap.pm @@ -12,6 +12,7 @@ use strict; use warnings; use DBI; use DBD::SQLite; +use File::Temp qw(tempfile); sub new { my ($class, $git_dir, $writable) = @_; @@ -23,9 +24,8 @@ sub new { new_file($class, "$d/msgmap.sqlite3", $writable); } -sub new_file { - my ($class, $f, $writable) = @_; - +sub dbh_new { + my ($f, $writable) = @_; my $dbh = DBI->connect("dbi:SQLite:dbname=$f",'','', { AutoCommit => 1, RaiseError => 1, @@ -34,6 +34,14 @@ sub new_file { sqlite_use_immediate_transaction => 1, }); $dbh->do('PRAGMA case_sensitive_like = ON'); + $dbh; +} + +sub new_file { + my ($class, $f, $writable) = @_; + return if !$writable && !-r $f; + + my $dbh = dbh_new($f, $writable); my $self = bless { dbh => $dbh }, $class; if ($writable) { @@ -45,6 +53,19 @@ sub new_file { $self; } +# used to keep track of used numeric mappings for v2 reindex +sub tmp_clone { + my ($self) = @_; + my ($fh, $fn) = tempfile('msgmap-XXXXXXXX', EXLOCK => 0, TMPDIR => 1); + $self->{dbh}->sqlite_backup_to_file($fn); + my $tmp = ref($self)->new_file($fn, 1); + $tmp->{dbh}->do('PRAGMA synchronous = OFF'); + $tmp->{tmp_name} = $fn; # SQLite won't work if unlinked, apparently + $tmp->{pid} = $$; + close $fh or die "failed to close $fn: $!"; + $tmp; +} + # n.b. invoked directly by scripts/xhdr-num2mid sub meta_accessor { my ($self, $key, $value) = @_; @@ -71,6 +92,14 @@ sub last_commit { $self->meta_accessor('last_commit', $commit); } +# v2 uses this to keep track of how up-to-date Xapian is +# old versions may be automatically GC'ed away in the future, +# but it's a trivial amount of storage. +sub last_commit_xap { + my ($self, $version, $i, $commit) = @_; + $self->meta_accessor("last_xap$version-$i", $commit); +} + sub created_at { my ($self, $second) = @_; $self->meta_accessor('created_at', $second); @@ -79,10 +108,10 @@ sub created_at { sub mid_insert { my ($self, $mid) = @_; my $dbh = $self->{dbh}; - my $sql = 'INSERT OR IGNORE INTO msgmap (mid) VALUES (?)'; - my $sth = $self->{mid_insert} ||= $dbh->prepare($sql); - $sth->bind_param(1, $mid); - return if $sth->execute == 0; + my $sth = $dbh->prepare_cached(<<''); +INSERT OR IGNORE INTO msgmap (mid) VALUES (?) + + return if $sth->execute($mid) == 0; $dbh->last_insert_id(undef, undef, 'msgmap', 'num'); } @@ -109,10 +138,14 @@ sub num_for { sub minmax { my ($self) = @_; my $dbh = $self->{dbh}; - my $sth = $self->{num_minmax} ||= - $dbh->prepare('SELECT MIN(num),MAX(num) FROM msgmap'); + # breaking MIN and MAX into separate queries speeds up from 250ms + # to around 700us with 2.7million messages. + my $sth = $dbh->prepare_cached('SELECT MIN(num) FROM msgmap', undef, 1); $sth->execute; - $sth->fetchrow_array; + my $min = $sth->fetchrow_array; + $sth = $dbh->prepare_cached('SELECT MAX(num) FROM msgmap', undef, 1); + $sth->execute; + ($min, $sth->fetchrow_array); } sub mid_prefixes { @@ -140,6 +173,14 @@ sub mid_delete { $sth->execute; } +sub num_delete { + my ($self, $num) = @_; + my $dbh = $self->{dbh}; + my $sth = $dbh->prepare('DELETE FROM msgmap WHERE num = ?'); + $sth->bind_param(1, $num); + $sth->execute; +} + sub create_tables { my ($dbh) = @_; my $e; @@ -157,17 +198,26 @@ sub create_tables { } # used by NNTP.pm -sub id_batch { - my ($self, $num, $cb) = @_; +sub ids_after { + my ($self, $num) = @_; + my $ids = $self->{dbh}->selectcol_arrayref(<<'', undef, $$num); +SELECT num FROM msgmap WHERE num > ? +ORDER BY num ASC LIMIT 1000 + + $$num = $ids->[-1] if @$ids; + $ids; +} + +sub msg_range { + my ($self, $beg, $end) = @_; my $dbh = $self->{dbh}; - my $sth = $dbh->prepare('SELECT num FROM msgmap WHERE num > ? '. - 'ORDER BY num ASC LIMIT 1000'); - $sth->execute($num); - my $ary = $sth->fetchall_arrayref; - @$ary = map { $_->[0] } @$ary; - my $nr = scalar @$ary; - $cb->($ary) if $nr; - $nr; + my $attr = { Columns => [] }; + my $mids = $dbh->selectall_arrayref(<<'', $attr, $$beg, $end); +SELECT num,mid FROM msgmap WHERE num >= ? AND num <= ? +ORDER BY num ASC + + $$beg = $mids->[-1]->[0] + 1 if @$mids; + $mids } # only used for mapping external serial numbers (e.g. articles from gmane) @@ -181,4 +231,31 @@ sub mid_set { $sth->execute($num, $mid); } +sub DESTROY { + my ($self) = @_; + delete $self->{dbh}; + my $f = delete $self->{tmp_name}; + if (defined $f && $self->{pid} == $$) { + unlink $f or warn "failed to unlink $f: $!\n"; + } +} + +sub atfork_parent { + my ($self) = @_; + my $f = $self->{tmp_name} or die "not a temporary clone\n"; + delete $self->{dbh} and die "tmp_clone dbh not prepared for parent"; + my $dbh = $self->{dbh} = dbh_new($f, 1); + $dbh->do('PRAGMA synchronous = OFF'); +} + +sub atfork_prepare { + my ($self) = @_; + my $f = $self->{tmp_name} or die "not a temporary clone\n"; + $self->{pid} == $$ or + die "BUG: atfork_prepare not called from $self->{pid}\n"; + $self->{dbh} or die "temporary clone not open\n"; + # must clobber prepared statements + %$self = (tmp_name => $f, pid => $$); +} + 1; diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index c574c9e6..cdbd8e98 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -34,7 +34,6 @@ my $LIST_HEADERS = join("\r\n", @OVERVIEW, qw(:bytes :lines Xref To Cc)) . "\r\n"; # disable commands with easy DoS potential: -# LISTGROUP could get pretty bad, too... my %DISABLED; # = map { $_ => 1 } qw(xover list_overview_fmt newnews xhdr); my $EXPMAP; # fd -> [ idle_time, $self ] @@ -226,15 +225,12 @@ sub cmd_listgroup ($;$) { } $self->{ng} or return '412 no newsgroup selected'; - long_response($self, 0, long_response_limit, sub { - my ($i) = @_; - my $nr = $self->{ng}->mm->id_batch($$i, sub { - my ($ary) = @_; - more($self, join("\r\n", @$ary)); - }); - - # -1 to adjust for implicit increment in long_response - $$i = $nr ? $$i + $nr - 1 : long_response_limit; + my $n = 0; + long_response($self, sub { + my $ary = $self->{ng}->mm->ids_after(\$n); + scalar @$ary or return; + more($self, join("\r\n", @$ary)); + 1; }); } @@ -332,24 +328,22 @@ sub cmd_newnews ($$$$;$$) { }; return '.' unless @srch; - $ts .= '..'; - my $opts = { asc => 1, limit => 1000, offset => 0 }; - long_response($self, 0, long_response_limit, sub { - my ($i) = @_; + my $prev = 0; + long_response($self, sub { my $srch = $srch[0]; - my $res = $srch->query_ts($ts, $opts); - my $msgs = $res->{msgs}; - if (my $nr = scalar @$msgs) { + my $msgs = $srch->query_ts($ts, $prev); + if (scalar @$msgs) { more($self, '<' . join(">\r\n<", map { $_->mid } @$msgs ). '>'); - $opts->{offset} += $nr; + $prev = $msgs->[-1]->{num}; } else { shift @srch; if (@srch) { # continue onto next newsgroup - $opts->{offset} = 0; + $prev = 0; + return 1; } else { # break out of the long response. - $$i = long_response_limit; + return; } } }); @@ -414,12 +408,30 @@ sub header_append ($$$) { $hdr->header_set($k, @v, $v); } -sub set_nntp_headers { - my ($hdr, $ng, $n, $mid) = @_; +sub xref ($$$$) { + my ($self, $ng, $n, $mid) = @_; + my $ret = "$ng->{domain} $ng->{newsgroup}:$n"; + + # num_for is pretty cheap and sometimes we'll lookup the existence + # of an article without getting even the OVER info. In other words, + # I'm not sure if its worth optimizing by scanning To:/Cc: and + # PublicInbox::ExtMsg on the PSGI end is just as expensive + foreach my $other (@{$self->{nntpd}->{grouplist}}) { + next if $ng eq $other; + my $num = eval { $other->mm->num_for($mid) } or next; + $ret .= " $other->{newsgroup}:$num"; + } + $ret; +} + +sub set_nntp_headers ($$$$$) { + my ($self, $hdr, $ng, $n, $mid) = @_; # clobber some - $hdr->header_set('Newsgroups', $ng->{newsgroup}); - $hdr->header_set('Xref', xref($ng, $n)); + my $xref = xref($self, $ng, $n, $mid); + $hdr->header_set('Xref', $xref); + $xref =~ s/:\d+//g; + $hdr->header_set('Newsgroups', (split(/ /, $xref, 2))[1]); header_append($hdr, 'List-Post', "<mailto:$ng->{-primary_address}>"); if (my $url = $ng->base_url) { $mid = mid_escape($mid); @@ -464,18 +476,16 @@ find_mid: defined $mid or return $err; } found: - my $bytes; - my $s = eval { $ng->msg_by_mid($mid, \$bytes) } or return $err; - $s = Email::Simple->new($s); - my $lines; + my $smsg = $ng->search->{over_ro}->get_art($n) or return $err; + my $msg = $ng->msg_by_smsg($smsg) or return $err; + my $s = Email::Simple->new($msg); if ($set_headers) { - set_nntp_headers($s->header_obj, $ng, $n, $mid); - $lines = $s->body =~ tr!\n!\n!; + set_nntp_headers($self, $s->header_obj, $ng, $n, $mid); # must be last $s->body_set('') if ($set_headers == 2); } - [ $n, $mid, $s, $bytes, $lines, $ng ]; + [ $n, $mid, $s, $smsg->bytes, $smsg->lines, $ng ]; } sub simple_body_write ($$) { @@ -573,8 +583,8 @@ sub get_range ($$) { [ $beg, $end ]; } -sub long_response ($$$$) { - my ($self, $beg, $end, $cb) = @_; +sub long_response ($$) { + my ($self, $cb) = @_; die "BUG: nested long response" if $self->{long_res}; my $fd = $self->{fd}; @@ -585,24 +595,14 @@ sub long_response ($$$$) { $self->watch_read(0); my $t0 = now(); $self->{long_res} = sub { - # limit our own running time for fairness with other - # clients and to avoid buffering too much: - my $lim = 100; - - my $err; - do { - eval { $cb->(\$beg, \$lim) }; - } until (($err = $@) || $self->{closed} || - ++$beg > $end || --$lim < 0 || - $self->{write_buf_size}); - - if ($err || $self->{closed}) { + my $more = eval { $cb->() }; + if ($@ || $self->{closed}) { $self->{long_res} = undef; - if ($err) { + if ($@) { err($self, "%s during long response[$fd] - %0.6f", - $err, now() - $t0); + $@, now() - $t0); } if ($self->{closed}) { out($self, " deferred[$fd] aborted - %0.6f", @@ -611,7 +611,7 @@ sub long_response ($$$$) { update_idle_time($self); $self->watch_read(1); } - } elsif ($lim < 0 || $self->{write_buf_size}) { + } elsif ($more) { # $self->{write_buf_size}: # no recursion, schedule another call ASAP # but only after all pending writes are done update_idle_time($self); @@ -643,19 +643,17 @@ sub hdr_message_id ($$$) { # optimize XHDR Message-ID [range] for slrnpull. my $mm = $self->{ng}->mm; my ($beg, $end) = @$r; more($self, $xhdr ? r221 : r225); - long_response($self, $beg, $end, sub { - my ($i) = @_; - my $mid = $mm->mid_for($$i); - more($self, "$$i <$mid>") if defined $mid; + long_response($self, sub { + my $r = $mm->msg_range(\$beg, $end); + @$r or return; + more($self, join("\r\n", map { + "$_->[0] <$_->[1]>" + } @$r)); + 1; }); } } -sub xref ($$) { - my ($ng, $n) = @_; - "$ng->{domain} $ng->{newsgroup}:$n" -} - sub mid_lookup ($$) { my ($self, $mid) = @_; my $self_ng = $self->{ng}; @@ -675,9 +673,11 @@ sub hdr_xref ($$$) { # optimize XHDR Xref [range] for rtin my ($self, $xhdr, $range) = @_; if (defined $range && $range =~ /\A<(.+)>\z/) { # Message-ID - my ($ng, $n) = mid_lookup($self, $1); + my $mid = $1; + my ($ng, $n) = mid_lookup($self, $mid); return r430 unless $n; - hdr_mid_response($self, $xhdr, $ng, $n, $range, xref($ng, $n)); + hdr_mid_response($self, $xhdr, $ng, $n, $range, + xref($self, $ng, $n, $mid)); } else { # numeric range $range = $self->{article} unless defined $range; my $r = get_range($self, $range); @@ -686,26 +686,31 @@ sub hdr_xref ($$$) { # optimize XHDR Xref [range] for rtin my $mm = $ng->mm; my ($beg, $end) = @$r; more($self, $xhdr ? r221 : r225); - long_response($self, $beg, $end, sub { - my ($i) = @_; - my $mid = $mm->mid_for($$i); - more($self, "$$i ".xref($ng, $$i)) if defined $mid; + long_response($self, sub { + my $r = $mm->msg_range(\$beg, $end); + @$r or return; + more($self, join("\r\n", map { + my $num = $_->[0]; + "$num ".xref($self, $ng, $num, $_->[1]); + } @$r)); + 1; }); } } sub search_header_for { - my ($srch, $mid, $field) = @_; - my $smsg = $srch->lookup_mail($mid) or return; - $smsg->$field; + my ($srch, $num, $field) = @_; + my $smsg = $srch->{over_ro}->get_art($num) or return; + return PublicInbox::SearchMsg::date($smsg) if $field eq 'date'; + $smsg->{$field}; } sub hdr_searchmsg ($$$$) { my ($self, $xhdr, $field, $range) = @_; if (defined $range && $range =~ /\A<(.+)>\z/) { # Message-ID my ($ng, $n) = mid_lookup($self, $1); - return r430 unless $n; - my $v = search_header_for($ng->search, $range, $field); + return r430 unless defined $n; + my $v = search_header_for($ng->search, $n, $field); hdr_mid_response($self, $xhdr, $ng, $n, $range, $v); } else { # numeric range $range = $self->{article} unless defined $range; @@ -715,22 +720,17 @@ sub hdr_searchmsg ($$$$) { return $r unless ref $r; my ($beg, $end) = @$r; more($self, $xhdr ? r221 : r225); - my $off = 0; - long_response($self, $beg, $end, sub { - my ($i, $lim) = @_; - my $res = $srch->query_xover($beg, $end, $off); - my $msgs = $res->{msgs}; + my $cur = $beg; + long_response($self, sub { + my $msgs = $srch->query_xover($cur, $end); my $nr = scalar @$msgs or return; - $off += $nr; - $$lim -= $nr; my $tmp = ''; foreach my $s (@$msgs) { - $tmp .= $s->num . ' ' . $s->$field . "\r\n"; + $tmp .= $s->{num} . ' ' . $s->$field . "\r\n"; } utf8::encode($tmp); do_more($self, $tmp); - # -1 to adjust for implicit increment in long_response - $$i = $nr ? $$i + $nr - 1 : long_response_limit; + $cur = $msgs->[-1]->{num} + 1; }); } } @@ -804,11 +804,11 @@ sub cmd_xrover ($;$) { my $mm = $ng->mm; my $srch = $ng->search; more($self, '224 Overview information follows'); - long_response($self, $beg, $end, sub { - my ($i) = @_; - my $mid = $mm->mid_for($$i) or return; - my $h = search_header_for($srch, $mid, 'references'); - more($self, "$$i $h"); + + long_response($self, sub { + my $h = search_header_for($srch, $beg, 'references'); + more($self, "$beg $h") if defined($h); + $beg++ < $end; }); } @@ -820,10 +820,10 @@ sub over_line ($$) { $smsg->{subject}, $smsg->{from}, PublicInbox::SearchMsg::date($smsg), - '<'.PublicInbox::SearchMsg::mid($smsg).'>', + "<$smsg->{mid}>", $smsg->{references}, - PublicInbox::SearchMsg::bytes($smsg), - PublicInbox::SearchMsg::lines($smsg)); + $smsg->{bytes}, + $smsg->{lines}); utf8::encode($s); $s } @@ -832,8 +832,8 @@ sub cmd_over ($;$) { my ($self, $range) = @_; if ($range && $range =~ /\A<(.+)>\z/) { my ($ng, $n) = mid_lookup($self, $1); - my $smsg = $ng->search->lookup_mail($range) or - return '430 No article with that message-id'; + defined $n or return r430; + my $smsg = $ng->search->{over_ro}->get_art($n) or return r430; more($self, '224 Overview information follows (multi-line)'); # Only set article number column if it's the current group @@ -854,22 +854,16 @@ sub cmd_xover ($;$) { my ($beg, $end) = @$r; more($self, "224 Overview information follows for $beg to $end"); my $srch = $self->{ng}->search; - my $off = 0; - long_response($self, $beg, $end, sub { - my ($i, $lim) = @_; - my $res = $srch->query_xover($beg, $end, $off); - my $msgs = $res->{msgs}; + my $cur = $beg; + long_response($self, sub { + my $msgs = $srch->query_xover($cur, $end); my $nr = scalar @$msgs or return; - $off += $nr; - $$lim -= $nr; # OVERVIEW.FMT more($self, join("\r\n", map { - over_line(PublicInbox::SearchMsg::num($_), $_); + over_line($_->{num}, $_); } @$msgs)); - - # -1 to adjust for implicit increment in long_response - $$i = $nr ? $$i + $nr - 1 : long_response_limit; + $cur = $msgs->[-1]->{num} + 1; }); } diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm new file mode 100644 index 00000000..07e54b64 --- /dev/null +++ b/lib/PublicInbox/Over.pm @@ -0,0 +1,183 @@ +# Copyright (C) 2018 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI +# Unlike Msgmap, this is an _UNSTABLE_ database which can be +# tweaked/updated over time and rebuilt. +package PublicInbox::Over; +use strict; +use warnings; +use DBI; +use DBD::SQLite; +use PublicInbox::SearchMsg; +use Compress::Zlib qw(uncompress); + +sub dbh_new { + my ($self) = @_; + my $ro = ref($self) eq 'PublicInbox::Over'; + my $dbh = DBI->connect("dbi:SQLite:dbname=$self->{filename}",'','', { + AutoCommit => 1, + RaiseError => 1, + PrintError => 0, + ReadOnly => $ro, + sqlite_use_immediate_transaction => 1, + }); + $dbh->{sqlite_unicode} = 1; + $dbh; +} + +sub new { + my ($class, $f) = @_; + bless { filename => $f }, $class; +} + +sub disconnect { $_[0]->{dbh} = undef } + +sub connect { $_[0]->{dbh} ||= $_[0]->dbh_new } + +sub load_from_row { + my ($smsg) = @_; + bless $smsg, 'PublicInbox::SearchMsg'; + if (defined(my $data = delete $smsg->{ddd})) { + $data = uncompress($data); + utf8::decode($data); + $smsg->load_from_data($data); + } + $smsg +} + +sub do_get { + my ($self, $sql, $opts, @args) = @_; + my $dbh = $self->connect; + my $lim = (($opts->{limit} || 0) + 0) || 1000; + $sql .= "LIMIT $lim"; + my $msgs = $dbh->selectall_arrayref($sql, { Slice => {} }, @args); + load_from_row($_) for @$msgs; + $msgs +} + +sub query_xover { + my ($self, $beg, $end) = @_; + do_get($self, <<'', {}, $beg, $end); +SELECT num,ts,ds,ddd FROM over WHERE num >= ? AND num <= ? +ORDER BY num ASC + +} + +sub query_ts { + my ($self, $ts, $prev) = @_; + do_get($self, <<'', {}, $ts, $prev); +SELECT num,ddd FROM over WHERE ts >= ? AND num > ? +ORDER BY num ASC + +} + +sub nothing () { wantarray ? (0, []) : [] }; + +sub get_thread { + my ($self, $mid, $prev) = @_; + my $dbh = $self->connect; + + my $id = $dbh->selectrow_array(<<'', undef, $mid); +SELECT id FROM msgid WHERE mid = ? LIMIT 1 + + defined $id or return nothing; + + my $num = $dbh->selectrow_array(<<'', undef, $id); +SELECT num FROM id2num WHERE id = ? AND num > 0 +ORDER BY num ASC LIMIT 1 + + defined $num or return nothing; + + my ($tid, $sid) = $dbh->selectrow_array(<<'', undef, $num); +SELECT tid,sid FROM over WHERE num = ? LIMIT 1 + + defined $tid or return nothing; # $sid may be undef + my $sort_col = 'ds'; + $num = 0; + if ($prev) { + $num = $prev->{num} || 0; + $sort_col = 'num'; + } + my $cond = '(tid = ? OR sid = ?) AND num > ?'; + my $msgs = do_get($self, <<"", {}, $tid, $sid, $num); +SELECT num,ts,ds,ddd FROM over WHERE $cond ORDER BY $sort_col ASC + + return $msgs unless wantarray; + + my $nr = $dbh->selectrow_array(<<"", undef, $tid, $sid, $num); +SELECT COUNT(num) FROM over WHERE $cond + + ($nr, $msgs); +} + +sub recent { + my ($self, $opts, $after, $before) = @_; + my ($s, @v); + if (defined($before)) { + if (defined($after)) { + $s = 'num > 0 AND ts >= ? AND ts <= ? ORDER BY ts DESC'; + @v = ($after, $before); + } else { + $s = 'num > 0 AND ts <= ? ORDER BY ts DESC'; + @v = ($before); + } + } else { + if (defined($after)) { + $s = 'num > 0 AND ts >= ? ORDER BY ts ASC'; + @v = ($after); + } else { + $s = 'num > 0 ORDER BY ts DESC'; + } + } + my $msgs = do_get($self, <<"", $opts, @v); +SELECT ts,ds,ddd FROM over WHERE $s + + return $msgs unless wantarray; + + my $nr = $self->{dbh}->selectrow_array(<<''); +SELECT COUNT(num) FROM over WHERE num > 0 + + ($nr, $msgs); +} + +sub get_art { + my ($self, $num) = @_; + my $dbh = $self->connect; + my $smsg = $dbh->selectrow_hashref(<<'', undef, $num); +SELECT num,ds,ts,ddd FROM over WHERE num = ? LIMIT 1 + + return load_from_row($smsg) if $smsg; + undef; +} + +sub next_by_mid { + my ($self, $mid, $id, $prev) = @_; + my $dbh = $self->connect; + + unless (defined $$id) { + my $sth = $dbh->prepare_cached(<<'', undef, 1); + SELECT id FROM msgid WHERE mid = ? LIMIT 1 + + $sth->execute($mid); + $$id = $sth->fetchrow_array; + defined $$id or return; + } + my $sth = $dbh->prepare_cached(<<"", undef, 1); +SELECT num FROM id2num WHERE id = ? AND num > ? +ORDER BY num ASC LIMIT 1 + + $$prev ||= 0; + $sth->execute($$id, $$prev); + my $num = $sth->fetchrow_array or return; + $$prev = $num; + + $sth = $dbh->prepare_cached(<<"", undef, 1); +SELECT num,ts,ds,ddd FROM over WHERE num = ? LIMIT 1 + + $sth->execute($num); + my $smsg = $sth->fetchrow_hashref or return; + load_from_row($smsg); +} + +1; diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm new file mode 100644 index 00000000..62fec0da --- /dev/null +++ b/lib/PublicInbox/OverIdx.pm @@ -0,0 +1,418 @@ +# Copyright (C) 2018 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI +# Unlike Msgmap, this is an _UNSTABLE_ cache which can be +# tweaked/updated over time and rebuilt. +# +# Ghost messages (messages which are only referenced in References/In-Reply-To) +# are denoted by a negative NNTP article number. +package PublicInbox::OverIdx; +use strict; +use warnings; +use base qw(PublicInbox::Over); +use IO::Handle; +use DBI qw(:sql_types); # SQL_BLOB +use PublicInbox::MID qw/id_compress mids references/; +use PublicInbox::SearchMsg; +use Compress::Zlib qw(compress); +use PublicInbox::Search; + +sub dbh_new { + my ($self) = @_; + my $dbh = $self->SUPER::dbh_new; + $dbh->do('PRAGMA journal_mode = TRUNCATE'); + $dbh->do('PRAGMA cache_size = 80000'); + create_tables($dbh); + $dbh; +} + +sub get_counter ($$) { + my ($dbh, $key) = @_; + my $sth = $dbh->prepare_cached(<<'', undef, 1); +SELECT val FROM counter WHERE key = ? LIMIT 1 + + $sth->execute($key); + $sth->fetchrow_array; +} + +sub adj_counter ($$$) { + my ($self, $key, $op) = @_; + my $dbh = $self->{dbh}; + my $sth = $dbh->prepare_cached(<<""); +UPDATE counter SET val = val $op 1 WHERE key = ? + + $sth->execute($key); + + get_counter($dbh, $key); +} + +sub next_tid { adj_counter($_[0], 'thread', '+') } +sub next_ghost_num { adj_counter($_[0], 'ghost', '-') } + +sub id_for ($$$$$) { + my ($self, $tbl, $id_col, $val_col, $val) = @_; + my $dbh = $self->{dbh}; + my $in = $dbh->prepare_cached(<<"")->execute($val); +INSERT OR IGNORE INTO $tbl ($val_col) VALUES (?) + + if ($in == 0) { + my $sth = $dbh->prepare_cached(<<"", undef, 1); +SELECT $id_col FROM $tbl WHERE $val_col = ? LIMIT 1 + + $sth->execute($val); + $sth->fetchrow_array; + } else { + $dbh->last_insert_id(undef, undef, $tbl, $id_col); + } +} + +sub sid { + my ($self, $path) = @_; + return unless defined $path && $path ne ''; + id_for($self, 'subject', 'sid', 'path' => $path); +} + +sub mid2id { + my ($self, $mid) = @_; + id_for($self, 'msgid', 'id', 'mid' => $mid); +} + +sub delete_by_num { + my ($self, $num) = @_; + my $dbh = $self->{dbh}; + foreach (qw(over id2num)) { + $dbh->prepare_cached(<<"")->execute($num); +DELETE FROM $_ WHERE num = ? + + } +} + +# this includes ghosts +sub each_by_mid { + my ($self, $mid, $cols, $cb) = @_; + my $dbh = $self->{dbh}; + +=over + I originally wanted to stuff everything into a single query: + + SELECT over.* FROM over + LEFT JOIN id2num ON over.num = id2num.num + LEFT JOIN msgid ON msgid.id = id2num.id + WHERE msgid.mid = ? AND over.num >= ? + ORDER BY over.num ASC + LIMIT 1000 + + But it's faster broken out (and we're always in a + transaction for subroutines in this file) +=cut + + my $sth = $dbh->prepare_cached(<<'', undef, 1); +SELECT id FROM msgid WHERE mid = ? LIMIT 1 + + $sth->execute($mid); + my $id = $sth->fetchrow_array; + defined $id or return; + + push(@$cols, 'num'); + $cols = join(',', map { $_ } @$cols); + my $lim = 10; + my $prev = get_counter($dbh, 'ghost'); + while (1) { + $sth = $dbh->prepare_cached(<<"", undef, 1); +SELECT num FROM id2num WHERE id = ? AND num >= ? +ORDER BY num ASC +LIMIT $lim + + $sth->execute($id, $prev); + my $nums = $sth->fetchall_arrayref; + my $nr = scalar(@$nums) or return; + $prev = $nums->[-1]->[0]; + + $sth = $dbh->prepare_cached(<<"", undef, 1); +SELECT $cols FROM over WHERE over.num = ? LIMIT 1 + + foreach (@$nums) { + $sth->execute($_->[0]); + my $smsg = $sth->fetchrow_hashref; + $cb->(PublicInbox::Over::load_from_row($smsg)) or + return; + } + return if $nr != $lim; + } +} + +# this will create a ghost as necessary +sub resolve_mid_to_tid { + my ($self, $mid) = @_; + my $tid; + each_by_mid($self, $mid, ['tid'], sub { + my ($smsg) = @_; + my $cur_tid = $smsg->{tid}; + if (defined $tid) { + merge_threads($self, $tid, $cur_tid); + } else { + $tid = $cur_tid; + } + 1; + }); + defined $tid ? $tid : create_ghost($self, $mid); +} + +sub create_ghost { + my ($self, $mid) = @_; + my $id = $self->mid2id($mid); + my $num = $self->next_ghost_num; + $num < 0 or die "ghost num is non-negative: $num\n"; + my $tid = $self->next_tid; + my $dbh = $self->{dbh}; + $dbh->prepare_cached(<<'')->execute($num, $tid); +INSERT INTO over (num, tid) VALUES (?,?) + + $dbh->prepare_cached(<<'')->execute($id, $num); +INSERT INTO id2num (id, num) VALUES (?,?) + + $tid; +} + +sub merge_threads { + my ($self, $winner_tid, $loser_tid) = @_; + return if $winner_tid == $loser_tid; + my $dbh = $self->{dbh}; + $dbh->prepare_cached(<<'')->execute($winner_tid, $loser_tid); +UPDATE over SET tid = ? WHERE tid = ? + +} + +sub link_refs { + my ($self, $refs, $old_tid) = @_; + my $tid; + + if (@$refs) { + # first ref *should* be the thread root, + # but we can never trust clients to do the right thing + my $ref = $refs->[0]; + $tid = resolve_mid_to_tid($self, $ref); + merge_threads($self, $tid, $old_tid) if defined $old_tid; + + # the rest of the refs should point to this tid: + foreach my $i (1..$#$refs) { + $ref = $refs->[$i]; + my $ptid = resolve_mid_to_tid($self, $ref); + merge_threads($self, $tid, $ptid); + } + } else { + $tid = defined $old_tid ? $old_tid : $self->next_tid; + } + $tid; +} + +sub parse_references ($$$) { + my ($smsg, $mid0, $mids) = @_; + my $mime = $smsg->{mime}; + my $hdr = $mime->header_obj; + my $refs = references($hdr); + push(@$refs, @$mids) if scalar(@$mids) > 1; + return $refs if scalar(@$refs) == 0; + + # prevent circular references here: + my %seen = ( $mid0 => 1 ); + my @keep; + foreach my $ref (@$refs) { + if (length($ref) > PublicInbox::MID::MAX_MID_SIZE) { + warn "References: <$ref> too long, ignoring\n"; + next; + } + next if $seen{$ref}++; + push @keep, $ref; + } + $smsg->{references} = '<'.join('> <', @keep).'>' if @keep; + \@keep; +} + +sub add_overview { + my ($self, $mime, $bytes, $num, $oid, $mid0) = @_; + my $lines = $mime->body_raw =~ tr!\n!\n!; + my $smsg = bless { + mime => $mime, + mid => $mid0, + bytes => $bytes, + lines => $lines, + blob => $oid, + }, 'PublicInbox::SearchMsg'; + my $mids = mids($mime->header_obj); + my $refs = parse_references($smsg, $mid0, $mids); + my $subj = $smsg->subject; + my $xpath; + if ($subj ne '') { + $xpath = PublicInbox::Search::subject_path($subj); + $xpath = id_compress($xpath); + } + my $dd = $smsg->to_doc_data($oid, $mid0); + utf8::encode($dd); + $dd = compress($dd); + my $values = [ $smsg->ts, $smsg->ds, $num, $mids, $refs, $xpath, $dd ]; + add_over($self, $values); +} + +sub add_over { + my ($self, $values) = @_; + my ($ts, $ds, $num, $mids, $refs, $xpath, $ddd) = @$values; + my $old_tid; + my $vivified = 0; + + $self->begin_lazy; + $self->delete_by_num($num); + foreach my $mid (@$mids) { + my $v = 0; + each_by_mid($self, $mid, ['tid'], sub { + my ($cur) = @_; + my $cur_tid = $cur->{tid}; + my $n = $cur->{num}; + die "num must not be zero for $mid" if !$n; + $old_tid = $cur_tid unless defined $old_tid; + if ($n > 0) { # regular mail + merge_threads($self, $old_tid, $cur_tid); + } elsif ($n < 0) { # ghost + link_refs($self, $refs, $old_tid); + $self->delete_by_num($n); + $v++; + } + 1; + }); + $v > 1 and warn "BUG: vivified multiple ($v) ghosts for $mid\n"; + $vivified += $v; + } + my $tid = $vivified ? $old_tid : link_refs($self, $refs, $old_tid); + my $sid = $self->sid($xpath); + my $dbh = $self->{dbh}; + my $sth = $dbh->prepare_cached(<<''); +INSERT INTO over (num, tid, sid, ts, ds, ddd) +VALUES (?,?,?,?,?,?) + + my $n = 0; + my @v = ($num, $tid, $sid, $ts, $ds); + foreach (@v) { $sth->bind_param(++$n, $_) } + $sth->bind_param(++$n, $ddd, SQL_BLOB); + $sth->execute; + $sth = $dbh->prepare_cached(<<''); +INSERT INTO id2num (id, num) VALUES (?,?) + + foreach my $mid (@$mids) { + my $id = $self->mid2id($mid); + $sth->execute($id, $num); + } +} + +sub delete_articles { + my ($self, $nums) = @_; + my $dbh = $self->connect; + $self->delete_by_num($_) foreach @$nums; +} + +sub remove_oid { + my ($self, $oid, $mid) = @_; + $self->begin_lazy; + each_by_mid($self, $mid, ['ddd'], sub { + my ($smsg) = @_; + $self->delete_by_num($smsg->{num}) if $smsg->{blob} eq $oid; + 1; + }); +} + +sub create_tables { + my ($dbh) = @_; + + $dbh->do(<<''); +CREATE TABLE IF NOT EXISTS over ( + num INTEGER NOT NULL, + tid INTEGER NOT NULL, + sid INTEGER, + ts INTEGER, + ds INTEGER, + ddd VARBINARY, /* doc-data-deflated */ + UNIQUE (num) +) + + $dbh->do('CREATE INDEX IF NOT EXISTS idx_tid ON over (tid)'); + $dbh->do('CREATE INDEX IF NOT EXISTS idx_sid ON over (sid)'); + $dbh->do('CREATE INDEX IF NOT EXISTS idx_ts ON over (ts)'); + $dbh->do('CREATE INDEX IF NOT EXISTS idx_ds ON over (ds)'); + + $dbh->do(<<''); +CREATE TABLE IF NOT EXISTS counter ( + key VARCHAR(8) PRIMARY KEY NOT NULL, + val INTEGER DEFAULT 0, + UNIQUE (key) +) + + $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('thread')"); + $dbh->do("INSERT OR IGNORE INTO counter (key) VALUES ('ghost')"); + + $dbh->do(<<''); +CREATE TABLE IF NOT EXISTS subject ( + sid INTEGER PRIMARY KEY AUTOINCREMENT, + path VARCHAR(40) NOT NULL, + UNIQUE (path) +) + + $dbh->do(<<''); +CREATE TABLE IF NOT EXISTS id2num ( + id INTEGER NOT NULL, + num INTEGER NOT NULL, + UNIQUE (id, num) +) + + # performance critical: + $dbh->do('CREATE INDEX IF NOT EXISTS idx_inum ON id2num (num)'); + $dbh->do('CREATE INDEX IF NOT EXISTS idx_id ON id2num (id)'); + + $dbh->do(<<''); +CREATE TABLE IF NOT EXISTS msgid ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + mid VARCHAR(244) NOT NULL, + UNIQUE (mid) +) + +} + +sub commit_lazy { + my ($self) = @_; + delete $self->{txn} or return; + $self->{dbh}->commit; +} + +sub begin_lazy { + my ($self) = @_; + return if $self->{txn}; + my $dbh = $self->connect or return; + $dbh->begin_work; + # $dbh->{Profile} = 2; + $self->{txn} = 1; +} + +sub rollback_lazy { + my ($self) = @_; + delete $self->{txn} or return; + $self->{dbh}->rollback; +} + +sub disconnect { + my ($self) = @_; + die "in transaction" if $self->{txn}; + $self->{dbh} = undef; +} + +sub create { + my ($self) = @_; + unless (-r $self->{filename}) { + require File::Path; + require File::Basename; + File::Path::mkpath(File::Basename::dirname($self->{filename})); + } + # create the DB: + PublicInbox::Over::connect($self); + $self->disconnect; +} + +1; diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index df02e0b5..7175ddc5 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -8,16 +8,15 @@ use strict; use warnings; # values for searching -use constant TS => 0; # timestamp -use constant NUM => 1; # NNTP article number -use constant BYTES => 2; # :bytes as defined in RFC 3977 -use constant LINES => 3; # :lines as defined in RFC 3977 -use constant YYYYMMDD => 4; # for searching in the WWW UI +use constant TS => 0; # Received: header in Unix time +use constant YYYYMMDD => 1; # Date: header for searching in the WWW UI +use constant DT => 2; # Date: YYYYMMDDHHMMSS use Search::Xapian qw/:standard/; use PublicInbox::SearchMsg; use PublicInbox::MIME; -use PublicInbox::MID qw/mid_clean id_compress/; +use PublicInbox::MID qw/id_compress/; +use PublicInbox::Over; # This is English-only, everything else is non-standard and may be confused as # a prefix common in patch emails @@ -42,39 +41,34 @@ use constant { # 13 - fix threading for empty References/In-Reply-To # (commit 83425ef12e4b65cdcecd11ddcb38175d4a91d5a0) # 14 - fix ghost root vivification - SCHEMA_VERSION => 14, + SCHEMA_VERSION => 15, # n.b. FLAG_PURE_NOT is expensive not suitable for a public website # as it could become a denial-of-service vector QP_FLAGS => FLAG_PHRASE|FLAG_BOOLEAN|FLAG_LOVEHATE|FLAG_WILDCARD, }; -# setup prefixes -my %bool_pfx_internal = ( - type => 'T', # "mail" or "ghost" - thread => 'G', # newsGroup (or similar entity - e.g. a web forum name) -); - my %bool_pfx_external = ( - mid => 'Q', # uniQue id (Message-ID) + mid => 'Q', # Message-ID (full/exact), this is mostly uniQue ); +my $non_quoted_body = 'XNQ XDFN XDFA XDFB XDFHH XDFCTX XDFPRE XDFPOST'; my %prob_prefix = ( # for mairix compatibility s => 'S', - m => 'XMID', # 'mid:' (bool) is exact, 'm:' (prob) can do partial + m => 'XM', # 'mid:' (bool) is exact, 'm:' (prob) can do partial f => 'A', t => 'XTO', tc => 'XTO XCC', c => 'XCC', tcf => 'XTO XCC A', a => 'XTO XCC A', - b => 'XNQ XQUOT', - bs => 'XNQ XQUOT S', + b => $non_quoted_body . ' XQUOT', + bs => $non_quoted_body . ' XQUOT S', n => 'XFN', q => 'XQUOT', - nq => 'XNQ', + nq => $non_quoted_body, dfn => 'XDFN', dfa => 'XDFA', dfb => 'XDFB', @@ -85,7 +79,7 @@ my %prob_prefix = ( dfblob => 'XDFPRE XDFPOST', # default: - '' => 'XMID S A XNQ XQUOT XFN', + '' => 'XM S A XQUOT XFN ' . $non_quoted_body, ); # not documenting m: and mid: for now, the using the URLs works w/o Xapian @@ -96,6 +90,9 @@ date range as YYYYMMDD e.g. d:19931002..20101002 Open-ended ranges such as d:19931002.. and d:..20101002 are also supported EOF + 'dt:' => <<EOF, +date-time range as YYYYMMDDhhmmss (e.g. dt:19931002011000..19931002011200) +EOF 'b:' => 'match within message body, including text attachments', 'nq:' => 'match non-quoted text within message body', 'q:' => 'match quoted text within message body', @@ -117,72 +114,102 @@ EOF ); chomp @HELP; -my $mail_query = Search::Xapian::Query->new('T' . 'mail'); - sub xdir { - my (undef, $git_dir) = @_; - "$git_dir/public-inbox/xapian" . SCHEMA_VERSION; + my ($self) = @_; + if ($self->{version} == 1) { + "$self->{mainrepo}/public-inbox/xapian" . SCHEMA_VERSION; + } else { + my $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION; + my $part = $self->{partition}; + defined $part or die "partition not given"; + $dir .= "/$part"; + } } sub new { - my ($class, $git_dir, $altid) = @_; - my $dir = $class->xdir($git_dir); - my $db = Search::Xapian::Database->new($dir); - bless { xdb => $db, git_dir => $git_dir, altid => $altid }, $class; + my ($class, $mainrepo, $altid) = @_; + my $version = 1; + my $ibx = $mainrepo; + if (ref $ibx) { + $version = $ibx->{version} || 1; + $mainrepo = $ibx->{mainrepo}; + } + my $self = bless { + mainrepo => $mainrepo, + altid => $altid, + version => $version, + }, $class; + my $dir; + if ($version >= 2) { + $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION; + my $xdb; + my $parts = 0; + foreach my $part (<$dir/*>) { + -d $part && $part =~ m!/\d+\z! or next; + $parts++; + my $sub = Search::Xapian::Database->new($part); + if ($xdb) { + $xdb->add_database($sub); + } else { + $xdb = $sub; + } + } + $self->{xdb} = $xdb; + } else { + $dir = $self->xdir; + $self->{xdb} = Search::Xapian::Database->new($dir); + } + $self->{over_ro} = PublicInbox::Over->new("$dir/over.sqlite3"); + $self; } -sub reopen { $_[0]->{xdb}->reopen } +sub reopen { + my ($self) = @_; + $self->{xdb}->reopen; + $self; # make chaining easier +} # read-only sub query { my ($self, $query_string, $opts) = @_; - my $query; - $opts ||= {}; - unless ($query_string eq '') { - $query = $self->qp->parse_query($query_string, QP_FLAGS); + if ($query_string eq '' && !$opts->{mset}) { + $self->{over_ro}->recent($opts); + } else { + my $query = $self->qp->parse_query($query_string, QP_FLAGS); $opts->{relevance} = 1 unless exists $opts->{relevance}; + _do_enquire($self, $query, $opts); } - - _do_enquire($self, $query, $opts); } sub get_thread { - my ($self, $mid, $opts) = @_; - my $smsg = eval { $self->lookup_message($mid) }; - - return { total => 0, msgs => [] } unless $smsg; - my $qtid = Search::Xapian::Query->new('G' . $smsg->thread_id); - my $path = $smsg->path; - if (defined $path && $path ne '') { - my $path = id_compress($smsg->path); - my $qsub = Search::Xapian::Query->new('XPATH' . $path); - $qtid = Search::Xapian::Query->new(OP_OR, $qtid, $qsub); - } - $opts ||= {}; - $opts->{limit} ||= 1000; - - # always sort threads by timestamp, this makes life easier - # for the threading algorithm (in SearchThread.pm) - $opts->{asc} = 1; - - _do_enquire($self, $qtid, $opts); + my ($self, $mid, $prev) = @_; + $self->{over_ro}->get_thread($mid, $prev); } sub retry_reopen { my ($self, $cb) = @_; - my $ret; - for (1..10) { - eval { $ret = $cb->() }; - return $ret unless $@; + for my $i (1..10) { + if (wantarray) { + my @ret; + eval { @ret = $cb->() }; + return @ret unless $@; + } else { + my $ret; + eval { $ret = $cb->() }; + return $ret unless $@; + } # Exception: The revision being read has been discarded - # you should call Xapian::Database::reopen() if (ref($@) eq 'Search::Xapian::DatabaseModifiedError') { + warn "reopen try #$i on $@\n"; reopen($self); } else { + warn "ref: ", ref($@), "\n"; die; } } + die "Too many Xapian database modifications in progress\n"; } sub _do_enquire { @@ -192,19 +219,16 @@ sub _do_enquire { sub _enquire_once { my ($self, $query, $opts) = @_; - my $enquire = $self->enquire; - if (defined $query) { - $query = Search::Xapian::Query->new(OP_AND,$query,$mail_query); - } else { - $query = $mail_query; - } + my $enquire = enquire($self); $enquire->set_query($query); $opts ||= {}; my $desc = !$opts->{asc}; - if ($opts->{relevance}) { + if (($opts->{mset} || 0) == 2) { + $enquire->set_docid_order(Search::Xapian::ENQ_ASCENDING()); + $enquire->set_weighting_scheme(Search::Xapian::BoolWeight->new); + delete $self->{enquire}; + } elsif ($opts->{relevance}) { $enquire->set_sort_by_relevance_then_value(TS, $desc); - } elsif ($opts->{num}) { - $enquire->set_sort_by_value(NUM, 0); } else { $enquire->set_sort_by_value_then_relevance(TS, $desc); } @@ -215,8 +239,9 @@ sub _enquire_once { my @msgs = map { PublicInbox::SearchMsg->load_doc($_->get_document); } $mset->items; + return \@msgs unless wantarray; - { total => $mset->get_matches_estimated, msgs => \@msgs } + ($mset->get_matches_estimated, \@msgs) } # read-write @@ -237,6 +262,8 @@ sub qp { $qp->set_stemming_strategy(STEM_SOME); $qp->add_valuerangeprocessor( Search::Xapian::NumberValueRangeProcessor->new(YYYYMMDD, 'd:')); + $qp->add_valuerangeprocessor( + Search::Xapian::NumberValueRangeProcessor->new(DT, 'dt:')); while (my ($name, $prefix) = each %bool_pfx_external) { $qp->add_boolean_prefix($name, $prefix); @@ -266,78 +293,25 @@ EOF $self->{query_parser} = $qp; } -sub num_range_processor { - $_[0]->{nrp} ||= Search::Xapian::NumberValueRangeProcessor->new(NUM); -} - # only used for NNTP server sub query_xover { my ($self, $beg, $end, $offset) = @_; - my $qp = Search::Xapian::QueryParser->new; - $qp->set_database($self->{xdb}); - $qp->add_valuerangeprocessor($self->num_range_processor); - my $query = $qp->parse_query("$beg..$end", QP_FLAGS); - - _do_enquire($self, $query, {num => 1, limit => 200, offset => $offset}); + $self->{over_ro}->query_xover($beg, $end, $offset); } sub query_ts { - my ($self, $ts, $opts) = @_; - my $qp = $self->{qp_ts} ||= eval { - my $q = Search::Xapian::QueryParser->new; - $q->set_database($self->{xdb}); - $q->add_valuerangeprocessor( - Search::Xapian::NumberValueRangeProcessor->new(TS)); - $q - }; - my $query = $qp->parse_query($ts, QP_FLAGS); - _do_enquire($self, $query, $opts); + my ($self, $ts, $prev) = @_; + $self->{over_ro}->query_ts($ts, $prev); } -sub lookup_message { - my ($self, $mid) = @_; - $mid = mid_clean($mid); - - my $doc_id = $self->find_unique_doc_id('Q' . $mid); - my $smsg; - if (defined $doc_id) { - # raises on error: - my $doc = $self->{xdb}->get_document($doc_id); - $smsg = PublicInbox::SearchMsg->wrap($doc, $mid); - $smsg->{doc_id} = $doc_id; - } - $smsg; +sub lookup_article { + my ($self, $num) = @_; + $self->{over_ro}->get_art($num); } -sub lookup_mail { # no ghosts! - my ($self, $mid) = @_; - retry_reopen($self, sub { - my $smsg = lookup_message($self, $mid) or return; - $smsg->load_expand; - }); -} - -sub find_unique_doc_id { - my ($self, $termval) = @_; - - my ($begin, $end) = $self->find_doc_ids($termval); - - return undef if $begin->equal($end); # not found - - my $rv = $begin->get_docid; - - # sanity check - $begin->inc; - $begin->equal($end) or die "Term '$termval' is not unique\n"; - $rv; -} - -# returns begin and end PostingIterator -sub find_doc_ids { - my ($self, $termval) = @_; - my $db = $self->{xdb}; - - ($db->postlist_begin($termval), $db->postlist_end($termval)); +sub next_by_mid { + my ($self, $mid, $id, $prev) = @_; + $self->{over_ro}->next_by_mid($mid, $id, $prev); } # normalize subjects so they are suitable as pathnames for URLs diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 5559b39d..7026fc4c 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -9,24 +9,19 @@ package PublicInbox::SearchIdx; use strict; use warnings; -use Fcntl qw(:flock :DEFAULT); +use base qw(PublicInbox::Search PublicInbox::Lock); use PublicInbox::MIME; -use Email::MIME::ContentType; -$Email::MIME::ContentType::STRICT_PARAMS = 0; -use base qw(PublicInbox::Search); -use PublicInbox::MID qw/mid_clean id_compress mid_mime/; +use PublicInbox::InboxWritable; +use PublicInbox::MID qw/mid_clean id_compress mid_mime mids/; use PublicInbox::MsgIter; use Carp qw(croak); use POSIX qw(strftime); +use PublicInbox::OverIdx; +use PublicInbox::Spawn qw(spawn); require PublicInbox::Git; +use Compress::Zlib qw(compress); use constant { - MAX_MID_SIZE => 244, # max term size - 1 in Xapian - PERM_UMASK => 0, - OLD_PERM_GROUP => 1, - OLD_PERM_EVERYBODY => 2, - PERM_GROUP => 0660, - PERM_EVERYBODY => 0664, BATCH_BYTES => 1_000_000, DEBUG => !!$ENV{DEBUG}, }; @@ -51,26 +46,46 @@ sub git_unquote ($) { } sub new { - my ($class, $inbox, $creat) = @_; - my $git_dir = $inbox; - my $altid; - if (ref $inbox) { - $git_dir = $inbox->{mainrepo}; - $altid = $inbox->{altid}; + my ($class, $ibx, $creat, $part) = @_; + my $mainrepo = $ibx; # for "public-inbox-index" w/o entry in config + my $git_dir = $mainrepo; + my ($altid, $git); + my $version = 1; + if (ref $ibx) { + $mainrepo = $ibx->{mainrepo}; + $altid = $ibx->{altid}; + $version = $ibx->{version} || 1; if ($altid) { require PublicInbox::AltId; $altid = [ map { - PublicInbox::AltId->new($inbox, $_); + PublicInbox::AltId->new($ibx, $_); } @$altid ]; } + } else { # v1 + $ibx = { mainrepo => $git_dir, version => 1 }; } + $ibx = PublicInbox::InboxWritable->new($ibx); require Search::Xapian::WritableDatabase; - my $self = bless { git_dir => $git_dir, -altid => $altid }, $class; - my $perm = $self->_git_config_perm; - my $umask = _umask_for($perm); - $self->{umask} = $umask; - $self->{lock_path} = "$git_dir/ssoma.lock"; - $self->{git} = PublicInbox::Git->new($git_dir); + my $self = bless { + mainrepo => $mainrepo, + -inbox => $ibx, + git => $ibx->git, + -altid => $altid, + version => $version, + }, $class; + $ibx->umask_prepare; + if ($version == 1) { + $self->{lock_path} = "$mainrepo/ssoma.lock"; + my $dir = $self->xdir; + $self->{over} = PublicInbox::OverIdx->new("$dir/over.sqlite3"); + } elsif ($version == 2) { + defined $part or die "partition is required for v2\n"; + # partition is a number + $self->{partition} = $part; + $self->{lock_path} = undef; + } else { + die "unsupported inbox version=$version\n"; + } $self->{creat} = ($creat || 0) == 1; $self; } @@ -79,66 +94,30 @@ sub _xdb_release { my ($self) = @_; my $xdb = delete $self->{xdb} or croak 'not acquired'; $xdb->close; - _lock_release($self) if $self->{creat}; + $self->lock_release if $self->{creat}; undef; } sub _xdb_acquire { my ($self) = @_; croak 'already acquired' if $self->{xdb}; - my $dir = PublicInbox::Search->xdir($self->{git_dir}); + my $dir = $self->xdir; my $flag = Search::Xapian::DB_OPEN; if ($self->{creat}) { require File::Path; - _lock_acquire($self); + $self->lock_acquire; File::Path::mkpath($dir); $flag = Search::Xapian::DB_CREATE_OR_OPEN; } $self->{xdb} = Search::Xapian::WritableDatabase->new($dir, $flag); } -# we only acquire the flock if creating or reindexing; -# PublicInbox::Import already has the lock on its own. -sub _lock_acquire { - my ($self) = @_; - croak 'already locked' if $self->{lockfh}; - sysopen(my $lockfh, $self->{lock_path}, O_WRONLY|O_CREAT) or - die "failed to open lock $self->{lock_path}: $!\n"; - flock($lockfh, LOCK_EX) or die "lock failed: $!\n"; - $self->{lockfh} = $lockfh; -} - -sub _lock_release { - my ($self) = @_; - my $lockfh = delete $self->{lockfh} or croak 'not locked'; - flock($lockfh, LOCK_UN) or die "unlock failed: $!\n"; - close $lockfh or die "close failed: $!\n"; -} - sub add_val ($$$) { my ($doc, $col, $num) = @_; $num = Search::Xapian::sortable_serialise($num); $doc->add_value($col, $num); } -sub add_values ($$$) { - my ($smsg, $bytes, $num) = @_; - - my $ts = $smsg->ts; - my $doc = $smsg->{doc}; - add_val($doc, &PublicInbox::Search::TS, $ts); - - defined($num) and add_val($doc, &PublicInbox::Search::NUM, $num); - - defined($bytes) and add_val($doc, &PublicInbox::Search::BYTES, $bytes); - - add_val($doc, &PublicInbox::Search::LINES, - $smsg->{mime}->body_raw =~ tr!\n!\n!); - - my $yyyymmdd = strftime('%Y%m%d', gmtime($ts)); - add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd); -} - sub index_users ($$) { my ($tg, $smsg) = @_; @@ -154,14 +133,19 @@ sub index_users ($$) { $tg->increase_termpos; } -sub index_text_inc ($$$) { - my ($tg, $text, $pfx) = @_; +sub index_diff_inc ($$$$) { + my ($tg, $text, $pfx, $xnq) = @_; + if (@$xnq) { + $tg->index_text(join("\n", @$xnq), 1, 'XNQ'); + $tg->increase_termpos; + @$xnq = (); + } $tg->index_text($text, 1, $pfx); $tg->increase_termpos; } sub index_old_diff_fn { - my ($tg, $seen, $fa, $fb) = @_; + my ($tg, $seen, $fa, $fb, $xnq) = @_; # no renames or space support for traditional diffs, # find the number of leading common paths to strip: @@ -171,7 +155,9 @@ sub index_old_diff_fn { $fa = join('/', @fa); $fb = join('/', @fb); if ($fa eq $fb) { - index_text_inc($tg, $fa,'XDFN') unless $seen->{$fa}++; + unless ($seen->{$fa}++) { + index_diff_inc($tg, $fa, 'XDFN', $xnq); + } return 1; } shift @fa; @@ -184,40 +170,46 @@ sub index_diff ($$$) { my ($tg, $lines, $doc) = @_; my %seen; my $in_diff; + my @xnq; + my $xnq = \@xnq; foreach (@$lines) { if ($in_diff && s/^ //) { # diff context - index_text_inc($tg, $_, 'XDFCTX'); + index_diff_inc($tg, $_, 'XDFCTX', $xnq); } elsif (/^-- $/) { # email signature begins $in_diff = undef; } elsif (m!^diff --git ("?a/.+) ("?b/.+)\z!) { my ($fa, $fb) = ($1, $2); my $fn = (split('/', git_unquote($fa), 2))[1]; - index_text_inc($tg, $fn, 'XDFN') unless $seen{$fn}++; + $seen{$fn}++ or index_diff_inc($tg, $fn, 'XDFN', $xnq); $fn = (split('/', git_unquote($fb), 2))[1]; - index_text_inc($tg, $fn, 'XDFN') unless $seen{$fn}++; + $seen{$fn}++ or index_diff_inc($tg, $fn, 'XDFN', $xnq); $in_diff = 1; # traditional diff: } elsif (m/^diff -(.+) (\S+) (\S+)$/) { my ($opt, $fa, $fb) = ($1, $2, $3); + push @xnq, $_; # only support unified: next unless $opt =~ /[uU]/; - $in_diff = index_old_diff_fn($tg, \%seen, $fa, $fb); + $in_diff = index_old_diff_fn($tg, \%seen, $fa, $fb, + $xnq); } elsif (m!^--- ("?a/.+)!) { my $fn = (split('/', git_unquote($1), 2))[1]; - index_text_inc($tg, $fn, 'XDFN') unless $seen{$fn}++; + $seen{$fn}++ or index_diff_inc($tg, $fn, 'XDFN', $xnq); $in_diff = 1; } elsif (m!^\+\+\+ ("?b/.+)!) { my $fn = (split('/', git_unquote($1), 2))[1]; - index_text_inc($tg, $fn, 'XDFN') unless $seen{$fn}++; + $seen{$fn}++ or index_diff_inc($tg, $fn, 'XDFN', $xnq); $in_diff = 1; } elsif (/^--- (\S+)/) { $in_diff = $1; + push @xnq, $_; } elsif (defined $in_diff && /^\+\+\+ (\S+)/) { - $in_diff = index_old_diff_fn($tg, \%seen, $in_diff, $1); + $in_diff = index_old_diff_fn($tg, \%seen, $in_diff, $1, + $xnq); } elsif ($in_diff && s/^\+//) { # diff added - index_text_inc($tg, $_, 'XDFB'); + index_diff_inc($tg, $_, 'XDFB', $xnq); } elsif ($in_diff && s/^-//) { # diff removed - index_text_inc($tg, $_, 'XDFA'); + index_diff_inc($tg, $_, 'XDFA', $xnq); } elsif (m!^index ([a-f0-9]+)\.\.([a-f0-9]+)!) { my ($ba, $bb) = ($1, $2); index_git_blob_id($doc, 'XDFPRE', $ba); @@ -227,64 +219,67 @@ sub index_diff ($$$) { # traditional diff w/o -p } elsif (/^@@ (?:\S+) (?:\S+) @@\s*(\S+.*)$/) { # hunk header context - index_text_inc($tg, $1, 'XDFHH'); + index_diff_inc($tg, $1, 'XDFHH', $xnq); # ignore the following lines: - } elsif (/^(?:dis)similarity index/) { - } elsif (/^(?:old|new) mode/) { - } elsif (/^(?:deleted|new) file mode/) { - } elsif (/^(?:copy|rename) (?:from|to) /) { - } elsif (/^(?:dis)?similarity index /) { - } elsif (/^\\ No newline at end of file/) { - } elsif (/^Binary files .* differ/) { + } elsif (/^(?:dis)similarity index/ || + /^(?:old|new) mode/ || + /^(?:deleted|new) file mode/ || + /^(?:copy|rename) (?:from|to) / || + /^(?:dis)?similarity index / || + /^\\ No newline at end of file/ || + /^Binary files .* differ/) { + push @xnq, $_; } elsif ($_ eq '') { $in_diff = undef; } else { + push @xnq, $_; warn "non-diff line: $_\n" if DEBUG && $_ ne ''; $in_diff = undef; } } + + $tg->index_text(join("\n", @xnq), 1, 'XNQ'); + $tg->increase_termpos; } sub index_body ($$$) { my ($tg, $lines, $doc) = @_; my $txt = join("\n", @$lines); - $tg->index_text($txt, !!$doc, $doc ? 'XNQ' : 'XQUOT'); - $tg->increase_termpos; - # does it look like a diff? - if ($doc && $txt =~ /^(?:diff|---|\+\+\+) /ms) { - $txt = undef; - index_diff($tg, $lines, $doc); + if ($doc) { + # does it look like a diff? + if ($txt =~ /^(?:diff|---|\+\+\+) /ms) { + $txt = undef; + index_diff($tg, $lines, $doc); + } else { + $tg->index_text($txt, 1, 'XNQ'); + } + } else { + $tg->index_text($txt, 0, 'XQUOT'); } + $tg->increase_termpos; @$lines = (); } sub add_message { - my ($self, $mime, $bytes, $num, $blob) = @_; # mime = Email::MIME object - my $db = $self->{xdb}; - - my ($doc_id, $old_tid); - my $mid = mid_clean(mid_mime($mime)); - + # mime = Email::MIME object + my ($self, $mime, $bytes, $num, $oid, $mid0) = @_; + my $doc_id; + my $mids = mids($mime->header_obj); + $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility + unless (defined $num) { # v1 + $self->_msgmap_init; + $num = index_mm($self, $mime); + } eval { - die 'Message-ID too long' if length($mid) > MAX_MID_SIZE; - my $smsg = $self->lookup_message($mid); - if ($smsg) { - # convert a ghost to a regular message - # it will also clobber any existing regular message - $doc_id = $smsg->{doc_id}; - $old_tid = $smsg->thread_id; - } - $smsg = PublicInbox::SearchMsg->new($mime); + my $smsg = PublicInbox::SearchMsg->new($mime); my $doc = $smsg->{doc}; - $doc->add_term('Q' . $mid); - my $subj = $smsg->subject; - if ($subj ne '') { - my $path = $self->subject_path($subj); - $doc->add_term('XPATH' . id_compress($path)); - } - - add_values($smsg, $bytes, $num); + add_val($doc, PublicInbox::Search::TS(), $smsg->ts); + my @ds = gmtime($smsg->ds); + my $yyyymmdd = strftime('%Y%m%d', @ds); + add_val($doc, PublicInbox::Search::YYYYMMDD(), $yyyymmdd); + my $dt = strftime('%Y%m%d%H%M%S', @ds); + add_val($doc, PublicInbox::Search::DT(), $dt); my $tg = $self->term_generator; @@ -300,6 +295,7 @@ sub add_message { my $fn = $part->filename; if (defined $fn && $fn ne '') { $tg->index_text($fn, 1, 'XFN'); + $tg->increase_termpos; } return if $ct =~ m!\btext/x?html\b!i; @@ -333,52 +329,110 @@ sub add_message { index_body($tg, \@orig, $doc) if @orig; }); - link_message($self, $smsg, $old_tid); - $tg->index_text($mid, 1, 'XMID'); - $doc->set_data($smsg->to_doc_data($blob)); - + foreach my $mid (@$mids) { + $tg->index_text($mid, 1, 'XM'); + $tg->increase_termpos; + } + $smsg->{to} = $smsg->{cc} = ''; + PublicInbox::OverIdx::parse_references($smsg, $mid0, $mids); + my $data = $smsg->to_doc_data($oid, $mid0); + $doc->set_data($data); if (my $altid = $self->{-altid}) { foreach my $alt (@$altid) { - my $id = $alt->mid2alt($mid); - next unless defined $id; - $doc->add_term($alt->{xprefix} . $id); + my $pfx = $alt->{xprefix}; + foreach my $mid (@$mids) { + my $id = $alt->mid2alt($mid); + next unless defined $id; + $doc->add_boolean_term($pfx . $id); + } } } - if (defined $doc_id) { - $db->replace_document($doc_id, $doc); - } else { - $doc_id = $db->add_document($doc); + + if (my $over = $self->{over}) { + $over->add_overview($mime, $bytes, $num, $oid, $mid0); } + $doc->add_boolean_term('Q' . $_) foreach @$mids; + $self->{xdb}->replace_document($doc_id = $num, $doc); }; if ($@) { - warn "failed to index message <$mid>: $@\n"; + warn "failed to index message <".join('> <',@$mids).">: $@\n"; return undef; } $doc_id; } -# returns deleted doc_id on success, undef on missing +# returns begin and end PostingIterator +sub find_doc_ids { + my ($self, $termval) = @_; + my $db = $self->{xdb}; + + ($db->postlist_begin($termval), $db->postlist_end($termval)); +} + +sub batch_do { + my ($self, $termval, $cb) = @_; + my $batch_size = 1000; # don't let @ids grow too large to avoid OOM + while (1) { + my ($head, $tail) = $self->find_doc_ids($termval); + return if $head == $tail; + my @ids; + for (; $head != $tail && @ids < $batch_size; $head->inc) { + push @ids, $head->get_docid; + } + $cb->(\@ids); + } +} + sub remove_message { my ($self, $mid) = @_; my $db = $self->{xdb}; - my $doc_id; + my $called; $mid = mid_clean($mid); + my $over = $self->{over}; eval { - $doc_id = $self->find_unique_doc_id('Q' . $mid); - if (defined $doc_id) { - $db->delete_document($doc_id); - } else { - warn "cannot remove non-existent <$mid>\n"; - } + batch_do($self, 'Q' . $mid, sub { + my ($ids) = @_; + $db->delete_document($_) for @$ids; + $over->delete_articles($ids) if $over; + $called = 1; + }); }; - if ($@) { warn "failed to remove message <$mid>: $@\n"; - return undef; + } elsif (!$called) { + warn "cannot remove non-existent <$mid>\n"; } - $doc_id; +} + +# MID is a hint in V2 +sub remove_by_oid { + my ($self, $oid, $mid) = @_; + my $db = $self->{xdb}; + + $self->{over}->remove_oid($oid, $mid) if $self->{over}; + + # XXX careful, we cannot use batch_do here since we conditionally + # delete documents based on other factors, so we cannot call + # find_doc_ids twice. + my ($head, $tail) = $self->find_doc_ids('Q' . $mid); + return if $head == $tail; + + # there is only ONE element in @delete unless we + # have bugs in our v2writable deduplication check + my @delete; + for (; $head != $tail; $head->inc) { + my $docid = $head->get_docid; + my $doc = $db->get_document($docid); + my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid); + $smsg->load_expand; + if ($smsg->{blob} eq $oid) { + push(@delete, $docid); + } + } + $db->delete_document($_) foreach @delete; + scalar(@delete); } sub term_generator { # write-only @@ -393,72 +447,6 @@ sub term_generator { # write-only $self->{term_generator} = $tg; } -# increments last_thread_id counter -# returns a 64-bit integer represented as a hex string -sub next_thread_id { - my ($self) = @_; - my $db = $self->{xdb}; - my $last_thread_id = int($db->get_metadata('last_thread_id') || 0); - - $db->set_metadata('last_thread_id', ++$last_thread_id); - - $last_thread_id; -} - -sub link_message { - my ($self, $smsg, $old_tid) = @_; - my $doc = $smsg->{doc}; - my $mid = $smsg->mid; - my $mime = $smsg->{mime}; - my $hdr = $mime->header_obj; - - # last References should be IRT, but some mail clients do things - # out of order, so trust IRT over References iff IRT exists - my @refs = (($hdr->header_raw('References') || '') =~ /<([^>]+)>/g); - push(@refs, (($hdr->header_raw('In-Reply-To') || '') =~ /<([^>]+)>/g)); - - my $tid; - if (@refs) { - my %uniq = ($mid => 1); - my @orig_refs = @refs; - @refs = (); - - # prevent circular references via References: here: - foreach my $ref (@orig_refs) { - if (length($ref) > MAX_MID_SIZE) { - warn "References: <$ref> too long, ignoring\n"; - } - next if $uniq{$ref}; - $uniq{$ref} = 1; - push @refs, $ref; - } - } - - if (@refs) { - $smsg->{references} = '<'.join('> <', @refs).'>'; - - # first ref *should* be the thread root, - # but we can never trust clients to do the right thing - my $ref = shift @refs; - $tid = $self->_resolve_mid_to_tid($ref); - $self->merge_threads($tid, $old_tid) if defined $old_tid; - - # the rest of the refs should point to this tid: - foreach $ref (@refs) { - my $ptid = $self->_resolve_mid_to_tid($ref); - merge_threads($self, $tid, $ptid); - } - } else { - $tid = defined $old_tid ? $old_tid : $self->next_thread_id; - } - $doc->add_term('G' . $tid); -} - -sub index_blob { - my ($self, $mime, $bytes, $num, $blob) = @_; - $self->add_message($mime, $bytes, $num, $blob); -} - sub index_git_blob_id { my ($doc, $pfx, $objid) = @_; @@ -479,33 +467,42 @@ sub index_mm { my ($self, $mime) = @_; my $mid = mid_clean(mid_mime($mime)); my $mm = $self->{mm}; - my $num = $mm->mid_insert($mid); + my $num; - # fallback to num_for since filters like RubyLang set the number - defined $num ? $num : $mm->num_for($mid); -} + if (defined $self->{regen_down}) { + $num = $mm->num_for($mid) and return $num; -sub unindex_mm { - my ($self, $mime) = @_; - $self->{mm}->mid_delete(mid_clean(mid_mime($mime))); -} + while (($num = $self->{regen_down}--) > 0) { + if ($mm->mid_set($num, $mid) != 0) { + return $num; + } + } + } elsif (defined $self->{regen_up}) { + $num = $mm->num_for($mid) and return $num; -sub index_mm2 { - my ($self, $mime, $bytes, $blob) = @_; - my $num = $self->{mm}->num_for(mid_clean(mid_mime($mime))); - index_blob($self, $mime, $bytes, $num, $blob); + # this is to fixup old bugs due to add-remove-add + while (($num = ++$self->{regen_up})) { + if ($mm->mid_set($num, $mid) != 0) { + return $num; + } + } + } + + $num = $mm->mid_insert($mid) and return $num; + + # fallback to num_for since filters like RubyLang set the number + $mm->num_for($mid); } -sub unindex_mm2 { +sub unindex_mm { my ($self, $mime) = @_; $self->{mm}->mid_delete(mid_clean(mid_mime($mime))); - unindex_blob($self, $mime); } sub index_both { my ($self, $mime, $bytes, $blob) = @_; my $num = index_mm($self, $mime); - index_blob($self, $mime, $bytes, $num, $blob); + add_message($self, $mime, $bytes, $num, $blob); } sub unindex_both { @@ -527,7 +524,7 @@ sub do_cat_mail { sub index_sync { my ($self, $opts) = @_; - with_umask($self, sub { $self->_index_sync($opts) }); + $self->{-inbox}->with_umask(sub { $self->_index_sync($opts) }) } sub batch_adjust ($$$$) { @@ -535,11 +532,12 @@ sub batch_adjust ($$$$) { $$max -= $bytes; if ($$max <= 0) { $$max = BATCH_BYTES; - $batch_cb->($latest, 1); + $batch_cb->($latest); } } -sub rlog { +# only for v1 +sub read_log { my ($self, $log, $add_cb, $del_cb, $batch_cb) = @_; my $hex = '[a-f0-9]'; my $h40 = $hex .'{40}'; @@ -550,48 +548,108 @@ sub rlog { my $bytes; my $max = BATCH_BYTES; local $/ = "\n"; + my %D; my $line; + my $newest; + my $mid = '20170114215743.5igbjup6qpsh3jfg@genre.crustytoothpaste.net'; while (defined($line = <$log>)) { if ($line =~ /$addmsg/o) { my $blob = $1; + delete $D{$blob} and next; my $mime = do_cat_mail($git, $blob, \$bytes) or next; + my $mids = mids($mime->header_obj); + foreach (@$mids) { + warn "ADD $mid\n" if ($_ eq $mid); + } batch_adjust(\$max, $bytes, $batch_cb, $latest); $add_cb->($self, $mime, $bytes, $blob); } elsif ($line =~ /$delmsg/o) { my $blob = $1; - my $mime = do_cat_mail($git, $blob, \$bytes) or next; - batch_adjust(\$max, $bytes, $batch_cb, $latest); - $del_cb->($self, $mime); + $D{$blob} = 1; } elsif ($line =~ /^commit ($h40)/o) { $latest = $1; + $newest ||= $latest; } } - $batch_cb->($latest, 0); + # get the leftovers + foreach my $blob (keys %D) { + my $mime = do_cat_mail($git, $blob, \$bytes) or next; + my $mids = mids($mime->header_obj); + foreach (@$mids) { + warn "DEL $mid\n" if ($_ eq $mid); + } + $del_cb->($self, $mime); + } + $batch_cb->($latest, $newest); } sub _msgmap_init { my ($self) = @_; - $self->{mm} = eval { + die "BUG: _msgmap_init is only for v1\n" if $self->{version} != 1; + $self->{mm} ||= eval { require PublicInbox::Msgmap; - PublicInbox::Msgmap->new($self->{git_dir}, 1); + PublicInbox::Msgmap->new($self->{mainrepo}, 1); }; } sub _git_log { my ($self, $range) = @_; - $self->{git}->popen(qw/log --reverse --no-notes --no-color + my $git = $self->{git}; + + if (index($range, '..') < 0) { + my $regen_max = 0; + # can't use 'rev-list --count' if we use --diff-filter + my $fh = $git->popen(qw(log --pretty=tformat:%h + --no-notes --no-color --no-renames + --diff-filter=AM), $range); + ++$regen_max while <$fh>; + my (undef, $max) = $self->{mm}->minmax; + + if ($max && $max == $regen_max) { + # fix up old bugs in full indexes which caused messages to + # not appear in Msgmap + $self->{regen_up} = $max; + } else { + # normal regen is for for fresh data + $self->{regen_down} = $regen_max; + } + } + + $git->popen(qw/log --no-notes --no-color --no-renames --raw -r --no-abbrev/, $range); } -# indexes all unindexed messages +sub is_ancestor ($$$) { + my ($git, $cur, $tip) = @_; + return 0 unless $git->check($cur); + my $cmd = [ 'git', "--git-dir=$git->{git_dir}", + qw(merge-base --is-ancestor), $cur, $tip ]; + my $pid = spawn($cmd); + defined $pid or die "spawning ".join(' ', @$cmd)." failed: $!"; + waitpid($pid, 0) == $pid or die join(' ', @$cmd) .' did not finish'; + $? == 0; +} + +sub need_update ($$$) { + my ($self, $cur, $new) = @_; + my $git = $self->{git}; + return 1 if $cur && !is_ancestor($git, $cur, $new); + my $range = $cur eq '' ? $new : "$cur..$new"; + chomp(my $n = $git->qx(qw(rev-list --count), $range)); + ($n eq '' || $n > 0); +} + +# indexes all unindexed messages (v1 only) sub _index_sync { my ($self, $opts) = @_; my $tip = $opts->{ref} || 'HEAD'; my $reindex = $opts->{reindex}; my ($mkey, $last_commit, $lx, $xlog); - $self->{git}->batch_prepare; - my $xdb = _xdb_acquire($self); - $xdb->begin_transaction; + my $git = $self->{git}; + $git->batch_prepare; + + my $xdb = $self->begin_txn_lazy; + my $mm = _msgmap_init($self); do { $xlog = undef; $mkey = 'last_commit'; @@ -601,180 +659,118 @@ sub _index_sync { $lx = ''; $mkey = undef if $last_commit ne ''; } + + # use last_commit from msgmap if it is older or unset + my $lm = $mm->last_commit || ''; + if (!$lm || ($lm && $lx && is_ancestor($git, $lm, $lx))) { + $lx = $lm; + } + + $self->{over}->rollback_lazy; + $self->{over}->disconnect; + delete $self->{txn}; $xdb->cancel_transaction; $xdb = _xdb_release($self); - # ensure we leak no FDs to "git log" + # ensure we leak no FDs to "git log" with Xapian <= 1.2 my $range = $lx eq '' ? $tip : "$lx..$tip"; $xlog = _git_log($self, $range); - $xdb = _xdb_acquire($self); - $xdb->begin_transaction; + $xdb = $self->begin_txn_lazy; } while ($xdb->get_metadata('last_commit') ne $last_commit); - my $mm = _msgmap_init($self); my $dbh = $mm->{dbh} if $mm; - my $mm_only; my $cb = sub { - my ($commit, $more) = @_; + my ($commit, $newest) = @_; if ($dbh) { - $mm->last_commit($commit) if $commit; + if ($newest) { + my $cur = $mm->last_commit || ''; + if (need_update($self, $cur, $newest)) { + $mm->last_commit($newest); + } + } $dbh->commit; } - if (!$mm_only) { - $xdb->set_metadata($mkey, $commit) if $mkey && $commit; - $xdb->commit_transaction; - $xdb = _xdb_release($self); + if ($mkey && $newest) { + my $cur = $xdb->get_metadata($mkey); + if (need_update($self, $cur, $newest)) { + $xdb->set_metadata($mkey, $newest); + } } + $self->commit_txn_lazy; # let another process do some work... < - if ($more) { - if (!$mm_only) { - $xdb = _xdb_acquire($self); - $xdb->begin_transaction; - } + if (!$newest) { + $xdb = $self->begin_txn_lazy; $dbh->begin_work if $dbh; } }; - if ($mm) { - $dbh->begin_work; - my $lm = $mm->last_commit || ''; - if ($lm eq $lx) { - # Common case is the indexes are synced, - # we only need to run git-log once: - rlog($self, $xlog, *index_both, *unindex_both, $cb); - } else { - # Uncommon case, msgmap and xapian are out-of-sync - # do not care for performance (but git is fast :>) - # This happens if we have to reindex Xapian since - # msgmap is a frozen format and our Xapian format - # is evolving. - my $r = $lm eq '' ? $tip : "$lm..$tip"; - - # first, ensure msgmap is up-to-date: - my $mkey_prev = $mkey; - $mkey = undef; # ignore xapian, for now - my $mlog = _git_log($self, $r); - $mm_only = 1; - rlog($self, $mlog, *index_mm, *unindex_mm, $cb); - $mm_only = $mlog = undef; - - # now deal with Xapian - $mkey = $mkey_prev; - $dbh = undef; - rlog($self, $xlog, *index_mm2, *unindex_mm2, $cb); - } - } else { - # user didn't install DBD::SQLite and DBI - rlog($self, $xlog, *index_blob, *unindex_blob, $cb); - } + $dbh->begin_work; + read_log($self, $xlog, *index_both, *unindex_both, $cb); } -# this will create a ghost as necessary -sub _resolve_mid_to_tid { - my ($self, $mid) = @_; - - my $smsg = $self->lookup_message($mid) || $self->create_ghost($mid); - $smsg->thread_id; -} - -sub create_ghost { - my ($self, $mid) = @_; - - my $tid = $self->next_thread_id; - my $doc = Search::Xapian::Document->new; - $doc->add_term('Q' . $mid); - $doc->add_term('G' . $tid); - $doc->add_term('T' . 'ghost'); - - my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid); - $self->{xdb}->add_document($doc); - - $smsg; +sub DESTROY { + # order matters for unlocking + $_[0]->{xdb} = undef; + $_[0]->{lockfh} = undef; } -sub merge_threads { - my ($self, $winner_tid, $loser_tid) = @_; - return if $winner_tid == $loser_tid; - my $db = $self->{xdb}; - - my $batch_size = 1000; # don't let @ids grow too large to avoid OOM - while (1) { - my ($head, $tail) = $self->find_doc_ids('G' . $loser_tid); - return if $head == $tail; - my @ids; - for (; $head != $tail && @ids < $batch_size; $head->inc) { - push @ids, $head->get_docid; - } - foreach my $docid (@ids) { - my $doc = $db->get_document($docid); - $doc->remove_term('G' . $loser_tid); - $doc->add_term('G' . $winner_tid); - $db->replace_document($docid, $doc); - } +# remote_* subs are only used by SearchIdxPart +sub remote_commit { + my ($self) = @_; + if (my $w = $self->{w}) { + print $w "commit\n" or die "failed to write commit: $!"; + } else { + $self->commit_txn_lazy; } } -sub _read_git_config_perm { +sub remote_close { my ($self) = @_; - my @cmd = qw(config core.sharedRepository); - my $fh = PublicInbox::Git->new($self->{git_dir})->popen(@cmd); - local $/ = "\n"; - my $perm = <$fh>; - chomp $perm if defined $perm; - $perm; -} - -sub _git_config_perm { - my $self = shift; - my $perm = scalar @_ ? $_[0] : _read_git_config_perm($self); - return PERM_GROUP if (!defined($perm) || $perm eq ''); - return PERM_UMASK if ($perm eq 'umask'); - return PERM_GROUP if ($perm eq 'group'); - if ($perm =~ /\A(?:all|world|everybody)\z/) { - return PERM_EVERYBODY; + if (my $w = delete $self->{w}) { + my $pid = delete $self->{pid} or die "no process to wait on\n"; + print $w "close\n" or die "failed to write to pid:$pid: $!\n"; + close $w or die "failed to close pipe for pid:$pid: $!\n"; + waitpid($pid, 0) == $pid or die "remote process did not finish"; + $? == 0 or die ref($self)." pid:$pid exited with: $?"; + } else { + die "transaction in progress $self\n" if $self->{txn}; + $self->_xdb_release if $self->{xdb}; } - return PERM_GROUP if ($perm =~ /\A(?:true|yes|on|1)\z/); - return PERM_UMASK if ($perm =~ /\A(?:false|no|off|0)\z/); - - my $i = oct($perm); - return PERM_UMASK if ($i == PERM_UMASK); - return PERM_GROUP if ($i == OLD_PERM_GROUP); - return PERM_EVERYBODY if ($i == OLD_PERM_EVERYBODY); +} - if (($i & 0600) != 0600) { - die "core.sharedRepository mode invalid: ". - sprintf('%.3o', $i) . "\nOwner must have permissions\n"; +sub remote_remove { + my ($self, $oid, $mid) = @_; + if (my $w = $self->{w}) { + # triggers remove_by_oid in a partition + print $w "D $oid $mid\n" or die "failed to write remove $!"; + } else { + $self->begin_txn_lazy; + $self->remove_by_oid($oid, $mid); } - ($i & 0666); } -sub _umask_for { - my ($perm) = @_; # _git_config_perm return value - my $rv = $perm; - return umask if $rv == 0; - - # set +x bit if +r or +w were set - $rv |= 0100 if ($rv & 0600); - $rv |= 0010 if ($rv & 0060); - $rv |= 0001 if ($rv & 0006); - (~$rv & 0777); +sub begin_txn_lazy { + my ($self) = @_; + return if $self->{txn}; + my $xdb = $self->{xdb} || $self->_xdb_acquire; + $self->{over}->begin_lazy if $self->{over}; + $xdb->begin_transaction; + $self->{txn} = 1; + $xdb; } -sub with_umask { - my ($self, $cb) = @_; - my $old = umask $self->{umask}; - my $rv = eval { $cb->() }; - my $err = $@; - umask $old; - die $err if $err; - $rv; +sub commit_txn_lazy { + my ($self) = @_; + delete $self->{txn} or return; + $self->{xdb}->commit_transaction; + $self->{over}->commit_lazy if $self->{over}; } -sub DESTROY { - # order matters for unlocking - $_[0]->{xdb} = undef; - $_[0]->{lockfh} = undef; +sub worker_done { + my ($self) = @_; + die "$$ $0 xdb not released\n" if $self->{xdb}; + die "$$ $0 still in transaction\n" if $self->{txn}; } 1; diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm new file mode 100644 index 00000000..078d2df1 --- /dev/null +++ b/lib/PublicInbox/SearchIdxPart.pm @@ -0,0 +1,106 @@ +# Copyright (C) 2018 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +package PublicInbox::SearchIdxPart; +use strict; +use warnings; +use base qw(PublicInbox::SearchIdx); + +sub new { + my ($class, $v2writable, $part) = @_; + my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part); + # create the DB before forking: + $self->_xdb_acquire; + $self->_xdb_release; + $self->spawn_worker($v2writable, $part) if $v2writable->{parallel}; + $self; +} + +sub spawn_worker { + my ($self, $v2writable, $part) = @_; + my ($r, $w); + pipe($r, $w) or die "pipe failed: $!\n"; + binmode $r, ':raw'; + binmode $w, ':raw'; + my $pid = fork; + defined $pid or die "fork failed: $!\n"; + if ($pid == 0) { + my $bnote = $v2writable->atfork_child; + $v2writable = undef; + close $w or die "failed to close: $!"; + + # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here + # speeds V2Writable batch imports across 8 cores by nearly 20% + fcntl($r, 1031, 1048576) if $^O eq 'linux'; + + eval { partition_worker_loop($self, $r, $part, $bnote) }; + die "worker $part died: $@\n" if $@; + die "unexpected MM $self->{mm}" if $self->{mm}; + exit; + } + $self->{pid} = $pid; + $self->{w} = $w; + close $r or die "failed to close: $!"; +} + +sub partition_worker_loop ($$$$) { + my ($self, $r, $part, $bnote) = @_; + $0 = "pi-v2-partition[$part]"; + $self->begin_txn_lazy; + while (my $line = $r->getline) { + if ($line eq "commit\n") { + $self->commit_txn_lazy; + } elsif ($line eq "close\n") { + $self->_xdb_release; + } elsif ($line eq "barrier\n") { + $self->commit_txn_lazy; + # no need to lock < 512 bytes is atomic under POSIX + print $bnote "barrier $part\n" or + die "write failed for barrier $!\n"; + } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) { + my ($oid, $mid) = ($1, $2); + $self->begin_txn_lazy; + $self->remove_by_oid($oid, $mid); + } else { + chomp $line; + my ($len, $artnum, $oid, $mid0) = split(/ /, $line); + $self->begin_txn_lazy; + my $n = read($r, my $msg, $len) or die "read: $!\n"; + $n == $len or die "short read: $n != $len\n"; + my $mime = PublicInbox::MIME->new(\$msg); + $artnum = int($artnum); + $self->add_message($mime, $n, $artnum, $oid, $mid0); + } + } + $self->worker_done; +} + +# called by V2Writable +sub index_raw { + my ($self, $bytes, $msgref, $artnum, $oid, $mid0, $mime) = @_; + if (my $w = $self->{w}) { + print $w "$bytes $artnum $oid $mid0\n", $$msgref or die + "failed to write partition $!\n"; + $w->flush or die "failed to flush: $!\n"; + } else { + $$msgref = undef; + $self->begin_txn_lazy; + $self->add_message($mime, $bytes, $artnum, $oid, $mid0); + } +} + +sub atfork_child { + close $_[0]->{w} or die "failed to close write pipe: $!\n"; +} + +# called by V2Writable: +sub remote_barrier { + my ($self) = @_; + if (my $w = $self->{w}) { + print $w "barrier\n" or die "failed to print: $!"; + $w->flush or die "failed to flush: $!"; + } else { + $self->commit_txn_lazy; + } +} + +1; diff --git a/lib/PublicInbox/SearchMsg.pm b/lib/PublicInbox/SearchMsg.pm index afba8b1a..ab971e00 100644 --- a/lib/PublicInbox/SearchMsg.pm +++ b/lib/PublicInbox/SearchMsg.pm @@ -6,17 +6,15 @@ package PublicInbox::SearchMsg; use strict; use warnings; -use Search::Xapian; -use Date::Parse qw/str2time/; -use PublicInbox::MID qw/mid_clean/; +use PublicInbox::MID qw/mid_clean mid_mime/; use PublicInbox::Address; +use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp); +use Time::Local qw(timegm); sub new { my ($class, $mime) = @_; my $doc = Search::Xapian::Document->new; - $doc->add_term('T' . 'mail'); - - bless { type => 'mail', doc => $doc, mime => $mime }, $class; + bless { doc => $doc, mime => $mime }, $class; } sub wrap { @@ -24,49 +22,75 @@ sub wrap { bless { doc => $doc, mime => undef, mid => $mid }, $class; } +sub get { + my ($class, $head, $db, $mid) = @_; + my $doc_id = $head->get_docid; + my $doc = $db->get_document($doc_id); + load_expand(wrap($class, $doc, $mid)) +} + sub get_val ($$) { my ($doc, $col) = @_; Search::Xapian::sortable_unserialise($doc->get_value($col)); } +sub to_doc_data { + my ($self, $oid, $mid0) = @_; + $oid = '' unless defined $oid; + join("\n", + $self->subject, + $self->from, + $self->references, + $self->to, + $self->cc, + $oid, + $mid0, + $self->{bytes} || '', + $self->{lines} || '' + ); +} + +sub load_from_data ($$) { + my ($self) = $_[0]; # data = $_[1] + ( + $self->{subject}, + $self->{from}, + $self->{references}, + + # To: and Cc: are stored to optimize HDR/XHDR in NNTP since + # some NNTP clients will use that for message displays. + $self->{to}, + $self->{cc}, + + $self->{blob}, + $self->{mid}, + $self->{bytes}, + $self->{lines} + ) = split(/\n/, $_[1]); +} + sub load_expand { my ($self) = @_; my $doc = $self->{doc}; my $data = $doc->get_data or return; - $self->{ts} = get_val($doc, &PublicInbox::Search::TS); + $self->{ts} = get_val($doc, PublicInbox::Search::TS()); + my $dt = get_val($doc, PublicInbox::Search::DT()); + my ($yyyy, $mon, $dd, $hh, $mm, $ss) = unpack('A4A2A2A2A2A2', $dt); + $self->{ds} = timegm($ss, $mm, $hh, $dd, $mon - 1, $yyyy); utf8::decode($data); - my ($subj, $from, $refs, $to, $cc, $blob) = split(/\n/, $data); - $self->{subject} = $subj; - $self->{from} = $from; - $self->{references} = $refs; - $self->{to} = $to; - $self->{cc} = $cc; - $self->{blob} = $blob; + load_from_data($self, $data); $self; } sub load_doc { my ($class, $doc) = @_; - my $data = $doc->get_data or return; - my $ts = get_val($doc, &PublicInbox::Search::TS); - utf8::decode($data); - my ($subj, $from, $refs, $to, $cc, $blob) = split(/\n/, $data); - bless { - doc => $doc, - subject => $subj, - ts => $ts, - from => $from, - references => $refs, - to => $to, - cc => $cc, - blob => $blob, - }, $class; + my $self = bless { doc => $doc }, $class; + $self->load_expand; } # :bytes and :lines metadata in RFC 3977 -sub bytes ($) { get_val($_[0]->{doc}, &PublicInbox::Search::BYTES) } -sub lines ($) { get_val($_[0]->{doc}, &PublicInbox::Search::LINES) } -sub num ($) { get_val($_[0]->{doc}, &PublicInbox::Search::NUM) } +sub bytes ($) { $_[0]->{bytes} } +sub lines ($) { $_[0]->{lines} } sub __hdr ($$) { my ($self, $field) = @_; @@ -91,9 +115,9 @@ my @MoY = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec); sub date ($) { my ($self) = @_; - my $ts = $self->{ts}; - return unless defined $ts; - my ($sec, $min, $hour, $mday, $mon, $year, $wday) = gmtime($ts); + my $ds = $self->{ds}; + return unless defined $ds; + my ($sec, $min, $hour, $mday, $mon, $year, $wday) = gmtime($ds); "$DoW[$wday], " . sprintf("%02d $MoY[$mon] %04d %02d:%02d:%02d +0000", $mday, $year+1900, $hour, $min, $sec); @@ -119,15 +143,12 @@ sub from_name { sub ts { my ($self) = @_; - $self->{ts} ||= eval { str2time($self->{mime}->header('Date')) } || 0; + $self->{ts} ||= eval { msg_timestamp($self->{mime}->header_obj) } || 0; } -sub to_doc_data { - my ($self, $blob) = @_; - my @rows = ($self->subject, $self->from, $self->references, - $self->to, $self->cc); - push @rows, $blob if defined $blob; - join("\n", @rows); +sub ds { + my ($self) = @_; + $self->{ds} ||= eval { msg_datestamp($self->{mime}->header_obj); } || 0; } sub references { @@ -156,27 +177,13 @@ sub mid ($;$) { $self->{mid} = $mid; } elsif (my $rv = $self->{mid}) { $rv; + } elsif ($self->{doc}) { + $self->{mid} = _get_term_val($self, 'Q', qr/\AQ/); } else { - $self->{mid} = _get_term_val($self, 'Q', qr/\AQ/) || - $self->_extract_mid; + $self->_extract_mid; # v1 w/o Xapian } } sub _extract_mid { mid_clean(mid_mime($_[0]->{mime})) } -sub thread_id { - my ($self) = @_; - my $tid = $self->{thread}; - return $tid if defined $tid; - $self->{thread} = _get_term_val($self, 'G', qr/\AG/); # *G*roup -} - -# XXX: consider removing this, we can phrase match subject -sub path { - my ($self) = @_; - my $path = $self->{path}; - return $path if defined $path; - $self->{path} = _get_term_val($self, 'XPATH', qr/\AXPATH/); # path -} - 1; diff --git a/lib/PublicInbox/SearchThread.pm b/lib/PublicInbox/SearchThread.pm index 6fbce15c..1d250b46 100644 --- a/lib/PublicInbox/SearchThread.pm +++ b/lib/PublicInbox/SearchThread.pm @@ -22,15 +22,15 @@ use strict; use warnings; sub thread { - my ($messages, $ordersub, $srch) = @_; + my ($messages, $ordersub, $ibx) = @_; my $id_table = {}; _add_message($id_table, $_) foreach @$messages; my $rootset = [ grep { - !delete($_->{parent}) && $_->visible($srch) + !delete($_->{parent}) && $_->visible($ibx) } values %$id_table ]; $id_table = undef; $rootset = $ordersub->($rootset); - $_->order_children($ordersub, $srch) for @$rootset; + $_->order_children($ordersub, $ibx) for @$rootset; $rootset; } @@ -131,20 +131,20 @@ sub has_descendent { # a ghost Message-ID is the result of a long header line # being folded/mangled by a MUA, and not a missing message. sub visible ($$) { - my ($self, $srch) = @_; - ($self->{smsg} ||= eval { $srch->lookup_mail($self->{id}) }) || + my ($self, $ibx) = @_; + ($self->{smsg} ||= eval { $ibx->smsg_by_mid($self->{id}) }) || (scalar values %{$self->{children}}); } sub order_children { - my ($cur, $ordersub, $srch) = @_; + my ($cur, $ordersub, $ibx) = @_; my %seen = ($cur => 1); # self-referential loop prevention my @q = ($cur); while (defined($cur = shift @q)) { my $c = $cur->{children}; # The hashref here... - $c = [ grep { !$seen{$_}++ && visible($_, $srch) } values %$c ]; + $c = [ grep { !$seen{$_}++ && visible($_, $ibx) } values %$c ]; $c = $ordersub->($c) if scalar @$c > 1; $cur->{children} = $c; # ...becomes an arrayref push @q, @$c; diff --git a/lib/PublicInbox/SearchView.pm b/lib/PublicInbox/SearchView.pm index 1c4442e4..5d500c1b 100644 --- a/lib/PublicInbox/SearchView.pm +++ b/lib/PublicInbox/SearchView.pm @@ -10,7 +10,7 @@ use PublicInbox::SearchMsg; use PublicInbox::Hval qw/ascii_html obfuscate_addrs/; use PublicInbox::View; use PublicInbox::WwwAtomStream; -use PublicInbox::MID qw(mid2path mid_mime mid_clean mid_escape MID_ESC); +use PublicInbox::MID qw(MID_ESC); use PublicInbox::MIME; require PublicInbox::Git; require PublicInbox::SearchThread; @@ -118,11 +118,11 @@ sub mset_summary { obfuscate_addrs($obfs_ibx, $s); obfuscate_addrs($obfs_ibx, $f); } - my $ts = PublicInbox::View::fmt_ts($smsg->ts); + my $date = PublicInbox::View::fmt_ts($smsg->ds); my $mid = PublicInbox::Hval->new_msgid($smsg->mid)->{href}; $$res .= qq{$rank. <b><a\nhref="$mid/">}. $s . "</a></b>\n"; - $$res .= "$pfx - by $f @ $ts UTC [$pct%]\n\n"; + $$res .= "$pfx - by $f @ $date UTC [$pct%]\n\n"; } $$res .= search_nav_bot($mset, $q); *noop; @@ -181,10 +181,9 @@ sub search_nav_top { sub search_nav_bot { my ($mset, $q) = @_; my $total = $mset->get_matches_estimated; - my $nr = scalar $mset->items; my $o = $q->{o}; my $l = $q->{l}; - my $end = $o + $nr; + my $end = $o + $mset->size; my $beg = $o + 1; my $rv = '</pre><hr><pre id=t>'; if ($beg <= $end) { @@ -229,8 +228,8 @@ sub mset_thread { } ($mset->items) ]}); my $r = $q->{r}; my $rootset = PublicInbox::SearchThread::thread($msgs, - $r ? sort_relevance(\%pct) : *PublicInbox::View::sort_ts, - $srch); + $r ? sort_relevance(\%pct) : *PublicInbox::View::sort_ds, + $ctx); my $skel = search_nav_bot($mset, $q). "<pre>"; my $inbox = $ctx->{-inbox}; $ctx->{-upfx} = ''; @@ -250,15 +249,14 @@ sub mset_thread { *PublicInbox::View::pre_thread); @$msgs = reverse @$msgs if $r; - my $mime; sub { return unless $msgs; - while ($mime = pop @$msgs) { - $mime = $inbox->msg_by_smsg($mime) and last; + my $smsg; + while (my $m = pop @$msgs) { + $smsg = $inbox->smsg_mime($m) and last; } - if ($mime) { - $mime = PublicInbox::MIME->new($mime); - return PublicInbox::View::index_entry($mime, $ctx, + if ($smsg) { + return PublicInbox::View::index_entry($smsg, $ctx, scalar @$msgs); } $msgs = undef; @@ -292,8 +290,7 @@ sub adump { PublicInbox::WwwAtomStream->response($ctx, 200, sub { while (my $x = shift @items) { $x = load_doc_retry($srch, $x); - $x = $ibx->msg_by_smsg($x) and - return PublicInbox::MIME->new($x); + $x = $ibx->smsg_mime($x) and return $x; } return undef; }); diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm new file mode 100644 index 00000000..1316d628 --- /dev/null +++ b/lib/PublicInbox/V2Writable.pm @@ -0,0 +1,908 @@ +# Copyright (C) 2018 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# This interface wraps and mimics PublicInbox::Import +package PublicInbox::V2Writable; +use strict; +use warnings; +use base qw(PublicInbox::Lock); +use PublicInbox::SearchIdxPart; +use PublicInbox::MIME; +use PublicInbox::Git; +use PublicInbox::Import; +use PublicInbox::MID qw(mids); +use PublicInbox::ContentId qw(content_id content_digest); +use PublicInbox::Inbox; +use PublicInbox::OverIdx; +use PublicInbox::Msgmap; +use PublicInbox::Spawn qw(spawn); +use PublicInbox::SearchIdx; +use IO::Handle; + +# an estimate of the post-packed size to the raw uncompressed size +my $PACKING_FACTOR = 0.4; + +# assume 2 cores if GNU nproc(1) is not available +sub nproc_parts () { + my $n = int($ENV{NPROC} || `nproc 2>/dev/null` || 2); + # subtract for the main process and git-fast-import + $n -= 1; + $n < 1 ? 1 : $n; +} + +sub count_partitions ($) { + my ($self) = @_; + my $nparts = 0; + my $xpfx = $self->{xpfx}; + + # always load existing partitions in case core count changes: + # Also, partition count may change while -watch is running + # due to -compact + if (-d $xpfx) { + foreach my $part (<$xpfx/*>) { + -d $part && $part =~ m!/\d+\z! or next; + eval { + Search::Xapian::Database->new($part)->close; + $nparts++; + }; + } + } + $nparts; +} + +sub new { + my ($class, $v2ibx, $creat) = @_; + my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n"; + unless (-d $dir) { + if ($creat) { + require File::Path; + File::Path::mkpath($dir); + } else { + die "$dir does not exist\n"; + } + } + + $v2ibx = PublicInbox::InboxWritable->new($v2ibx); + + my $xpfx = "$dir/xap" . PublicInbox::Search::SCHEMA_VERSION; + my $self = { + -inbox => $v2ibx, + im => undef, # PublicInbox::Import + parallel => 1, + transact_bytes => 0, + xpfx => $xpfx, + over => PublicInbox::OverIdx->new("$xpfx/over.sqlite3", 1), + lock_path => "$dir/inbox.lock", + # limit each git repo (epoch) to 1GB or so + rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR), + last_commit => [], # git repo -> commit + }; + $self->{partitions} = count_partitions($self) || nproc_parts(); + bless $self, $class; +} + +sub init_inbox { + my ($self, $parallel) = @_; + $self->{parallel} = $parallel; + $self->idx_init; + my $epoch_max = -1; + git_dir_latest($self, \$epoch_max); + $self->git_init($epoch_max >= 0 ? $epoch_max : 0); + $self->done; +} + +# returns undef on duplicate or spam +# mimics Import::add and wraps it for v2 +sub add { + my ($self, $mime, $check_cb) = @_; + + # spam check: + if ($check_cb) { + $mime = $check_cb->($mime) or return; + } + + # All pipes (> $^F) known to Perl 5.6+ have FD_CLOEXEC set, + # as does SQLite 3.4.1+ (released in 2007-07-20), and + # Xapian 1.3.2+ (released 2015-03-15). + # For the most part, we can spawn git-fast-import without + # leaking FDs to it... + $self->idx_init; + + my $mid0; + my $num = num_for($self, $mime, \$mid0); + defined $num or return; # duplicate + defined $mid0 or die "BUG: $mid0 undefined\n"; + my $im = $self->importer; + my $cmt = $im->add($mime); + $cmt = $im->get_mark($cmt); + $self->{last_commit}->[$self->{epoch_max}] = $cmt; + + my ($oid, $len, $msgref) = @{$im->{last_object}}; + $self->{over}->add_overview($mime, $len, $num, $oid, $mid0); + my $nparts = $self->{partitions}; + my $part = $num % $nparts; + my $idx = $self->idx_part($part); + $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime); + my $n = $self->{transact_bytes} += $len; + if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) { + $self->checkpoint; + } + + $cmt; +} + +sub num_for { + my ($self, $mime, $mid0) = @_; + my $mids = mids($mime->header_obj); + if (@$mids) { + my $mid = $mids->[0]; + my $num = $self->{mm}->mid_insert($mid); + if (defined $num) { # common case + $$mid0 = $mid; + return $num; + }; + + # crap, Message-ID is already known, hope somebody just resent: + foreach my $m (@$mids) { + # read-only lookup now safe to do after above barrier + my $existing = $self->lookup_content($mime, $m); + # easy, don't store duplicates + # note: do not add more diagnostic info here since + # it gets noisy on public-inbox-watch restarts + return if $existing; + } + + # very unlikely: + warn "<$mid> reused for mismatched content\n"; + + # try the rest of the mids + for(my $i = $#$mids; $i >= 1; $i--) { + my $m = $mids->[$i]; + $num = $self->{mm}->mid_insert($m); + if (defined $num) { + warn "alternative <$m> for <$mid> found\n"; + $$mid0 = $m; + return $num; + } + } + } + # none of the existing Message-IDs are good, generate a new one: + num_for_harder($self, $mime, $mid0); +} + +sub num_for_harder { + my ($self, $mime, $mid0) = @_; + + my $hdr = $mime->header_obj; + my $dig = content_digest($mime); + $$mid0 = PublicInbox::Import::digest2mid($dig, $hdr); + my $num = $self->{mm}->mid_insert($$mid0); + unless (defined $num) { + # it's hard to spoof the last Received: header + my @recvd = $hdr->header_raw('Received'); + $dig->add("Received: $_") foreach (@recvd); + $$mid0 = PublicInbox::Import::digest2mid($dig, $hdr); + $num = $self->{mm}->mid_insert($$mid0); + + # fall back to a random Message-ID and give up determinism: + until (defined($num)) { + $dig->add(rand); + $$mid0 = PublicInbox::Import::digest2mid($dig, $hdr); + warn "using random Message-ID <$$mid0> as fallback\n"; + $num = $self->{mm}->mid_insert($$mid0); + } + } + PublicInbox::Import::append_mid($hdr, $$mid0); + $num; +} + +sub idx_part { + my ($self, $part) = @_; + $self->{idx_parts}->[$part]; +} + +# idempotent +sub idx_init { + my ($self) = @_; + return if $self->{idx_parts}; + my $ibx = $self->{-inbox}; + + # do not leak read-only FDs to child processes, we only have these + # FDs for duplicate detection so they should not be + # frequently activated. + delete $ibx->{$_} foreach (qw(git mm search)); + + if ($self->{parallel}) { + pipe(my ($r, $w)) or die "pipe failed: $!"; + $self->{bnote} = [ $r, $w ]; + $w->autoflush(1); + } + + my $over = $self->{over}; + $ibx->umask_prepare; + $ibx->with_umask(sub { + $self->lock_acquire; + $over->create; + + # -compact can change partition count while -watch is idle + my $nparts = count_partitions($self); + if ($nparts && $nparts != $self->{partitions}) { + $self->{partitions} = $nparts; + } + + # need to create all parts before initializing msgmap FD + my $max = $self->{partitions} - 1; + + # idx_parts must be visible to all forked processes + my $idx = $self->{idx_parts} = []; + for my $i (0..$max) { + push @$idx, PublicInbox::SearchIdxPart->new($self, $i); + } + + # Now that all subprocesses are up, we can open the FDs + # for SQLite: + my $mm = $self->{mm} = PublicInbox::Msgmap->new_file( + "$self->{-inbox}->{mainrepo}/msgmap.sqlite3", 1); + $mm->{dbh}->begin_work; + }); +} + +sub purge_oids { + my ($self, $purge) = @_; # $purge = { $object_id => 1, ... } + $self->done; + my $pfx = "$self->{-inbox}->{mainrepo}/git"; + my $purges = []; + foreach my $i (0..$self->{epoch_max}) { + my $git = PublicInbox::Git->new("$pfx/$i.git"); + my $im = $self->import_init($git, 0, 1); + $purges->[$i] = $im->purge_oids($purge); + } + $purges; +} + +sub content_ids ($) { + my ($mime) = @_; + my @cids = ( content_id($mime) ); + + # Email::MIME->as_string doesn't always round-trip, so we may + # use a second content_id + my $rt = content_id(PublicInbox::MIME->new(\($mime->as_string))); + push @cids, $rt if $cids[0] ne $rt; + \@cids; +} + +sub content_matches ($$) { + my ($cids, $existing) = @_; + my $cid = content_id($existing); + foreach (@$cids) { + return 1 if $_ eq $cid + } + 0 +} + +sub remove_internal { + my ($self, $mime, $cmt_msg, $purge) = @_; + $self->idx_init; + my $im = $self->importer unless $purge; + my $over = $self->{over}; + my $cids = content_ids($mime); + my $parts = $self->{idx_parts}; + my $mm = $self->{mm}; + my $removed; + my $mids = mids($mime->header_obj); + + # We avoid introducing new blobs into git since the raw content + # can be slightly different, so we do not need the user-supplied + # message now that we have the mids and content_id + $mime = undef; + my $mark; + + foreach my $mid (@$mids) { + my %gone; + my ($id, $prev); + while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) { + my $msg = get_blob($self, $smsg); + if (!defined($msg)) { + warn "broken smsg for $mid\n"; + next; # continue + } + my $orig = $$msg; + my $cur = PublicInbox::MIME->new($msg); + if (content_matches($cids, $cur)) { + $smsg->{mime} = $cur; + $gone{$smsg->{num}} = [ $smsg, \$orig ]; + } + } + my $n = scalar keys %gone; + next unless $n; + if ($n > 1) { + warn "BUG: multiple articles linked to <$mid>\n", + join(',', sort keys %gone), "\n"; + } + foreach my $num (keys %gone) { + my ($smsg, $orig) = @{$gone{$num}}; + $mm->num_delete($num); + # $removed should only be set once assuming + # no bugs in our deduplication code: + $removed = $smsg; + my $oid = $smsg->{blob}; + if ($purge) { + $purge->{$oid} = 1; + } else { + ($mark, undef) = $im->remove($orig, $cmt_msg); + } + $orig = undef; + $self->unindex_oid_remote($oid, $mid); + } + } + + if (defined $mark) { + my $cmt = $im->get_mark($mark); + $self->{last_commit}->[$self->{epoch_max}] = $cmt; + } + if ($purge && scalar keys %$purge) { + return purge_oids($self, $purge); + } + $removed; +} + +sub remove { + my ($self, $mime, $cmt_msg) = @_; + remove_internal($self, $mime, $cmt_msg); +} + +sub purge { + my ($self, $mime) = @_; + my $purges = remove_internal($self, $mime, undef, {}); + $self->idx_init if @$purges; # ->done is called on purges + for my $i (0..$#$purges) { + defined(my $cmt = $purges->[$i]) or next; + $self->{last_commit}->[$i] = $cmt; + } + $purges; +} + +sub last_commit_part ($$;$) { + my ($self, $i, $cmt) = @_; + my $v = PublicInbox::Search::SCHEMA_VERSION(); + $self->{mm}->last_commit_xap($v, $i, $cmt); +} + +sub set_last_commits ($) { + my ($self) = @_; + defined(my $epoch_max = $self->{epoch_max}) or return; + my $last_commit = $self->{last_commit}; + foreach my $i (0..$epoch_max) { + defined(my $cmt = $last_commit->[$i]) or next; + $last_commit->[$i] = undef; + last_commit_part($self, $i, $cmt); + } +} + +sub barrier_init { + my ($self, $n) = @_; + $self->{bnote} or return; + --$n; + my $barrier = { map { $_ => 1 } (0..$n) }; +} + +sub barrier_wait { + my ($self, $barrier) = @_; + my $bnote = $self->{bnote} or return; + my $r = $bnote->[0]; + while (scalar keys %$barrier) { + defined(my $l = $r->getline) or die "EOF on barrier_wait: $!"; + $l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l"; + delete $barrier->{$1} or die "bad part[$1] on barrier wait"; + } +} + +sub checkpoint ($;$) { + my ($self, $wait) = @_; + + if (my $im = $self->{im}) { + if ($wait) { + $im->barrier; + } else { + $im->checkpoint; + } + } + my $parts = $self->{idx_parts}; + if ($parts) { + my $dbh = $self->{mm}->{dbh}; + + # SQLite msgmap data is second in importance + $dbh->commit; + + # SQLite overview is third + $self->{over}->commit_lazy; + + # Now deal with Xapian + if ($wait) { + my $barrier = $self->barrier_init(scalar @$parts); + + # each partition needs to issue a barrier command + $_->remote_barrier for @$parts; + + # wait for each Xapian partition + $self->barrier_wait($barrier); + } else { + $_->remote_commit for @$parts; + } + + # last_commit is special, don't commit these until + # remote partitions are done: + $dbh->begin_work; + set_last_commits($self); + $dbh->commit; + + $dbh->begin_work; + } + $self->{transact_bytes} = 0; +} + +# issue a write barrier to ensure all data is visible to other processes +# and read-only ops. Order of data importance is: git > SQLite > Xapian +sub barrier { checkpoint($_[0], 1) }; + +sub done { + my ($self) = @_; + my $im = delete $self->{im}; + $im->done if $im; # PublicInbox::Import::done + checkpoint($self); + my $mm = delete $self->{mm}; + $mm->{dbh}->commit if $mm; + my $parts = delete $self->{idx_parts}; + if ($parts) { + $_->remote_close for @$parts; + } + $self->{over}->disconnect; + delete $self->{bnote}; + $self->{transact_bytes} = 0; + $self->lock_release if $parts; +} + +sub git_init { + my ($self, $epoch) = @_; + my $pfx = "$self->{-inbox}->{mainrepo}/git"; + my $git_dir = "$pfx/$epoch.git"; + my @cmd = (qw(git init --bare -q), $git_dir); + PublicInbox::Import::run_die(\@cmd); + + my $all = "$self->{-inbox}->{mainrepo}/all.git"; + unless (-d $all) { + @cmd = (qw(git init --bare -q), $all); + PublicInbox::Import::run_die(\@cmd); + @cmd = (qw/git config/, "--file=$all/config", + 'repack.writeBitmaps', 'true'); + PublicInbox::Import::run_die(\@cmd); + } + + @cmd = (qw/git config/, "--file=$git_dir/config", + 'include.path', '../../all.git/config'); + PublicInbox::Import::run_die(\@cmd); + + my $alt = "$all/objects/info/alternates"; + my $new_obj_dir = "../../git/$epoch.git/objects"; + my %alts; + if (-e $alt) { + open(my $fh, '<', $alt) or die "open < $alt: $!\n"; + %alts = map { chomp; $_ => 1 } (<$fh>); + } + return $git_dir if $alts{$new_obj_dir}; + open my $fh, '>>', $alt or die "open >> $alt: $!\n"; + print $fh "$new_obj_dir\n" or die "print >> $alt: $!\n"; + close $fh or die "close $alt: $!\n"; + $git_dir +} + +sub git_dir_latest { + my ($self, $max) = @_; + $$max = -1; + my $pfx = "$self->{-inbox}->{mainrepo}/git"; + return unless -d $pfx; + my $latest; + opendir my $dh, $pfx or die "opendir $pfx: $!\n"; + while (defined(my $git_dir = readdir($dh))) { + $git_dir =~ m!\A(\d+)\.git\z! or next; + if ($1 > $$max) { + $$max = $1; + $latest = "$pfx/$git_dir"; + } + } + $latest; +} + +sub importer { + my ($self) = @_; + my $im = $self->{im}; + if ($im) { + if ($im->{bytes_added} < $self->{rotate_bytes}) { + return $im; + } else { + $self->{im} = undef; + $im->done; + $im = undef; + $self->checkpoint; + my $git_dir = $self->git_init(++$self->{epoch_max}); + my $git = PublicInbox::Git->new($git_dir); + return $self->import_init($git, 0); + } + } + my $epoch = 0; + my $max; + my $latest = git_dir_latest($self, \$max); + if (defined $latest) { + my $git = PublicInbox::Git->new($latest); + my $packed_bytes = $git->packed_bytes; + if ($packed_bytes >= $self->{rotate_bytes}) { + $epoch = $max + 1; + } else { + $self->{epoch_max} = $max; + return $self->import_init($git, $packed_bytes); + } + } + $self->{epoch_max} = $epoch; + $latest = $self->git_init($epoch); + $self->import_init(PublicInbox::Git->new($latest), 0); +} + +sub import_init { + my ($self, $git, $packed_bytes, $tmp) = @_; + my $im = PublicInbox::Import->new($git, undef, undef, $self->{-inbox}); + $im->{bytes_added} = int($packed_bytes / $PACKING_FACTOR); + $im->{want_object_info} = 1; + $im->{lock_path} = undef; + $im->{path_type} = 'v2'; + $self->{im} = $im unless $tmp; + $im; +} + +# XXX experimental +sub diff ($$$) { + my ($mid, $cur, $new) = @_; + use File::Temp qw(tempfile); + + my ($ah, $an) = tempfile('email-cur-XXXXXXXX', TMPDIR => 1); + print $ah $cur->as_string or die "print: $!"; + close $ah or die "close: $!"; + my ($bh, $bn) = tempfile('email-new-XXXXXXXX', TMPDIR => 1); + PublicInbox::Import::drop_unwanted_headers($new); + print $bh $new->as_string or die "print: $!"; + close $bh or die "close: $!"; + my $cmd = [ qw(diff -u), $an, $bn ]; + print STDERR "# MID conflict <$mid>\n"; + my $pid = spawn($cmd, undef, { 1 => 2 }); + defined $pid or die "diff failed to spawn $!"; + waitpid($pid, 0) == $pid or die "diff did not finish"; + unlink($an, $bn); +} + +sub get_blob ($$) { + my ($self, $smsg) = @_; + if (my $im = $self->{im}) { + my $msg = $im->cat_blob($smsg->{blob}); + return $msg if $msg; + } + # older message, should be in alternates + my $ibx = $self->{-inbox}; + $ibx->msg_by_smsg($smsg); +} + +sub lookup_content { + my ($self, $mime, $mid) = @_; + my $over = $self->{over}; + my $cids = content_ids($mime); + my ($id, $prev); + while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) { + my $msg = get_blob($self, $smsg); + if (!defined($msg)) { + warn "broken smsg for $mid\n"; + next; + } + my $cur = PublicInbox::MIME->new($msg); + if (content_matches($cids, $cur)) { + $smsg->{mime} = $cur; + return $smsg; + } + + + # XXX DEBUG_DIFF is experimental and may be removed + diff($mid, $cur, $mime) if $ENV{DEBUG_DIFF}; + } + undef; +} + +sub atfork_child { + my ($self) = @_; + my $fh = delete $self->{reindex_pipe}; + close $fh if $fh; + if (my $parts = $self->{idx_parts}) { + $_->atfork_child foreach @$parts; + } + if (my $im = $self->{im}) { + $im->atfork_child; + } + die "unexpected mm" if $self->{mm}; + close $self->{bnote}->[0] or die "close bnote[0]: $!\n"; + $self->{bnote}->[1]; +} + +sub mark_deleted { + my ($self, $D, $git, $oid) = @_; + my $msgref = $git->cat_file($oid); + my $mime = PublicInbox::MIME->new($$msgref); + my $mids = mids($mime->header_obj); + my $cid = content_id($mime); + foreach my $mid (@$mids) { + $D->{"$mid\0$cid"} = 1; + } +} + +sub reindex_oid { + my ($self, $mm_tmp, $D, $git, $oid, $regen) = @_; + my $len; + my $msgref = $git->cat_file($oid, \$len); + my $mime = PublicInbox::MIME->new($$msgref); + my $mids = mids($mime->header_obj); + my $cid = content_id($mime); + + # get the NNTP article number we used before, highest number wins + # and gets deleted from mm_tmp; + my $mid0; + my $num = -1; + my $del = 0; + foreach my $mid (@$mids) { + $del += (delete $D->{"$mid\0$cid"} || 0); + my $n = $mm_tmp->num_for($mid); + if (defined $n && $n > $num) { + $mid0 = $mid; + $num = $n; + } + } + if (!defined($mid0) && $regen && !$del) { + $num = $$regen--; + die "BUG: ran out of article numbers\n" if $num <= 0; + my $mm = $self->{mm}; + foreach my $mid (reverse @$mids) { + if ($mm->mid_set($num, $mid) == 1) { + $mid0 = $mid; + last; + } + } + if (!defined($mid0)) { + my $id = '<' . join('> <', @$mids) . '>'; + warn "Message-ID $id unusable for $num\n"; + foreach my $mid (@$mids) { + defined(my $n = $mm->num_for($mid)) or next; + warn "#$n previously mapped for <$mid>\n"; + } + } + } + + if (!defined($mid0) || $del) { + if (!defined($mid0) && $del) { # expected for deletes + $$regen--; + return + } + + my $id = '<' . join('> <', @$mids) . '>'; + defined($mid0) or + warn "Skipping $id, no article number found\n"; + if ($del && defined($mid0)) { + warn "$id was deleted $del " . + "time(s) but mapped to article #$num\n"; + } + return; + + } + $mm_tmp->mid_delete($mid0) or + die "failed to delete <$mid0> for article #$num\n"; + + $self->{over}->add_overview($mime, $len, $num, $oid, $mid0); + my $nparts = $self->{partitions}; + my $part = $num % $nparts; + my $idx = $self->idx_part($part); + $idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime); + my $n = $self->{transact_bytes} += $len; + if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) { + $git->cleanup; + $mm_tmp->atfork_prepare; + $self->done; # release lock + # allow -watch or -mda to write... + $self->idx_init; # reacquire lock + $mm_tmp->atfork_parent; + } +} + +# only update last_commit for $i on reindex iff newer than current +sub update_last_commit { + my ($self, $git, $i, $cmt) = @_; + my $last = last_commit_part($self, $i); + if (defined $last && is_ancestor($git, $last, $cmt)) { + my @cmd = (qw(rev-list --count), "$last..$cmt"); + chomp(my $n = $git->qx(@cmd)); + return if $n ne '' && $n == 0; + } + last_commit_part($self, $i, $cmt); +} + +sub git_dir_n ($$) { "$_[0]->{-inbox}->{mainrepo}/git/$_[1].git" } + +sub last_commits { + my ($self, $epoch_max) = @_; + my $heads = []; + for (my $i = $epoch_max; $i >= 0; $i--) { + $heads->[$i] = last_commit_part($self, $i); + } + $heads; +} + +*is_ancestor = *PublicInbox::SearchIdx::is_ancestor; + +sub index_prepare { + my ($self, $opts, $epoch_max, $ranges) = @_; + my $regen_max = 0; + my $head = $self->{-inbox}->{ref_head} || 'refs/heads/master'; + for (my $i = $epoch_max; $i >= 0; $i--) { + die "already indexing!\n" if $self->{index_pipe}; + my $git_dir = git_dir_n($self, $i); + -d $git_dir or next; # missing parts are fine + my $git = PublicInbox::Git->new($git_dir); + chomp(my $tip = $git->qx('rev-parse', $head)); + my $range; + if (defined(my $cur = $ranges->[$i])) { + $range = "$cur..$tip"; + if (is_ancestor($git, $cur, $tip)) { # common case + my $n = $git->qx(qw(rev-list --count), $range); + chomp($n); + if ($n == 0) { + $ranges->[$i] = undef; + next; + } + } else { + warn <<""; +discontiguous range: $range +Rewritten history? (in $git_dir) + + my $base = $git->qx('merge-base', $tip, $cur); + chomp $base; + if ($base) { + $range = "$base..$tip"; + warn "found merge-base: $base\n" + } else { + $range = $tip; + warn <<""; +discarding history at $cur + + } + warn <<""; +reindexing $git_dir starting at +$range + + $self->{"unindex-range.$i"} = "$base..$cur"; + } + } else { + $range = $tip; # all of it + } + $ranges->[$i] = $range; + + # can't use 'rev-list --count' if we use --diff-filter + my $fh = $git->popen(qw(log --pretty=tformat:%H + --no-notes --no-color --no-renames + --diff-filter=AM), $range, '--', 'm'); + ++$regen_max while <$fh>; + } + \$regen_max; +} + +sub unindex_oid_remote { + my ($self, $oid, $mid) = @_; + $_->remote_remove($oid, $mid) foreach @{$self->{idx_parts}}; + $self->{over}->remove_oid($oid, $mid); +} + +sub unindex_oid { + my ($self, $git, $oid) = @_; + my $msgref = $git->cat_file($oid); + my $mime = PublicInbox::MIME->new($msgref); + my $mids = mids($mime->header_obj); + $mime = $msgref = undef; + my $over = $self->{over}; + foreach my $mid (@$mids) { + my %gone; + my ($id, $prev); + while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) { + $gone{$smsg->{num}} = 1 if $oid eq $smsg->{blob}; + 1; # continue + } + my $n = scalar keys %gone; + next unless $n; + if ($n > 1) { + warn "BUG: multiple articles linked to $oid\n", + join(',',sort keys %gone), "\n"; + } + $self->{unindexed}->{$_}++ foreach keys %gone; + $self->unindex_oid_remote($oid, $mid); + } +} + +my $x40 = qr/[a-f0-9]{40}/; +sub unindex { + my ($self, $opts, $git, $unindex_range) = @_; + my $un = $self->{unindexed} ||= {}; # num => removal count + my $before = scalar keys %$un; + my @cmd = qw(log --raw -r + --no-notes --no-color --no-abbrev --no-renames); + my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $unindex_range); + while (<$fh>) { + /\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o or next; + $self->unindex_oid($git, $1); + } + delete $self->{reindex_pipe}; + $fh = undef; + + return unless $opts->{prune}; + my $after = scalar keys %$un; + return if $before == $after; + + # ensure any blob can not longer be accessed via dumb HTTP + PublicInbox::Import::run_die(['git', "--git-dir=$git->{git_dir}", + qw(-c gc.reflogExpire=now gc --prune=all)]); +} + +sub index_sync { + my ($self, $opts) = @_; + $opts ||= {}; + my $epoch_max; + my $latest = git_dir_latest($self, \$epoch_max); + return unless defined $latest; + $self->idx_init; # acquire lock + my $mm_tmp = $self->{mm}->tmp_clone; + my $ranges = $opts->{reindex} ? [] : $self->last_commits($epoch_max); + + my ($min, $max) = $mm_tmp->minmax; + my $regen = $self->index_prepare($opts, $epoch_max, $ranges); + $$regen += $max if $max; + my $D = {}; + my @cmd = qw(log --raw -r --pretty=tformat:%H + --no-notes --no-color --no-abbrev --no-renames); + + # work backwards through history + my $last_commit = []; + for (my $i = $epoch_max; $i >= 0; $i--) { + my $git_dir = git_dir_n($self, $i); + die "already reindexing!\n" if delete $self->{reindex_pipe}; + -d $git_dir or next; # missing parts are fine + my $git = PublicInbox::Git->new($git_dir); + my $unindex = delete $self->{"unindex-range.$i"}; + $self->unindex($opts, $git, $unindex) if $unindex; + defined(my $range = $ranges->[$i]) or next; + my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $range); + my $cmt; + while (<$fh>) { + if (/\A$x40$/o && !defined($cmt)) { + chomp($cmt = $_); + } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o) { + $self->reindex_oid($mm_tmp, $D, $git, $1, + $regen); + } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) { + $self->mark_deleted($D, $git, $1); + } + } + $fh = undef; + delete $self->{reindex_pipe}; + $self->update_last_commit($git, $i, $cmt) if defined $cmt; + } + my @d = sort keys %$D; + if (@d) { + warn "BUG: ", scalar(@d)," unseen deleted messages marked\n"; + foreach (@d) { + my ($mid, undef) = split(/\0/, $_, 2); + warn "<$mid>\n"; + } + } + $self->done; +} + +1; diff --git a/lib/PublicInbox/View.pm b/lib/PublicInbox/View.pm index 590a76a0..af287b96 100644 --- a/lib/PublicInbox/View.pm +++ b/lib/PublicInbox/View.pm @@ -6,33 +6,41 @@ package PublicInbox::View; use strict; use warnings; -use Date::Parse qw/str2time/; +use PublicInbox::MsgTime qw(msg_datestamp); use PublicInbox::Hval qw/ascii_html obfuscate_addrs/; use PublicInbox::Linkify; -use PublicInbox::MID qw/mid_clean id_compress mid_mime mid_escape/; +use PublicInbox::MID qw/id_compress mid_escape mids references/; use PublicInbox::MsgIter; use PublicInbox::Address; use PublicInbox::WwwStream; use PublicInbox::Reply; require POSIX; +use Time::Local qw(timegm); use constant INDENT => ' '; use constant TCHILD => '` '; sub th_pfx ($) { $_[0] == 0 ? '' : TCHILD }; # public functions: (unstable) + sub msg_html { - my ($ctx, $mime) = @_; + my ($ctx, $mime, $more, $smsg) = @_; my $hdr = $mime->header_obj; my $ibx = $ctx->{-inbox}; - my $obfs_ibx = $ibx->{obfuscate} ? $ibx : undef; - my $tip = _msg_html_prepare($hdr, $ctx, $obfs_ibx); + my $obfs_ibx = $ctx->{-obfs_ibx} = $ibx->{obfuscate} ? $ibx : undef; + my $tip = _msg_html_prepare($hdr, $ctx, $more, 0); + my $end = 2; PublicInbox::WwwStream->response($ctx, 200, sub { my ($nr, undef) = @_; if ($nr == 1) { - $tip . multipart_text_as_html($mime, '', $obfs_ibx) . + # $more cannot be true w/o $smsg being defined: + my $upfx = $more ? '../'.mid_escape($smsg->mid).'/' : ''; + $tip . multipart_text_as_html($mime, $upfx, $obfs_ibx) . '</pre><hr>' - } elsif ($nr == 2) { + } elsif ($more && @$more) { + ++$end; + msg_html_more($ctx, $more, $nr); + } elsif ($nr == $end) { # fake an EOF if generating the footer fails; # we want to at least show the message if something # here crashes: @@ -46,6 +54,53 @@ sub msg_html { }); } +sub msg_page { + my ($ctx) = @_; + my $mid = $ctx->{mid}; + my $ibx = $ctx->{-inbox}; + my ($first, $more); + my $smsg; + if (my $srch = $ibx->search) { + my ($id, $prev); + $smsg = $srch->next_by_mid($mid, \$id, \$prev); + $first = $ibx->msg_by_smsg($smsg) if $smsg; + if ($first) { + my $next = $srch->next_by_mid($mid, \$id, \$prev); + $more = [ $id, $prev, $next ] if $next; + } + return unless $first; + } else { + $first = $ibx->msg_by_mid($mid) or return; + } + msg_html($ctx, PublicInbox::MIME->new($first), $more, $smsg); +} + +sub msg_html_more { + my ($ctx, $more, $nr) = @_; + my $str = eval { + my ($id, $prev, $smsg) = @$more; + my $mid = $ctx->{mid}; + $smsg = $ctx->{-inbox}->smsg_mime($smsg); + my $next = $ctx->{srch}->next_by_mid($mid, \$id, \$prev); + @$more = $next ? ($id, $prev, $next) : (); + if ($smsg) { + my $mime = $smsg->{mime}; + my $upfx = '../' . mid_escape($smsg->mid) . '/'; + _msg_html_prepare($mime->header_obj, $ctx, $more, $nr) . + multipart_text_as_html($mime, $upfx, + $ctx->{-obfs_ibx}) . + '</pre><hr>' + } else { + ''; + } + }; + if ($@) { + warn "Error lookup up additional messages: $@\n"; + $str = '<pre>Error looking up additional messages</pre>'; + } + $str; +} + # /$INBOX/$MESSAGE_ID/#R sub msg_reply { my ($ctx, $hdr) = @_; @@ -105,14 +160,8 @@ EOF sub in_reply_to { my ($hdr) = @_; - my %mid = map { $_ => 1 } $hdr->header_raw('Message-ID'); - my @refs = (($hdr->header_raw('References') || '') =~ /<([^>]+)>/g); - push(@refs, (($hdr->header_raw('In-Reply-To') || '') =~ /<([^>]+)>/g)); - while (defined(my $irt = pop @refs)) { - next if $mid{"<$irt>"}; - return $irt; - } - undef; + my $refs = references($hdr); + $refs->[-1]; } sub _hdr_names_html ($$) { @@ -129,12 +178,10 @@ sub nr_to_s ($$$) { # this is already inside a <pre> sub index_entry { - my ($mime, $ctx, $more) = @_; + my ($smsg, $ctx, $more) = @_; my $srch = $ctx->{srch}; - my $hdr = $mime->header_obj; - my $subj = $hdr->header('Subject'); - - my $mid_raw = mid_clean(mid_mime($mime)); + my $subj = $smsg->subject; + my $mid_raw = $smsg->mid; my $id = id_compress($mid_raw, 1); my $id_m = 'm'.$id; @@ -149,6 +196,8 @@ sub index_entry { $rv .= $subj . "\n"; $rv .= _th_index_lite($mid_raw, \$irt, $id, $ctx); my @tocc; + my $mime = $smsg->{mime}; + my $hdr = $mime->header_obj; foreach my $f (qw(To Cc)) { my $dst = _hdr_names_html($hdr, $f); if ($dst ne '') { @@ -158,7 +207,7 @@ sub index_entry { } my $from = _hdr_names_html($hdr, 'From'); obfuscate_addrs($obfs_ibx, $from) if $obfs_ibx; - $rv .= "From: $from @ "._msg_date($hdr)." UTC"; + $rv .= "From: $from @ ".fmt_ts($smsg->ds)." UTC"; my $upfx = $ctx->{-upfx}; my $mhref = $upfx . mid_escape($mid_raw) . '/'; $rv .= qq{ (<a\nhref="$mhref">permalink</a> / }; @@ -301,30 +350,30 @@ sub pre_thread { } sub thread_index_entry { - my ($ctx, $level, $mime) = @_; + my ($ctx, $level, $smsg) = @_; my ($beg, $end) = thread_adj_level($ctx, $level); - $beg . '<pre>' . index_entry($mime, $ctx, 0) . '</pre>' . $end; + $beg . '<pre>' . index_entry($smsg, $ctx, 0) . '</pre>' . $end; } sub stream_thread ($$) { my ($rootset, $ctx) = @_; my $inbox = $ctx->{-inbox}; - my $mime; my @q = map { (0, $_) } @$rootset; my $level; + my $smsg; while (@q) { $level = shift @q; my $node = shift @q or next; my $cl = $level + 1; unshift @q, map { ($cl, $_) } @{$node->{children}}; - $mime = $inbox->msg_by_smsg($node->{smsg}) and last; + $smsg = $inbox->smsg_mime($node->{smsg}) and last; } - return missing_thread($ctx) unless $mime; + return missing_thread($ctx) unless $smsg; $ctx->{-obfs_ibx} = $inbox->{obfuscate} ? $inbox : undef; - $mime = PublicInbox::MIME->new($mime); - $ctx->{-title_html} = ascii_html($mime->header('Subject')); - $ctx->{-html_tip} = thread_index_entry($ctx, $level, $mime); + $ctx->{-title_html} = ascii_html($smsg->subject); + $ctx->{-html_tip} = thread_index_entry($ctx, $level, $smsg); + $smsg = undef; PublicInbox::WwwStream->response($ctx, 200, sub { return unless $ctx; while (@q) { @@ -332,10 +381,8 @@ sub stream_thread ($$) { my $node = shift @q or next; my $cl = $level + 1; unshift @q, map { ($cl, $_) } @{$node->{children}}; - my $mid = $node->{id}; - if ($mime = $inbox->msg_by_smsg($node->{smsg})) { - $mime = PublicInbox::MIME->new($mime); - return thread_index_entry($ctx, $level, $mime); + if ($smsg = $inbox->smsg_mime($node->{smsg})) { + return thread_index_entry($ctx, $level, $smsg); } else { return ghost_index_entry($ctx, $level, $node); } @@ -351,9 +398,7 @@ sub thread_html { my ($ctx) = @_; my $mid = $ctx->{mid}; my $srch = $ctx->{srch}; - my $sres = $srch->get_thread($mid); - my $msgs = load_results($srch, $sres); - my $nr = $sres->{total}; + my ($nr, $msgs) = $srch->get_thread($mid); return missing_thread($ctx) if $nr == 0; my $skel = '<hr><pre>'; $skel .= $nr == 1 ? 'only message in thread' : 'end of thread'; @@ -372,7 +417,7 @@ sub thread_html { $ctx->{mapping} = {}; $ctx->{s_nr} = "$nr+ messages in thread"; - my $rootset = thread_results($msgs, $srch); + my $rootset = thread_results($ctx, $msgs); # reduce hash lookups in pre_thread->skel_dump my $inbox = $ctx->{-inbox}; @@ -383,24 +428,21 @@ sub thread_html { return stream_thread($rootset, $ctx) unless $ctx->{flat}; # flat display: lazy load the full message from smsg - my $mime; - while ($mime = shift @$msgs) { - $mime = $inbox->msg_by_smsg($mime) and last; + my $smsg; + while (my $m = shift @$msgs) { + $smsg = $inbox->smsg_mime($m) and last; } - return missing_thread($ctx) unless $mime; - $mime = PublicInbox::MIME->new($mime); - $ctx->{-title_html} = ascii_html($mime->header('Subject')); - $ctx->{-html_tip} = '<pre>'.index_entry($mime, $ctx, scalar @$msgs); - $mime = undef; + return missing_thread($ctx) unless $smsg; + $ctx->{-title_html} = ascii_html($smsg->subject); + $ctx->{-html_tip} = '<pre>'.index_entry($smsg, $ctx, scalar @$msgs); + $smsg = undef; PublicInbox::WwwStream->response($ctx, 200, sub { return unless $msgs; - while ($mime = shift @$msgs) { - $mime = $inbox->msg_by_smsg($mime) and last; - } - if ($mime) { - $mime = PublicInbox::MIME->new($mime); - return index_entry($mime, $ctx, scalar @$msgs); + $smsg = undef; + while (my $m = shift @$msgs) { + $smsg = $inbox->smsg_mime($m) and last; } + return index_entry($smsg, $ctx, scalar @$msgs) if $smsg; $msgs = undef; $skel; }); @@ -529,17 +571,26 @@ sub add_text_body { } sub _msg_html_prepare { - my ($hdr, $ctx, $obfs_ibx) = @_; + my ($hdr, $ctx, $more, $nr) = @_; my $srch = $ctx->{srch} if $ctx; my $atom = ''; - my $rv = "<pre\nid=b>"; # anchor for body start - + my $obfs_ibx = $ctx->{-obfs_ibx}; + my $rv = ''; + my $mids = mids($hdr); + my $multiple = scalar(@$mids) > 1; # zero, one, infinity + if ($nr == 0) { + if ($more) { + $rv .= +"<pre>WARNING: multiple messages refer to this Message-ID\n</pre>"; + } + $rv .= "<pre\nid=b>"; # anchor for body start + } else { + $rv .= '<pre>'; + } if ($srch) { $ctx->{-upfx} = '../'; } my @title; - my $mid = mid_clean($hdr->header_raw('Message-ID')); - $mid = PublicInbox::Hval->new_msgid($mid); foreach my $h (qw(From To Cc Subject Date)) { my $v = $hdr->header($h); defined($v) && ($v ne '') or next; @@ -564,8 +615,20 @@ sub _msg_html_prepare { } $title[0] ||= '(no subject)'; $ctx->{-title_html} = join(' - ', @title); - $rv .= 'Message-ID: <' . $mid->as_html . '> '; - $rv .= "(<a\nhref=\"raw\">raw</a>)\n"; + foreach (@$mids) { + my $mid = PublicInbox::Hval->new_msgid($_) ; + my $mhtml = $mid->as_html; + if ($multiple) { + my $href = $mid->{href}; + $rv .= "Message-ID: "; + $rv .= "<a\nhref=\"../$href/\">"; + $rv .= "<$mhtml></a> "; + $rv .= "(<a\nhref=\"../$href/raw\">raw</a>)\n"; + } else { + $rv .= "Message-ID: <$mhtml> "; + $rv .= "(<a\nhref=\"raw\">raw</a>)\n"; + } + } $rv .= _parent_headers($hdr, $srch); $rv .= "\n"; } @@ -573,9 +636,8 @@ sub _msg_html_prepare { sub thread_skel { my ($dst, $ctx, $hdr, $tpfx) = @_; my $srch = $ctx->{srch}; - my $mid = mid_clean($hdr->header_raw('Message-ID')); - my $sres = $srch->get_thread($mid); - my $nr = $sres->{total}; + my $mid = mids($hdr)->[0]; + my ($nr, $msgs) = $srch->get_thread($mid); my $expand = qq(expand[<a\nhref="${tpfx}T/#u">flat</a>) . qq(|<a\nhref="${tpfx}t/#u">nested</a>] ) . qq(<a\nhref="${tpfx}t.mbox.gz">mbox.gz</a> ) . @@ -605,12 +667,11 @@ sub thread_skel { $ctx->{prev_attr} = ''; $ctx->{prev_level} = 0; $ctx->{dst} = $dst; - $sres = load_results($srch, $sres); # reduce hash lookups in skel_dump my $ibx = $ctx->{-inbox}; $ctx->{-obfs_ibx} = $ibx->{obfuscate} ? $ibx : undef; - walk_thread(thread_results($sres, $srch), $ctx, *skel_dump); + walk_thread(thread_results($ctx, $msgs), $ctx, *skel_dump); $ctx->{parent_msg} = $parent; } @@ -726,22 +787,10 @@ sub indent_for { $level ? INDENT x ($level - 1) : ''; } -sub load_results { - my ($srch, $sres) = @_; - my $msgs = delete $sres->{msgs}; - $srch->retry_reopen(sub { [ map { $_->mid; $_ } @$msgs ] }); -} - -sub msg_timestamp { - my ($hdr) = @_; - my $ts = eval { str2time($hdr->header('Date')) }; - defined($ts) ? $ts : 0; -} - sub thread_results { - my ($msgs, $srch) = @_; + my ($ctx, $msgs) = @_; require PublicInbox::SearchThread; - PublicInbox::SearchThread::thread($msgs, *sort_ts, $srch); + PublicInbox::SearchThread::thread($msgs, *sort_ds, $ctx->{-inbox}); } sub missing_thread { @@ -752,8 +801,7 @@ sub missing_thread { sub _msg_date { my ($hdr) = @_; - my $ts = $hdr->header('X-PI-TS') || msg_timestamp($hdr); - fmt_ts($ts); + fmt_ts(msg_datestamp($hdr)); } sub fmt_ts { POSIX::strftime('%Y-%m-%d %k:%M', gmtime($_[0])) } @@ -789,7 +837,7 @@ sub skel_dump { my $obfs_ibx = $ctx->{-obfs_ibx}; obfuscate_addrs($obfs_ibx, $f) if $obfs_ibx; - my $d = fmt_ts($smsg->{ts}) . ' ' . indent_for($level) . th_pfx($level); + my $d = fmt_ts($smsg->{ds}) . ' ' . indent_for($level) . th_pfx($level); my $attr = $f; $ctx->{first_level} ||= $level; @@ -870,10 +918,10 @@ sub _skel_ghost { $$dst .= $d; } -sub sort_ts { +sub sort_ds { [ sort { - (eval { $a->topmost->{smsg}->ts } || 0) <=> - (eval { $b->topmost->{smsg}->ts } || 0) + (eval { $a->topmost->{smsg}->ds } || 0) <=> + (eval { $b->topmost->{smsg}->ds } || 0) } @{$_[0]} ]; } @@ -883,22 +931,22 @@ sub acc_topic { my ($ctx, $level, $node) = @_; my $srch = $ctx->{srch}; my $mid = $node->{id}; - my $x = $node->{smsg} || $srch->lookup_mail($mid); - my ($subj, $ts); + my $x = $node->{smsg} || $ctx->{-inbox}->smsg_by_mid($mid); + my ($subj, $ds); my $topic; if ($x) { $subj = $x->subject; $subj = $srch->subject_normalized($subj); - $ts = $x->ts; + $ds = $x->ds; if ($level == 0) { - $topic = [ $ts, 1, { $subj => $mid }, $subj ]; + $topic = [ $ds, 1, { $subj => $mid }, $subj ]; $ctx->{-cur_topic} = $topic; push @{$ctx->{order}}, $topic; return; } $topic = $ctx->{-cur_topic}; # should never be undef - $topic->[0] = $ts if $ts > $topic->[0]; + $topic->[0] = $ds if $ds > $topic->[0]; $topic->[1]++; my $seen = $topic->[2]; if (scalar(@$topic) == 3) { # parent was a ghost @@ -917,7 +965,7 @@ sub acc_topic { sub dump_topics { my ($ctx) = @_; - my $order = delete $ctx->{order}; # [ ts, subj1, subj2, subj3, ... ] + my $order = delete $ctx->{order}; # [ ds, subj1, subj2, subj3, ... ] if (!@$order) { $ctx->{-html_tip} = '<pre>[No topics in range]</pre>'; return 404; @@ -930,14 +978,14 @@ sub dump_topics { # sort by recency, this allows new posts to "bump" old topics... foreach my $topic (sort { $b->[0] <=> $a->[0] } @$order) { - my ($ts, $n, $seen, $top, @ex) = @$topic; + my ($ds, $n, $seen, $top, @ex) = @$topic; @$topic = (); next unless defined $top; # ghost topic my $mid = delete $seen->{$top}; my $href = mid_escape($mid); my $prev_subj = [ split(/ /, $top) ]; $top = PublicInbox::Hval->new($top)->as_html; - $ts = fmt_ts($ts); + $ds = fmt_ts($ds); # $n isn't the total number of posts on the topic, # just the number of posts in the current results window @@ -953,7 +1001,7 @@ sub dump_topics { my $mbox = qq(<a\nhref="$href/t.mbox.gz">mbox.gz</a>); my $atom = qq(<a\nhref="$href/t.atom">Atom</a>); my $s = "<a\nhref=\"$href/T/$anchor\"><b>$top</b></a>\n" . - " $ts UTC $n - $mbox / $atom\n"; + " $ds UTC $n - $mbox / $atom\n"; for (my $i = 0; $i < scalar(@ex); $i += 2) { my $level = $ex[$i]; my $subj = $ex[$i + 1]; @@ -974,45 +1022,81 @@ sub dump_topics { 200; } +sub ts2str ($) { + my ($ts) = @_; + POSIX::strftime('%Y%m%d%H%M%S', gmtime($ts)); +} + +sub str2ts ($) { + my ($yyyy, $mon, $dd, $hh, $mm, $ss) = unpack('A4A2A2A2A2A2', $_[0]); + timegm($ss, $mm, $hh, $dd, $mon - 1, $yyyy); +} + +sub pagination_footer ($$) { + my ($ctx, $latest) = @_; + delete $ctx->{qp} or return; + my $next = $ctx->{next_page} || ''; + my $prev = $ctx->{prev_page} || ''; + if ($prev) { + $next = $next ? "$next " : ' '; + $prev .= qq! <a\nhref='$latest'>latest</a>!; + } + "<hr><pre>page: $next$prev</pre>"; +} + sub index_nav { # callback for WwwStream my (undef, $ctx) = @_; - delete $ctx->{qp} or return; - my ($next, $prev); - $next = $prev = ' '; - my $latest = ''; + pagination_footer($ctx, '.') +} + +sub paginate_recent ($$) { + my ($ctx, $lim) = @_; + my $t = $ctx->{qp}->{t} || ''; + my $opts = { limit => $lim }; + my ($after, $before); + + # Xapian uses '..' but '-' is perhaps friendier to URL linkifiers + # if only $after exists "YYYYMMDD.." because "." could be skipped + # if interpreted as an end-of-sentence + $t =~ s/\A(\d{8,14})-// and $after = str2ts($1); + $t =~ /\A(\d{8,14})\z/ and $before = str2ts($1); - my $next_o = $ctx->{-next_o}; - if ($next_o) { - $next = qq!<a\nhref="?o=$next_o"\nrel=next>next</a>!; + my $ibx = $ctx->{-inbox}; + my $msgs = $ibx->recent($opts, $after, $before); + my $nr = scalar @$msgs; + if ($nr < $lim && defined($after)) { + $after = $before = undef; + $msgs = $ibx->recent($opts); + $nr = scalar @$msgs; } - if (my $cur_o = $ctx->{-cur_o}) { - $latest = qq! <a\nhref=.>latest</a>!; - - my $o = $cur_o - ($next_o - $cur_o); - if ($o > 0) { - $prev = qq!<a\nhref="?o=$o"\nrel=prev>prev</a>!; - } elsif ($o == 0) { - $prev = qq!<a\nhref=.\nrel=prev>prev</a>!; + my $more = $nr == $lim; + my ($newest, $oldest); + if ($nr) { + $newest = $msgs->[0]->{ts}; + $oldest = $msgs->[-1]->{ts}; + # if we only had $after, our SQL query in ->recent ordered + if ($newest < $oldest) { + ($oldest, $newest) = ($newest, $oldest); + $more = 0 if defined($after) && $after < $oldest; } } - "<hr><pre>page: $next $prev$latest</pre>"; + if (defined($oldest) && $more) { + my $s = ts2str($oldest); + $ctx->{next_page} = qq!<a\nhref="?t=$s"\nrel=next>next</a>!; + } + if (defined($newest) && (defined($before) || defined($after))) { + my $s = ts2str($newest); + $ctx->{prev_page} = qq!<a\nhref="?t=$s-"\nrel=prev>prev</a>!; + } + $msgs; } sub index_topics { my ($ctx) = @_; - my ($off) = (($ctx->{qp}->{o} || '0') =~ /(\d+)/); - my $opts = { offset => $off, limit => 200 }; - - $ctx->{order} = []; - my $srch = $ctx->{srch}; - my $sres = $srch->query('', $opts); - my $nr = scalar @{$sres->{msgs}}; - if ($nr) { - $sres = load_results($srch, $sres); - walk_thread(thread_results($sres, $srch), $ctx, *acc_topic); + my $msgs = paginate_recent($ctx, 200); # 200 is our window + if (@$msgs) { + walk_thread(thread_results($ctx, $msgs), $ctx, *acc_topic); } - $ctx->{-next_o} = $off+ $nr; - $ctx->{-cur_o} = $off; PublicInbox::WwwStream->response($ctx, dump_topics($ctx), *index_nav); } diff --git a/lib/PublicInbox/WWW.pm b/lib/PublicInbox/WWW.pm index 4ddc187b..24e24f1e 100644 --- a/lib/PublicInbox/WWW.pm +++ b/lib/PublicInbox/WWW.pm @@ -54,10 +54,10 @@ sub call { my $method = $env->{REQUEST_METHOD}; if ($method eq 'POST') { - if ($path_info =~ m!$INBOX_RE/(git-upload-pack)\z!) { - my $path = $2; + if ($path_info =~ m!$INBOX_RE/(?:(\d+)/)?(git-upload-pack)\z!) { + my ($part, $path) = ($2, $3); return invalid_inbox($ctx, $1) || - serve_git($ctx, $path); + serve_git($ctx, $part, $path); } elsif ($path_info =~ m!$INBOX_RE/!o) { return invalid_inbox($ctx, $1) || mbox_results($ctx); } @@ -77,10 +77,10 @@ sub call { invalid_inbox($ctx, $1) || get_atom($ctx); } elsif ($path_info =~ m!$INBOX_RE/new\.html\z!o) { invalid_inbox($ctx, $1) || get_new($ctx); - } elsif ($path_info =~ m!$INBOX_RE/ + } elsif ($path_info =~ m!$INBOX_RE/(?:(\d+)/)? ($PublicInbox::GitHTTPBackend::ANY)\z!ox) { - my $path = $2; - invalid_inbox($ctx, $1) || serve_git($ctx, $path); + my ($part, $path) = ($2, $3); + invalid_inbox($ctx, $1) || serve_git($ctx, $part, $path); } elsif ($path_info =~ m!$INBOX_RE/([\w-]+).mbox\.gz\z!o) { serve_mbox_range($ctx, $1, $2); } elsif ($path_info =~ m!$INBOX_RE/$MID_RE/$END_RE\z!o) { @@ -150,10 +150,8 @@ sub invalid_inbox ($$) { my $www = $ctx->{www}; my $obj = $www->{pi_config}->lookup_name($inbox); if (defined $obj) { - $ctx->{git_dir} = $obj->{mainrepo}; $ctx->{git} = $obj->git; $ctx->{-inbox} = $obj; - $ctx->{inbox} = $inbox; return; } @@ -171,14 +169,15 @@ sub invalid_inbox_mid { return $ret if $ret; $ctx->{mid} = $mid; - if ($mid =~ /\A[a-f0-9]{40}\z/) { - # this is horiffically wasteful for legacy URLs: - if ($mid = mid2blob($ctx)) { - require Email::Simple; - use PublicInbox::MID qw/mid_clean/; - my $s = Email::Simple->new($mid); - $ctx->{mid} = mid_clean($s->header('Message-ID')); - } + my $ibx = $ctx->{-inbox}; + if ($mid =~ m!\A([a-f0-9]{2})([a-f0-9]{38})\z!) { + my ($x2, $x38) = ($1, $2); + # this is horrifically wasteful for legacy URLs: + my $str = $ctx->{-inbox}->msg_by_path("$x2/$x38") or return; + require Email::Simple; + my $s = Email::Simple->new($str); + $mid = PublicInbox::MID::mid_clean($s->header('Message-ID')); + return r301($ctx, $inbox, $mid); } undef; } @@ -210,30 +209,19 @@ sub get_index { } } -# just returns a string ref for the blob in the current ctx -sub mid2blob { - my ($ctx) = @_; - $ctx->{-inbox}->msg_by_mid($ctx->{mid}); -} - # /$INBOX/$MESSAGE_ID/raw -> raw mbox sub get_mid_txt { my ($ctx) = @_; - my $x = mid2blob($ctx) or return r404($ctx); require PublicInbox::Mbox; - PublicInbox::Mbox::emit1($ctx, $x); + PublicInbox::Mbox::emit_raw($ctx) || r404($ctx); } # /$INBOX/$MESSAGE_ID/ -> HTML content (short quotes) sub get_mid_html { my ($ctx) = @_; - my $x = mid2blob($ctx) or return r404($ctx); - require PublicInbox::View; - require PublicInbox::MIME; - my $mime = PublicInbox::MIME->new($x); searcher($ctx); - PublicInbox::View::msg_html($ctx, $mime); + PublicInbox::View::msg_page($ctx) || r404($ctx); } # /$INBOX/$MESSAGE_ID/t/ @@ -400,8 +388,11 @@ sub msg_page { } sub serve_git { - my ($ctx, $path) = @_; - PublicInbox::GitHTTPBackend::serve($ctx->{env}, $ctx->{git}, $path); + my ($ctx, $part, $path) = @_; + my $env = $ctx->{env}; + my $ibx = $ctx->{-inbox}; + my $git = defined $part ? $ibx->git_part($part) : $ibx->git; + $git ? PublicInbox::GitHTTPBackend::serve($env, $git, $path) : r404(); } sub mbox_results { diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm index a3fab428..7ee29da5 100644 --- a/lib/PublicInbox/WatchMaildir.pm +++ b/lib/PublicInbox/WatchMaildir.pm @@ -7,13 +7,14 @@ package PublicInbox::WatchMaildir; use strict; use warnings; use PublicInbox::MIME; -use Email::MIME::ContentType; -$Email::MIME::ContentType::STRICT_PARAMS = 0; # user input is imperfect use PublicInbox::Git; use PublicInbox::Import; use PublicInbox::MDA; use PublicInbox::Spawn qw(spawn); +use PublicInbox::InboxWritable; use File::Temp qw//; +use PublicInbox::Filter::Base; +*REJECT = *PublicInbox::Filter::Base::REJECT; sub new { my ($class, $config) = @_; @@ -52,6 +53,10 @@ sub new { $spamcheck = undef; } } + + # need to make all inboxes writable for spam removal: + $config->each_inbox(sub { PublicInbox::InboxWritable->new($_[0]) }); + foreach $k (keys %$config) { $k =~ /\Apublicinbox\.([^\.]+)\.watch\z/ or next; my $name = $1; @@ -93,18 +98,6 @@ sub _done_for_now { my ($self) = @_; my $importers = $self->{importers}; foreach my $im (values %$importers) { - $im->done if $im->{nchg}; - } - - my $opendirs = $self->{opendirs}; - - # spamdir scanning means every importer remains open - my $spamdir = $self->{spamdir}; - return if defined($spamdir) && $opendirs->{$spamdir}; - - foreach my $im (values %$importers) { - # not done if we're scanning - next if $opendirs->{$im->{git}->{git_dir}}; $im->done; } } @@ -127,15 +120,14 @@ sub _remove_spam { # path must be marked as (S)een $path =~ /:2,[A-R]*S[T-Za-z]*\z/ or return; my $mime = _path_to_mime($path) or return; - _force_mid($mime); $self->{config}->each_inbox(sub { my ($ibx) = @_; eval { my $im = _importer_for($self, $ibx); $im->remove($mime, 'spam'); - if (my $scrub = _scrubber_for($ibx)) { + if (my $scrub = $ibx->filter) { my $scrubbed = $scrub->scrub($mime) or return; - $scrubbed == 100 and return; + $scrubbed == REJECT() and return; $im->remove($scrubbed, 'spam'); } }; @@ -146,36 +138,9 @@ sub _remove_spam { }) } -# used to hash the relevant portions of a message when there are conflicts -sub _hash_mime2 { - my ($mime) = @_; - require Digest::SHA; - my $dig = Digest::SHA->new('SHA-1'); - $dig->add($mime->header_obj->header_raw('Subject')); - $dig->add($mime->body_raw); - $dig->hexdigest; -} - -sub _force_mid { - my ($mime) = @_; - # probably a bad idea, but we inject a Message-Id if - # one is missing, here.. - my $mid = $mime->header_obj->header_raw('Message-Id'); - if (!defined $mid || $mid =~ /\A\s*\z/) { - $mid = '<' . _hash_mime2($mime) . '@generated>'; - $mime->header_set('Message-Id', $mid); - } -} - sub _try_path { my ($self, $path) = @_; - my @p = split(m!/+!, $path); - return if $p[-1] !~ /\A[a-zA-Z0-9][\w:,=\.]+\z/; - if ($p[-1] =~ /:2,([A-Z]+)\z/i) { - my $flags = $1; - return if $flags =~ /[DT]/; # no [D]rafts or [T]rashed mail - } - return unless -f $path; + return unless PublicInbox::InboxWritable::is_maildir_path($path); if ($path !~ $self->{mdre}) { warn "unrecognized path: $path\n"; return; @@ -190,19 +155,17 @@ sub _try_path { } my $im = _importer_for($self, $inbox); my $mime = _path_to_mime($path) or return; - $mime->header_set($_) foreach @PublicInbox::MDA::BAD_HEADERS; my $wm = $inbox->{-watchheader}; if ($wm) { my $v = $mime->header_obj->header_raw($wm->[0]); return unless ($v && $v =~ $wm->[1]); } - if (my $scrub = _scrubber_for($inbox)) { + if (my $scrub = $inbox->filter) { my $ret = $scrub->scrub($mime) or return; - $ret == 100 and return; + $ret == REJECT() and return; $mime = $ret; } - _force_mid($mime); $im->add($mime, $self->{spamcheck}); } @@ -290,41 +253,15 @@ sub _path_to_mime { } sub _importer_for { - my ($self, $inbox) = @_; - my $im = $inbox->{-import} ||= eval { - my $git = $inbox->git; - my $name = $inbox->{name}; - my $addr = $inbox->{-primary_address}; - PublicInbox::Import->new($git, $name, $addr, $inbox); - }; - + my ($self, $ibx) = @_; my $importers = $self->{importers}; + my $im = $importers->{"$ibx"} ||= $ibx->importer(0); if (scalar(keys(%$importers)) > 2) { - delete $importers->{"$im"}; + delete $importers->{"$ibx"}; _done_for_now($self); } - $importers->{"$im"} = $im; -} - -sub _scrubber_for { - my ($inbox) = @_; - my $f = $inbox->{filter}; - if ($f && $f =~ /::/) { - my @args = (-inbox => $inbox); - # basic line splitting, only - # Perhaps we can have proper quote splitting one day... - ($f, @args) = split(/\s+/, $f) if $f =~ /\s+/; - - eval "require $f"; - if ($@) { - warn $@; - } else { - # e.g: PublicInbox::Filter::Vger->new(@args) - return $f->new(@args); - } - } - undef; + $importers->{"$ibx"} = $im; } sub _spamcheck_cb { diff --git a/lib/PublicInbox/WwwAtomStream.pm b/lib/PublicInbox/WwwAtomStream.pm index b69de856..38eba2a0 100644 --- a/lib/PublicInbox/WwwAtomStream.pm +++ b/lib/PublicInbox/WwwAtomStream.pm @@ -7,11 +7,11 @@ use strict; use warnings; use POSIX qw(strftime); -use Date::Parse qw(str2time); use Digest::SHA qw(sha1_hex); use PublicInbox::Address; use PublicInbox::Hval qw(ascii_html); use PublicInbox::MID qw/mid_clean mid_escape/; +use PublicInbox::MsgTime qw(msg_timestamp); # called by PSGI server after getline: sub close {} @@ -33,8 +33,8 @@ sub response { sub getline { my ($self) = @_; if (my $middle = $self->{cb}) { - my $mime = $middle->(); - return feed_entry($self, $mime) if $mime; + my $smsg = $middle->(); + return feed_entry($self, $smsg) if $smsg; } delete $self->{cb} ? '</feed>' : undef; } @@ -92,10 +92,11 @@ sub mid2uuid ($) { # returns undef or string sub feed_entry { - my ($self, $mime) = @_; + my ($self, $smsg) = @_; my $ctx = $self->{ctx}; + my $mime = $smsg->{mime}; my $hdr = $mime->header_obj; - my $mid = mid_clean($hdr->header_raw('Message-ID')); + my $mid = $smsg->mid; my $irt = PublicInbox::View::in_reply_to($hdr); my $uuid = mid2uuid($mid); my $base = $ctx->{feed_base_url}; @@ -108,8 +109,7 @@ sub feed_entry { $irt = ''; } my $href = $base . mid_escape($mid) . '/'; - my $date = $hdr->header('Date'); - my $t = eval { str2time($date) } if defined $date; + my $t = msg_timestamp($hdr); my @t = gmtime(defined $t ? $t : time); my $updated = feed_updated(@t); diff --git a/lib/PublicInbox/WwwAttach.pm b/lib/PublicInbox/WwwAttach.pm index 98cf9f70..b1504f52 100644 --- a/lib/PublicInbox/WwwAttach.pm +++ b/lib/PublicInbox/WwwAttach.pm @@ -5,9 +5,8 @@ package PublicInbox::WwwAttach; # internal package use strict; use warnings; -use PublicInbox::MIME; use Email::MIME::ContentType qw(parse_content_type); -$Email::MIME::ContentType::STRICT_PARAMS = 0; +use PublicInbox::MIME; use PublicInbox::MsgIter; # /$LISTNAME/$MESSAGE_ID/$IDX-$FILENAME diff --git a/lib/PublicInbox/WwwStream.pm b/lib/PublicInbox/WwwStream.pm index 05519984..ec75f16c 100644 --- a/lib/PublicInbox/WwwStream.pm +++ b/lib/PublicInbox/WwwStream.pm @@ -72,22 +72,52 @@ sub _html_end { my $obj = $ctx->{-inbox}; my $desc = ascii_html($obj->description); + my (%seen, @urls); my $http = $obj->base_url($ctx->{env}); - chop $http; - my %seen = ( $http => 1 ); - my @urls = ($http); + chop $http; # no trailing slash for clone + my $part = $obj->max_git_part; + my $dir = (split(m!/!, $http))[-1]; + if (defined($part)) { # v2 + $seen{$http} = 1; + for my $i (0..$part) { + # old parts my be deleted: + -d "$obj->{mainrepo}/git/$i.git" or next; + my $url = "$http/$i"; + $seen{$url} = 1; + push @urls, "$url $dir/git/$i.git"; + } + } else { # v1 + $seen{$http} = 1; + push @urls, $http; + } + + # FIXME: partitioning in can be different in other repositories, + # use the "cloneurl" file as-is for now: foreach my $u (@{$obj->cloneurl}) { next if $seen{$u}; $seen{$u} = 1; push @urls, $u =~ /\Ahttps?:/ ? qq(<a\nhref="$u">$u</a>) : $u; } + if (scalar(@urls) == 1) { - $urls .= " git clone --mirror $http"; + $urls .= " git clone --mirror $urls[0]"; } else { $urls .= "\n" . join("\n", map { "\tgit clone --mirror $_" } @urls); } + if (defined $part) { + my $addrs = $obj->{address}; + $addrs = join(' ', @$addrs) if ref($addrs) eq 'ARRAY'; + $urls .= <<EOF + + # If you have public-inbox 1.1+ installed, you may + # initialize and index your mirror using the following commands: + public-inbox-init -V2 $obj->{name} $dir/ $http \\ + $addrs + public-inbox-index $dir +EOF + } my @nntp = map { qq(<a\nhref="$_">$_</a>) } @{$obj->nntp_url}; if (@nntp) { $urls .= "\n\n"; |