about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiXSearch.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r--lib/PublicInbox/LeiXSearch.pm15
1 files changed, 13 insertions, 2 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index e69a4edd..3482082d 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -482,11 +482,22 @@ sub do_query {
                 if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
                         $lei->start_mua;
                 }
+                my $F_SETPIPE_SZ = $^O eq 'linux' ? 1031 : undef;
+                if ($l2m->{-wq_nr_workers} > 1 &&
+                                $l2m->{base_type} =~ /\A(?:maildir|mbox)\z/) {
+                        # setup two barriers to coordinate dedupe_nr
+                        # between l2m workers
+                        pipe(my ($a_r, $a_w)) or die "pipe: $!";
+                        fcntl($a_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+                        pipe(my ($b_r, $b_w)) or die "pipe: $!";
+                        fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+                        $l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
+                }
                 $l2m->wq_workers_start('lei2mail', undef,
                                         $lei->oldset, { lei => $lei });
                 pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
-                # 1031: F_SETPIPE_SZ
-                fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
+                fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+                delete $l2m->{au_peers};
         }
         $self->wq_workers_start('lei_xsearch', undef,
                                 $lei->oldset, { lei => $lei });