diff options
-rw-r--r-- | lib/PublicInbox/IPC.pm | 3 | ||||
-rw-r--r-- | lib/PublicInbox/LEI.pm | 36 | ||||
-rw-r--r-- | lib/PublicInbox/LeiOverview.pm | 36 | ||||
-rw-r--r-- | lib/PublicInbox/LeiQuery.pm | 12 | ||||
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 28 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 27 | ||||
-rw-r--r-- | lib/PublicInbox/Spawn.pm | 2 |
7 files changed, 117 insertions, 27 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 78cb8400..8fec2e62 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -397,4 +397,7 @@ sub DESTROY { ipc_worker_stop($self); } +# Sereal doesn't have dclone +sub deep_clone { thaw(freeze($_[-1])) } + 1; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 61f2a65b..6b6ee0f5 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -279,13 +279,21 @@ sub atfork_prepare_wq { if (my $sock = $self->{sock}) { push @$tcafc, @$self{qw(0 1 2)}, $sock; } + for my $f (qw(lxs l2m)) { + my $ipc = $self->{$f} or next; + push @$tcafc, grep { defined } + @$ipc{qw(-wq_s1 -wq_s2 -ipc_req -ipc_res)}; + } } # usage: my %sig = $lei->atfork_child_wq($wq); # local @SIG{keys %sig} = values %sig; sub atfork_child_wq { my ($self, $wq) = @_; - @$self{qw(0 1 2 sock)} = delete(@$wq{0..3}); + my ($sock, $l2m_wq_s1); + (@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4}); + $self->{sock} = $sock if -S $sock; + $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1; %PATH2CFG = (); $quit = \&CORE::exit; @TO_CLOSE_ATFORK_CHILD = (); @@ -304,15 +312,23 @@ sub atfork_child_wq { # usage: ($lei, @io) = $lei->atfork_parent_wq($wq); sub atfork_parent_wq { my ($self, $wq) = @_; - if ($wq->wq_workers) { - my $env = delete $self->{env}; # env is inherited at fork - my $ret = bless { %$self }, ref($self); - $self->{env} = $env; - delete @$ret{qw(-lei_store cfg pgr)}; - ($ret, delete @$ret{0..2}, delete($ret->{sock}) // ()); - } else { - ($self, @$self{0..2}, $self->{sock} // ()); + my $env = delete $self->{env}; # env is inherited at fork + my $ret = bless { %$self }, ref($self); + if (my $dedupe = delete $ret->{dedupe}) { + $ret->{dedupe} = $wq->deep_clone($dedupe); + } + $self->{env} = $env; + delete @$ret{qw(-lei_store cfg pgr lxs)}; # keep l2m + my @io = delete @$ret{0..2}; + $io[3] = delete($ret->{sock}) // *STDERR{GLOB}; + my $l2m = $ret->{l2m}; + if ($l2m && $l2m != $wq) { + $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1}; + if (my @pids = $l2m->wq_close) { + $wq->{l2m_pids} = \@pids; + } } + ($ret, @io); } sub _help ($;$) { @@ -656,7 +672,7 @@ sub start_mua { @cmd = map { $_ eq '%f' ? ($replaced = $mfolder) : $_ } @cmd; push @cmd, $mfolder unless defined($replaced); $sock //= $self->{sock}; - if ($sock) { # lei(1) client process runs it + if ($PublicInbox::DS::in_loop) { # lei(1) client process runs it send($sock, exec_buf(\@cmd, {}), MSG_EOR); } else { # oneshot $self->{"mua.pid.$self.$$"} = spawn(\@cmd); diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index c0b423f6..538d6bd5 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -140,6 +140,16 @@ sub _unbless_smsg { sub ovv_atexit_child { my ($self, $lei) = @_; + if (my $l2m = delete $lei->{l2m}) { + # gracefully stop lei2mail processes after all + # ->write_mail work is complete + delete $l2m->{-wq_s1}; + if (my $rd = delete $l2m->{each_smsg_done}) { + read($rd, my $buf, 1); # wait for EOF + } + } + # order matters, git->{-tmp}->DESTROY must not fire until + # {each_smsg_done} hits EOF above if (my $git = delete $self->{git}) { $git->async_wait_all; } @@ -178,8 +188,6 @@ sub _json_pretty { sub ovv_each_smsg_cb { # runs in wq worker usually my ($self, $lei, $ibxish) = @_; - $lei->{ovv_buf} = \(my $buf = ''); - delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel}; my $json; $lei->{1}->autoflush(1); if (my $pkg = $self->{json}) { @@ -187,7 +195,27 @@ sub ovv_each_smsg_cb { # runs in wq worker usually $json->utf8->canonical; $json->ascii(1) if $lei->{opt}->{ascii}; } - if (my $l2m = $lei->{l2m}) { + my $l2m = $lei->{l2m}; + if ($l2m && $l2m->{-wq_s1}) { + my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m); + # n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout, + # $io[4] becomes a notification pipe that triggers EOF + # in this wq worker when all outstanding ->write_mail + # calls are complete + die "BUG: \$io[4] $io[4] unexpected" if $io[4]; + pipe($l2m->{each_smsg_done}, $io[4]) or die "pipe: $!"; + fcntl($io[4], 1031, 4096) if $^O eq 'linux'; + delete @$lei_ipc{qw(l2m opt mset_opt cmd)}; + my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git + $self->{git} = $git; + my $git_dir = $git->{git_dir}; + sub { + my ($smsg, $mitem) = @_; + my $kw = []; # TODO get from mitem + $l2m->wq_do('write_mail', \@io, $git_dir, + $smsg->{blob}, $lei_ipc, $kw) + } + } elsif ($l2m) { my $wcb = $l2m->write_cb($lei); my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git $self->{git} = $git; # for ovv_atexit_child @@ -199,6 +227,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually }; } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},"; + $lei->{ovv_buf} = \(my $buf = ''); sub { # DIY prettiness :P my ($smsg, $mitem) = @_; $smsg = _unbless_smsg($smsg, $mitem); @@ -221,6 +250,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually } } elsif ($json) { my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL + $lei->{ovv_buf} = \(my $buf = ''); sub { my ($smsg, $mitem) = @_; delete @$smsg{qw(tid num)}; diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index a80d5887..d6e801e3 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -41,11 +41,17 @@ sub lei_q { $j = 1 if !$opt->{thread}; $j++ if $opt->{'local'}; # for sto->search below $self->atfork_prepare_wq($lxs); - $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset) - // $lxs->wq_workers($j); + $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset); + $self->{lxs} = $lxs; - # no forking workers after this my $ovv = PublicInbox::LeiOverview->new($self) or return; + if (my $l2m = $self->{l2m}) { + $j = 4 if $j <= 4; # TODO configurable + $self->atfork_prepare_wq($l2m); + $l2m->wq_workers_start('lei2mail', $j, $self->oldset); + } + + # no forking workers after this my $sto = $self->_lei_store(1); unshift(@srcs, $sto->search) if $opt->{'local'}; my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset); diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 0e23b8da..8d030227 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -5,6 +5,7 @@ package PublicInbox::LeiToMail; use strict; use v5.10.1; +use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::Lock; use PublicInbox::ProcessPipe; @@ -14,6 +15,7 @@ use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); use Errno qw(EEXIST ESPIPE ENOENT); +use PublicInbox::Git; my %kw2char = ( # Maildir characters draft => 'D', @@ -422,4 +424,30 @@ sub lock_free { $_[0]->{base_type} =~ /\A(?:maildir|mh|imap|jmap)\z/ ? 1 : 0; } +sub write_mail { # via ->wq_do + my ($self, $git_dir, $oid, $lei, $kw) = @_; + my $wcb = $self->{wcb} //= do { # first message + my %sig = $lei->atfork_child_wq($self); + @SIG{keys %sig} = values %sig; # not local + $lei->{dedupe}->prepare_dedupe; + $self->write_cb($lei); + }; + my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir); + $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw ]); +} + +sub ipc_atfork_prepare { + my ($self) = @_; + # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr) + $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= >&=]); + $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC +} + +sub DESTROY { + my ($self) = @_; + for my $pid_git (grep(/\A$$\0/, keys %$self)) { + $self->{$pid_git}->async_wait_all; + } +} + 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 91864cd0..dc5cf3b6 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -126,6 +126,7 @@ sub query_thread_mset { # for --thread @{$ctx->{xids}} = (); } } while (_mset_more($mset, $mo)); + undef $each_smsg; # drops @io for l2m->{each_smsg_done} $lei->{ovv}->ovv_atexit_child($lei); } @@ -147,6 +148,7 @@ sub query_mset { # non-parallel for non-"--thread" users $each_smsg->($smsg, $it); } } while (_mset_more($mset, $mo)); + undef $each_smsg; # drops @io for l2m->{each_smsg_done} $lei->{ovv}->ovv_atexit_child($lei); } @@ -170,11 +172,14 @@ sub git { } sub query_done { # EOF callback - my ($lei) = @_; - $lei->{ovv}->ovv_end($lei); - if (my $l2m = $lei->{l2m}) { - $lei->start_mua unless $l2m->lock_free; + my ($self, $lei) = @_; + my $l2m = delete $lei->{l2m}; + if (my $pids = delete $self->{l2m_pids}) { + my $ipc_worker_reap = $self->can('ipc_worker_reap'); + dwaitpid($_, $ipc_worker_reap, $l2m) for @$pids; } + $lei->{ovv}->ovv_end($lei); + $lei->start_mua if $l2m && !$l2m->lock_free; $lei->dclose; } @@ -188,12 +193,10 @@ sub start_query { # always runs in main (lei-daemon) process } my $remotes = $self->{remotes} // []; if ($lei->{opt}->{thread}) { - $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1; for my $ibxish (@$srcs) { $self->wq_do('query_thread_mset', $io, $lei, $ibxish); } } else { - $lei->{-parallel} = scalar(@$remotes); $self->wq_do('query_mset', $io, $lei, $srcs); } # TODO @@ -226,12 +229,12 @@ sub do_query { $io[0] = undef; pipe(my $qry_status_rd, $io[0]) or die "pipe $!"; - $lei_orig->{lxs} = $self; $lei_orig->event_step_init; # wait for shutdowns - my $op_map = { '' => [ \&query_done, $lei_orig ] }; + my $op_map = { '' => [ \&query_done, $self, $lei_orig ] }; my $in_loop = exists $lei_orig->{sock}; my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop); - if (my $l2m = $lei->{l2m}) { + my $l2m = $lei->{l2m}; + if ($l2m) { $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox $io[1] = $lei_orig->{1}; $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ]; @@ -246,13 +249,17 @@ sub do_query { $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ]; $opp->event_step; my $ipc_worker_reap = $self->can('ipc_worker_reap'); + if (my $l2m_pids = delete $self->{l2m_pids}) { + dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids; + } dwaitpid($_, $ipc_worker_reap, $self) for @pids; } } sub ipc_atfork_prepare { my ($self) = @_; - $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]); + # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1}) + $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]); $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC } diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index e5c0b1e9..b03f2d59 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS'; #include <sys/socket.h> #if defined(CMSG_SPACE) && defined(CMSG_LEN) -#define SEND_FD_CAPA 4 +#define SEND_FD_CAPA 5 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int)) union my_cmsg { struct cmsghdr hdr; |