user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 1/2] ipc: drop awaitpid_init to avoid circular refs
  @ 2023-01-30  4:30  4% ` Eric Wong
  0 siblings, 0 replies; 3+ results
From: Eric Wong @ 2023-01-30  4:30 UTC (permalink / raw)
  To: meta

This brings t/lei-index.t back down from ~8 to ~3s.  I didn't
notice this before was because the LeiNoteEvent timer was firing
every 5s and clearing circular refs and parallel testing meant
the delay got hidden.

Fixes: 4a2a95bbc78f99c8 (ipc+lei: switch to awaitpid, 2023-01-17)
---
 lib/PublicInbox/IPC.pm        | 32 ++++++++++++--------------------
 lib/PublicInbox/LEI.pm        |  6 +++---
 lib/PublicInbox/LeiMirror.pm  |  2 +-
 lib/PublicInbox/LeiStore.pm   |  7 +++----
 lib/PublicInbox/LeiToMail.pm  |  6 +++---
 lib/PublicInbox/LeiUp.pm      |  2 +-
 lib/PublicInbox/LeiXSearch.pm | 10 +++++-----
 7 files changed, 28 insertions(+), 37 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index edc5ba64..548a72eb 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -96,7 +96,7 @@ sub ipc_worker_loop ($$$) {
 
 # starts a worker if Sereal or Storable is installed
 sub ipc_worker_spawn {
-	my ($self, $ident, $oldset, $fields) = @_;
+	my ($self, $ident, $oldset, $fields, @cb_args) = @_;
 	return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
 	delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
 	pipe(my ($r_req, $w_req)) or die "pipe: $!";
@@ -133,28 +133,20 @@ sub ipc_worker_spawn {
 	$self->{-ipc_req} = $w_req;
 	$self->{-ipc_res} = $r_res;
 	$self->{-ipc_ppid} = $$;
-	awaitpid($pid, \&ipc_worker_reap, $self);
+	awaitpid($pid, \&ipc_worker_reap, $self, @cb_args);
 	$self->{-ipc_pid} = $pid;
 }
 
 sub ipc_worker_reap { # awaitpid callback
-	my ($pid, $self) = @_;
+	my ($pid, $self, $cb, @args) = @_;
 	delete $self->{-wq_workers}->{$pid};
-	if (my $cb_args = $self->{-reap_do}) {
-		return $cb_args->[0]->($pid, $self, @$cb_args[1..$#$cb_args]);
-	}
+	return $cb->($pid, $self, @args) if $cb;
 	return if !$?;
 	my $s = $? & 127;
 	# TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
 	warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
 }
 
-# register wait workers
-sub awaitpid_init {
-	my ($self, @cb_args) = @_;
-	$self->{-reap_do} = \@cb_args;
-}
-
 # for base class, override in sub classes
 sub ipc_atfork_prepare {}
 
@@ -347,7 +339,6 @@ sub wq_do {
 
 sub prepare_nonblock {
 	($_[0]->{-wq_s1} // die 'BUG: no {-wq_s1}')->blocking(0);
-	$_[0]->{-reap_do} or die 'BUG: {-reap_do} needed for nonblock';
 	require PublicInbox::WQBlocked;
 }
 
@@ -363,8 +354,8 @@ sub wq_nonblock_do { # always async
 	}
 }
 
-sub _wq_worker_start ($$$$) {
-	my ($self, $oldset, $fields, $one) = @_;
+sub _wq_worker_start {
+	my ($self, $oldset, $fields, $one, @cb_args) = @_;
 	my ($bcast1, $bcast2);
 	$one or socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or
 							die "socketpair: $!";
@@ -395,13 +386,13 @@ sub _wq_worker_start ($$$$) {
 		undef $end; # trigger exit
 	} else {
 		$self->{-wq_workers}->{$pid} = $bcast1;
-		awaitpid($pid, \&ipc_worker_reap, $self);
+		awaitpid($pid, \&ipc_worker_reap, $self, @cb_args);
 	}
 }
 
 # starts workqueue workers if Sereal or Storable is installed
 sub wq_workers_start {
-	my ($self, $ident, $nr_workers, $oldset, $fields) = @_;
+	my ($self, $ident, $nr_workers, $oldset, $fields, @cb_args) = @_;
 	($send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
 	return if $self->{-wq_s1}; # idempotent
 	$self->{-wq_s1} = $self->{-wq_s2} = undef;
@@ -414,7 +405,9 @@ sub wq_workers_start {
 	$self->{-wq_ident} = $ident;
 	my $one = $nr_workers == 1;
 	$self->{-wq_nr_workers} = $nr_workers;
-	_wq_worker_start($self, $sigset, $fields, $one) for (1..$nr_workers);
+	for (1..$nr_workers) {
+		_wq_worker_start($self, $sigset, $fields, $one, @cb_args);
+	}
 	PublicInbox::DS::sig_setmask($sigset) unless $oldset;
 	$self->{-wq_ppid} = $$;
 }
@@ -422,11 +415,10 @@ sub wq_workers_start {
 sub wq_close {
 	my ($self) = @_;
 	if (my $wqb = delete $self->{wqb}) {
-		$self->{-reap_do} or die 'BUG: {-reap_do} unset';
 		$wqb->enq_close;
 	}
 	delete @$self{qw(-wq_s1 -wq_s2)} or return;
-	return if $self->{-reap_do};
+	return if ($self->{-wq_ppid} // -1) != $$;
 	awaitpid($_) for keys %{$self->{-wq_workers}};
 }
 
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 6ad42111..ffd50db5 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -643,8 +643,8 @@ sub workers_start {
 	my $end = $lei->pkt_op_pair;
 	my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
 	$flds->{lei} = $lei;
-	$wq->awaitpid_init($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
-	$wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
+	$wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds,
+		$wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
 	delete $lei->{pkt_op_p};
 	my $op_c = delete $lei->{pkt_op_c};
 	@$end = ();
@@ -1390,7 +1390,7 @@ sub DESTROY {
 	# preserve $? for ->fail or ->x_it code
 }
 
-sub wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+sub wq_done_wait { # awaitpid cb (via wq_eof)
 	my ($pid, $wq, $lei) = @_;
 	local $current_lei = $lei;
 	my $err_type = $lei->{-err_type};
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index abf66315..640b253f 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -31,7 +31,7 @@ sub keep_going ($) {
 		$_[0]->{lei}->{opt}->{'keep-going'});
 }
 
-sub _wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+sub _wq_done_wait { # awaitpid cb (via wq_eof)
 	my ($pid, $mrr, $lei) = @_;
 	if ($?) {
 		$lei->child_error($?);
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 0ecf1388..fce15a72 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -604,8 +604,8 @@ sub recv_and_run {
 	$self->SUPER::recv_and_run(@args);
 }
 
-sub _sto_atexit { # awaitpid cb (via awaitpid_init)
-	my ($pid, $sto) = @_;
+sub _sto_atexit { # awaitpid cb
+	my ($pid) = @_;
 	warn "lei/store PID:$pid died \$?=$?\n" if $?;
 }
 
@@ -620,12 +620,11 @@ sub write_prepare {
 		# Mail we import into lei are private, so headers filtered out
 		# by -mda for public mail are not appropriate
 		local @PublicInbox::MDA::BAD_HEADERS = ();
-		$self->awaitpid_init(\&_sto_atexit); # outlives $lei
 		$self->wq_workers_start("lei/store $dir", 1, $lei->oldset, {
 					lei => $lei,
 					-err_wr => $w,
 					to_close => [ $r ],
-				});
+				}, \&_sto_atexit);
 		require PublicInbox::LeiStoreErr;
 		PublicInbox::LeiStoreErr->new($r, $lei);
 	}
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 6a4554e7..31eba794 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -652,7 +652,7 @@ sub _do_augment_mbox {
 	$dedupe->pause_dedupe if $dedupe;
 }
 
-sub v2w_done_wait { # awaitpid cb (via awaitpid_init)
+sub v2w_done_wait { # awaitpid cb
 	my ($pid, $v2w, $lei) = @_;
 	$lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
 }
@@ -679,8 +679,8 @@ sub _pre_augment_v2 {
 	PublicInbox::InboxWritable->new($ibx, @creat);
 	$ibx->init_inbox if @creat;
 	my $v2w = $ibx->importer;
-	$v2w->awaitpid_init(\&v2w_done_wait, $lei);
-	$v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
+	$v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei},
+				\&v2w_done_wait, $lei);
 	$lei->{v2w} = $v2w;
 	return if !$lei->{opt}->{shared};
 	my $d = "$lei->{ale}->{git}->{git_dir}/objects";
diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm
index 3e92242e..cd2337b4 100644
--- a/lib/PublicInbox/LeiUp.pm
+++ b/lib/PublicInbox/LeiUp.pm
@@ -165,7 +165,7 @@ sub _complete_up { # lei__complete hook
 	map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei);
 }
 
-sub _wq_done_wait { # awaitpid cb (via awaitpid_init)
+sub _wq_done_wait { # awaitpid cb
 	my ($pid, $wq, $lei) = @_;
 	$lei->child_error($?, 'auth failure') if $?
 }
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index f9aa870e..5965274c 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -400,7 +400,7 @@ sub query_remote_mboxrd {
 
 sub git { $_[0]->{git} // die 'BUG: git uninitialized' }
 
-sub xsearch_done_wait { # awaitpid cb (via awaitpid_init)
+sub xsearch_done_wait { # awaitpid cb
 	my ($pid, $wq, $lei) = @_;
 	return if !$?;
 	my $s = $? & 127;
@@ -572,16 +572,16 @@ sub do_query {
 			fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
 			$l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
 		}
-		$l2m->awaitpid_init(\&xsearch_done_wait, $lei);
 		$l2m->wq_workers_start('lei2mail', undef,
-					$lei->oldset, { lei => $lei });
+					$lei->oldset, { lei => $lei },
+					\&xsearch_done_wait, $lei);
 		pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
 		fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
 		delete $l2m->{au_peers};
 	}
-	$self->awaitpid_init(\&xsearch_done_wait, $lei);
 	$self->wq_workers_start('lei_xsearch', undef,
-				$lei->oldset, { lei => $lei });
+				$lei->oldset, { lei => $lei },
+				\&xsearch_done_wait, $lei);
 	my $op_c = delete $lei->{pkt_op_c};
 	delete $lei->{pkt_op_p};
 	@$end = ();

^ permalink raw reply related	[relevance 4%]

* [PATCH 11/12] ipc+lei: switch to awaitpid
  2023-01-17  7:18  7% [PATCH 00/12] improve process reaping Eric Wong
@ 2023-01-17  7:19  3% ` Eric Wong
  0 siblings, 0 replies; 3+ results
