diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/PublicInbox/LEI.pm | 19 | ||||
-rw-r--r-- | lib/PublicInbox/LeiForgetMailSync.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/LeiImportKw.pm | 3 | ||||
-rw-r--r-- | lib/PublicInbox/LeiNoteEvent.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/LeiPmdir.pm | 5 | ||||
-rw-r--r-- | lib/PublicInbox/LeiStore.pm | 35 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/PktOp.pm | 9 |
8 files changed, 59 insertions, 24 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index ea3ec0fe..5694e92c 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -37,6 +37,7 @@ $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through)); our %PATH2CFG; # persistent for socket daemon our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] } +our %LIVE_SOCK; # "GLOB(0x....)" => $lei->{sock} # TBD: this is a documentation mechanism to show a subcommand # (may) pass options through to another command: @@ -565,6 +566,7 @@ sub _lei_atfork_child { $dir_idle->force_close if $dir_idle; %PATH2CFG = (); $MDIR2CFGPATH = {}; + %LIVE_SOCK = (); eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush'; undef $errors_log; $quit = \&CORE::exit; @@ -1429,7 +1431,7 @@ sub refresh_watches { add_maildir_watch($cd, $cfg_f); } } - my $wait = $renames ? $sto->ipc_do('done') : undef; + $lei->sto_done_request if $renames; if ($old) { # cull old non-existent entries for my $url (keys %$old) { next if exists $seen{$url}; @@ -1463,4 +1465,19 @@ sub lms { # read-only LeiMailSync $lse ? $lse->lms : undef; } +sub sto_done_request { # only call this from lei-daemon process (not workers) + my ($lei, $sock) = @_; + if ($sock //= $lei->{sock}) { + $LIVE_SOCK{"$sock"} = $sock; + $lei->{sto}->ipc_do('done', "$sock"); # issue, async wait + } else { # forcibly wait + my $wait = $lei->{sto}->ipc_do('done'); + } +} + +sub sto_done_complete { # called in lei-daemon when LeiStore->done is complete + my ($sock_str) = @_; + delete $LIVE_SOCK{$sock_str}; # frees {sock} for waiting lei clients +} + 1; diff --git a/lib/PublicInbox/LeiForgetMailSync.pm b/lib/PublicInbox/LeiForgetMailSync.pm index 940ca1b6..2b4e58a9 100644 --- a/lib/PublicInbox/LeiForgetMailSync.pm +++ b/lib/PublicInbox/LeiForgetMailSync.pm @@ -16,12 +16,12 @@ sub lei_forget_mail_sync { my ($lei, @folders) = @_; my $lms = $lei->lms or return; my $sto = $lei->_lei_store or return; # may disappear due to race - $sto->write_prepare; + $sto->write_prepare($lei); my $err = $lms->arg2folder($lei, \@folders); $lei->qerr(@{$err->{qerr}}) if $err->{qerr}; return $lei->fail($err->{fail}) if $err->{fail}; $sto->ipc_do('lms_forget_folders', @folders); - my $wait = $sto->ipc_do('done'); + $lei->sto_done_request; } *_complete_forget_mail_sync = \&PublicInbox::LeiExportKw::_complete_export_kw; diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm index 2878cbdf..402125cf 100644 --- a/lib/PublicInbox/LeiImportKw.pm +++ b/lib/PublicInbox/LeiImportKw.pm @@ -13,6 +13,7 @@ sub new { my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls; my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc); $op_c->{ops} = $ops; # for PktOp->event_step + $self->{lei_sock} = $lei->{sock}; $lei->{ikw} = $self; } @@ -42,13 +43,13 @@ sub ck_update_kw { # via wq_io_do sub ikw_done_wait { my ($arg, $pid) = @_; my ($self, $lei) = @$arg; - my $wait = $lei->{sto}->ipc_do('done'); $lei->can('wq_done_wait')->($arg, $pid); } sub _lei_wq_eof { # EOF callback for main lei daemon my ($lei) = @_; my $ikw = delete $lei->{ikw} or return $lei->fail; + $lei->sto_done_request($ikw->{lei_sock}); $ikw->wq_wait_old(\&ikw_done_wait, $lei); } diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm index 1cd15296..6a40ba39 100644 --- a/lib/PublicInbox/LeiNoteEvent.pm +++ b/lib/PublicInbox/LeiNoteEvent.pm @@ -15,7 +15,7 @@ sub flush_lei ($) { if (my $lne = delete $lei->{cfg}->{-lei_note_event}) { $lne->wq_close(1, undef, $lei); # runs _lei_wq_eof; } elsif ($lei->{sto}) { # lms_clear_src calls only: - my $wait = $lei->{sto}->ipc_do('done'); + $lei->sto_done_request; } } @@ -117,7 +117,7 @@ sub lne_done_wait { sub _lei_wq_eof { # EOF callback for main lei daemon my ($lei) = @_; my $lne = delete $lei->{lne} or return $lei->fail; - my $wait = $lei->{sto}->ipc_do('done'); + $lei->sto_done_request; $lne->wq_wait_old(\&lne_done_wait, $lei); } diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm index 760f276c..59cf886e 100644 --- a/lib/PublicInbox/LeiPmdir.pm +++ b/lib/PublicInbox/LeiPmdir.pm @@ -25,6 +25,7 @@ sub new { my ($op_c, $ops) = $lei->workers_start($self, $nproc, undef, { ipt => $ipt }); # LeiInput subclass $op_c->{ops} = $ops; # for PktOp->event_step + $self->{lei_sock} = $lei->{sock}; # keep client for pmd_done_wait $lei->{pmd} = $self; } @@ -32,7 +33,7 @@ sub ipc_atfork_child { my ($self) = @_; my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}'; $ipt->{lei} = $self->{lei}; - $ipt->ipc_atfork_child; + $ipt->ipc_atfork_child; # calls _lei_atfork_child; } sub each_mdir_fn { # maildir_each_file callback @@ -48,13 +49,13 @@ sub mdir_iter { # via wq_io_do sub pmd_done_wait { my ($arg, $pid) = @_; my ($self, $lei) = @$arg; - my $wait = $lei->{sto}->ipc_do('done'); $lei->can('wq_done_wait')->($arg, $pid); } sub _lei_wq_eof { # EOF callback for main lei daemon my ($lei) = @_; my $pmd = delete $lei->{pmd} or return $lei->fail; + $lei->sto_done_request($pmd->{lei_sock}); $pmd->wq_wait_old(\&pmd_done_wait, $lei); } diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index bbd853e5..28e36e89 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -471,7 +471,7 @@ sub xchg_stderr { } sub done { - my ($self) = @_; + my ($self, $sock_ref) = @_; my $err = ''; if (my $im = delete($self->{im})) { eval { $im->done }; @@ -486,6 +486,10 @@ sub done { $self->{priv_eidx}->done; # V2Writable::done xchg_stderr($self); die $err if $err; + + # notify clients ->done has been issued + defined($sock_ref) and + $self->{s2d_op_p}->pkt_do('sto_done_complete', $sock_ref); } sub ipc_atfork_child { @@ -493,28 +497,37 @@ sub ipc_atfork_child { my $lei = $self->{lei}; $lei->_lei_atfork_child(1) if $lei; xchg_stderr($self); - if (my $err = delete($self->{err_pipe})) { - close $err->[0]; - $self->{-err_wr} = $err->[1]; + if (my $to_close = delete($self->{to_close})) { + close($_) for @$to_close; } $self->SUPER::ipc_atfork_child; } sub write_prepare { my ($self, $lei) = @_; + $lei // die 'BUG: $lei not passed'; unless ($self->{-ipc_req}) { - my $d = $lei->store_path; - $self->ipc_lock_init("$d/ipc.lock"); - substr($d, -length('/lei/store'), 10, ''); + # s2d => store-to-daemon messages + require PublicInbox::PktOp; + my ($s2d_op_c, $s2d_op_p) = PublicInbox::PktOp->pair; + my $dir = $lei->store_path; + $self->ipc_lock_init("$dir/ipc.lock"); + substr($dir, -length('/lei/store'), 10, ''); pipe(my ($r, $w)) or die "pipe: $!"; - my $err_pipe = [ $r, $w ]; # Mail we import into lei are private, so headers filtered out # by -mda for public mail are not appropriate local @PublicInbox::MDA::BAD_HEADERS = (); - $self->ipc_worker_spawn("lei/store $d", $lei->oldset, - { lei => $lei, err_pipe => $err_pipe }); + $self->ipc_worker_spawn("lei/store $dir", $lei->oldset, { + lei => $lei, + -err_wr => $w, + to_close => [ $r, $s2d_op_c->{sock} ], + s2d_op_p => $s2d_op_p, + }); require PublicInbox::LeiStoreErr; - PublicInbox::LeiStoreErr->new($err_pipe->[0], $lei); + PublicInbox::LeiStoreErr->new($r, $lei); + $s2d_op_c->{ops} = { + sto_done_complete => [ $lei->can('sto_done_complete') ] + }; } $lei->{sto} = $self; } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 5e34d864..1f83e582 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -374,8 +374,8 @@ sub query_done { # EOF callback for main daemon if ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) { warn "BUG: {sto} missing with --mail-sync"; } - my $wait = $lei->{sto} ? $lei->{sto}->ipc_do('done') : undef; - $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef; + $lei->sto_done_request if $lei->{sto}; + my $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef; $lei->{ovv}->ovv_end($lei); my $start_mua; if ($l2m) { # close() calls LeiToMail reap_compress diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm index 92e150a4..10942dd1 100644 --- a/lib/PublicInbox/PktOp.pm +++ b/lib/PublicInbox/PktOp.pm @@ -56,9 +56,12 @@ sub event_step { ($cmd, @pargs) = split(/ /, $msg); } my $op = $self->{ops}->{$cmd //= $msg}; - die "BUG: unknown message: `$cmd'" unless $op; - my ($sub, @args) = @$op; - $sub->(@args, @pargs); + if ($op) { + my ($sub, @args) = @$op; + $sub->(@args, @pargs); + } elsif ($msg ne '') { + die "BUG: unknown message: `$cmd'"; + } return $self->close if $msg eq ''; # close on EOF } } |