about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/IPC.pm3
-rw-r--r--lib/PublicInbox/LEI.pm36
-rw-r--r--lib/PublicInbox/LeiOverview.pm36
-rw-r--r--lib/PublicInbox/LeiQuery.pm12
-rw-r--r--lib/PublicInbox/LeiToMail.pm28
-rw-r--r--lib/PublicInbox/LeiXSearch.pm27
-rw-r--r--lib/PublicInbox/Spawn.pm2
7 files changed, 117 insertions, 27 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 78cb8400..8fec2e62 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -397,4 +397,7 @@ sub DESTROY {
         ipc_worker_stop($self);
 }
 
+# Sereal doesn't have dclone
+sub deep_clone { thaw(freeze($_[-1])) }
+
 1;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 61f2a65b..6b6ee0f5 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -279,13 +279,21 @@ sub atfork_prepare_wq {
         if (my $sock = $self->{sock}) {
                 push @$tcafc, @$self{qw(0 1 2)}, $sock;
         }
+        for my $f (qw(lxs l2m)) {
+                my $ipc = $self->{$f} or next;
+                push @$tcafc, grep { defined }
+                                @$ipc{qw(-wq_s1 -wq_s2 -ipc_req -ipc_res)};
+        }
 }
 
 # usage: my %sig = $lei->atfork_child_wq($wq);
 #         local @SIG{keys %sig} = values %sig;
 sub atfork_child_wq {
         my ($self, $wq) = @_;
-        @$self{qw(0 1 2 sock)} = delete(@$wq{0..3});
+        my ($sock, $l2m_wq_s1);
+        (@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4});
+        $self->{sock} = $sock if -S $sock;
+        $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1;
         %PATH2CFG = ();
         $quit = \&CORE::exit;
         @TO_CLOSE_ATFORK_CHILD = ();
