From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-3.7 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id B5ED81FAF5 for ; Mon, 28 Nov 2022 05:32:33 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1669613553; bh=T/gGlB7u1YNupe4ZO2Jm8YCsCqkF+wDY2/Dacl//lTo=; h=From:To:Subject:Date:In-Reply-To:References:From; b=2QdjNLy+IfNjfeHkOy2H8COIjpz4lRjfQUJIPfjXQIKEIKCyLUSZUsPfvnStRbZsC aFMbmGnWNJ66Gq05bfPGYSjkuOa1VH0lC770N4Wx7kBAFzwyX+YVCG0xzgiP356lQU KKZg9THLVmVXdmpgrDymlJJIEw25PdGlVKG/vQO4= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 06/95] lei_mirror: rely on global process reaper Date: Mon, 28 Nov 2022 05:31:03 +0000 Message-Id: <20221128053232.291618-7-e@80x24.org> In-Reply-To: <20221128053232.291618-1-e@80x24.org> References: <20221128053232.291618-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We no longer rely on SIGCHLD for predictability, and instead call waitpid at safe points. This will make it easier for us to do parallel mirroring of multiple inboxes while preserving proper dependencies via ->DESTROY callbacks. --- lib/PublicInbox/LeiMirror.pm | 54 +++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index 0603dd48..7dc47ab8 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -14,7 +14,7 @@ use PublicInbox::Spawn qw(popen_rd spawn); use File::Temp (); use Fcntl qw(SEEK_SET O_CREAT O_EXCL O_WRONLY); use Carp qw(croak); -use POSIX qw(WNOHANG); +our %LIVE; sub _wq_done_wait { # dwaitpid callback (via wq_eof) my ($arg, $pid) = @_; @@ -61,7 +61,9 @@ sub try_scrape { my ($n) = (m!/([0-9]+)\z!); $n => [ URI->new($_), '' ] } @v2_urls; # uniq - return clone_v2($self, \%v2_epochs); + clone_v2($self, \%v2_epochs); + reap_live() while keys(%LIVE); + return; } # filter out common URLs served by WWW (e.g /$MSGID/T/) @@ -311,16 +313,16 @@ EOM } sub reap_clone { # async, called via SIGCHLD - my ($lei, $cmd, $live) = @_; + my ($lei, $cmd) = @_; my $cerr = $?; $? = 0; # don't let it influence normal exit if ($cerr) { - kill('TERM', keys %$live); + kill('TERM', keys %LIVE); $lei->child_error($cerr, "@$cmd failed"); } } -sub v2_done { +sub v2_done { # called via OnDestroy my ($self) = @_; require PublicInbox::MultiGit; my $dst = $self->{cur_dst} // $self->{dst}; @@ -336,6 +338,16 @@ sub v2_done { index_cloned_inbox($self, 2); } +sub reap_live { + my $pid = waitpid(-1, 0) // die "waitpid(-1): $!"; + if (my $x = delete $LIVE{$pid}) { + my $cb = shift @$x; + $cb->(@$x); + } else { + warn "reaped unknown PID=$pid ($?)\n"; + } +} + sub clone_v2 ($$;$) { my ($self, $v2_epochs, $m) = @_; # $m => manifest.js.gz hashref my $lei = $self->{lei}; @@ -366,37 +378,21 @@ failed to extract epoch number from $src # filter out the epochs we skipped $self->{-culled_manifest} = 1 if delete(@$m{@skip}); my $lk = bless { lock_path => "$dst/inbox.lock" }, 'PublicInbox::Lock'; - my %live; my $fini = PublicInbox::OnDestroy->new($$, \&v2_done, $task); - $live{_try_config_start($task)} = [ \&_try_config_done, $task, $fini ]; + $LIVE{_try_config_start($task)} = [ \&_try_config_done, $task, $fini ]; $task->{-locked} = $lk->lock_for_scope($$); my @cmd = clone_cmd($lei, my $opt = {}); my $jobs = $self->{lei}->{opt}->{jobs} // 2; - my $sigchld = sub { - my ($sig) = @_; - my $flags = $sig ? WNOHANG : 0; - while (1) { - my $pid = waitpid(-1, $flags) or return; - return if $pid < 0; - if (my $x = delete $live{$pid}) { - my $cb = shift @$x; - $cb->(@$x, \%live); - } else { - warn "reaped unknown PID=$pid ($?)\n"; - } - } - }; do { - $sigchld->(0) while keys(%live) >= $jobs; - while (keys(%live) < $jobs && @src_edst && + reap_live() while keys(%LIVE) >= $jobs; + while (keys(%LIVE) < $jobs && @src_edst && !$lei->{child_error}) { my $cmd = [ @$pfx, @cmd, splice(@src_edst, 0, 2) ]; $lei->qerr("# @$cmd"); - my $pid = spawn($cmd, undef, $opt); - $live{$pid} = [ \&reap_clone, $lei, $cmd, $fini ]; + $LIVE{spawn($cmd, undef, $opt)} = [ \&reap_clone, + $lei, $cmd, $fini ]; } } while (@src_edst && !$lei->{child_error}); - $sigchld->(0) while keys(%live); } sub decode_manifest ($$$) { @@ -487,6 +483,7 @@ sub try_manifest { my $opt = { -C => $pdir }; $opt->{$_} = $lei->{$_} for (0..2); my $cerr = run_reap($lei, $cmd, $opt); + local %LIVE; if ($cerr) { return try_scrape($self) if ($cerr >> 8) == 22; # 404 missing return $lei->child_error($cerr, "@$cmd failed"); @@ -498,6 +495,7 @@ sub try_manifest { } my ($path_pfx, $n, $multi) = multi_inbox($self, \$path, $m); return $lei->child_error(1, $multi) if !ref($multi); + my $jobs = $self->{lei}->{opt}->{jobs} // 2; if (my $v2 = delete $multi->{v2}) { for my $name (sort keys %$v2) { my $epochs = delete $v2->{$name}; @@ -520,6 +518,8 @@ sub try_manifest { E: `$self->{cur_dst}' must not contain newline EOM clone_v2($self, \%v2_epochs, $m); + reap_live() while keys(%LIVE) >= $jobs; + return if $self->{lei}->{child_error}; } } if (my $v1 = delete $multi->{v1}) { @@ -540,6 +540,7 @@ EOM clone_v1($self, 1); } } + reap_live() while keys(%LIVE); if (delete $self->{-culled_manifest}) { # set by clone_v2/-I/--exclude # write the smaller manifest if epochs were skipped so # users won't have to delete manifest if they +w an @@ -566,6 +567,7 @@ sub do_mirror { # via wq_io_do eval { my $iv = $lei->{opt}->{'inbox-version'}; if (defined $iv) { + local %LIVE; return clone_v1($self) if $iv == 1; return try_scrape($self) if $iv == 2; die "bad --inbox-version=$iv\n";