about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/LEI.pm19
-rw-r--r--lib/PublicInbox/LeiForgetMailSync.pm4
-rw-r--r--lib/PublicInbox/LeiImportKw.pm3
-rw-r--r--lib/PublicInbox/LeiNoteEvent.pm4
-rw-r--r--lib/PublicInbox/LeiPmdir.pm5
-rw-r--r--lib/PublicInbox/LeiStore.pm35
-rw-r--r--lib/PublicInbox/LeiXSearch.pm4
-rw-r--r--lib/PublicInbox/PktOp.pm9
8 files changed, 59 insertions, 24 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index ea3ec0fe..5694e92c 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -37,6 +37,7 @@ $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through));
 
 our %PATH2CFG; # persistent for socket daemon
 our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] }
+our %LIVE_SOCK; # "GLOB(0x....)" => $lei->{sock}
 
 # TBD: this is a documentation mechanism to show a subcommand
 # (may) pass options through to another command:
@@ -565,6 +566,7 @@ sub _lei_atfork_child {
         $dir_idle->force_close if $dir_idle;
         %PATH2CFG = ();
         $MDIR2CFGPATH = {};
+        %LIVE_SOCK = ();
         eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush';
         undef $errors_log;
         $quit = \&CORE::exit;
@@ -1429,7 +1431,7 @@ sub refresh_watches {
                         add_maildir_watch($cd, $cfg_f);
                 }
         }
-        my $wait = $renames ? $sto->ipc_do('done') : undef;
+        $lei->sto_done_request if $renames;
         if ($old) { # cull old non-existent entries
                 for my $url (keys %$old) {
                         next if exists $seen{$url};
@@ -1463,4 +1465,19 @@ sub lms { # read-only LeiMailSync
         $lse ? $lse->lms : undef;
 }
 
+sub sto_done_request { # only call this from lei-daemon process (not workers)
+        my ($lei, $sock) = @_;
+        if ($sock //= $lei->{sock}) {
+                $LIVE_SOCK{"$sock"} = $sock;
+                $lei->{sto}->ipc_do('done', "$sock"); # issue, async wait
+        } else { # forcibly wait
+                my $wait = $lei->{sto}->ipc_do('done');
+        }
+}
+
+sub sto_done_complete { # called in lei-daemon when LeiStore->done is complete
+        my ($sock_str) = @_;
+        delete $LIVE_SOCK{$sock_str}; # frees {sock} for waiting lei clients
+}
+
 1;
diff --git a/lib/PublicInbox/LeiForgetMailSync.pm b/lib/PublicInbox/LeiForgetMailSync.pm
index 940ca1b6..2b4e58a9 100644
--- a/lib/PublicInbox/LeiForgetMailSync.pm
+++ b/lib/PublicInbox/LeiForgetMailSync.pm
@@ -16,12 +16,12 @@ sub lei_forget_mail_sync {
         my ($lei, @folders) = @_;
         my $lms = $lei->lms or return;
         my $sto = $lei->_lei_store or return; # may disappear due to race
-        $sto->write_prepare;
+        $sto->write_prepare($lei);
         my $err = $lms->arg2folder($lei, \@folders);
         $lei->qerr(@{$err->{qerr}}) if $err->{qerr};
         return $lei->fail($err->{fail}) if $err->{fail};
         $sto->ipc_do('lms_forget_folders', @folders);
-        my $wait = $sto->ipc_do('done');
+        $lei->sto_done_request;
 }
 
 *_complete_forget_mail_sync = \&PublicInbox::LeiExportKw::_complete_export_kw;
diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm
index 2878cbdf..402125cf 100644
--- a/lib/PublicInbox/LeiImportKw.pm
+++ b/lib/PublicInbox/LeiImportKw.pm
@@ -13,6 +13,7 @@ sub new {
         my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls;
         my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc);
         $op_c->{ops} = $ops; # for PktOp->event_step
+        $self->{lei_sock} = $lei->{sock};
         $lei->{ikw} = $self;
 }
 
@@ -42,13 +43,13 @@ sub ck_update_kw { # via wq_io_do
 sub ikw_done_wait {
         my ($arg, $pid) = @_;
         my ($self, $lei) = @$arg;
-        my $wait = $lei->{sto}->ipc_do('done');
         $lei->can('wq_done_wait')->($arg, $pid);
 }
 
 sub _lei_wq_eof { # EOF callback for main lei daemon
         my ($lei) = @_;
         my $ikw = delete $lei->{ikw} or return $lei->fail;
+        $lei->sto_done_request($ikw->{lei_sock});
         $ikw->wq_wait_old(\&ikw_done_wait, $lei);
 }
 
diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm
index 1cd15296..6a40ba39 100644
--- a/lib/PublicInbox/LeiNoteEvent.pm
+++ b/lib/PublicInbox/LeiNoteEvent.pm
@@ -15,7 +15,7 @@ sub flush_lei ($) {
         if (my $lne = delete $lei->{cfg}->{-lei_note_event}) {
                 $lne->wq_close(1, undef, $lei); # runs _lei_wq_eof;
         } elsif ($lei->{sto}) { # lms_clear_src calls only:
-                my $wait = $lei->{sto}->ipc_do('done');
+                $lei->sto_done_request;
         }
 }
 
@@ -117,7 +117,7 @@ sub lne_done_wait {
 sub _lei_wq_eof { # EOF callback for main lei daemon
         my ($lei) = @_;
         my $lne = delete $lei->{lne} or return $lei->fail;
-        my $wait = $lei->{sto}->ipc_do('done');
+        $lei->sto_done_request;
         $lne->wq_wait_old(\&lne_done_wait, $lei);
 }
 
diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm
index 760f276c..59cf886e 100644
--- a/lib/PublicInbox/LeiPmdir.pm
+++ b/lib/PublicInbox/LeiPmdir.pm
@@ -25,6 +25,7 @@ sub new {
         my ($op_c, $ops) = $lei->workers_start($self, $nproc,
                 undef, { ipt => $ipt }); # LeiInput subclass
         $op_c->{ops} = $ops; # for PktOp->event_step
+        $self->{lei_sock} = $lei->{sock}; # keep client for pmd_done_wait
         $lei->{pmd} = $self;
 }
 
@@ -32,7 +33,7 @@ sub ipc_atfork_child {
         my ($self) = @_;
         my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
         $ipt->{lei} = $self->{lei};
-        $ipt->ipc_atfork_child;
+        $ipt->ipc_atfork_child; # calls _lei_atfork_child;
 }
 
 sub each_mdir_fn { # maildir_each_file callback
@@ -48,13 +49,13 @@ sub mdir_iter { # via wq_io_do
 sub pmd_done_wait {
         my ($arg, $pid) = @_;
         my ($self, $lei) = @$arg;
-        my $wait = $lei->{sto}->ipc_do('done');
         $lei->can('wq_done_wait')->($arg, $pid);
 }
 
 sub _lei_wq_eof { # EOF callback for main lei daemon
         my ($lei) = @_;
         my $pmd = delete $lei->{pmd} or return $lei->fail;
+        $lei->sto_done_request($pmd->{lei_sock});
         $pmd->wq_wait_old(\&pmd_done_wait, $lei);
 }
 
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index bbd853e5..28e36e89 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -471,7 +471,7 @@ sub xchg_stderr {
 }
 
 sub done {
-        my ($self) = @_;
+        my ($self, $sock_ref) = @_;
         my $err = '';
         if (my $im = delete($self->{im})) {
                 eval { $im->done };
@@ -486,6 +486,10 @@ sub done {
         $self->{priv_eidx}->done; # V2Writable::done
         xchg_stderr($self);
         die $err if $err;
+
+        # notify clients ->done has been issued
+        defined($sock_ref) and
+                $self->{s2d_op_p}->pkt_do('sto_done_complete', $sock_ref);
 }
 
 sub ipc_atfork_child {
@@ -493,28 +497,37 @@ sub ipc_atfork_child {
         my $lei = $self->{lei};
         $lei->_lei_atfork_child(1) if $lei;
         xchg_stderr($self);
-        if (my $err = delete($self->{err_pipe})) {
-                close $err->[0];
-                $self->{-err_wr} = $err->[1];
+        if (my $to_close = delete($self->{to_close})) {
+                close($_) for @$to_close;
         }
         $self->SUPER::ipc_atfork_child;
 }
 
 sub write_prepare {
         my ($self, $lei) = @_;
+        $lei // die 'BUG: $lei not passed';
         unless ($self->{-ipc_req}) {
-                my $d = $lei->store_path;
-                $self->ipc_lock_init("$d/ipc.lock");
-                substr($d, -length('/lei/store'), 10, '');
+                # s2d => store-to-daemon messages
+                require PublicInbox::PktOp;
+                my ($s2d_op_c, $s2d_op_p) = PublicInbox::PktOp->pair;
+                my $dir = $lei->store_path;
+                $self->ipc_lock_init("$dir/ipc.lock");
+                substr($dir, -length('/lei/store'), 10, '');
                 pipe(my ($r, $w)) or die "pipe: $!";
-                my $err_pipe = [ $r, $w ];
                 # Mail we import into lei are private, so headers filtered out
                 # by -mda for public mail are not appropriate
                 local @PublicInbox::MDA::BAD_HEADERS = ();
-                $self->ipc_worker_spawn("lei/store $d", $lei->oldset,
-                                        { lei => $lei, err_pipe => $err_pipe });
+                $self->ipc_worker_spawn("lei/store $dir", $lei->oldset, {
+                                        lei => $lei,
+                                        -err_wr => $w,
+                                        to_close => [ $r, $s2d_op_c->{sock} ],
+                                        s2d_op_p => $s2d_op_p,
+                                });
                 require PublicInbox::LeiStoreErr;
-                PublicInbox::LeiStoreErr->new($err_pipe->[0], $lei);
+                PublicInbox::LeiStoreErr->new($r, $lei);
+                $s2d_op_c->{ops} = {
+                        sto_done_complete => [ $lei->can('sto_done_complete') ]
+                };
         }
         $lei->{sto} = $self;
 }
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 5e34d864..1f83e582 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -374,8 +374,8 @@ sub query_done { # EOF callback for main daemon
         if ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) {
                 warn "BUG: {sto} missing with --mail-sync";
         }
-        my $wait = $lei->{sto} ? $lei->{sto}->ipc_do('done') : undef;
-        $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
+        $lei->sto_done_request if $lei->{sto};
+        my $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
         $lei->{ovv}->ovv_end($lei);
         my $start_mua;
         if ($l2m) { # close() calls LeiToMail reap_compress
diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm
index 92e150a4..10942dd1 100644
--- a/lib/PublicInbox/PktOp.pm
+++ b/lib/PublicInbox/PktOp.pm
@@ -56,9 +56,12 @@ sub event_step {
                         ($cmd, @pargs) = split(/ /, $msg);
                 }
                 my $op = $self->{ops}->{$cmd //= $msg};
-                die "BUG: unknown message: `$cmd'" unless $op;
-                my ($sub, @args) = @$op;
-                $sub->(@args, @pargs);
+                if ($op) {
+                        my ($sub, @args) = @$op;
+                        $sub->(@args, @pargs);
+                } elsif ($msg ne '') {
+                        die "BUG: unknown message: `$cmd'";
+                }
                 return $self->close if $msg eq ''; # close on EOF
         }
 }