about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/LeiDedupe.pm6
-rw-r--r--lib/PublicInbox/LeiSavedSearch.pm7
-rw-r--r--lib/PublicInbox/LeiToMail.pm42
-rw-r--r--lib/PublicInbox/LeiXSearch.pm15
4 files changed, 55 insertions, 15 deletions
diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index 378f748e..ed52e417 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -127,4 +127,10 @@ sub pause_dedupe {
         delete($skv->{dbh}) if $skv;
 }
 
+sub dedupe_nr {
+        my $skv = $_[0]->[0] or return undef;
+        my @n = $skv->count;
+        $n[0];
+}
+
 1;
diff --git a/lib/PublicInbox/LeiSavedSearch.pm b/lib/PublicInbox/LeiSavedSearch.pm
index 01b987d1..48d252f1 100644
--- a/lib/PublicInbox/LeiSavedSearch.pm
+++ b/lib/PublicInbox/LeiSavedSearch.pm
@@ -309,6 +309,13 @@ E: rename($dir_old, $dir_new) error: $!
 EOM
 }
 
+# cf. LeiDedupe->dedupe_nr
+sub dedupe_nr {
+        my $oidx = $_[0]->{oidx} // die 'BUG: no {oidx}';
+        my @n = $oidx->{dbh}->selectrow_array('SELECT COUNT(*) FROM over');
+        $n[0];
+}
+
 no warnings 'once';
 *nntp_url = \&cloneurl;
 *base_url = \&PublicInbox::Inbox::base_url;
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index f3c03969..ad6b9439 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -41,11 +41,14 @@ sub _mbox_hdr_buf ($$$) {
                         warn "# keyword `$k' not supported for mbox\n";
                 }
         }
-        # Messages are always 'O' (non-\Recent in IMAP), it saves
-        # MUAs the trouble of rewriting the mbox if no other
-        # changes are made.  We put 'O' at the end (e.g. "Status: RO")
-        # to match mutt(1) output.
-        $eml->header_set('Status', join('', sort(@{$hdr{Status}})). 'O');
+        # When writing to empty mboxes, messages are always 'O'
+        # (not-\Recent in IMAP), it saves MUAs the trouble of
+        # rewriting the mbox if no other changes are made.
+        # We put 'O' at the end (e.g. "Status: RO") to match mutt(1) output.
+        # We only set smsg->{-recent} if augmenting existing stores.
+        my $status = join('', sort(@{$hdr{Status}}));
+        $status .= 'O' unless $smsg->{-recent};
+        $eml->header_set('Status', $status) if $status;
         if (my $chars = delete $hdr{'X-Status'}) {
                 $eml->header_set('X-Status', join('', sort(@$chars)));
         }
@@ -196,11 +199,13 @@ sub _mbox_write_cb ($$) {
         my $dedupe = $lei->{dedupe};
         $dedupe->prepare_dedupe;
         my $lse = $lei->{lse}; # may be undef
+        my $set_recent = $dedupe->dedupe_nr;
         sub { # for git_to_mail
                 my ($buf, $smsg, $eml) = @_;
                 $eml //= PublicInbox::Eml->new($buf);
                 return if $dedupe->is_dup($eml, $smsg);
                 $lse->xsmsg_vmd($smsg) if $lse;
+                $smsg->{-recent} = 1 if $set_recent;
                 $buf = $eml2mbox->($eml, $smsg);
                 if ($atomic_append) {
                         atomic_append($lei, $buf);
@@ -248,8 +253,8 @@ sub kw2suffix ($;@) {
         join('', sort(map { $kw2char{$_} // () } @$kw, @_));
 }
 
-sub _buf2maildir {
-        my ($dst, $buf, $smsg) = @_;
+sub _buf2maildir ($$$$) {
+        my ($dst, $buf, $smsg, $dir) = @_;
         my $kw = $smsg->{kw} // [];
         my $rand = ''; # chosen by die roll :P
         my ($tmp, $fh, $base, $ok);
@@ -260,11 +265,7 @@ sub _buf2maildir {
         } while (!($ok = sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY)) &&
                 $!{EEXIST} && ($rand = _rand.','));
         if ($ok && print $fh $$buf and close($fh)) {
-                # ignore new/ and write only to cur/, otherwise MUAs
-                # with R/W access to the Maildir will end up doing
-                # a mass rename which can take a while with thousands
-                # of messages.
-                $dst .= 'cur/';
+                $dst .= $dir; # 'new/' or 'cur/'
                 $rand = '';
                 do {
                         $base = $rand.$common.':2,'.kw2suffix($kw);
@@ -289,6 +290,11 @@ sub _maildir_write_cb ($$) {
         my $lse = $lei->{lse}; # may be undef
         my $sto = $lei->{opt}->{'mail-sync'} ? $lei->{sto} : undef;
         my $out = $sto ? 'maildir:'.$lei->rel2abs($dst) : undef;
+
+        # Favor cur/ and only write to new/ when augmenting.  This
+        # saves MUAs from having to do a mass rename when the initial
+        # search result set is huge.
+        my $dir = $dedupe && $dedupe->dedupe_nr ? 'new/' : 'cur/';
         sub { # for git_to_mail
                 my ($bref, $smsg, $eml) = @_;
                 $dst // return $lei->fail; # dst may be undef-ed in last run
@@ -296,7 +302,8 @@ sub _maildir_write_cb ($$) {
                                                 PublicInbox::Eml->new($$bref),
                                                 $smsg);
                 $lse->xsmsg_vmd($smsg) if $lse;
-                my $n = _buf2maildir($dst, $bref // \($eml->as_string), $smsg);
+                my $n = _buf2maildir($dst, $bref // \($eml->as_string),
+                                        $smsg, $dir);
                 $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
                 ++$lei->{-nr_write};
         }
@@ -648,7 +655,16 @@ sub do_post_auth {
                 $lei->{1} = $zpipe->[1];
                 close $zpipe->[0];
         }
+        my $au_peers = delete $self->{au_peers};
+        if ($au_peers) { # wait for peer l2m to finish augmenting:
+                $au_peers->[1] = undef;
+                sysread($au_peers->[0], my $barrier1, 1);
+        }
         $self->{wcb} = $self->write_cb($lei);
+        if ($au_peers) { # wait for peer l2m to set write_cb
+                $au_peers->[3] = undef;
+                sysread($au_peers->[2], my $barrier2, 1);
+        }
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index e69a4edd..3482082d 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -482,11 +482,22 @@ sub do_query {
                 if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
                         $lei->start_mua;
                 }
+                my $F_SETPIPE_SZ = $^O eq 'linux' ? 1031 : undef;
+                if ($l2m->{-wq_nr_workers} > 1 &&
+                                $l2m->{base_type} =~ /\A(?:maildir|mbox)\z/) {
+                        # setup two barriers to coordinate dedupe_nr
+                        # between l2m workers
+                        pipe(my ($a_r, $a_w)) or die "pipe: $!";
+                        fcntl($a_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+                        pipe(my ($b_r, $b_w)) or die "pipe: $!";
+                        fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+                        $l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
+                }
                 $l2m->wq_workers_start('lei2mail', undef,
                                         $lei->oldset, { lei => $lei });
                 pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
-                # 1031: F_SETPIPE_SZ
-                fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
+                fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+                delete $l2m->{au_peers};
         }
         $self->wq_workers_start('lei_xsearch', undef,
                                 $lei->oldset, { lei => $lei });