From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 92E1E1F5AE for ; Tue, 4 May 2021 09:49:12 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH] lei index: new command to index mail w/o git storage Date: Tue, 4 May 2021 09:49:12 +0000 Message-Id: <20210504094912.28642-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Since completely purging blobs from git is slow, users may wish to index messages in Maildirs (and eventually other local storage) without storing data in git. Much code from LeiImport and LeiInput is reused, and a new dummy FakeImport class supplies a non-storing $im->add and minimize changes to LeiStore. The tricky part of this command is to support "lei import" after a message has gone through "lei index". Relying on $smsg->{bytes} == 0 (as we do for external-only vmd storage) does not work here, since it would break searching for "z:" byte-ranges when not using externals. This eventually required PublicInbox::Import::add to use a SharedKV to keep track of imported blobs and prevent duplication. --- MANIFEST | 3 ++ lib/PublicInbox/FakeImport.pm | 23 ++++++++++++++ lib/PublicInbox/Import.pm | 26 +++++++-------- lib/PublicInbox/LeiBlob.pm | 38 +++++++++++++++------- lib/PublicInbox/LeiImport.pm | 15 ++++++--- lib/PublicInbox/LeiIndex.pm | 48 ++++++++++++++++++++++++++++ lib/PublicInbox/LeiInput.pm | 30 ++++++------------ lib/PublicInbox/LeiMailSync.pm | 27 ++++++++++++++++ lib/PublicInbox/LeiStore.pm | 10 +++++- lib/PublicInbox/LeiToMail.pm | 10 +++++- lib/PublicInbox/OverIdx.pm | 18 +++++++++++ t/lei-index.t | 58 ++++++++++++++++++++++++++++++++++ 12 files changed, 253 insertions(+), 53 deletions(-) create mode 100644 lib/PublicInbox/FakeImport.pm create mode 100644 lib/PublicInbox/LeiIndex.pm create mode 100644 t/lei-index.t diff --git a/MANIFEST b/MANIFEST index e23297fa..42729b9c 100644 --- a/MANIFEST +++ b/MANIFEST @@ -148,6 +148,7 @@ lib/PublicInbox/EmlContentFoo.pm lib/PublicInbox/ExtMsg.pm lib/PublicInbox/ExtSearch.pm lib/PublicInbox/ExtSearchIdx.pm +lib/PublicInbox/FakeImport.pm lib/PublicInbox/FakeInotify.pm lib/PublicInbox/Feed.pm lib/PublicInbox/Filter/Base.pm @@ -198,6 +199,7 @@ lib/PublicInbox/LeiExternal.pm lib/PublicInbox/LeiForgetSearch.pm lib/PublicInbox/LeiHelp.pm lib/PublicInbox/LeiImport.pm +lib/PublicInbox/LeiIndex.pm lib/PublicInbox/LeiInit.pm lib/PublicInbox/LeiInput.pm lib/PublicInbox/LeiInspect.pm @@ -404,6 +406,7 @@ t/lei-import-imap.t t/lei-import-maildir.t t/lei-import-nntp.t t/lei-import.t +t/lei-index.t t/lei-lcat.t t/lei-mirror.t t/lei-p2q.t diff --git a/lib/PublicInbox/FakeImport.pm b/lib/PublicInbox/FakeImport.pm new file mode 100644 index 00000000..dea25cbe --- /dev/null +++ b/lib/PublicInbox/FakeImport.pm @@ -0,0 +1,23 @@ +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ + +# pretend to do PublicInbox::Import::add for "lei index" +package PublicInbox::FakeImport; +use strict; +use PublicInbox::ContentHash qw(git_sha); + +sub new { bless { bytes_added => 0 }, __PACKAGE__ } + +sub add { + my ($self, $eml, $check_cb, $smsg) = @_; + $smsg->populate($eml); + my $raw = $eml->as_string; + $smsg->{blob} = git_sha(1, \$raw)->hexdigest; + $smsg->set_bytes($raw, length($raw)); + if (my $oidx = delete $smsg->{-oidx}) { # used by LeiStore + $oidx->vivify_xvmd($smsg) or return; + } + 1; +} + +1; diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index 3adf9dec..362cdc47 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -413,19 +413,19 @@ sub add { $smsg->{blob} = $self->get_mark(":$blob"); $smsg->set_bytes($raw_email, $n); if (my $oidx = delete $smsg->{-oidx}) { # used by LeiStore - my @docids = $oidx->blob_exists($smsg->{blob}); - my @vivify_xvmd; - for my $id (@docids) { - if (my $cur = $oidx->get_art($id)) { - # already imported if bytes > 0 - return if $cur->{bytes} > 0; - push @vivify_xvmd, $id; - } else { - warn "W: $smsg->{blob} ", - "#$id gone (bug?)\n"; - } - } - $smsg->{-vivify_xvmd} = \@vivify_xvmd; + my $eidx_git = delete $smsg->{-eidx_git}; + + # we need this sharedkv to dedupe blobs added in the + # same fast-import transaction + my $u = $self->{uniq_skv} //= do { + require PublicInbox::SharedKV; + my $x = PublicInbox::SharedKV->new; + $x->dbh; + $x; + }; + return if !$u->set_maybe(pack('H*', $smsg->{blob}), 1); + return if (!$oidx->vivify_xvmd($smsg) && + $eidx_git->check($smsg->{blob})); } } my $ref = $self->{ref}; diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm index 710430a2..8de86565 100644 --- a/lib/PublicInbox/LeiBlob.pm +++ b/lib/PublicInbox/LeiBlob.pm @@ -87,6 +87,16 @@ sub cat_attach_i { # Eml->each_part callback $lei->out($part->body); } +sub extract_attach ($$$) { + my ($lei, $blob, $bref) = @_; + my $eml = PublicInbox::Eml->new($bref); + $eml->each_part(\&cat_attach_i, $lei, 1); + my $idx = delete $lei->{-attach_idx}; + defined($idx) and return $lei->fail(<start_pager if -t $lei->{1}; @@ -106,7 +116,7 @@ sub lei_blob { } my $rdr = {}; if ($opt->{mail}) { - $rdr->{2} = $lei->{2}; + open $rdr->{2}, '+>', undef or die "open: $!"; } else { open $rdr->{2}, '>', '/dev/null' or die "open: $!"; } @@ -115,21 +125,25 @@ sub lei_blob { if (defined $lei->{-attach_idx}) { my $fh = popen_rd($cmd, $lei->{env}, $rdr); require PublicInbox::Eml; - my $str = do { local $/; <$fh> }; - if (close $fh) { - my $eml = PublicInbox::Eml->new(\$str); - $eml->each_part(\&cat_attach_i, $lei, 1); - my $idx = delete $lei->{-attach_idx}; - defined($idx) and return $lei->fail(< }; + return extract_attach($lei, $blob, \$buf) if close($fh); } else { $rdr->{1} = $lei->{1}; waitpid(spawn($cmd, $lei->{env}, $rdr), 0); } - return if $? == 0; - return $lei->child_error($?) if $opt->{mail}; + my $ce = $?; + return if $ce == 0; + my $sto = $lei->_lei_store; + my $lms = $sto ? $sto->search->lms : undef; + if (my $bref = $lms ? $lms->local_blob($blob, 1) : undef) { + defined($lei->{-attach_idx}) and + return extract_attach($lei, $blob, $bref); + return $lei->out($$bref); + } elsif ($opt->{mail}) { + my $eh = $rdr->{2}; + seek($eh, 0, 0); + return $lei->child_error($ce, do { local $/; <$eh> }); + } # else: fall through to solver below } # maybe it's a non-email (code) blob from a coderepo diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 6a57df47..55925cc5 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -38,21 +38,20 @@ sub input_maildir_cb { # maildir_each_eml cb warn "E: $f was not from a Maildir?\n"; } } - input_eml_cb($self, $eml, $vmd); + $self->input_eml_cb($eml, $vmd); } sub input_net_cb { # imap_each / nntp_each my ($url, $uid, $kw, $eml, $self) = @_; my $vmd = $self->{-import_kw} ? { kw => $kw } : undef; $vmd->{sync_info} = [ $url, $uid ] if $self->{-mail_sync}; - input_eml_cb($self, $eml, $vmd); + $self->input_eml_cb($eml, $vmd); } -sub lei_import { # the main "lei import" method - my ($lei, @inputs) = @_; +sub do_import_index ($$@) { + my ($self, $lei, @inputs) = @_; my $sto = $lei->_lei_store(1); $sto->write_prepare($lei); - my $self = bless {}, __PACKAGE__; $self->{-import_kw} = $lei->{opt}->{kw} // 1; my $vmd_mod = $self->vmd_mod_extract(\@inputs); return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err}; @@ -83,6 +82,12 @@ sub lei_import { # the main "lei import" method $op_c->op_wait_event($ops); } +sub lei_import { # the main "lei import" method + my ($lei, @inputs) = @_; + my $self = bless {}, __PACKAGE__; + do_import_index($self, $lei, @inputs); +} + sub _complete_import { my ($lei, @argv) = @_; my $sto = $lei->_lei_store or return; diff --git a/lib/PublicInbox/LeiIndex.pm b/lib/PublicInbox/LeiIndex.pm new file mode 100644 index 00000000..cc3e83e7 --- /dev/null +++ b/lib/PublicInbox/LeiIndex.pm @@ -0,0 +1,48 @@ +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ + +# front-end for the "lei index" sub-command, this is similar to +# "lei import" but doesn't put a git blob into ~/.local/share/lei/store +package PublicInbox::LeiIndex; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC PublicInbox::LeiInput); +use PublicInbox::LeiImport; + +# /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass +sub input_eml_cb { # used by input_maildir_cb and input_net_cb + my ($self, $eml, $vmd) = @_; + my $xoids = $self->{lei}->{ale}->xoids_for($eml); + if (my $all_vmd = $self->{all_vmd}) { + @$vmd{keys %$all_vmd} = values %$all_vmd; + } + $self->{lei}->{sto}->ipc_do('index_eml_only', $eml, $vmd, $xoids); +} + +sub input_fh { # overrides PublicInbox::LeiInput::input_fh + my ($self, $ifmt, $fh, $input, @args) = @_; + $self->{lei}->child_error(1<<8, <{opt}->{'mail-sync'} = 1; + my $self = bless {}, __PACKAGE__; + PublicInbox::LeiImport::do_import_index($self, $lei, @argv); +} + +no warnings 'once'; +no strict 'refs'; +for my $m (qw(input_maildir_cb input_net_cb)) { + *$m = PublicInbox::LeiImport->can($m); +} + +*_complete_import = \&PublicInbox::LeiImport::_complete_import; +*ipc_atfork_child = \&PublicInbox::LeiInput::input_only_atfork_child; +*net_merge_all_done = \&PublicInbox::LeiInput::input_only_net_merge_all_done; + +# the following works even when LeiAuth is lazy-loaded +*net_merge_all = \&PublicInbox::LeiAuth::net_merge_all; +1; diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index 85caac35..46eea111 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -1,7 +1,7 @@ # Copyright (C) 2021 all contributors # License: AGPL-3.0+ -# parent class for LeiImport, LeiConvert +# parent class for LeiImport, LeiConvert, LeiIndex package PublicInbox::LeiInput; use strict; use v5.10.1; @@ -93,11 +93,7 @@ sub handle_http_input ($$@) { my ($fh, $pid) = popen_rd($cmd, undef, $rdr); grep(/\A--compressed\z/, @$curl) or $fh = IO::Uncompress::Gunzip->new($fh, MultiStream => 1); - eval { - PublicInbox::MboxReader->mboxrd($fh, - $self->can('input_mbox_cb'), - $self, @args); - }; + eval { $self->input_fh('mboxrd', $fh, $url, @args) }; my $err = $@; waitpid($pid, 0); $? || $err and @@ -221,14 +217,8 @@ sub prepare_inputs { # returns undef on error require PublicInbox::NetReader; $net //= PublicInbox::NetReader->new; $net->add_url($input); - if ($sync) { - if ($input =~ m!\Aimaps?://!) { - push @{$sync->{ok}}, $input; - } else { - push @{$sync->{no}}, $input; - } - } - } elsif ($input_path =~ m!\Ahttps?://!i) { + push @{$sync->{ok}}, $input if $sync; + } elsif ($input_path =~ m!\Ahttps?://!i) { # mboxrd.gz # TODO: how would we detect r/w JMAP? push @{$sync->{no}}, $input if $sync; prepare_http_input($self, $lei, $input_path) or return; @@ -239,12 +229,10 @@ sub prepare_inputs { # returns undef on error --in-format=$in_fmt and `$ifmt:' conflict } - if ($sync) { - if ($ifmt =~ /\A(?:maildir|mh)\z/i) { - push @{$sync->{ok}}, $input; - } else { - push @{$sync->{no}}, $input; - } + if ($ifmt =~ /\A(?:maildir|mh)\z/i) { + push @{$sync->{ok}}, $input if $sync; + } else { + push @{$sync->{no}}, $input if $sync; } my $devfd = $lei->path_to_fd($input_path) // return; if ($devfd >= 0 || (-f $input_path || -p _)) { @@ -260,7 +248,7 @@ sub prepare_inputs { # returns undef on error } else { return $lei->fail("Unable to handle $input"); } - } elsif ($input =~ /\.(eml|patch)\z/i && -f $input) { + } elsif ($input =~ /\.(?:eml|patch)\z/i && -f $input) { lc($in_fmt//'eml') eq 'eml' or return $lei->fail(<<""); $input is `eml', not --in-format=$in_fmt diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm index 2ce189fa..2e74e433 100644 --- a/lib/PublicInbox/LeiMailSync.pm +++ b/lib/PublicInbox/LeiMailSync.pm @@ -6,6 +6,7 @@ package PublicInbox::LeiMailSync; use strict; use v5.10.1; use DBI; +use PublicInbox::ContentHash qw(git_sha); sub dbh_new { my ($self, $rw) = @_; @@ -208,4 +209,30 @@ sub folders { map { $_->[0] } @{$dbh->selectall_arrayref($sql, undef, @pfx)}; } +sub local_blob { + my ($self, $oidhex, $vrfy) = @_; + my $dbh = $self->{dbh} //= dbh_new($self); + my $b2n = $dbh->prepare(<<''); +SELECT f.loc,b.name FROM blob2name b +LEFT JOIN folders f ON b.fid = f.fid +WHERE b.oidbin = ? + + $b2n->execute(pack('H*', $oidhex)); + while (my ($d, $n) = $b2n->fetchrow_array) { + substr($d, 0, length('maildir:')) = ''; + my $f = "$d/" . ($n =~ /:2,[a-zA-Z]*\z/ ? "cur/$n" : "new/$n"); + open my $fh, '<', $f or next; + if (-s $fh) { + local $/; + my $raw = <$fh>; + if ($vrfy && git_sha(1, \$raw)->hexdigest ne $oidhex) { + warn "$f changed $oidhex\n"; + next; + } + return \$raw; + } + } + undef; +} + 1; diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index 29362b2e..a7a0ebef 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -206,10 +206,11 @@ sub set_sync_info { sub add_eml { my ($self, $eml, $vmd, $xoids) = @_; - my $im = $self->importer; # may create new epoch + my $im = $self->{-fake_im} // $self->importer; # may create new epoch my ($eidx, $tl) = eidx_init($self); my $oidx = $eidx->{oidx}; # PublicInbox::Import::add checks this my $smsg = bless { -oidx => $oidx }, 'PublicInbox::Smsg'; + $smsg->{-eidx_git} = $eidx->git if !$self->{-fake_im}; my $im_mark = $im->add($eml, undef, $smsg); if ($vmd && $vmd->{sync_info}) { set_sync_info($self, $smsg->{blob}, @{$vmd->{sync_info}}); @@ -276,6 +277,13 @@ sub set_eml { set_eml_vmd($self, $eml, $vmd); } +sub index_eml_only { + my ($self, $eml, $vmd, $xoids) = @_; + require PublicInbox::FakeImport; + local $self->{-fake_im} = PublicInbox::FakeImport->new; + set_eml($self, $eml, $vmd, $xoids); +} + sub _external_only ($$$) { my ($self, $xoids, $eml) = @_; my $eidx = $self->{priv_eidx}; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 64061788..da3a95d2 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -137,9 +137,15 @@ sub eml2mboxcl2 { sub git_to_mail { # git->cat_async callback my ($bref, $oid, $type, $size, $arg) = @_; + my ($write_cb, $smsg) = @$arg; + if ($type eq 'missing' && $smsg->{-lms_ro}) { + if ($bref = $smsg->{-lms_ro}->local_blob($oid, 1)) { + $type = 'blob'; + $size = length($$bref); + } + } return warn("W: $oid is $type (!= blob)\n") if $type ne 'blob'; return warn("E: $oid is empty\n") unless $size; - my ($write_cb, $smsg) = @$arg; die "BUG: expected=$smsg->{blob} got=$oid" if $smsg->{blob} ne $oid; $write_cb->($bref, $smsg); } @@ -644,6 +650,7 @@ sub ipc_atfork_child { my ($self) = @_; my $lei = $self->{lei}; $lei->_lei_atfork_child; + $self->{-lms_ro} = $lei->{lse}->lms if $lei->{lse}; $lei->{auth}->do_auth_atfork($self) if $lei->{auth}; $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); $self->SUPER::ipc_atfork_child; @@ -665,6 +672,7 @@ sub poke_dst { sub write_mail { # via ->wq_io_do my ($self, $smsg, $eml) = @_; return $self->{wcb}->(undef, $smsg, $eml) if $eml; + $smsg->{-lms_ro} = $self->{-lms_ro}; $self->{lei}->{ale}->git->cat_async($smsg->{blob}, \&git_to_mail, [$self->{wcb}, $smsg]); } diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm index 66dec099..5f96a5b0 100644 --- a/lib/PublicInbox/OverIdx.pm +++ b/lib/PublicInbox/OverIdx.pm @@ -670,4 +670,22 @@ DELETE FROM eidxq WHERE docid = ? } +# returns true if we're vivifying a message for lei/store that was +# previously external-metadata only +sub vivify_xvmd { + my ($self, $smsg) = @_; + my @docids = $self->blob_exists($smsg->{blob}); + my @vivify_xvmd; + for my $id (@docids) { + if (my $cur = $self->get_art($id)) { + # already indexed if bytes > 0 + return if $cur->{bytes} > 0; + push @vivify_xvmd, $id; + } else { + warn "W: $smsg->{blob} #$id gone (bug?)\n"; + } + } + $smsg->{-vivify_xvmd} = \@vivify_xvmd; +} + 1; diff --git a/t/lei-index.t b/t/lei-index.t new file mode 100644 index 00000000..3382d42b --- /dev/null +++ b/t/lei-index.t @@ -0,0 +1,58 @@ +#!perl -w +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ +use strict; use v5.10.1; use PublicInbox::TestCommon; +use File::Spec; +require_mods(qw(lei -nntpd)); +my ($ro_home, $cfg_path) = setup_public_inboxes; +my ($tmpdir, $for_destroy) = tmpdir; +my $env = { PI_CONFIG => $cfg_path }; + +my $sock = tcp_server; +my $cmd = [ '-nntpd', '-W0', "--stdout=$tmpdir/n1", "--stderr=$tmpdir/n2" ]; +my $nntpd = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-nntpd $?"); +my $nntp_host_port = tcp_host_port($sock); + +$sock = tcp_server; +$cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/i1", "--stderr=$tmpdir/i2" ]; +my $imapd = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-imapd $?"); +my $imap_host_port = tcp_host_port($sock); +undef $sock; +for ('', qw(cur new)) { + mkdir "$tmpdir/md/$_" or xbail "mkdir: $!"; +} +symlink(File::Spec->rel2abs('t/plack-qp.eml'), "$tmpdir/md/cur/x:2,"); +my $expect = do { + open my $fh, '<', 't/plack-qp.eml' or xbail $!; + local $/; + <$fh>; +}; +test_lei({ tmpdir => $tmpdir }, sub { + my $store_path = "$ENV{HOME}/.local/share/lei/store/"; + + lei_ok('index', "$tmpdir/md"); + lei_ok(qw(q mid:qp@example.com)); + my $res_a = json_utf8->decode($lei_out); + my $blob = $res_a->[0]->{'blob'}; + like($blob, qr/\A[0-9a-f]{40,}\z/, 'got blob from qp@example'); + lei_ok('blob', $blob); + is($lei_out, $expect, 'got expected blob via Maildir'); + lei_ok(qw(q mid:qp@example.com -f text)); + like($lei_out, qr/^hi = bye/sm, 'lei2mail fallback'); + + my $all_obj = ['git', "--git-dir=$store_path/ALL.git", + qw(cat-file --batch-check --batch-all-objects)]; + is_deeply([xqx($all_obj)], [], 'no git objects'); + lei_ok('import', 't/plack-qp.eml'); + ok(grep(/\A$blob blob /, my @objs = xqx($all_obj)), + 'imported blob'); + lei_ok(qw(q z:0.. --dedupe=none)); + my $res_b = json_utf8->decode($lei_out); + is_deeply($res_b, $res_a, 'no extra DB entries'); + + lei_ok('index', "nntp://$nntp_host_port/t.v2"); + lei_ok('index', "imap://$imap_host_port/t.v2.0"); + is_deeply([xqx($all_obj)], \@objs, 'no new objects from NNTP+IMAP'); +}); + +done_testing;