diff options
author | Eric Wong <e@80x24.org> | 2021-01-18 04:30:31 -0600 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2021-01-18 21:20:25 +0000 |
commit | 51191d611e918ff3ef6e9ce8ee52ba7b2cd2144c (patch) | |
tree | c787b82356d3f7e42d003a478d8c3a3ce0ee956e /lib/PublicInbox/LeiXSearch.pm | |
parent | 21671ed82f8d1a7b6de593e073079e29c5675aa8 (diff) | |
download | public-inbox-51191d611e918ff3ef6e9ce8ee52ba7b2cd2144c.tar.gz |
With 4 dedicated workers, this seems to provide a 100-120% speedup on a 4 core machine when writing thousands of search results to a Maildir or mbox. This also sets us up for high-latency IMAP destinations in the future. This opens the door to more speedup opportunities such as optimizing dedupe locking and other ways to reduce contention. This change is fairly complex and convoluted, unfortunately. Further work may allow us to simplify it and even improve performance.
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 27 |
1 files changed, 17 insertions, 10 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 91864cd0..dc5cf3b6 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -126,6 +126,7 @@ sub query_thread_mset { # for --thread @{$ctx->{xids}} = (); } } while (_mset_more($mset, $mo)); + undef $each_smsg; # drops @io for l2m->{each_smsg_done} $lei->{ovv}->ovv_atexit_child($lei); } @@ -147,6 +148,7 @@ sub query_mset { # non-parallel for non-"--thread" users $each_smsg->($smsg, $it); } } while (_mset_more($mset, $mo)); + undef $each_smsg; # drops @io for l2m->{each_smsg_done} $lei->{ovv}->ovv_atexit_child($lei); } @@ -170,11 +172,14 @@ sub git { } sub query_done { # EOF callback - my ($lei) = @_; - $lei->{ovv}->ovv_end($lei); - if (my $l2m = $lei->{l2m}) { - $lei->start_mua unless $l2m->lock_free; + my ($self, $lei) = @_; + my $l2m = delete $lei->{l2m}; + if (my $pids = delete $self->{l2m_pids}) { + my $ipc_worker_reap = $self->can('ipc_worker_reap'); + dwaitpid($_, $ipc_worker_reap, $l2m) for @$pids; } + $lei->{ovv}->ovv_end($lei); + $lei->start_mua if $l2m && !$l2m->lock_free; $lei->dclose; } @@ -188,12 +193,10 @@ sub start_query { # always runs in main (lei-daemon) process } my $remotes = $self->{remotes} // []; if ($lei->{opt}->{thread}) { - $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1; for my $ibxish (@$srcs) { $self->wq_do('query_thread_mset', $io, $lei, $ibxish); } } else { - $lei->{-parallel} = scalar(@$remotes); $self->wq_do('query_mset', $io, $lei, $srcs); } # TODO @@ -226,12 +229,12 @@ sub do_query { $io[0] = undef; pipe(my $qry_status_rd, $io[0]) or die "pipe $!"; - $lei_orig->{lxs} = $self; $lei_orig->event_step_init; # wait for shutdowns - my $op_map = { '' => [ \&query_done, $lei_orig ] }; + my $op_map = { '' => [ \&query_done, $self, $lei_orig ] }; my $in_loop = exists $lei_orig->{sock}; my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop); - if (my $l2m = $lei->{l2m}) { + my $l2m = $lei->{l2m}; + if ($l2m) { $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox $io[1] = $lei_orig->{1}; $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ]; @@ -246,13 +249,17 @@ sub do_query { $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ]; $opp->event_step; my $ipc_worker_reap = $self->can('ipc_worker_reap'); + if (my $l2m_pids = delete $self->{l2m_pids}) { + dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids; + } dwaitpid($_, $ipc_worker_reap, $self) for @pids; } } sub ipc_atfork_prepare { my ($self) = @_; - $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]); + # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1}) + $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]); $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC } |