diff options
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 97 |
1 files changed, 46 insertions, 51 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index ab66717c..e41d899e 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -110,8 +110,8 @@ sub wait_startq ($) { sub mset_progress { my $lei = shift; return unless $lei->{-progress}; - if ($lei->{pkt_op}) { # called via pkt_op/pkt_do from workers - pkt_do($lei->{pkt_op}, 'mset_progress', @_); + if ($lei->{pkt_op_p}) { + pkt_do($lei->{pkt_op_p}, 'mset_progress', @_); } else { # single lei-daemon consumer my ($desc, $mset_size, $mset_total_est) = @_; $lei->{-mset_total} += $mset_size; @@ -120,11 +120,10 @@ sub mset_progress { } sub query_thread_mset { # for --thread - my ($self, $lei, $ibxish) = @_; + my ($self, $ibxish) = @_; local $0 = "$0 query_thread_mset"; - $lei->atfork_child_wq($self); + my $lei = $self->{lei}; my $startq = delete $lei->{startq}; - my ($srch, $over) = ($ibxish->search, $ibxish->over); my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; return warn("$desc not indexed by Xapian\n") unless ($srch && $over); @@ -154,9 +153,9 @@ sub query_thread_mset { # for --thread } sub query_mset { # non-parallel for non-"--thread" users - my ($self, $lei) = @_; + my ($self) = @_; local $0 = "$0 query_mset"; - $lei->atfork_child_wq($self); + my $lei = $self->{lei}; my $startq = delete $lei->{startq}; my $mo = { %{$lei->{mset_opt}} }; my $mset; @@ -207,10 +206,10 @@ sub kill_reap { } sub query_remote_mboxrd { - my ($self, $lei, $uris) = @_; + my ($self, $uris) = @_; local $0 = "$0 query_remote_mboxrd"; - $lei->atfork_child_wq($self); local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap) + my $lei = $self->{lei}; my ($opt, $env) = @$lei{qw(opt env)}; my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm'); push(@qform, t => 1) if $opt->{thread}; @@ -307,7 +306,7 @@ sub git { $git; } -sub query_done { # EOF callback +sub query_done { # EOF callback for main daemon my ($lei) = @_; my $has_l2m = exists $lei->{l2m}; for my $f (qw(lxs l2m)) { @@ -332,9 +331,8 @@ Error closing $lei->{ovv}->{dst}: $! } sub do_post_augment { - my ($lei, $zpipe, $au_done) = @_; - my $l2m = $lei->{l2m} or die 'BUG: no {l2m}'; - eval { $l2m->post_augment($lei, $zpipe) }; + my ($lei) = @_; + eval { $lei->{l2m}->post_augment($lei) }; if (my $err = $@) { if (my $lxs = delete $lei->{lxs}) { $lxs->wq_kill; @@ -342,7 +340,7 @@ sub do_post_augment { } $lei->fail("$err"); } - close $au_done; # triggers wait_startq + close(delete $lei->{au_done}); # triggers wait_startq } my $MAX_PER_HOST = 4; @@ -356,13 +354,13 @@ sub concurrency { } sub start_query { # always runs in main (lei-daemon) process - my ($self, $io, $lei) = @_; + my ($self, $lei) = @_; if ($lei->{opt}->{thread}) { for my $ibxish (locals($self)) { - $self->wq_do('query_thread_mset', $io, $lei, $ibxish); + $self->wq_do('query_thread_mset', [], $ibxish); } } elsif (locals($self)) { - $self->wq_do('query_mset', $io, $lei); + $self->wq_do('query_mset', []); } my $i = 0; my $q = []; @@ -370,19 +368,23 @@ sub start_query { # always runs in main (lei-daemon) process push @{$q->[$i++ % $MAX_PER_HOST]}, $uri; } for my $uris (@$q) { - $self->wq_do('query_remote_mboxrd', $io, $lei, $uris); + $self->wq_do('query_remote_mboxrd', [], $uris); } - @$io = (); +} + +sub ipc_atfork_child { + my ($self) = @_; + $self->{lei}->lei_atfork_child; + $self->SUPER::ipc_atfork_child; } sub query_prepare { # called by wq_do - my ($self, $lei) = @_; + my ($self) = @_; local $0 = "$0 query_prepare"; - $lei->atfork_child_wq($self); - delete $lei->{l2m}->{-wq_s1}; + my $lei = $self->{lei}; eval { $lei->{l2m}->do_augment($lei) }; $lei->fail($@) if $@; - pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!" + pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!" } sub fail_handler ($;$$) { @@ -401,45 +403,38 @@ sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers sub do_query { my ($self, $lei) = @_; - $lei->{1}->autoflush(1); - $lei->start_pager if -t $lei->{1}; - $lei->{ovv}->ovv_begin($lei); - my ($au_done, $zpipe); - my $l2m = $lei->{l2m}; - $lei->atfork_prepare_wq($self); - $self->wq_workers_start('lei_xsearch', $self->{jobs}, $lei->oldset); - delete $self->{-ipc_atfork_child_close}; - if ($l2m) { - $lei->atfork_prepare_wq($l2m); - $l2m->wq_workers_start('lei2mail', $l2m->{jobs}, $lei->oldset); - delete $l2m->{-ipc_atfork_child_close}; - pipe($lei->{startq}, $au_done) or die "pipe: $!"; - # 1031: F_SETPIPE_SZ - fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux'; - $zpipe = $l2m->pre_augment($lei); - } my $ops = { '|' => [ \&sigpipe_handler, $lei ], '!' => [ \&fail_handler, $lei ], - '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ], + '.' => [ \&do_post_augment, $lei ], '' => [ \&query_done, $lei ], 'mset_progress' => [ \&mset_progress, $lei ], 'x_it' => [ $lei->can('x_it'), $lei ], 'child_error' => [ $lei->can('child_error'), $lei ], }; - (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops); - my ($lei_ipc, @io) = $lei->atfork_parent_wq($self); - delete($lei->{pkt_op}); - - $lei->event_step_init; # wait for shutdowns + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + $lei->{1}->autoflush(1); + $lei->start_pager if -t $lei->{1}; + $lei->{ovv}->ovv_begin($lei); + my $l2m = $lei->{l2m}; if ($l2m) { - $self->wq_do('query_prepare', \@io, $lei_ipc); - $io[1] = $zpipe->[1] if $zpipe; + $l2m->pre_augment($lei); + $l2m->wq_workers_start('lei2mail', $l2m->{jobs}, + $lei->oldset, { lei => $lei }); + pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; + # 1031: F_SETPIPE_SZ + fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux'; } - start_query($self, \@io, $lei_ipc); - $self->wq_close(1); + $self->wq_workers_start('lei_xsearch', $self->{jobs}, + $lei->oldset, { lei => $lei }); + my $op = delete $lei->{pkt_op_c}; + delete $lei->{pkt_op_p}; + $l2m->wq_close(1) if $l2m; + $lei->event_step_init; # wait for shutdowns + $self->wq_do('query_prepare', []) if $l2m; + start_query($self, $lei); + $self->wq_close(1); # lei_xsearch workers stop when done if ($lei->{oneshot}) { - # for the $lei_ipc->atfork_child_wq PIPE handler: while ($op->{sock}) { $op->event_step } } } |