about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiOverview.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-18 04:30:31 -0600
committerEric Wong <e@80x24.org>2021-01-18 21:20:25 +0000
commit51191d611e918ff3ef6e9ce8ee52ba7b2cd2144c (patch)
treec787b82356d3f7e42d003a478d8c3a3ce0ee956e /lib/PublicInbox/LeiOverview.pm
parent21671ed82f8d1a7b6de593e073079e29c5675aa8 (diff)
downloadpublic-inbox-51191d611e918ff3ef6e9ce8ee52ba7b2cd2144c.tar.gz
With 4 dedicated workers, this seems to provide a 100-120%
speedup on a 4 core machine when writing thousands of search
results to a Maildir or mbox.  This also sets us up for
high-latency IMAP destinations in the future.

This opens the door to more speedup opportunities such
as optimizing dedupe locking and other ways to reduce
contention.

This change is fairly complex and convoluted, unfortunately.
Further work may allow us to simplify it and even improve
performance.
Diffstat (limited to 'lib/PublicInbox/LeiOverview.pm')
-rw-r--r--lib/PublicInbox/LeiOverview.pm36
1 files changed, 33 insertions, 3 deletions
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index c0b423f6..538d6bd5 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -140,6 +140,16 @@ sub _unbless_smsg {
 
 sub ovv_atexit_child {
         my ($self, $lei) = @_;
+        if (my $l2m = delete $lei->{l2m}) {
+                # gracefully stop lei2mail processes after all
+                # ->write_mail work is complete
+                delete $l2m->{-wq_s1};
+                if (my $rd = delete $l2m->{each_smsg_done}) {
+                        read($rd, my $buf, 1); # wait for EOF
+                }
+        }
+        # order matters, git->{-tmp}->DESTROY must not fire until
+        # {each_smsg_done} hits EOF above
         if (my $git = delete $self->{git}) {
                 $git->async_wait_all;
         }
@@ -178,8 +188,6 @@ sub _json_pretty {
 
 sub ovv_each_smsg_cb { # runs in wq worker usually
         my ($self, $lei, $ibxish) = @_;
-        $lei->{ovv_buf} = \(my $buf = '');
-        delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
         my $json;
         $lei->{1}->autoflush(1);
         if (my $pkg = $self->{json}) {
@@ -187,7 +195,27 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                 $json->utf8->canonical;
                 $json->ascii(1) if $lei->{opt}->{ascii};
         }
-        if (my $l2m = $lei->{l2m}) {
+        my $l2m = $lei->{l2m};
+        if ($l2m && $l2m->{-wq_s1}) {
+                my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m);
+                # n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout,
+                # $io[4] becomes a notification pipe that triggers EOF
+                # in this wq worker when all outstanding ->write_mail
+                # calls are complete
+                die "BUG: \$io[4] $io[4] unexpected" if $io[4];
+                pipe($l2m->{each_smsg_done}, $io[4]) or die "pipe: $!";
+                fcntl($io[4], 1031, 4096) if $^O eq 'linux';
+                delete @$lei_ipc{qw(l2m opt mset_opt cmd)};
+                my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
+                $self->{git} = $git;
+                my $git_dir = $git->{git_dir};
+                sub {
+                        my ($smsg, $mitem) = @_;
+                        my $kw = []; # TODO get from mitem
+                        $l2m->wq_do('write_mail', \@io, $git_dir,
+                                        $smsg->{blob}, $lei_ipc, $kw)
+                }
+        } elsif ($l2m) {
                 my $wcb = $l2m->write_cb($lei);
                 my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
                 $self->{git} = $git; # for ovv_atexit_child
@@ -199,6 +227,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                 };
         } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
                 my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
+                $lei->{ovv_buf} = \(my $buf = '');
                 sub { # DIY prettiness :P
                         my ($smsg, $mitem) = @_;
                         $smsg = _unbless_smsg($smsg, $mitem);
@@ -221,6 +250,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                 }
         } elsif ($json) {
                 my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
+                $lei->{ovv_buf} = \(my $buf = '');
                 sub {
                         my ($smsg, $mitem) = @_;
                         delete @$smsg{qw(tid num)};