From: Eric Wong @ 2023-01-17  7:19 UTC (permalink / raw)
  To: meta

This avoids awkwardly stuffing an arrayref into callbacks
which expect multiple arguments.  IPC->awaitpid_init now
allows pre-registering callbacks before spawning workers.
---
 lib/PublicInbox/IPC.pm        | 30 ++++++++++++++----------------
 lib/PublicInbox/LEI.pm        |  8 +++-----
 lib/PublicInbox/LeiConvert.pm |  2 +-
 lib/PublicInbox/LeiInput.pm   |  2 +-
 lib/PublicInbox/LeiMirror.pm  |  7 +++----
 lib/PublicInbox/LeiStore.pm   |  7 +++----
 lib/PublicInbox/LeiToMail.pm  |  7 +++----
 lib/PublicInbox/LeiUp.pm      |  5 ++---
 lib/PublicInbox/LeiXSearch.pm |  9 ++++-----
 script/public-inbox-clone     |  2 +-
 10 files changed, 35 insertions(+), 44 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 34e40118..edc5ba64 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -12,7 +12,7 @@ use strict;
 use v5.10.1;
 use parent qw(Exporter);
 use Carp qw(croak);
-use PublicInbox::DS qw(dwaitpid);
+use PublicInbox::DS qw(awaitpid);
 use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
