diff options
-rw-r--r-- | lib/PublicInbox/LEI.pm | 56 | ||||
-rw-r--r-- | lib/PublicInbox/LeiOverview.pm | 9 | ||||
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 8 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 68 | ||||
-rw-r--r-- | lib/PublicInbox/Spawn.pm | 2 |
5 files changed, 77 insertions, 66 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index f5413aab..3ed330f9 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -335,14 +335,27 @@ sub atfork_prepare_wq { } } +sub io_restore ($$) { + my ($dst, $src) = @_; + for my $i (0..2) { # standard FDs + my $io = delete $src->{$i} or next; + $dst->{$i} = $io; + } + for my $i (3..9) { # named (non-standard) FDs + my $io = $src->{$i} or next; + my @st = stat($io) or die "stat $src.$i ($io): $!"; + my $f = delete $dst->{"dev=$st[0],ino=$st[1]"} // next; + $dst->{$f} = $io; + delete $src->{$i}; + } +} + # usage: my %sig = $lei->atfork_child_wq($wq); # local @SIG{keys %sig} = values %sig; sub atfork_child_wq { my ($self, $wq) = @_; - my ($sock, $l2m_wq_s1); - (@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4}); - $self->{sock} = $sock if -S $sock; - $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1 && -S $l2m_wq_s1; + io_restore($self, $wq); + io_restore($self->{l2m}, $wq); %PATH2CFG = (); undef $errors_log; $quit = \&CORE::exit; @@ -355,30 +368,45 @@ sub atfork_child_wq { close(delete $self->{$i}); } # trigger the LeiXSearch $done OpPipe: - syswrite($self->{0}, '!') if $self->{0} && -p $self->{0}; + syswrite($self->{op_pipe}, '!') if $self->{op_pipe}; $SIG{PIPE} = 'DEFAULT'; die bless(\"$_[0]", 'PublicInbox::SIGPIPE'), }); } +sub io_extract ($;@) { + my ($obj, @fields) = @_; + my @io; + for my $f (@fields) { + my $io = delete $obj->{$f} or next; + my @st = stat($io) or die "W: stat $obj.$f ($io): $!"; + $obj->{"dev=$st[0],ino=$st[1]"} = $f; + push @io, $io; + } + @io +} + # usage: ($lei, @io) = $lei->atfork_parent_wq($wq); sub atfork_parent_wq { my ($self, $wq) = @_; my $env = delete $self->{env}; # env is inherited at fork - my $ret = bless { %$self }, ref($self); - if (my $dedupe = delete $ret->{dedupe}) { - $ret->{dedupe} = $wq->deep_clone($dedupe); + my $lei = bless { %$self }, ref($self); + if (my $dedupe = delete $lei->{dedupe}) { + $lei->{dedupe} = $wq->deep_clone($dedupe); } $self->{env} = $env; - delete @$ret{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m - my @io = delete @$ret{0..2}; - $io[3] = delete($ret->{sock}) // $io[2]; - my $l2m = $ret->{l2m}; + delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m + my @io = (delete(@$lei{qw(0 1 2)}), + io_extract($lei, qw(sock op_pipe startq))); + my $l2m = $lei->{l2m}; if ($l2m && $l2m != $wq) { # $wq == lxs - $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1}; + if (my $wq_s1 = $l2m->{-wq_s1}) { + push @io, io_extract($l2m, '-wq_s1'); + $l2m->{-wq_s1} = $wq_s1; + } $l2m->wq_close(1); } - ($ret, @io); + ($lei, @io); } sub _help ($;$) { diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index f9a28138..c67e2747 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -220,14 +220,13 @@ sub ovv_each_smsg_cb { # runs in wq worker usually }; } elsif ($l2m && $l2m->{-wq_s1}) { my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m); - # n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout, - # $io[4] becomes a notification pipe that triggers EOF + # $io[-1] becomes a notification pipe that triggers EOF # in this wq worker when all outstanding ->write_mail # calls are complete - die "BUG: \$io[4] $io[4] unexpected" if $io[4]; - pipe($l2m->{each_smsg_done}, $io[4]) or die "pipe: $!"; - fcntl($io[4], 1031, 4096) if $^O eq 'linux'; + pipe($l2m->{each_smsg_done}, $io[$#io + 1]) or die "pipe: $!"; + fcntl($io[-1], 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ delete @$lei_ipc{qw(l2m opt mset_opt cmd)}; + $lei_ipc->{each_smsg_not_done} = $#io; my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git $self->{git} = $git; my $git_dir = $git->{git_dir}; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 08a1570d..61b546b5 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -460,7 +460,7 @@ sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon sub write_mail { # via ->wq_do my ($self, $git_dir, $smsg, $lei) = @_; - my $not_done = delete $self->{4}; # write end of {each_smsg_done} + my $not_done = delete $self->{$lei->{each_smsg_not_done}}; my $wcb = $self->{wcb} //= do { # first message my %sig = $lei->atfork_child_wq($self); @SIG{keys %sig} = values %sig; # not local @@ -471,12 +471,6 @@ sub write_mail { # via ->wq_do $git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]); } -sub ipc_atfork_prepare { - my ($self) = @_; - # FDs: (done_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr) - $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC -} - # We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY # ordering is unstable at worker exit and may cause segfaults sub reap_gits { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 9ea2b5f3..e69b637c 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -109,9 +109,9 @@ sub wait_startq ($) { sub query_thread_mset { # for --thread my ($self, $lei, $ibxish) = @_; local $0 = "$0 query_thread_mset"; - my $startq = delete $self->{5}; my %sig = $lei->atfork_child_wq($self); local @SIG{keys %sig} = values %sig; + my $startq = delete $lei->{startq}; my ($srch, $over) = ($ibxish->search, $ibxish->over); unless ($srch && $over) { @@ -145,9 +145,9 @@ sub query_thread_mset { # for --thread sub query_mset { # non-parallel for non-"--thread" users my ($self, $lei) = @_; local $0 = "$0 query_mset"; - my $startq = delete $self->{5}; my %sig = $lei->atfork_child_wq($self); local @SIG{keys %sig} = values %sig; + my $startq = delete $lei->{startq}; my $mo = { %{$lei->{mset_opt}} }; my $mset; for my $loc (locals($self)) { @@ -173,7 +173,7 @@ sub each_eml { # callback for MboxReader->mboxrd $smsg->parse_references($eml, mids($eml)); $smsg->{$_} //= '' for qw(from to cc ds subject references mid); delete @$smsg{qw(From Subject -ds -ts)}; - if (my $startq = delete($self->{5})) { wait_startq($startq) } + if (my $startq = delete($lei->{startq})) { wait_startq($startq) } $each_smsg->($smsg, undef, $eml); } @@ -352,11 +352,12 @@ sub query_prepare { # called by wq_do my ($self, $lei) = @_; local $0 = "$0 query_prepare"; my %sig = $lei->atfork_child_wq($self); - -p $lei->{0} or die "BUG: \$done pipe expected"; + -p $lei->{op_pipe} or die "BUG: \$done pipe expected"; local @SIG{keys %sig} = values %sig; + delete $lei->{l2m}->{-wq_s1}; eval { $lei->{l2m}->do_augment($lei) }; $lei->fail($@) if $@; - syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!"; + syswrite($lei->{op_pipe}, '.') == 1 or die "do_post_augment trigger: $!" } sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers @@ -370,56 +371,45 @@ sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers } sub do_query { - my ($self, $lei_orig) = @_; - my ($lei, @io) = $lei_orig->atfork_parent_wq($self); - $io[0] = undef; - pipe(my $done, $io[0]) or die "pipe $!"; - $lei_orig->{1}->autoflush(1); + my ($self, $lei) = @_; + $lei->{1}->autoflush(1); + my ($au_done, $zpipe); + my $l2m = $lei->{l2m}; + if ($l2m) { + pipe($lei->{startq}, $au_done) or die "pipe: $!"; + # 1031: F_SETPIPE_SZ + fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux'; + $zpipe = $l2m->pre_augment($lei); + } + pipe(my $done, $lei->{op_pipe}) or die "pipe $!"; + my ($lei_ipc, @io) = $lei->atfork_parent_wq($self); + delete($lei->{op_pipe}); - $lei_orig->event_step_init; # wait for shutdowns + $lei->event_step_init; # wait for shutdowns my $done_op = { - '' => [ \&query_done, $lei_orig ], - '!' => [ \&sigpipe_handler, $lei_orig ] + '' => [ \&query_done, $lei ], + '!' => [ \&sigpipe_handler, $lei ] }; - my $in_loop = exists $lei_orig->{sock}; + my $in_loop = exists $lei->{sock}; $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop); - my $l2m = $lei->{l2m}; if ($l2m) { - # may redirect $lei->{1} for mbox - my $zpipe = $l2m->pre_augment($lei_orig); - $io[1] = $lei_orig->{1}; - pipe(my ($startq, $au_done)) or die "pipe: $!"; - $done_op->{'.'} = [ \&do_post_augment, $lei_orig, - $zpipe, $au_done ]; - local $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1} - die "BUG: unexpected \$io[5]: $io[5]" if $io[5]; - $self->wq_do('query_prepare', \@io, $lei); - fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ - $io[5] = $startq; + $done_op->{'.'} = [ \&do_post_augment, $lei, $zpipe, $au_done ]; + $self->wq_do('query_prepare', \@io, $lei_ipc); $io[1] = $zpipe->[1] if $zpipe; } - start_query($self, \@io, $lei); + start_query($self, \@io, $lei_ipc); $self->wq_close(1); unless ($in_loop) { - # for the $lei->atfork_child_wq PIPE handler: + # for the $lei_ipc->atfork_child_wq PIPE handler: while ($done->{sock}) { $done->event_step } } } -sub ipc_atfork_prepare { - my ($self) = @_; - if (exists $self->{remotes}) { - require PublicInbox::MboxReader; - require IO::Uncompress::Gunzip; - } - # FDS: (0: done_wr, 1: stdout|mbox, 2: stderr, - # 3: sock, 4: $l2m->{-wq_s1}, 5: $startq) - $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC -} - sub add_uri { my ($self, $uri) = @_; if (my $curl = $self->{curl} //= which('curl') // 0) { + require PublicInbox::MboxReader; + require IO::Uncompress::Gunzip; push @{$self->{remotes}}, $uri; } else { warn "curl missing, ignoring $uri\n"; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index ef4885c1..1842899c 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS'; #include <sys/socket.h> #if defined(CMSG_SPACE) && defined(CMSG_LEN) -#define SEND_FD_CAPA 6 +#define SEND_FD_CAPA 10 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int)) union my_cmsg { struct cmsghdr hdr; |