about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/LeiDedupe.pm6
-rw-r--r--lib/PublicInbox/LeiSavedSearch.pm7
-rw-r--r--lib/PublicInbox/LeiToMail.pm42
-rw-r--r--lib/PublicInbox/LeiXSearch.pm15
-rw-r--r--t/lei-q-kw.t13
-rw-r--r--t/lei-q-save.t4
-rw-r--r--t/lei_to_mail.t16
7 files changed, 75 insertions, 28 deletions
diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index 378f748e..ed52e417 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -127,4 +127,10 @@ sub pause_dedupe {
         delete($skv->{dbh}) if $skv;
 }
 
+sub dedupe_nr {
+        my $skv = $_[0]->[0] or return undef;
+        my @n = $skv->count;
+        $n[0];
+}
+
 1;
diff --git a/lib/PublicInbox/LeiSavedSearch.pm b/lib/PublicInbox/LeiSavedSearch.pm
index 01b987d1..48d252f1 100644
--- a/lib/PublicInbox/LeiSavedSearch.pm
+++ b/lib/PublicInbox/LeiSavedSearch.pm
@@ -309,6 +309,13 @@ E: rename($dir_old, $dir_new) error: $!
 EOM
 }
 
+# cf. LeiDedupe->dedupe_nr
+sub dedupe_nr {
+        my $oidx = $_[0]->{oidx} // die 'BUG: no {oidx}';
+        my @n = $oidx->{dbh}->selectrow_array('SELECT COUNT(*) FROM over');
+        $n[0];
+}
+
 no warnings 'once';
 *nntp_url = \&cloneurl;
 *base_url = \&PublicInbox::Inbox::base_url;
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index f3c03969..ad6b9439 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -41,11 +41,14 @@ sub _mbox_hdr_buf ($$$) {
                         warn "# keyword `$k' not supported for mbox\n";
                 }
         }
-        # Messages are always 'O' (non-\Recent in IMAP), it saves
-        # MUAs the trouble of rewriting the mbox if no other
-        # changes are made.  We put 'O' at the end (e.g. "Status: RO")
-        # to match mutt(1) output.
-        $eml->header_set('Status', join('', sort(@{$hdr{Status}})). 'O');
+        # When writing to empty mboxes, messages are always 'O'
+        # (not-\Recent in IMAP), it saves MUAs the trouble of
+        # rewriting the mbox if no other changes are made.
+        # We put 'O' at the end (e.g. "Status: RO") to match mutt(1) output.
+        # We only set smsg->{-recent} if augmenting existing stores.
+        my $status = join('', sort(@{$hdr{Status}}));
+        $status .= 'O' unless $smsg->{-recent};
+        $eml->header_set('Status', $status) if $status;
         if (my $chars = delete $hdr{'X-Status'}) {
                 $eml->header_set('X-Status', join('', sort(@$chars)));
         }