@@ -304,15 +312,23 @@ sub atfork_child_wq {
 # usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
 sub atfork_parent_wq {
         my ($self, $wq) = @_;
-        if ($wq->wq_workers) {
-                my $env = delete $self->{env}; # env is inherited at fork
-                my $ret = bless { %$self }, ref($self);
-                $self->{env} = $env;
-                delete @$ret{qw(-lei_store cfg pgr)};
-                ($ret, delete @$ret{0..2}, delete($ret->{sock}) // ());
-        } else {
-                ($self, @$self{0..2}, $self->{sock} // ());
+        my $env = delete $self->{env}; # env is inherited at fork
+        my $ret = bless { %$self }, ref($self);
+        if (my $dedupe = delete $ret->{dedupe}) {
+                $ret->{dedupe} = $wq->deep_clone($dedupe);
+        }
+        $self->{env} = $env;
+        delete @$ret{qw(-lei_store cfg pgr lxs)}; # keep l2m
+        my @io = delete @$ret{0..2};
+        $io[3] = delete($ret->{sock}) // *STDERR{GLOB};
+        my $l2m = $ret->{l2m};
+        if ($l2m && $l2m != $wq) {
+                $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1};
+                if (my @pids = $l2m->wq_close) {
+                        $wq->{l2m_pids} = \@pids;
+                }
         }
+        ($ret, @io);
 }
 
 sub _help ($;$) {
@@ -656,7 +672,7 @@ sub start_mua {
         @cmd = map { $_ eq '%f' ? ($replaced = $mfolder) : $_ } @cmd;
         push @cmd, $mfolder unless defined($replaced);
         $sock //= $self->{sock};
-        if ($sock) { # lei(1) client process runs it
+        if ($PublicInbox::DS::in_loop) { # lei(1) client process runs it
                 send($sock, exec_buf(\@cmd, {}), MSG_EOR);
         } else { # oneshot
                 $self->{"mua.pid.$self.$$"} = spawn(\@cmd);
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index c0b423f6..538d6bd5 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -140,6 +140,16 @@ sub _unbless_smsg {
 
 sub ovv_atexit_child {
         my ($self, $lei) = @_;
+        if (my $l2m = delete $lei->{l2m}) {
+                # gracefully stop lei2mail processes after all
+                # ->write_mail work is complete
+                delete $l2m->{-wq_s1};
+                if (my $rd = delete $l2m->{each_smsg_done}) {
+                        read($rd, my $buf, 1); # wait for EOF
+                }
+        }
+        # order matters, git->{-tmp}->DESTROY must not fire until
+        # {each_smsg_done} hits EOF above
         if (my $git = delete $self->{git}) {
                 $git->async_wait_all;
         }
@@ -178,8 +188,6 @@ sub _json_pretty {
 
 sub ovv_each_smsg_cb { # runs in wq worker usually
         my ($self, $lei, $ibxish) = @_;
-        $lei->{ovv_buf} = \(my $buf = '');
-        delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
         my $json;
         $lei->{1}->autoflush(1);
         if (my $pkg = $self->{json}) {
@@ -187,7 +195,27 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                 $json->utf8->canonical;
                 $json->ascii(1) if $lei->{opt}->{ascii};
         }
-        if (my $l2m = $lei->{l2m}) {
+        my $l2m = $lei->{l2m};
+        if ($l2m && $l2m->{-wq_s1}) {
+                my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m);
+                # n.b. $io[0] = qry_status_wr, $io[1] = mbox|stdout,
+                # $io[4] becomes a notification pipe that triggers EOF
+                # in this wq worker when all outstanding ->write_mail
+                # calls are complete
+                die "BUG: \$io[4] $io[4] unexpected" if $io[4];
+                pipe($l2m->{each_smsg_done}, $io[4]) or die "pipe: $!";
+                fcntl($io[4], 1031, 4096) if $^O eq 'linux';
+                delete @$lei_ipc{qw(l2m opt mset_opt cmd)};
+                my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
+                $self->{git} = $git;
+                my $git_dir = $git->{git_dir};
+                sub {
+                        my ($smsg, $mitem) = @_;
+                        my $kw = []; # TODO get from mitem
+                        $l2m->wq_do('write_mail', \@io, $git_dir,
+                                        $smsg->{blob}, $lei_ipc, $kw)
+                }
+        } elsif ($l2m) {
                 my $wcb = $l2m->write_cb($lei);
                 my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
                 $self->{git} = $git; # for ovv_atexit_child
@@ -199,6 +227,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                 };
         } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
                 my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
+                $lei->{ovv_buf} = \(my $buf = '');
                 sub { # DIY prettiness :P
                         my ($smsg, $mitem) = @_;
                         $smsg = _unbless_smsg($smsg, $mitem);
@@ -221,6 +250,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                 }
         } elsif ($json) {
                 my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
+                $lei->{ovv_buf} = \(my $buf = '');
                 sub {
                         my ($smsg, $mitem) = @_;
                         delete @$smsg{qw(tid num)};
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index a80d5887..d6e801e3 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -41,11 +41,17 @@ sub lei_q {
         $j = 1 if !$opt->{thread};
         $j++ if $opt->{'local'}; # for sto->search below
         $self->atfork_prepare_wq($lxs);
-        $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
-                // $lxs->wq_workers($j);
+        $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset);
+        $self->{lxs} = $lxs;
 
-        # no forking workers after this
         my $ovv = PublicInbox::LeiOverview->new($self) or return;
+        if (my $l2m = $self->{l2m}) {
+                $j = 4 if $j <= 4; # TODO configurable
+                $self->atfork_prepare_wq($l2m);
+                $l2m->wq_workers_start('lei2mail', $j, $self->oldset);
+        }
+
+        # no forking workers after this
         my $sto = $self->_lei_store(1);
         unshift(@srcs, $sto->search) if $opt->{'local'};
         my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 0e23b8da..8d030227 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -5,6 +5,7 @@
 package PublicInbox::LeiToMail;
 use strict;
 use v5.10.1;
+use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
 use PublicInbox::Lock;
 use PublicInbox::ProcessPipe;
@@ -14,6 +15,7 @@ use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use Errno qw(EEXIST ESPIPE ENOENT);
+use PublicInbox::Git;
 
 my %kw2char = ( # Maildir characters
         draft => 'D',
@@ -422,4 +424,30 @@ sub lock_free {
         $_[0]->{base_type} =~ /\A(?:maildir|mh|imap|jmap)\z/ ? 1 : 0;
 }
 
+sub write_mail { # via ->wq_do
+        my ($self, $git_dir, $oid, $lei, $kw) = @_;
+        my $wcb = $self->{wcb} //= do { # first message
+                my %sig = $lei->atfork_child_wq($self);
+                @SIG{keys %sig} = values %sig; # not local
+                $lei->{dedupe}->prepare_dedupe;
+                $self->write_cb($lei);
+        };
+        my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
+        $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw ]);
+}
+
+sub ipc_atfork_prepare {
+        my ($self) = @_;
+        # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr)
+        $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= >&=]);
+        $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
+}
+
+sub DESTROY {
+        my ($self) = @_;
+        for my $pid_git (grep(/\A$$\0/, keys %$self)) {
+                $self->{$pid_git}->async_wait_all;
+        }
+}
+
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 91864cd0..dc5cf3b6 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -126,6 +126,7 @@ sub query_thread_mset { # for --thread
                         @{$ctx->{xids}} = ();
                 }
         } while (_mset_more($mset, $mo));
+        undef $each_smsg; # drops @io for l2m->{each_smsg_done}
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
@@ -147,6 +148,7 @@ sub query_mset { # non-parallel for non-"--thread" users
                         $each_smsg->($smsg, $it);
                 }
         } while (_mset_more($mset, $mo));
+        undef $each_smsg; # drops @io for l2m->{each_smsg_done}
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
@@ -170,11 +172,14 @@ sub git {
 }
 
 sub query_done { # EOF callback
-        my ($lei) = @_;
-        $lei->{ovv}->ovv_end($lei);
-        if (my $l2m = $lei->{l2m}) {
-                $lei->start_mua unless $l2m->lock_free;
+        my ($self, $lei) = @_;
+        my $l2m = delete $lei->{l2m};
+        if (my $pids = delete $self->{l2m_pids}) {
+                my $ipc_worker_reap = $self->can('ipc_worker_reap');
+                dwaitpid($_, $ipc_worker_reap, $l2m) for @$pids;
         }
+        $lei->{ovv}->ovv_end($lei);
+        $lei->start_mua if $l2m && !$l2m->lock_free;
         $lei->dclose;
 }
 
@@ -188,12 +193,10 @@ sub start_query { # always runs in main (lei-daemon) process
         }
         my $remotes = $self->{remotes} // [];
         if ($lei->{opt}->{thread}) {
-                $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
                 for my $ibxish (@$srcs) {
                         $self->wq_do('query_thread_mset', $io, $lei, $ibxish);
                 }
         } else {
-                $lei->{-parallel} = scalar(@$remotes);
                 $self->wq_do('query_mset', $io, $lei, $srcs);
         }
         # TODO
@@ -226,12 +229,12 @@ sub do_query {
         $io[0] = undef;
         pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
 
-        $lei_orig->{lxs} = $self;
         $lei_orig->event_step_init; # wait for shutdowns
-        my $op_map = { '' => [ \&query_done, $lei_orig ] };
+        my $op_map = { '' => [ \&query_done, $self, $lei_orig ] };
         my $in_loop = exists $lei_orig->{sock};
         my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
-        if (my $l2m = $lei->{l2m}) {
+        my $l2m = $lei->{l2m};
+        if ($l2m) {
                 $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
                 $io[1] = $lei_orig->{1};
                 $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
@@ -246,13 +249,17 @@ sub do_query {
                 $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
                 $opp->event_step;
                 my $ipc_worker_reap = $self->can('ipc_worker_reap');
+                if (my $l2m_pids = delete $self->{l2m_pids}) {
+                        dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
+                }
                 dwaitpid($_, $ipc_worker_reap, $self) for @pids;
         }
 }
 
 sub ipc_atfork_prepare {
         my ($self) = @_;
-        $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
+        # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1})
+        $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]);
         $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }
 
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index e5c0b1e9..b03f2d59 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-#define SEND_FD_CAPA 4
+#define SEND_FD_CAPA 5
 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
         struct cmsghdr hdr;