@@ -133,26 +133,26 @@ sub ipc_worker_spawn {
 	$self->{-ipc_req} = $w_req;
 	$self->{-ipc_res} = $r_res;
 	$self->{-ipc_ppid} = $$;
+	awaitpid($pid, \&ipc_worker_reap, $self);
 	$self->{-ipc_pid} = $pid;
 }
 
-sub ipc_worker_reap { # dwaitpid callback
-	my ($args, $pid) = @_;
-	my ($self, @uargs) = @$args;
+sub ipc_worker_reap { # awaitpid callback
+	my ($pid, $self) = @_;
 	delete $self->{-wq_workers}->{$pid};
-	return $self->{-reap_do}->($args, $pid) if $self->{-reap_do};
+	if (my $cb_args = $self->{-reap_do}) {
+		return $cb_args->[0]->($pid, $self, @$cb_args[1..$#$cb_args]);
+	}
 	return if !$?;
 	my $s = $? & 127;
 	# TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
 	warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
 }
 
-sub wq_wait_async {
-	my ($self, $cb, @uargs) = @_;
-	local $PublicInbox::DS::in_loop = 1;
-	$self->{-reap_do} = $cb;
-	my @pids = keys %{$self->{-wq_workers}};
-	dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids;
+# register wait workers
+sub awaitpid_init {
+	my ($self, @cb_args) = @_;
+	$self->{-reap_do} = \@cb_args;
 }
 
 # for base class, override in sub classes
@@ -178,9 +178,7 @@ sub ipc_worker_stop {
 	}
 	die 'no PID with IPC pipes' unless $pid;
 	$w_req = $r_res = undef;
-
-	return if $$ != $ppid;
-	dwaitpid($pid, \&ipc_worker_reap, [$self]);
+	awaitpid($pid) if $$ == $ppid; # for non-event loop
 }
 
 # use this if we have multiple readers reading curl or "pigz -dc"
@@ -397,6 +395,7 @@ sub _wq_worker_start ($$$$) {
 		undef $end; # trigger exit
 	} else {
 		$self->{-wq_workers}->{$pid} = $bcast1;
+		awaitpid($pid, \&ipc_worker_reap, $self);
 	}
 }
 
@@ -428,8 +427,7 @@ sub wq_close {
 	}
 	delete @$self{qw(-wq_s1 -wq_s2)} or return;
 	return if $self->{-reap_do};
-	my @pids = keys %{$self->{-wq_workers}};
-	dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids;
+	awaitpid($_) for keys %{$self->{-wq_workers}};
 }
 
 sub wq_kill {
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index b78d70de..6ad42111 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -18,7 +18,6 @@ use IO::Handle ();
 use Fcntl qw(SEEK_SET);
 use PublicInbox::Config;
 use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::Lock;
 use PublicInbox::Eml;
@@ -644,12 +643,12 @@ sub workers_start {
 	my $end = $lei->pkt_op_pair;
 	my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
 	$flds->{lei} = $lei;
+	$wq->awaitpid_init($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
 	$wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
 	delete $lei->{pkt_op_p};
 	my $op_c = delete $lei->{pkt_op_c};
 	@$end = ();
 	$lei->event_step_init;
-	$wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
 	($op_c, $ops);
 }
 
@@ -1391,9 +1390,8 @@ sub DESTROY {
 	# preserve $? for ->fail or ->x_it code
 }
 
-sub wq_done_wait { # dwaitpid callback
-	my ($arg, $pid) = @_;
-	my ($wq, $lei) = @$arg;
+sub wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+	my ($pid, $wq, $lei) = @_;
 	local $current_lei = $lei;
 	my $err_type = $lei->{-err_type};
 	$? and $lei->child_error($?,
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 59af40de..1acd4558 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -30,7 +30,7 @@ sub input_maildir_cb {
 
 sub process_inputs { # via wq_do
 	my ($self) = @_;
-	local $PublicInbox::DS::in_loop = 0; # force synchronous dwaitpid
+	local $PublicInbox::DS::in_loop = 0; # force synchronous awaitpid
 	$self->SUPER::process_inputs;
 	my $lei = $self->{lei};
 	delete $lei->{1};
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index a1dcc907..c258f824 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -177,7 +177,7 @@ sub input_path_url {
 			$mbl->{fh} =
 			     PublicInbox::MboxReader::zsfxcat($in, $zsfx, $lei);
 		}
-		local $PublicInbox::DS::in_loop = 0 if $zsfx; # dwaitpid
+		local $PublicInbox::DS::in_loop = 0 if $zsfx; # awaitpid
 		$self->input_fh($ifmt, $mbl->{fh}, $input, @args);
 	} elsif (-d _ && (-d "$input/cur" || -d "$input/new")) {
 		return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index 87abf88c..abf66315 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -31,9 +31,8 @@ sub keep_going ($) {
 		$_[0]->{lei}->{opt}->{'keep-going'});
 }
 
-sub _wq_done_wait { # dwaitpid callback (via wq_eof)
-	my ($arg, $pid) = @_;
-	my ($mrr, $lei) = @$arg;
+sub _wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+	my ($pid, $mrr, $lei) = @_;
 	if ($?) {
 		$lei->child_error($?);
 	} elsif (!$lei->{child_error}) {
@@ -236,7 +235,7 @@ sub index_cloned_inbox {
 			my ($k) = ($sw =~ /\A([\w-]+)/);
 			$opt->{$k} = $lei->{opt}->{$k};
 		}
-		# force synchronous dwaitpid for v2:
+		# force synchronous awaitpid for v2:
 		local $PublicInbox::DS::in_loop = 0;
 		my $cfg = PublicInbox::Config->new(undef, $lei->{2});
 		my $env = PublicInbox::Admin::index_prepare($opt, $cfg);
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 57f0e013..0ecf1388 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -604,9 +604,8 @@ sub recv_and_run {
 	$self->SUPER::recv_and_run(@args);
 }
 
-sub _sto_atexit { # dwaitpid callback
-	my ($args, $pid) = @_;
-	my $self = $args->[0];
+sub _sto_atexit { # awaitpid cb (via awaitpid_init)
+	my ($pid, $sto) = @_;
 	warn "lei/store PID:$pid died \$?=$?\n" if $?;
 }
 
@@ -621,12 +620,12 @@ sub write_prepare {
 		# Mail we import into lei are private, so headers filtered out
 		# by -mda for public mail are not appropriate
 		local @PublicInbox::MDA::BAD_HEADERS = ();
+		$self->awaitpid_init(\&_sto_atexit); # outlives $lei
 		$self->wq_workers_start("lei/store $dir", 1, $lei->oldset, {
 					lei => $lei,
 					-err_wr => $w,
 					to_close => [ $r ],
 				});
-		$self->wq_wait_async(\&_sto_atexit); # outlives $lei
 		require PublicInbox::LeiStoreErr;
 		PublicInbox::LeiStoreErr->new($r, $lei);
 	}
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 1528165a..6a4554e7 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -652,9 +652,8 @@ sub _do_augment_mbox {
 	$dedupe->pause_dedupe if $dedupe;
 }
 
-sub v2w_done_wait { # dwaitpid callback
-	my ($arg, $pid) = @_;
-	my ($v2w, $lei) = @$arg;
+sub v2w_done_wait { # awaitpid cb (via awaitpid_init)
+	my ($pid, $v2w, $lei) = @_;
 	$lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
 }
 
@@ -680,8 +679,8 @@ sub _pre_augment_v2 {
 	PublicInbox::InboxWritable->new($ibx, @creat);
 	$ibx->init_inbox if @creat;
 	my $v2w = $ibx->importer;
+	$v2w->awaitpid_init(\&v2w_done_wait, $lei);
 	$v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
-	$v2w->wq_wait_async(\&v2w_done_wait, $lei);
 	$lei->{v2w} = $v2w;
 	return if !$lei->{opt}->{shared};
 	my $d = "$lei->{ale}->{git}->{git_dir}/objects";
diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm
index 49917339..3e92242e 100644
--- a/lib/PublicInbox/LeiUp.pm
+++ b/lib/PublicInbox/LeiUp.pm
@@ -165,9 +165,8 @@ sub _complete_up { # lei__complete hook
 	map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei);
 }
 
-sub _wq_done_wait { # dwaitpid callback
-	my ($arg, $pid) = @_;
-	my ($wq, $lei) = @$arg;
+sub _wq_done_wait { # awaitpid cb (via awaitpid_init)
+	my ($pid, $wq, $lei) = @_;
 	$lei->child_error($?, 'auth failure') if $?
 }
 
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 730df1f7..f9aa870e 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -400,9 +400,8 @@ sub query_remote_mboxrd {
 
 sub git { $_[0]->{git} // die 'BUG: git uninitialized' }
 
-sub xsearch_done_wait { # dwaitpid callback
-	my ($arg, $pid) = @_;
-	my ($wq, $lei) = @$arg;
+sub xsearch_done_wait { # awaitpid cb (via awaitpid_init)
+	my ($pid, $wq, $lei) = @_;
 	return if !$?;
 	my $s = $? & 127;
 	return $lei->child_error($?) if $s == 13 || $s == 15;
@@ -573,16 +572,16 @@ sub do_query {
 			fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
 			$l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
 		}
+		$l2m->awaitpid_init(\&xsearch_done_wait, $lei);
 		$l2m->wq_workers_start('lei2mail', undef,
 					$lei->oldset, { lei => $lei });
-		$l2m->wq_wait_async(\&xsearch_done_wait, $lei);
 		pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
 		fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
 		delete $l2m->{au_peers};
 	}
+	$self->awaitpid_init(\&xsearch_done_wait, $lei);
 	$self->wq_workers_start('lei_xsearch', undef,
 				$lei->oldset, { lei => $lei });
-	$self->wq_wait_async(\&xsearch_done_wait, $lei);
 	my $op_c = delete $lei->{pkt_op_c};
 	delete $lei->{pkt_op_p};
 	@$end = ();
diff --git a/script/public-inbox-clone b/script/public-inbox-clone
index e93ac37b..598979bc 100755
--- a/script/public-inbox-clone
+++ b/script/public-inbox-clone
@@ -62,5 +62,5 @@ my $mrr = bless {
 
 $? = 0;
 $mrr->do_mirror;
-$mrr->can('_wq_done_wait')->([$mrr, $lei], $$);
+$mrr->can('_wq_done_wait')->($$, $mrr, $lei);
 exit(($lei->{child_error} // 0) >> 8);

^ permalink raw reply related	[relevance 3%]

* [PATCH 00/12] improve process reaping
@ 2023-01-17  7:18  7% Eric Wong
  2023-01-17  7:19  3% ` [PATCH 11/12] ipc+lei: switch to awaitpid Eric Wong
  0 siblings, 1 reply; 3+ results
From: Eric Wong @ 2023-01-17  7:18 UTC (permalink / raw)
  To: meta

dwaitpid was implemented under the assumption our code could
eventually use a multithreaded Perl 5.  Since the threads(3perl)
manpage officially discourages threads, that assumption proved
false.  This series saves syscalls and improves ergonomics of
our internal APIs, data structures and code a small bit.

Eric Wong (12):
  ipc: remove {-reap_async} field
  t/solver_git.t: fix test message
  qspawn: drop {psgi_env} deref
  ds: introduce awaitpid, switch ProcessPipe users
  git|gcf2: switch to awaitpid
  watch: switch to awaitpid
  watch: simplify internal data structures
  eofpipe: drop {arg} support for now
  watch: IMAP and NNTP polling can use the same interval
  ipc: drop unused $args from ->ipc_worker_stop
  ipc+lei: switch to awaitpid
  ds: drop dwaitpid, switch to waitpid(-1)

 Documentation/technical/ds.txt |  2 +-
 lib/PublicInbox/DS.pm          | 69 ++++++++++++------------
 lib/PublicInbox/Daemon.pm      |  2 +-
 lib/PublicInbox/EOFpipe.pm     | 10 ++--
 lib/PublicInbox/Gcf2Client.pm  |  5 +-
 lib/PublicInbox/Git.pm         | 10 ++--
 lib/PublicInbox/IPC.pm         | 39 +++++++-------
 lib/PublicInbox/LEI.pm         |  8 ++-
 lib/PublicInbox/LeiConvert.pm  |  2 +-
 lib/PublicInbox/LeiInput.pm    |  2 +-
 lib/PublicInbox/LeiMirror.pm   |  7 ++-
 lib/PublicInbox/LeiStore.pm    |  7 ++-
 lib/PublicInbox/LeiToMail.pm   | 11 ++--
 lib/PublicInbox/LeiUp.pm       |  5 +-
 lib/PublicInbox/LeiXSearch.pm  |  9 ++--
 lib/PublicInbox/ProcessPipe.pm | 42 +++++++--------
 lib/PublicInbox/Qspawn.pm      | 61 ++++++++++-----------
 lib/PublicInbox/Spawn.pm       |  6 +--
 lib/PublicInbox/Watch.pm       | 96 ++++++++++++----------------------
 script/public-inbox-clone      |  2 +-
 t/solver_git.t                 |  2 +-
 t/spawn.t                      | 12 +++--
 22 files changed, 186 insertions(+), 223 deletions(-)

^ permalink raw reply	[relevance 7%]

Results 1-3 of 3 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2023-01-17  7:18  7% [PATCH 00/12] improve process reaping Eric Wong
2023-01-17  7:19  3% ` [PATCH 11/12] ipc+lei: switch to awaitpid Eric Wong
2023-01-30  4:30     [PATCH 0/2] drop cyclic references and track slow tests Eric Wong
2023-01-30  4:30  4% ` [PATCH 1/2] ipc: drop awaitpid_init to avoid circular refs Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).