diff options
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/LeiDedupe.pm | 6 | ||||
-rw-r--r-- | lib/PublicInbox/LeiSavedSearch.pm | 7 | ||||
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 42 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 15 |
4 files changed, 55 insertions, 15 deletions
diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm index 378f748e..ed52e417 100644 --- a/lib/PublicInbox/LeiDedupe.pm +++ b/lib/PublicInbox/LeiDedupe.pm @@ -127,4 +127,10 @@ sub pause_dedupe { delete($skv->{dbh}) if $skv; } +sub dedupe_nr { + my $skv = $_[0]->[0] or return undef; + my @n = $skv->count; + $n[0]; +} + 1; diff --git a/lib/PublicInbox/LeiSavedSearch.pm b/lib/PublicInbox/LeiSavedSearch.pm index 01b987d1..48d252f1 100644 --- a/lib/PublicInbox/LeiSavedSearch.pm +++ b/lib/PublicInbox/LeiSavedSearch.pm @@ -309,6 +309,13 @@ E: rename($dir_old, $dir_new) error: $! EOM } +# cf. LeiDedupe->dedupe_nr +sub dedupe_nr { + my $oidx = $_[0]->{oidx} // die 'BUG: no {oidx}'; + my @n = $oidx->{dbh}->selectrow_array('SELECT COUNT(*) FROM over'); + $n[0]; +} + no warnings 'once'; *nntp_url = \&cloneurl; *base_url = \&PublicInbox::Inbox::base_url; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index f3c03969..ad6b9439 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -41,11 +41,14 @@ sub _mbox_hdr_buf ($$$) { warn "# keyword `$k' not supported for mbox\n"; } } - # Messages are always 'O' (non-\Recent in IMAP), it saves - # MUAs the trouble of rewriting the mbox if no other - # changes are made. We put 'O' at the end (e.g. "Status: RO") - # to match mutt(1) output. - $eml->header_set('Status', join('', sort(@{$hdr{Status}})). 'O'); + # When writing to empty mboxes, messages are always 'O' + # (not-\Recent in IMAP), it saves MUAs the trouble of + # rewriting the mbox if no other changes are made. + # We put 'O' at the end (e.g. "Status: RO") to match mutt(1) output. + # We only set smsg->{-recent} if augmenting existing stores. + my $status = join('', sort(@{$hdr{Status}})); + $status .= 'O' unless $smsg->{-recent}; + $eml->header_set('Status', $status) if $status; if (my $chars = delete $hdr{'X-Status'}) { $eml->header_set('X-Status', join('', sort(@$chars))); } @@ -196,11 +199,13 @@ sub _mbox_write_cb ($$) { my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe; my $lse = $lei->{lse}; # may be undef + my $set_recent = $dedupe->dedupe_nr; sub { # for git_to_mail my ($buf, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($buf); return if $dedupe->is_dup($eml, $smsg); $lse->xsmsg_vmd($smsg) if $lse; + $smsg->{-recent} = 1 if $set_recent; $buf = $eml2mbox->($eml, $smsg); if ($atomic_append) { atomic_append($lei, $buf); @@ -248,8 +253,8 @@ sub kw2suffix ($;@) { join('', sort(map { $kw2char{$_} // () } @$kw, @_)); } -sub _buf2maildir { - my ($dst, $buf, $smsg) = @_; +sub _buf2maildir ($$$$) { + my ($dst, $buf, $smsg, $dir) = @_; my $kw = $smsg->{kw} // []; my $rand = ''; # chosen by die roll :P my ($tmp, $fh, $base, $ok); @@ -260,11 +265,7 @@ sub _buf2maildir { } while (!($ok = sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY)) && $!{EEXIST} && ($rand = _rand.',')); if ($ok && print $fh $$buf and close($fh)) { - # ignore new/ and write only to cur/, otherwise MUAs - # with R/W access to the Maildir will end up doing - # a mass rename which can take a while with thousands - # of messages. - $dst .= 'cur/'; + $dst .= $dir; # 'new/' or 'cur/' $rand = ''; do { $base = $rand.$common.':2,'.kw2suffix($kw); @@ -289,6 +290,11 @@ sub _maildir_write_cb ($$) { my $lse = $lei->{lse}; # may be undef my $sto = $lei->{opt}->{'mail-sync'} ? $lei->{sto} : undef; my $out = $sto ? 'maildir:'.$lei->rel2abs($dst) : undef; + + # Favor cur/ and only write to new/ when augmenting. This + # saves MUAs from having to do a mass rename when the initial + # search result set is huge. + my $dir = $dedupe && $dedupe->dedupe_nr ? 'new/' : 'cur/'; sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $dst // return $lei->fail; # dst may be undef-ed in last run @@ -296,7 +302,8 @@ sub _maildir_write_cb ($$) { PublicInbox::Eml->new($$bref), $smsg); $lse->xsmsg_vmd($smsg) if $lse; - my $n = _buf2maildir($dst, $bref // \($eml->as_string), $smsg); + my $n = _buf2maildir($dst, $bref // \($eml->as_string), + $smsg, $dir); $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto; ++$lei->{-nr_write}; } @@ -648,7 +655,16 @@ sub do_post_auth { $lei->{1} = $zpipe->[1]; close $zpipe->[0]; } + my $au_peers = delete $self->{au_peers}; + if ($au_peers) { # wait for peer l2m to finish augmenting: + $au_peers->[1] = undef; + sysread($au_peers->[0], my $barrier1, 1); + } $self->{wcb} = $self->write_cb($lei); + if ($au_peers) { # wait for peer l2m to set write_cb + $au_peers->[3] = undef; + sysread($au_peers->[2], my $barrier2, 1); + } } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index e69a4edd..3482082d 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -482,11 +482,22 @@ sub do_query { if ($lei->{opt}->{augment} && delete $lei->{early_mua}) { $lei->start_mua; } + my $F_SETPIPE_SZ = $^O eq 'linux' ? 1031 : undef; + if ($l2m->{-wq_nr_workers} > 1 && + $l2m->{base_type} =~ /\A(?:maildir|mbox)\z/) { + # setup two barriers to coordinate dedupe_nr + # between l2m workers + pipe(my ($a_r, $a_w)) or die "pipe: $!"; + fcntl($a_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; + pipe(my ($b_r, $b_w)) or die "pipe: $!"; + fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; + $l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ]; + } $l2m->wq_workers_start('lei2mail', undef, $lei->oldset, { lei => $lei }); pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; - # 1031: F_SETPIPE_SZ - fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux'; + fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; + delete $l2m->{au_peers}; } $self->wq_workers_start('lei_xsearch', undef, $lei->oldset, { lei => $lei }); |