From 51191d611e918ff3ef6e9ce8ee52ba7b2cd2144c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 18 Jan 2021 04:30:31 -0600 Subject: lei q: parallelize Maildir and mbox writing With 4 dedicated workers, this seems to provide a 100-120% speedup on a 4 core machine when writing thousands of search results to a Maildir or mbox. This also sets us up for high-latency IMAP destinations in the future. This opens the door to more speedup opportunities such as optimizing dedupe locking and other ways to reduce contention. This change is fairly complex and convoluted, unfortunately. Further work may allow us to simplify it and even improve performance. --- lib/PublicInbox/LeiOverview.pm | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) (limited to 'lib/PublicInbox/LeiOverview.pm') diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index c0b423f6..538d6bd5 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -140,6 +140,16 @@ sub _unbless_smsg { sub ovv_atexit_child { my ($self, $lei) = @_; + if (my $l2m = delete $lei->{l2m}) { + # gracefully stop lei2mail processes after all + # ->write_mail work is complete + delete $l2m->{-wq_s1}; + if (my $rd = delete $l2m->{each_smsg_done}) { + read($rd, my $buf, 1); # wait for EOF + } + } + # order matters, git->{-tmp}->DESTROY must not fire until + # {each_smsg_done} hits EOF above if (my $git = delete $self->{git}) { $git->async_wait_all; } @@ -178,8 +188,6 @@ sub _json_pretty { sub ovv_each_smsg_cb { # runs in wq worker usually my ($self, $lei, $ibxish) = @_; - $lei->{ovv_buf} = \(my $buf = ''); - delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel}; my $json; $lei->{1}->autoflush(1); if (my $pkg = $self->{json}) { @@ -187,7 +195,27 @@ sub ovv_each_smsg_cb { # runs in wq worker usually $json->utf8->canonical; $json->ascii(1) if $lei->{opt}->{ascii}; } - if (my $l2m = $lei->{l2m}) { + my $l2m = $lei->{l2m}; + if ($l2m && $l2m->{-wq_s1}) { + my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m); + # n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout, + # $io[4] becomes a notification pipe that triggers EOF + # in this wq worker when all outstanding ->write_mail + # calls are complete + die "BUG: \$io[4] $io[4] unexpected" if $io[4]; + pipe($l2m->{each_smsg_done}, $io[4]) or die "pipe: $!"; + fcntl($io[4], 1031, 4096) if $^O eq 'linux'; + delete @$lei_ipc{qw(l2m opt mset_opt cmd)}; + my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git + $self->{git} = $git; + my $git_dir = $git->{git_dir}; + sub { + my ($smsg, $mitem) = @_; + my $kw = []; # TODO get from mitem + $l2m->wq_do('write_mail', \@io, $git_dir, + $smsg->{blob}, $lei_ipc, $kw) + } + } elsif ($l2m) { my $wcb = $l2m->write_cb($lei); my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git $self->{git} = $git; # for ovv_atexit_child @@ -199,6 +227,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually }; } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},"; + $lei->{ovv_buf} = \(my $buf = ''); sub { # DIY prettiness :P my ($smsg, $mitem) = @_; $smsg = _unbless_smsg($smsg, $mitem); @@ -221,6 +250,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually } } elsif ($json) { my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL + $lei->{ovv_buf} = \(my $buf = ''); sub { my ($smsg, $mitem) = @_; delete @$smsg{qw(tid num)}; -- cgit v1.2.3-24-ge0c7