diff options
-rw-r--r-- | lib/PublicInbox/LeiDedupe.pm | 6 | ||||
-rw-r--r-- | lib/PublicInbox/LeiSavedSearch.pm | 7 | ||||
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 42 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 15 | ||||
-rw-r--r-- | t/lei-q-kw.t | 13 | ||||
-rw-r--r-- | t/lei-q-save.t | 4 | ||||
-rw-r--r-- | t/lei_to_mail.t | 16 |
7 files changed, 75 insertions, 28 deletions
diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm index 378f748e..ed52e417 100644 --- a/lib/PublicInbox/LeiDedupe.pm +++ b/lib/PublicInbox/LeiDedupe.pm @@ -127,4 +127,10 @@ sub pause_dedupe { delete($skv->{dbh}) if $skv; } +sub dedupe_nr { + my $skv = $_[0]->[0] or return undef; + my @n = $skv->count; + $n[0]; +} + 1; diff --git a/lib/PublicInbox/LeiSavedSearch.pm b/lib/PublicInbox/LeiSavedSearch.pm index 01b987d1..48d252f1 100644 --- a/lib/PublicInbox/LeiSavedSearch.pm +++ b/lib/PublicInbox/LeiSavedSearch.pm @@ -309,6 +309,13 @@ E: rename($dir_old, $dir_new) error: $! EOM } +# cf. LeiDedupe->dedupe_nr +sub dedupe_nr { + my $oidx = $_[0]->{oidx} // die 'BUG: no {oidx}'; + my @n = $oidx->{dbh}->selectrow_array('SELECT COUNT(*) FROM over'); + $n[0]; +} + no warnings 'once'; *nntp_url = \&cloneurl; *base_url = \&PublicInbox::Inbox::base_url; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index f3c03969..ad6b9439 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -41,11 +41,14 @@ sub _mbox_hdr_buf ($$$) { warn "# keyword `$k' not supported for mbox\n"; } } - # Messages are always 'O' (non-\Recent in IMAP), it saves - # MUAs the trouble of rewriting the mbox if no other - # changes are made. We put 'O' at the end (e.g. "Status: RO") - # to match mutt(1) output. - $eml->header_set('Status', join('', sort(@{$hdr{Status}})). 'O'); + # When writing to empty mboxes, messages are always 'O' + # (not-\Recent in IMAP), it saves MUAs the trouble of + # rewriting the mbox if no other changes are made. + # We put 'O' at the end (e.g. "Status: RO") to match mutt(1) output. + # We only set smsg->{-recent} if augmenting existing stores. + my $status = join('', sort(@{$hdr{Status}})); + $status .= 'O' unless $smsg->{-recent}; + $eml->header_set('Status', $status) if $status; if (my $chars = delete $hdr{'X-Status'}) { $eml->header_set('X-Status', join('', sort(@$chars))); } @@ -196,11 +199,13 @@ sub _mbox_write_cb ($$) { my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe; my $lse = $lei->{lse}; # may be undef + my $set_recent = $dedupe->dedupe_nr; sub { # for git_to_mail my ($buf, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($buf); return if $dedupe->is_dup($eml, $smsg); $lse->xsmsg_vmd($smsg) if $lse; + $smsg->{-recent} = 1 if $set_recent; $buf = $eml2mbox->($eml, $smsg); if ($atomic_append) { atomic_append($lei, $buf); @@ -248,8 +253,8 @@ sub kw2suffix ($;@) { join('', sort(map { $kw2char{$_} // () } @$kw, @_)); } -sub _buf2maildir { - my ($dst, $buf, $smsg) = @_; +sub _buf2maildir ($$$$) { + my ($dst, $buf, $smsg, $dir) = @_; my $kw = $smsg->{kw} // []; my $rand = ''; # chosen by die roll :P my ($tmp, $fh, $base, $ok); @@ -260,11 +265,7 @@ sub _buf2maildir { } while (!($ok = sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY)) && $!{EEXIST} && ($rand = _rand.',')); if ($ok && print $fh $$buf and close($fh)) { - # ignore new/ and write only to cur/, otherwise MUAs - # with R/W access to the Maildir will end up doing - # a mass rename which can take a while with thousands - # of messages. - $dst .= 'cur/'; + $dst .= $dir; # 'new/' or 'cur/' $rand = ''; do { $base = $rand.$common.':2,'.kw2suffix($kw); @@ -289,6 +290,11 @@ sub _maildir_write_cb ($$) { my $lse = $lei->{lse}; # may be undef my $sto = $lei->{opt}->{'mail-sync'} ? $lei->{sto} : undef; my $out = $sto ? 'maildir:'.$lei->rel2abs($dst) : undef; + + # Favor cur/ and only write to new/ when augmenting. This + # saves MUAs from having to do a mass rename when the initial + # search result set is huge. + my $dir = $dedupe && $dedupe->dedupe_nr ? 'new/' : 'cur/'; sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $dst // return $lei->fail; # dst may be undef-ed in last run @@ -296,7 +302,8 @@ sub _maildir_write_cb ($$) { PublicInbox::Eml->new($$bref), $smsg); $lse->xsmsg_vmd($smsg) if $lse; - my $n = _buf2maildir($dst, $bref // \($eml->as_string), $smsg); + my $n = _buf2maildir($dst, $bref // \($eml->as_string), + $smsg, $dir); $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto; ++$lei->{-nr_write}; } @@ -648,7 +655,16 @@ sub do_post_auth { $lei->{1} = $zpipe->[1]; close $zpipe->[0]; } + my $au_peers = delete $self->{au_peers}; + if ($au_peers) { # wait for peer l2m to finish augmenting: + $au_peers->[1] = undef; + sysread($au_peers->[0], my $barrier1, 1); + } $self->{wcb} = $self->write_cb($lei); + if ($au_peers) { # wait for peer l2m to set write_cb + $au_peers->[3] = undef; + sysread($au_peers->[2], my $barrier2, 1); + } } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index e69a4edd..3482082d 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -482,11 +482,22 @@ sub do_query { if ($lei->{opt}->{augment} && delete $lei->{early_mua}) { $lei->start_mua; } + my $F_SETPIPE_SZ = $^O eq 'linux' ? 1031 : undef; + if ($l2m->{-wq_nr_workers} > 1 && + $l2m->{base_type} =~ /\A(?:maildir|mbox)\z/) { + # setup two barriers to coordinate dedupe_nr + # between l2m workers + pipe(my ($a_r, $a_w)) or die "pipe: $!"; + fcntl($a_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; + pipe(my ($b_r, $b_w)) or die "pipe: $!"; + fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; + $l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ]; + } $l2m->wq_workers_start('lei2mail', undef, $lei->oldset, { lei => $lei }); pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; - # 1031: F_SETPIPE_SZ - fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux'; + fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; + delete $l2m->{au_peers}; } $self->wq_workers_start('lei_xsearch', undef, $lei->oldset, { lei => $lei }); diff --git a/t/lei-q-kw.t b/t/lei-q-kw.t index c00a0a43..074c573d 100644 --- a/t/lei-q-kw.t +++ b/t/lei-q-kw.t @@ -14,7 +14,6 @@ my $exp = { '<testmessage@example.com>' => eml_load('t/utf8.eml'), }; $exp->{'<qp@example.com>'}->header_set('Status', 'RO'); -$exp->{'<testmessage@example.com>'}->header_set('Status', 'O'); test_lei(sub { lei_ok(qw(import -F eml t/plack-qp.eml)); @@ -105,7 +104,17 @@ for my $sfx ('', '.gz') { my %res; PublicInbox::MboxReader->mboxrd($fh, sub { my ($eml) = @_; - $res{$eml->header_raw('Message-ID')} = $eml; + my $mid = $eml->header_raw('Message-ID'); + if ($mid eq '<testmessage@example.com>') { + is_deeply([$eml->header('Status')], [], + "no status $sfx"); + $eml->header_set('Status'); + } elsif ($mid eq '<qp@example.com>') { + is($eml->header('Status'), 'RO', 'status preserved'); + } else { + fail("unknown mid $mid"); + } + $res{$mid} = $eml; }); is_deeply(\%res, $exp, '--augment worked'); diff --git a/t/lei-q-save.t b/t/lei-q-save.t index 753d5b20..aed38a51 100644 --- a/t/lei-q-save.t +++ b/t/lei-q-save.t @@ -42,7 +42,7 @@ test_lei(sub { lei_ok qw(up -q md -C), $home; lei_ok qw(up -q . -C), "$home/md"; lei_ok qw(up -q), "/$home/md"; - my %after = map { $_ => 1 } glob("$home/md/cur/*"); + my %after = map { $_ => 1 } glob("$home/md/{new,cur}/*"); is(delete $after{(keys(%before))[0]}, 1, 'original message kept'); is(scalar(keys %after), 1, 'one new message added'); is_deeply(eml_load((keys %after)[0]), $doc2, 'doc2 matches'); @@ -155,7 +155,7 @@ test_lei(sub { $im->add(PublicInbox::Eml->new($diff)); $im->done; lei_ok('up', $o); - @m = glob("$o/cur/*"); + @m = glob("$o/{new,cur}/*"); is(scalar(@m), 2, 'got 2nd result due to different OID'); SKIP: { diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index 32532a98..35904706 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -90,7 +90,7 @@ my $fn = "$tmpdir/x.mbox"; my ($mbox) = shuffle(@MBOX); # pick one, shouldn't matter my $wcb_get = sub { my ($fmt, $dst) = @_; - delete $lei->{dedupe}; + delete $lei->{dedupe}; # to be recreated $lei->{ovv} = bless { fmt => $fmt, dst => $dst @@ -119,13 +119,12 @@ my $orig = do { like($raw, qr/^blah\n/sm, 'wrote content'); unlink $fn or BAIL_OUT $!; - local $lei->{opt} = { jobs => 2 }; $wcb = $wcb_get->($mbox, $fn); ok(-f $fn && !-s _, 'truncated mbox destination'); $wcb->(\($dup = $buf), $deadbeef); $commit->($wcb); open $fh, '<', $fn or BAIL_OUT $!; - is(do { local $/; <$fh> }, $raw, 'jobs > 1'); + is(do { local $/; <$fh> }, $raw, 'wrote identical content'); $raw; }; @@ -158,21 +157,20 @@ for my $zsfx (qw(gz bz2 xz)) { ok($dc_cmd, "decompressor for .$zsfx"); my $f = "$fn.$zsfx"; my $wcb = $wcb_get->($mbox, $f); - $wcb->(\(my $dup = $buf), $deadbeef); + $wcb->(\(my $dup = $buf), { %$deadbeef }); $commit->($wcb); my $uncompressed = xqx([@$dc_cmd, $f]); is($uncompressed, $orig, "$zsfx works unlocked"); - local $lei->{opt} = { jobs => 2 }; # for atomic writes unlink $f or BAIL_OUT "unlink $!"; $wcb = $wcb_get->($mbox, $f); - $wcb->(\($dup = $buf), $deadbeef); + $wcb->(\($dup = $buf), { %$deadbeef }); $commit->($wcb); is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock"); local $lei->{opt} = { augment => 1 }; $wcb = $wcb_get->($mbox, $f); - $wcb->(\($dup = $buf . "\nx\n"), $deadbeef); + $wcb->(\($dup = $buf . "\nx\n"), { %$deadbeef }); $commit->($wcb); my $cat = popen_rd([@$dc_cmd, $f]); @@ -182,9 +180,9 @@ for my $zsfx (qw(gz bz2 xz)) { like($raw[1], qr/\nblah\n\nx\n\z/s, "augmented $zsfx"); like($raw[0], qr/\nblah\n\z/s, "original preserved $zsfx"); - local $lei->{opt} = { augment => 1, jobs => 2 }; + local $lei->{opt} = { augment => 1 }; $wcb = $wcb_get->($mbox, $f); - $wcb->(\($dup = $buf . "\ny\n"), $deadbeef); + $wcb->(\($dup = $buf . "\ny\n"), { %$deadbeef }); $commit->($wcb); my @raw3; |