about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiXSearch.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r--lib/PublicInbox/LeiXSearch.pm97
1 files changed, 46 insertions, 51 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index ab66717c..e41d899e 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -110,8 +110,8 @@ sub wait_startq ($) {
 sub mset_progress {
         my $lei = shift;
         return unless $lei->{-progress};
-        if ($lei->{pkt_op}) { # called via pkt_op/pkt_do from workers
-                pkt_do($lei->{pkt_op}, 'mset_progress', @_);
+        if ($lei->{pkt_op_p}) {
+                pkt_do($lei->{pkt_op_p}, 'mset_progress', @_);
         } else { # single lei-daemon consumer
                 my ($desc, $mset_size, $mset_total_est) = @_;
                 $lei->{-mset_total} += $mset_size;
@@ -120,11 +120,10 @@ sub mset_progress {
 }
 
 sub query_thread_mset { # for --thread
-        my ($self, $lei, $ibxish) = @_;
+        my ($self, $ibxish) = @_;
         local $0 = "$0 query_thread_mset";
-        $lei->atfork_child_wq($self);
+        my $lei = $self->{lei};
         my $startq = delete $lei->{startq};
-
         my ($srch, $over) = ($ibxish->search, $ibxish->over);
         my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
         return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
@@ -154,9 +153,9 @@ sub query_thread_mset { # for --thread
 }
 
 sub query_mset { # non-parallel for non-"--thread" users
-        my ($self, $lei) = @_;
+        my ($self) = @_;
         local $0 = "$0 query_mset";
-        $lei->atfork_child_wq($self);
+        my $lei = $self->{lei};
         my $startq = delete $lei->{startq};
         my $mo = { %{$lei->{mset_opt}} };
         my $mset;
@@ -207,10 +206,10 @@ sub kill_reap {
 }
 
 sub query_remote_mboxrd {
-        my ($self, $lei, $uris) = @_;
+        my ($self, $uris) = @_;
         local $0 = "$0 query_remote_mboxrd";
-        $lei->atfork_child_wq($self);
         local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap)
+        my $lei = $self->{lei};
         my ($opt, $env) = @$lei{qw(opt env)};
         my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
         push(@qform, t => 1) if $opt->{thread};
@@ -307,7 +306,7 @@ sub git {
         $git;
 }
 
-sub query_done { # EOF callback
+sub query_done { # EOF callback for main daemon
         my ($lei) = @_;
         my $has_l2m = exists $lei->{l2m};
         for my $f (qw(lxs l2m)) {
@@ -332,9 +331,8 @@ Error closing $lei->{ovv}->{dst}: $!
 }
 
 sub do_post_augment {
-        my ($lei, $zpipe, $au_done) = @_;
-        my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
-        eval { $l2m->post_augment($lei, $zpipe) };
+        my ($lei) = @_;
+        eval { $lei->{l2m}->post_augment($lei) };
         if (my $err = $@) {
                 if (my $lxs = delete $lei->{lxs}) {
                         $lxs->wq_kill;
@@ -342,7 +340,7 @@ sub do_post_augment {
                 }
                 $lei->fail("$err");
         }
-        close $au_done; # triggers wait_startq
+        close(delete $lei->{au_done}); # triggers wait_startq
 }
 
 my $MAX_PER_HOST = 4;
@@ -356,13 +354,13 @@ sub concurrency {
 }
 
 sub start_query { # always runs in main (lei-daemon) process
-        my ($self, $io, $lei) = @_;
+        my ($self, $lei) = @_;
         if ($lei->{opt}->{thread}) {
                 for my $ibxish (locals($self)) {
-                        $self->wq_do('query_thread_mset', $io, $lei, $ibxish);
+                        $self->wq_do('query_thread_mset', [], $ibxish);
                 }
         } elsif (locals($self)) {
-                $self->wq_do('query_mset', $io, $lei);
+                $self->wq_do('query_mset', []);
         }
         my $i = 0;
         my $q = [];
@@ -370,19 +368,23 @@ sub start_query { # always runs in main (lei-daemon) process
                 push @{$q->[$i++ % $MAX_PER_HOST]}, $uri;
         }
         for my $uris (@$q) {
-                $self->wq_do('query_remote_mboxrd', $io, $lei, $uris);
+                $self->wq_do('query_remote_mboxrd', [], $uris);
         }
-        @$io = ();
+}
+
+sub ipc_atfork_child {
+        my ($self) = @_;
+        $self->{lei}->lei_atfork_child;
+        $self->SUPER::ipc_atfork_child;
 }
 
 sub query_prepare { # called by wq_do
-        my ($self, $lei) = @_;
+        my ($self) = @_;
         local $0 = "$0 query_prepare";
-        $lei->atfork_child_wq($self);
-        delete $lei->{l2m}->{-wq_s1};
+        my $lei = $self->{lei};
         eval { $lei->{l2m}->do_augment($lei) };
         $lei->fail($@) if $@;
-        pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!"
+        pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
 sub fail_handler ($;$$) {
@@ -401,45 +403,38 @@ sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
 
 sub do_query {
         my ($self, $lei) = @_;
-        $lei->{1}->autoflush(1);
-        $lei->start_pager if -t $lei->{1};
-        $lei->{ovv}->ovv_begin($lei);
-        my ($au_done, $zpipe);
-        my $l2m = $lei->{l2m};
-        $lei->atfork_prepare_wq($self);
-        $self->wq_workers_start('lei_xsearch', $self->{jobs}, $lei->oldset);
-        delete $self->{-ipc_atfork_child_close};
-        if ($l2m) {
-                $lei->atfork_prepare_wq($l2m);
-                $l2m->wq_workers_start('lei2mail', $l2m->{jobs}, $lei->oldset);
-                delete $l2m->{-ipc_atfork_child_close};
-                pipe($lei->{startq}, $au_done) or die "pipe: $!";
-                # 1031: F_SETPIPE_SZ
-                fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
-                $zpipe = $l2m->pre_augment($lei);
-        }
         my $ops = {
                 '|' => [ \&sigpipe_handler, $lei ],
                 '!' => [ \&fail_handler, $lei ],
-                '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
+                '.' => [ \&do_post_augment, $lei ],
                 '' => [ \&query_done, $lei ],
                 'mset_progress' => [ \&mset_progress, $lei ],
                 'x_it' => [ $lei->can('x_it'), $lei ],
                 'child_error' => [ $lei->can('child_error'), $lei ],
         };
-        (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops);
-        my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
-        delete($lei->{pkt_op});
-
-        $lei->event_step_init; # wait for shutdowns
+        ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+        $lei->{1}->autoflush(1);
+        $lei->start_pager if -t $lei->{1};
+        $lei->{ovv}->ovv_begin($lei);
+        my $l2m = $lei->{l2m};
         if ($l2m) {
-                $self->wq_do('query_prepare', \@io, $lei_ipc);
-                $io[1] = $zpipe->[1] if $zpipe;
+                $l2m->pre_augment($lei);
+                $l2m->wq_workers_start('lei2mail', $l2m->{jobs},
+                                        $lei->oldset, { lei => $lei });
+                pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
+                # 1031: F_SETPIPE_SZ
+                fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
         }
-        start_query($self, \@io, $lei_ipc);
-        $self->wq_close(1);
+        $self->wq_workers_start('lei_xsearch', $self->{jobs},
+                                $lei->oldset, { lei => $lei });
+        my $op = delete $lei->{pkt_op_c};
+        delete $lei->{pkt_op_p};
+        $l2m->wq_close(1) if $l2m;
+        $lei->event_step_init; # wait for shutdowns
+        $self->wq_do('query_prepare', []) if $l2m;
+        start_query($self, $lei);
+        $self->wq_close(1); # lei_xsearch workers stop when done
         if ($lei->{oneshot}) {
-                # for the $lei_ipc->atfork_child_wq PIPE handler:
                 while ($op->{sock}) { $op->event_step }
         }
 }