@@ -196,11 +199,13 @@ sub _mbox_write_cb ($$) {
         my $dedupe = $lei->{dedupe};
         $dedupe->prepare_dedupe;
         my $lse = $lei->{lse}; # may be undef
+        my $set_recent = $dedupe->dedupe_nr;
         sub { # for git_to_mail
                 my ($buf, $smsg, $eml) = @_;
                 $eml //= PublicInbox::Eml->new($buf);
                 return if $dedupe->is_dup($eml, $smsg);
                 $lse->xsmsg_vmd($smsg) if $lse;
+                $smsg->{-recent} = 1 if $set_recent;
                 $buf = $eml2mbox->($eml, $smsg);
                 if ($atomic_append) {
                         atomic_append($lei, $buf);
@@ -248,8 +253,8 @@ sub kw2suffix ($;@) {
         join('', sort(map { $kw2char{$_} // () } @$kw, @_));
 }
 
-sub _buf2maildir {
-        my ($dst, $buf, $smsg) = @_;
+sub _buf2maildir ($$$$) {
+        my ($dst, $buf, $smsg, $dir) = @_;
         my $kw = $smsg->{kw} // [];
         my $rand = ''; # chosen by die roll :P
         my ($tmp, $fh, $base, $ok);
@@ -260,11 +265,7 @@ sub _buf2maildir {
         } while (!($ok = sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY)) &&
                 $!{EEXIST} && ($rand = _rand.','));
         if ($ok && print $fh $$buf and close($fh)) {
-                # ignore new/ and write only to cur/, otherwise MUAs
-                # with R/W access to the Maildir will end up doing
-                # a mass rename which can take a while with thousands
-                # of messages.
-                $dst .= 'cur/';
+                $dst .= $dir; # 'new/' or 'cur/'
                 $rand = '';
                 do {
                         $base = $rand.$common.':2,'.kw2suffix($kw);
@@ -289,6 +290,11 @@ sub _maildir_write_cb ($$) {
         my $lse = $lei->{lse}; # may be undef
         my $sto = $lei->{opt}->{'mail-sync'} ? $lei->{sto} : undef;
         my $out = $sto ? 'maildir:'.$lei->rel2abs($dst) : undef;
+
+        # Favor cur/ and only write to new/ when augmenting.  This
+        # saves MUAs from having to do a mass rename when the initial
+        # search result set is huge.
+        my $dir = $dedupe && $dedupe->dedupe_nr ? 'new/' : 'cur/';
         sub { # for git_to_mail
                 my ($bref, $smsg, $eml) = @_;
                 $dst // return $lei->fail; # dst may be undef-ed in last run
@@ -296,7 +302,8 @@ sub _maildir_write_cb ($$) {
                                                 PublicInbox::Eml->new($$bref),
                                                 $smsg);
                 $lse->xsmsg_vmd($smsg) if $lse;
-                my $n = _buf2maildir($dst, $bref // \($eml->as_string), $smsg);
+                my $n = _buf2maildir($dst, $bref // \($eml->as_string),
+                                        $smsg, $dir);
                 $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
                 ++$lei->{-nr_write};
         }
@@ -648,7 +655,16 @@ sub do_post_auth {
                 $lei->{1} = $zpipe->[1];
                 close $zpipe->[0];
         }
+        my $au_peers = delete $self->{au_peers};
+        if ($au_peers) { # wait for peer l2m to finish augmenting:
+                $au_peers->[1] = undef;
+                sysread($au_peers->[0], my $barrier1, 1);
+        }
         $self->{wcb} = $self->write_cb($lei);
+        if ($au_peers) { # wait for peer l2m to set write_cb
+                $au_peers->[3] = undef;
+                sysread($au_peers->[2], my $barrier2, 1);
+        }
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index e69a4edd..3482082d 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -482,11 +482,22 @@ sub do_query {
                 if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
                         $lei->start_mua;
                 }
+                my $F_SETPIPE_SZ = $^O eq 'linux' ? 1031 : undef;
+                if ($l2m->{-wq_nr_workers} > 1 &&
+                                $l2m->{base_type} =~ /\A(?:maildir|mbox)\z/) {
+                        # setup two barriers to coordinate dedupe_nr
+                        # between l2m workers
+                        pipe(my ($a_r, $a_w)) or die "pipe: $!";
+                        fcntl($a_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+                        pipe(my ($b_r, $b_w)) or die "pipe: $!";
+                        fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+                        $l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
+                }
                 $l2m->wq_workers_start('lei2mail', undef,
                                         $lei->oldset, { lei => $lei });
                 pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
-                # 1031: F_SETPIPE_SZ
-                fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
+                fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+                delete $l2m->{au_peers};
         }
         $self->wq_workers_start('lei_xsearch', undef,
                                 $lei->oldset, { lei => $lei });
diff --git a/t/lei-q-kw.t b/t/lei-q-kw.t
index c00a0a43..074c573d 100644
--- a/t/lei-q-kw.t
+++ b/t/lei-q-kw.t
@@ -14,7 +14,6 @@ my $exp = {
         '<testmessage@example.com>' => eml_load('t/utf8.eml'),
 };
 $exp->{'<qp@example.com>'}->header_set('Status', 'RO');
-$exp->{'<testmessage@example.com>'}->header_set('Status', 'O');
 
 test_lei(sub {
 lei_ok(qw(import -F eml t/plack-qp.eml));
@@ -105,7 +104,17 @@ for my $sfx ('', '.gz') {
         my %res;
         PublicInbox::MboxReader->mboxrd($fh, sub {
                 my ($eml) = @_;
-                $res{$eml->header_raw('Message-ID')} = $eml;
+                my $mid = $eml->header_raw('Message-ID');
+                if ($mid eq '<testmessage@example.com>') {
+                        is_deeply([$eml->header('Status')], [],
+                                "no status $sfx");
+                        $eml->header_set('Status');
+                } elsif ($mid eq '<qp@example.com>') {
+                        is($eml->header('Status'), 'RO', 'status preserved');
+                } else {
+                        fail("unknown mid $mid");
+                }
+                $res{$mid} = $eml;
         });
         is_deeply(\%res, $exp, '--augment worked');
 
diff --git a/t/lei-q-save.t b/t/lei-q-save.t
index 753d5b20..aed38a51 100644
--- a/t/lei-q-save.t
+++ b/t/lei-q-save.t
@@ -42,7 +42,7 @@ test_lei(sub {
         lei_ok qw(up -q md -C), $home;
         lei_ok qw(up -q . -C), "$home/md";
         lei_ok qw(up -q), "/$home/md";
-        my %after = map { $_ => 1 } glob("$home/md/cur/*");
+        my %after = map { $_ => 1 } glob("$home/md/{new,cur}/*");
         is(delete $after{(keys(%before))[0]}, 1, 'original message kept');
         is(scalar(keys %after), 1, 'one new message added');
         is_deeply(eml_load((keys %after)[0]), $doc2, 'doc2 matches');
@@ -155,7 +155,7 @@ test_lei(sub {
         $im->add(PublicInbox::Eml->new($diff));
         $im->done;
         lei_ok('up', $o);
-        @m = glob("$o/cur/*");
+        @m = glob("$o/{new,cur}/*");
         is(scalar(@m), 2, 'got 2nd result due to different OID');
 
         SKIP: {
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index 32532a98..35904706 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -90,7 +90,7 @@ my $fn = "$tmpdir/x.mbox";
 my ($mbox) = shuffle(@MBOX); # pick one, shouldn't matter
 my $wcb_get = sub {
         my ($fmt, $dst) = @_;
-        delete $lei->{dedupe};
+        delete $lei->{dedupe}; # to be recreated
         $lei->{ovv} = bless {
                 fmt => $fmt,
                 dst => $dst
@@ -119,13 +119,12 @@ my $orig = do {
         like($raw, qr/^blah\n/sm, 'wrote content');
         unlink $fn or BAIL_OUT $!;
 
-        local $lei->{opt} = { jobs => 2 };
         $wcb = $wcb_get->($mbox, $fn);
         ok(-f $fn && !-s _, 'truncated mbox destination');
         $wcb->(\($dup = $buf), $deadbeef);
         $commit->($wcb);
         open $fh, '<', $fn or BAIL_OUT $!;
-        is(do { local $/; <$fh> }, $raw, 'jobs > 1');
+        is(do { local $/; <$fh> }, $raw, 'wrote identical content');
         $raw;
 };
 
@@ -158,21 +157,20 @@ for my $zsfx (qw(gz bz2 xz)) {
                 ok($dc_cmd, "decompressor for .$zsfx");
                 my $f = "$fn.$zsfx";
                 my $wcb = $wcb_get->($mbox, $f);
-                $wcb->(\(my $dup = $buf), $deadbeef);
+                $wcb->(\(my $dup = $buf), { %$deadbeef });
                 $commit->($wcb);
                 my $uncompressed = xqx([@$dc_cmd, $f]);
                 is($uncompressed, $orig, "$zsfx works unlocked");
 
-                local $lei->{opt} = { jobs => 2 }; # for atomic writes
                 unlink $f or BAIL_OUT "unlink $!";
                 $wcb = $wcb_get->($mbox, $f);
-                $wcb->(\($dup = $buf), $deadbeef);
+                $wcb->(\($dup = $buf), { %$deadbeef });
                 $commit->($wcb);
                 is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock");
 
                 local $lei->{opt} = { augment => 1 };
                 $wcb = $wcb_get->($mbox, $f);
-                $wcb->(\($dup = $buf . "\nx\n"), $deadbeef);
+                $wcb->(\($dup = $buf . "\nx\n"), { %$deadbeef });
                 $commit->($wcb);
 
                 my $cat = popen_rd([@$dc_cmd, $f]);
@@ -182,9 +180,9 @@ for my $zsfx (qw(gz bz2 xz)) {
                 like($raw[1], qr/\nblah\n\nx\n\z/s, "augmented $zsfx");
                 like($raw[0], qr/\nblah\n\z/s, "original preserved $zsfx");
 
-                local $lei->{opt} = { augment => 1, jobs => 2 };
+                local $lei->{opt} = { augment => 1 };
                 $wcb = $wcb_get->($mbox, $f);
-                $wcb->(\($dup = $buf . "\ny\n"), $deadbeef);
+                $wcb->(\($dup = $buf . "\ny\n"), { %$deadbeef });
                 $commit->($wcb);
 
                 my @raw3;