diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/PublicInbox/IPC.pm | 46 | ||||
-rw-r--r-- | lib/PublicInbox/LEI.pm | 16 | ||||
-rw-r--r-- | lib/PublicInbox/LeiBlob.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LeiConvert.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LeiImportKw.pm | 1 | ||||
-rw-r--r-- | lib/PublicInbox/LeiInput.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LeiInspect.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LeiLsSearch.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LeiMirror.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LeiNoteEvent.pm | 5 | ||||
-rw-r--r-- | lib/PublicInbox/LeiP2q.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LeiPmdir.pm | 1 | ||||
-rw-r--r-- | lib/PublicInbox/LeiStore.pm | 1 | ||||
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 10 | ||||
-rw-r--r-- | lib/PublicInbox/LeiUp.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 16 |
16 files changed, 51 insertions, 61 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 6c189b64..3e299448 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -134,16 +134,22 @@ sub ipc_worker_spawn { sub ipc_worker_reap { # dwaitpid callback my ($args, $pid) = @_; + my ($self, @uargs) = @$args; + delete $self->{-wq_workers}->{$pid}; + return $self->{-reap_do}->($args, $pid) if $self->{-reap_do}; return if !$?; - # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager my $s = $? & 127; - warn "PID:$pid died with \$?=$?\n" if $s != 15 && $s != 13; + # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager + warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13 } -sub wq_wait_old { - my ($self, $cb, @args) = @_; - my $pids = delete $self->{"-wq_old_pids.$$"} or return; - dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids; +sub wq_wait_async { + my ($self, $cb, @uargs) = @_; + local $PublicInbox::DS::in_loop = 1; + $self->{-reap_async} = 1; + $self->{-reap_do} = $cb; + my @pids = keys %{$self->{-wq_workers}}; + dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids; } # for base class, override in sub classes @@ -394,42 +400,24 @@ sub wq_workers_start { } sub wq_close { - my ($self, $nohang, $cb, @args) = @_; + my ($self) = @_; delete @$self{qw(-wq_s1 -wq_s2)} or return; - my $ppid = delete $self->{-wq_ppid} or return; - my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers'; - return if $ppid != $$; # can't reap siblings or parents - my @pids = map { $_ + 0 } keys %$workers; - if ($nohang) { - push @{$self->{"-wq_old_pids.$$"}}, @pids; - } else { - $cb //= \&ipc_worker_reap; - unshift @args, $self; - dwaitpid($_, $cb, \@args) for @pids; - } -} - -sub wq_kill_old { - my ($self, $sig) = @_; - my $pids = $self->{"-wq_old_pids.$$"} or return; - kill($sig // 'TERM', @$pids); + return if $self->{-reap_async}; + my @pids = keys %{$self->{-wq_workers}}; + dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids; } sub wq_kill { my ($self, $sig) = @_; - my $workers = $self->{-wq_workers} or return; - kill($sig // 'TERM', keys %$workers); + kill($sig // 'TERM', keys %{$self->{-wq_workers}}); } sub DESTROY { my ($self) = @_; my $ppid = $self->{-wq_ppid}; wq_kill($self) if $ppid && $ppid == $$; - my $err = $?; wq_close($self); - wq_wait_old($self); ipc_worker_stop($self); - $? = $err if $err; } sub detect_nproc () { diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index b6338377..83534878 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -455,16 +455,12 @@ my %CONFIG_KEYS = ( 'leistore.dir' => 'top-level storage location', ); -my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne); # internal workers +my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers sub _drop_wq { my ($self) = @_; for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) { - if ($wq->wq_kill('-TERM')) { - $wq->wq_close(0, undef, $self); - } elsif ($wq->wq_kill_old('-TERM')) { - $wq->wq_wait_old(undef, $self); - } + $wq->wq_kill('-TERM'); $wq->DESTROY; } } @@ -644,6 +640,7 @@ sub workers_start { my $op_c = delete $lei->{pkt_op_c}; @$end = (); $lei->event_step_init; + $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei); ($op_c, $ops); } @@ -651,7 +648,7 @@ sub workers_start { sub wait_wq_events { my ($lei, $op_c, $ops) = @_; for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs - $wq->wq_close(1); + $wq->wq_close; } $op_c->{ops} = $ops; } @@ -1150,7 +1147,7 @@ sub event_step { if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) { my $sig = "-$buf"; for my $wq (grep(defined, @$self{@WQ_KEYS})) { - $wq->wq_kill($sig) or $wq->wq_kill_old($sig); + $wq->wq_kill($sig); } } else { die "unrecognized client signal: $buf"; @@ -1393,8 +1390,7 @@ sub fchdir { sub wq_eof { # EOF callback for main daemon my ($lei) = @_; local $current_lei = $lei; - my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed - $wq1->wq_wait_old($wq1->can('_wq_done_wait') // \&wq_done_wait, $lei); + delete $lei->{wq1} // return $lei->fail; # already failed } sub watch_state_ok ($) { diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm index b6a62d24..004b156c 100644 --- a/lib/PublicInbox/LeiBlob.pm +++ b/lib/PublicInbox/LeiBlob.pm @@ -166,7 +166,7 @@ sub lei_blob { my ($op_c, $ops) = $lei->workers_start($self, 1); $lei->{wq1} = $self; $self->wq_io_do('do_solve_blob', []); - $self->wq_close(1); + $self->wq_close; $lei->wait_wq_events($op_c, $ops); } diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 9e98edc3..68fc7c0b 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -58,7 +58,7 @@ sub lei_convert { # the main "lei convert" method my ($op_c, $ops) = $lei->workers_start($self, 1); $lei->{wq1} = $self; $self->wq_io_do('process_inputs', []); - $self->wq_close(1); + $self->wq_close; $lei->wait_wq_events($op_c, $ops); } diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm index 8359f338..54454511 100644 --- a/lib/PublicInbox/LeiImportKw.pm +++ b/lib/PublicInbox/LeiImportKw.pm @@ -50,7 +50,6 @@ sub _lei_wq_eof { # EOF callback for main lei daemon my ($lei) = @_; my $ikw = delete $lei->{ikw} or return $lei->fail; $lei->sto_done_request($ikw->{lei_sock}); - $ikw->wq_wait_old($lei->can('wq_done_wait'), $lei); } 1; diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index dd40d838..2621fc1f 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -402,7 +402,7 @@ sub input_only_atfork_child { sub input_only_net_merge_all_done { my ($self) = @_; $self->wq_io_do('process_inputs'); - $self->wq_close(1); + $self->wq_close; } # like Getopt::Long, but for +kw:FOO and -kw:FOO to prepare diff --git a/lib/PublicInbox/LeiInspect.pm b/lib/PublicInbox/LeiInspect.pm index 5ba96056..05b6e21d 100644 --- a/lib/PublicInbox/LeiInspect.pm +++ b/lib/PublicInbox/LeiInspect.pm @@ -242,7 +242,7 @@ sub inspect_start ($$) { $lei->{wq1} = $self; $lei->wait_wq_events($op_c, $ops); $self->wq_do('inspect_argv'); - $self->wq_close(1); + $self->wq_close; } sub ins_add { # InputPipe->consume callback diff --git a/lib/PublicInbox/LeiLsSearch.pm b/lib/PublicInbox/LeiLsSearch.pm index aebf0184..0193e590 100644 --- a/lib/PublicInbox/LeiLsSearch.pm +++ b/lib/PublicInbox/LeiLsSearch.pm @@ -75,7 +75,7 @@ sub bg_worker ($$$) { my ($op_c, $ops) = $lei->workers_start($self, 1); $lei->{wq1} = $self; $self->wq_io_do('do_ls_search_long', [], $pfx); - $self->wq_close(1); + $self->wq_close; $lei->wait_wq_events($op_c, $ops); } diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index a75c99c4..e20d30b4 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -451,7 +451,7 @@ sub start { my ($op_c, $ops) = $lei->workers_start($self, 1); $lei->{wq1} = $self; $self->wq_io_do('do_mirror', []); - $self->wq_close(1); + $self->wq_close; $lei->wait_wq_events($op_c, $ops); } diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm index 1b714dae..ba4dfd49 100644 --- a/lib/PublicInbox/LeiNoteEvent.pm +++ b/lib/PublicInbox/LeiNoteEvent.pm @@ -14,7 +14,7 @@ our $to_flush; # { cfgpath => $lei } sub flush_lei ($) { my ($lei) = @_; my $lne = delete $lei->{cfg}->{-lei_note_event}; - $lne->wq_close(1, undef, $lei) if $lne; # runs _lei_wq_eof; + $lne->wq_close if $lne; # runs _lei_wq_eof; } # we batch up writes and flush every 5s (matching Linux default @@ -111,9 +111,8 @@ sub ipc_atfork_child { sub _lei_wq_eof { # EOF callback for main lei daemon my ($lei) = @_; - my $lne = delete $lei->{lne} or return $lei->fail; + delete $lei->{lne} or return $lei->fail; $lei->sto_done_request; - $lne->wq_wait_old($lei->can('wq_done_wait'), $lei); } 1; diff --git a/lib/PublicInbox/LeiP2q.pm b/lib/PublicInbox/LeiP2q.pm index 5c2ce0a1..08ec81c5 100644 --- a/lib/PublicInbox/LeiP2q.pm +++ b/lib/PublicInbox/LeiP2q.pm @@ -191,7 +191,7 @@ sub lei_p2q { # the "lei patch-to-query" entry point my ($op_c, $ops) = $lei->workers_start($self, 1); $lei->{wq1} = $self; $self->wq_io_do('do_p2q', []); - $self->wq_close(1); + $self->wq_close; $lei->wait_wq_events($op_c, $ops); } diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm index 2d3b9755..f9b68fc2 100644 --- a/lib/PublicInbox/LeiPmdir.pm +++ b/lib/PublicInbox/LeiPmdir.pm @@ -51,7 +51,6 @@ sub _lei_wq_eof { # EOF callback for main lei daemon my ($lei) = @_; my $pmd = delete $lei->{pmd} or return $lei->fail; $lei->sto_done_request($pmd->{lei_sock}); - $pmd->wq_wait_old($lei->can('wq_done_wait'), $lei); } 1; diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index c45380d1..82104570 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -587,6 +587,7 @@ sub write_prepare { -err_wr => $w, to_close => [ $r ], }); + $self->wq_wait_async; # outlives $lei require PublicInbox::LeiStoreErr; PublicInbox::LeiStoreErr->new($r, $lei); } diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 9c748dea..76e103c7 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -637,6 +637,12 @@ sub _do_augment_mbox { $dedupe->pause_dedupe if $dedupe; } +sub v2w_done_wait { # dwaitpid callback + my ($arg, $pid) = @_; + my ($v2w, $lei) = @$arg; + $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?; +} + sub _pre_augment_v2 { my ($self, $lei) = @_; my $dir = $self->{dst}; @@ -659,8 +665,8 @@ sub _pre_augment_v2 { PublicInbox::InboxWritable->new($ibx, @creat); $ibx->init_inbox if @creat; my $v2w = $ibx->importer; - $v2w->{-wq_no_bcast} = 1; $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}); + $v2w->wq_wait_async(\&v2w_done_wait, $lei); $lei->{v2w} = $v2w; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; @@ -811,7 +817,7 @@ sub net_merge_all_done { $self->{dst}, \$self->{-au_noted}); } $self->wq_broadcast('do_post_auth'); - $self->wq_close(1); + $self->wq_close; } 1; diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm index df65cb9b..39604177 100644 --- a/lib/PublicInbox/LeiUp.pm +++ b/lib/PublicInbox/LeiUp.pm @@ -122,7 +122,7 @@ EOM sub net_merge_all_done { my ($self, $lei) = @_; $lei->{net} = delete($self->{-net_new}) if $self->{-net_new}; - $self->wq_close(1); + $self->wq_close; eval { redispatch_all($self, $lei) }; warn "E: $@" if $@; } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 3ec75528..fd2c8a37 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -413,14 +413,14 @@ sub query_done { # EOF callback for main daemon my ($lei) = @_; local $PublicInbox::LEI::current_lei = $lei; my $l2m = delete $lei->{l2m}; - $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m; - if (my $lxs = delete $lei->{lxs}) { - $lxs->wq_wait_old(\&xsearch_done_wait, $lei); - } + delete $lei->{lxs}; ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and warn "BUG: {sto} missing with --mail-sync"; $lei->sto_done_request if $lei->{sto}; - my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef; + if (my $v2w = delete $lei->{v2w}) { + $v2w->wq_do('done'); + $v2w->wq_close; + } $lei->{ovv}->ovv_end($lei); my $start_mua; if ($l2m) { # close() calls LeiToMail reap_compress @@ -466,7 +466,7 @@ sub do_post_augment { if ($err) { if (my $lxs = delete $lei->{lxs}) { $lxs->wq_kill('-TERM'); - $lxs->wq_close(0, undef, $lei); + $lxs->wq_close; } $lei->fail("$err"); } @@ -514,7 +514,7 @@ sub start_query ($$) { # always runs in main (lei-daemon) process if ($self->{-do_lcat}) { $self->wq_io_do('lcat_dump', []); } - $self->wq_close(1); # lei_xsearch workers stop when done + $self->wq_close; # lei_xsearch workers stop when done } sub incr_start_query { # called whenever an l2m shard starts do_post_auth @@ -569,12 +569,14 @@ sub do_query { } $l2m->wq_workers_start('lei2mail', undef, $lei->oldset, { lei => $lei }); + $l2m->wq_wait_async(\&xsearch_done_wait, $lei); pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; delete $l2m->{au_peers}; } $self->wq_workers_start('lei_xsearch', undef, $lei->oldset, { lei => $lei }); + $self->wq_wait_async(\&xsearch_done_wait, $lei); my $op_c = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; @$end = (); |