* [PATCH 21/36] ipc: use shutdown(2), base atfork* callback
2020-12-31 13:51 5% [PATCH 00/36] another round of lei stuff Eric Wong
@ 2020-12-31 13:51 7% ` Eric Wong
0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
To: meta
shutdown(2) on a socket can be preferable if there's multiple
forked processes writing to a single worker and we really want
to shut things down ASAP.
It may also be good to provide an ipc_worker_exit method which
subclasses can override if needed for graceful shutdown. But we
won't need equivalents to atexit(3) since we can rely on DESTROY
handlers given this is Perl5.
---
lib/PublicInbox/IPC.pm | 49 ++++++++++++++++++++++++++++--------------
1 file changed, 33 insertions(+), 16 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 0baa218c..ed10cf44 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -56,8 +56,6 @@ sub ipc_return ($$$) {
sub ipc_worker_loop ($$) {
my ($self, $s2) = @_;
- $self->ipc_atfork_child if $self->can('ipc_atfork_child');
- $s2->autoflush(1);
while (my $rec = _get_rec($s2)) {
my ($wantarray, $sub, @args) = @$rec;
if (!defined($wantarray)) { # no waiting if client doesn't care
@@ -73,7 +71,7 @@ sub ipc_worker_loop ($$) {
}
}
-sub ipc_worker_spawn ($$$) {
+sub ipc_worker_spawn {
my ($self, $ident, $oldset) = @_;
return unless $enc;
my $pid = $self->{-ipc_worker_pid};
@@ -82,43 +80,62 @@ sub ipc_worker_spawn ($$$) {
my ($s1, $s2);
socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!";
my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
+ my $parent = $$;
+ $self->ipc_atfork_parent;
defined($pid = fork) or die "fork: $!";
if ($pid == 0) {
- undef $s1;
- local $0 = $ident;
+ eval { PublicInbox::DS->Reset };
+ $self->{-ipc_parent_pid} = $parent;
+ close $s1 or die "close(\$s1): $!";
+ $s2->autoflush(1);
$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+ local $0 = $ident;
PublicInbox::Sigfd::sig_setmask($oldset);
+ $self->ipc_atfork_child;
eval { ipc_worker_loop($self, $s2) };
- die "worker $ident died: $@\n" if $@;
- $self->ipc_at_worker_exit if $self->can('ipc_at_worker_exit');
+ die "worker $ident PID:$$ died: $@\n" if $@;
exit;
}
PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
+ close $s2 or die "close(\$s2): $!";
$s1->autoflush(1);
$self->{-ipc_sock} = $s1;
$self->{-ipc_worker_pid} = $pid;
}
-sub ipc_reap_worker { # dwaitpid callback
+sub ipc_worker_reap { # dwaitpid callback
my ($self, $pid) = @_;
warn "PID:$pid died with \$?=$?\n" if $?;
}
+# for base class, override in superclasses
+sub ipc_atfork_parent {}
+sub ipc_atfork_child {}
+
+sub ipc_worker_exit {
+ my (undef, $code) = @_;
+ exit($code);
+}
+
sub ipc_worker_stop {
my ($self) = @_;
my $pid;
- if (delete $self->{-ipc_sock}) {
- $pid = delete $self->{-ipc_worker_pid} or die "no PID?";
- } else {
+ my $s1 = delete $self->{-ipc_sock} or do {
$pid = delete $self->{-ipc_worker_pid} and
- die "unexpected PID:$pid";
- }
- return unless $pid;
- eval { PublicInbox::DS::dwaitpid($pid, \&ipc_reap_worker, $self) };
+ die "unexpected PID:$pid without ipc_sock";
+ return;
+ };
+ $pid = delete $self->{-ipc_worker_pid} or die "no PID?";
+ _send_rec($s1, [ undef, 'ipc_worker_exit', 0 ]);
+ shutdown($s1, 2) or die "shutdown(\$s1) for PID:$pid";
+ eval {
+ my $reap = $self->can('ipc_worker_reap');
+ PublicInbox::DS::dwaitpid($pid, $reap, $self);
+ };
if ($@) {
my $wp = waitpid($pid, 0);
$pid == $wp or die "waitpid($pid) returned $wp: \$?=$?";
- ipc_reap_worker($self, $pid);
+ $self->ipc_worker_reap($pid);
}
}
^ permalink raw reply related [relevance 7%]
* [PATCH 00/36] another round of lei stuff
@ 2020-12-31 13:51 5% Eric Wong
2020-12-31 13:51 7% ` [PATCH 21/36] ipc: use shutdown(2), base atfork* callback Eric Wong
0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
To: meta
This is against lei branch @ commit
0c8106d44f317175e122744b43407bf067183175 in
https://public-inbox.org/public-inbox.git
Infrastructure stuff for reading + writing local Maildirs and a
bunch of mbox formats are done (including gz/bz2/xz support)
and it's usage should be familiar to mairix(1) users.
Infrastructure for deduplication + augmenting search results
in place and tested.
Going to skip MH and MMDF for now; but IMAP/JMAP might happen
sooner but deduplication needs low-latency.
"extinbox" renamed "external"
Basic infrastructure like PublicInbox::IPC and SharedKV
should've been done and in use ages ago... I look forward to
using them, at least.
Some DS safety fixes since lei will use it in stranger ways
than current.
Bad enough we have messages with duplicate Message-IDs, lei will
need to deal with Unsent/Drafts messages w/o Message-IDs at all!
Eric Wong (36):
import: respect init.defaultBranch
lei_store: use per-machine refname as git HEAD
revert "lei_store: use per-machine refname as git HEAD"
lei_to_mail: initial implementation for writing mbox formats
sharedkv: fork()-friendly key-value store
sharedkv: split out index_values
lei_to_mail: start atomic and compressed mbox writing
mboxreader: new class for reading various mbox formats
lei_to_mail: start --augment, dedupe, bz2 and xz
lei: implement various deduplication strategies
lei_to_mail: lazy-require LeiDedupe
lei_to_mail: support for non-seekable outputs
lei_to_mail: support Maildir, fix+test --augment
ipc: generic IPC dispatch based on Storable
ipc: support Sereal
lei_store: add ->set_eml, ->add_eml can return smsg
lei: rename "extinbox" => "external"
mid: use defined-or with `push' for uniqueness check
mid: hoist out mids_in sub
lei_store: handle messages without Message-ID at all
ipc: use shutdown(2), base atfork* callback
lei_to_mail: unlink mboxes if not augmenting
lei: add --mfolder as an option
spawn: move run_die here from PublicInbox::Import
init: remove embedded UnlinkMe package
t/run.perl: avoid uninitialized var on incomplete test
gcf2client: reap process on DESTROY
lei_to_mail: open FIFOs O_WRONLY so we block
searchidxshard: call DS->Reset at worker start
t/ipc.t: test for references via `die'
use PublicInbox::DS for dwaitpid
syscall: SFD_NONBLOCK can be a constant, again
lei: avoid Spawn package when starting daemon
avoid calling waitpid from children in DESTROY
ds: clobber $in_loop first at reset
on_destroy: support PID owner guard
MANIFEST | 12 +-
lib/PublicInbox/DS.pm | 42 +-
lib/PublicInbox/DSKQXS.pm | 4 +-
lib/PublicInbox/Daemon.pm | 4 +-
lib/PublicInbox/Gcf2Client.pm | 18 +-
lib/PublicInbox/Git.pm | 7 +-
lib/PublicInbox/IPC.pm | 165 ++++++++
lib/PublicInbox/Import.pm | 36 +-
lib/PublicInbox/LEI.pm | 44 +--
lib/PublicInbox/LeiDedupe.pm | 100 +++++
.../{LeiExtinbox.pm => LeiExternal.pm} | 18 +-
lib/PublicInbox/LeiStore.pm | 32 +-
lib/PublicInbox/LeiToMail.pm | 361 ++++++++++++++++++
lib/PublicInbox/LeiXSearch.pm | 2 +-
lib/PublicInbox/Lock.pm | 17 +-
lib/PublicInbox/MID.pm | 15 +-
lib/PublicInbox/MboxReader.pm | 127 ++++++
lib/PublicInbox/OnDestroy.pm | 5 +
lib/PublicInbox/OverIdx.pm | 2 +
lib/PublicInbox/ProcessPipe.pm | 34 +-
lib/PublicInbox/Qspawn.pm | 43 +--
lib/PublicInbox/SearchIdxShard.pm | 1 +
lib/PublicInbox/SharedKV.pm | 148 +++++++
lib/PublicInbox/Sigfd.pm | 4 +-
lib/PublicInbox/Smsg.pm | 6 +-
lib/PublicInbox/Spawn.pm | 9 +-
lib/PublicInbox/Syscall.pm | 4 +-
lib/PublicInbox/TestCommon.pm | 25 +-
lib/PublicInbox/V2Writable.pm | 10 +-
script/lei | 17 +-
script/public-inbox-init | 32 +-
script/public-inbox-watch | 4 +-
t/convert-compact.t | 4 +-
t/index-git-times.t | 3 +-
t/ipc.t | 80 ++++
t/lei.t | 22 +-
t/lei_dedupe.t | 59 +++
t/lei_store.t | 47 ++-
t/lei_to_mail.t | 246 ++++++++++++
t/lei_xsearch.t | 2 +-
t/mbox_reader.t | 75 ++++
t/on_destroy.t | 9 +
t/plack.t | 4 +-
t/run.perl | 3 +-
t/shared_kv.t | 58 +++
t/sigfd.t | 6 +-
46 files changed, 1755 insertions(+), 211 deletions(-)
create mode 100644 lib/PublicInbox/IPC.pm
create mode 100644 lib/PublicInbox/LeiDedupe.pm
rename lib/PublicInbox/{LeiExtinbox.pm => LeiExternal.pm} (75%)
create mode 100644 lib/PublicInbox/LeiToMail.pm
create mode 100644 lib/PublicInbox/MboxReader.pm
create mode 100644 lib/PublicInbox/SharedKV.pm
create mode 100644 t/ipc.t
create mode 100644 t/lei_dedupe.t
create mode 100644 t/lei_to_mail.t
create mode 100644 t/mbox_reader.t
create mode 100644 t/shared_kv.t
^ permalink raw reply [relevance 5%]
Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2020-12-31 13:51 5% [PATCH 00/36] another round of lei stuff Eric Wong
2020-12-31 13:51 7% ` [PATCH 21/36] ipc: use shutdown(2), base atfork* callback Eric Wong
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).