user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 14/36] ipc: generic IPC dispatch based on Storable
  2020-12-31 13:51  7% [PATCH 00/36] another round of lei stuff Eric Wong
@ 2020-12-31 13:51  6% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

I intend to use this with LeiStore when importing from multiple
slow sources at once (e.g. curl, IMAP, etc).  This is because
over.sqlite3 can only have a single writer, and we'll have
several slow readers running in parallel.

Watch and SearchIdxShard should also be able to use this code
in the future, but this will be proven with LeiStore, first.
---
 MANIFEST                    |   2 +
 lib/PublicInbox/IPC.pm      | 129 ++++++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiStore.pm |   2 +-
 t/ipc.t                     |  67 +++++++++++++++++++
 t/lei_store.t               |   5 ++
 5 files changed, 204 insertions(+), 1 deletion(-)
 create mode 100644 lib/PublicInbox/IPC.pm
 create mode 100644 t/ipc.t

diff --git a/MANIFEST b/MANIFEST
index 7ce2075e..96ad52bf 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -153,6 +153,7 @@ lib/PublicInbox/IMAPD.pm
 lib/PublicInbox/IMAPTracker.pm
 lib/PublicInbox/IMAPdeflate.pm
 lib/PublicInbox/IMAPsearchqp.pm
+lib/PublicInbox/IPC.pm
 lib/PublicInbox/IdxStack.pm
 lib/PublicInbox/Import.pm
 lib/PublicInbox/In2Tie.pm
@@ -327,6 +328,7 @@ t/index-git-times.t
 t/indexlevels-mirror-v1.t
 t/indexlevels-mirror.t
 t/init.t
+t/ipc.t
 t/iso-2202-jp.eml
 t/kqnotify.t
 t/lei-oneshot.t
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
new file mode 100644
index 00000000..c04140ae
--- /dev/null
+++ b/lib/PublicInbox/IPC.pm
@@ -0,0 +1,129 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# base class for remote IPC calls, requires Storable
+# TODO: this ought to be usable in SearchIdxShard
+package PublicInbox::IPC;
+use strict;
+use v5.10.1;
+use Socket qw(AF_UNIX SOCK_STREAM);
+use Carp qw(confess croak);
+use PublicInbox::Sigfd;
+
+sub _get_rec ($) {
+	my ($sock) = @_;
+	local $/ = "\n";
+	defined(my $len = <$sock>) or return;
+	chop($len) eq "\n" or croak "no LF byte in $len";
+	defined(my $r = read($sock, my $buf, $len)) or croak "read error: $!";
+	$r == $len or croak "short read: $r != $len";
+	thaw($buf);
+}
+
+sub _send_rec ($$) {
+	my ($sock, $ref) = @_;
+	my $buf = freeze($ref);
+	print $sock length($buf), "\n", $buf or croak "print: $!";
+}
+
+sub ipc_return ($$$) {
+	my ($s2, $ret, $exc) = @_;
+	_send_rec($s2, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
+}
+
+sub ipc_worker_loop ($$) {
+	my ($self, $s2) = @_;
+	$self->ipc_atfork_child if $self->can('ipc_atfork_child');
+	$s2->autoflush(1);
+	while (my $rec = _get_rec($s2)) {
+		my ($wantarray, $sub, @args) = @$rec;
+		if (!defined($wantarray)) { # no waiting if client doesn't care
+			eval { $self->$sub(@args) };
+			eval { warn "die: $@ (from nowait $sub)\n" } if $@;
+		} elsif ($wantarray) {
+			my @ret = eval { $self->$sub(@args) };
+			ipc_return($s2, \@ret, $@);
+		} else {
+			my $ret = eval { $self->$sub(@args) };
+			ipc_return($s2, \$ret, $@);
+		}
+	}
+}
+
+sub ipc_worker_spawn ($$$) {
+	my ($self, $ident, $oldset) = @_;
+	eval { require Storable; Storable->import(qw(freeze thaw)); };
+	if ($@) {
+		state $w //= warn "Storable (part of Perl) missing: $@\n";
+		return;
+	}
+	my $pid = $self->{-ipc_worker_pid};
+	confess "BUG: already spawned PID:$pid" if $pid;
+	confess "BUG: already have worker socket" if $self->{-ipc_sock};
+	my ($s1, $s2);
+	socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!";
+	my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
+	defined($pid = fork) or die "fork: $!";
+	if ($pid == 0) {
+		undef $s1;
+		local $0 = $ident;
+		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+		PublicInbox::Sigfd::sig_setmask($oldset);
+		eval { ipc_worker_loop($self, $s2) };
+		die "worker $ident died: $@\n" if $@;
+		$self->ipc_at_worker_exit if $self->can('ipc_at_worker_exit');
+		exit;
+	}
+	PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
+	$s1->autoflush(1);
+	$self->{-ipc_sock} = $s1;
+	$self->{-ipc_worker_pid} = $pid;
+}
+
+sub ipc_reap_worker { # dwaitpid callback
+	my ($self, $pid) = @_;
+	warn "PID:$pid died with \$?=$?\n" if $?;
+}
+
+sub ipc_worker_stop {
+	my ($self) = @_;
+	my $pid;
+	if (delete $self->{-ipc_sock}) {
+		$pid = delete $self->{-ipc_worker_pid} or die "no PID?";
+	} else {
+		$pid = delete $self->{-ipc_worker_pid} and
+			die "unexpected PID:$pid";
+	}
+	return unless $pid;
+	eval { PublicInbox::DS::dwaitpid($pid, \&ipc_reap_worker, $self) };
+	if ($@) {
+		my $wp = waitpid($pid, 0);
+		$pid == $wp or die "waitpid($pid) returned $wp: \$?=$?";
+		ipc_reap_worker($self, $pid);
+	}
+}
+
+# use this if we have multiple readers reading curl or "pigz -dc"
+# and writing to the same store
+sub ipc_lock_init {
+	my ($self, $f) = @_;
+	require PublicInbox::Lock;
+	$self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock'
+}
+
+sub ipc_do {
+	my ($self, $sub, @args) = @_;
+	if (my $s1 = $self->{-ipc_sock}) {
+		my $ipc_lock = $self->{-ipc_lock};
+		my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
+		_send_rec($s1, [ wantarray, $sub, @args ]);
+		return unless defined(wantarray);
+		my $ret = _get_rec($s1) // die "no response on $sub";
+		die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
+		wantarray ? @$ret : $$ret;
+	} else {
+		$self->$sub(@args);
+	}
+}
+
+1;
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index f8383d5e..2745c560 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -9,7 +9,7 @@
 package PublicInbox::LeiStore;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::Lock);
+use parent qw(PublicInbox::Lock PublicInbox::IPC);
 use PublicInbox::SearchIdx qw(crlf_adjust);
 use PublicInbox::ExtSearchIdx;
 use PublicInbox::Import;
diff --git a/t/ipc.t b/t/ipc.t
new file mode 100644
index 00000000..f9c4024b
--- /dev/null
+++ b/t/ipc.t
@@ -0,0 +1,67 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+require_ok 'PublicInbox::IPC';
+state $once = eval <<'';
+package PublicInbox::IPC;
+use strict;
+sub test_array { qw(test array) }
+sub test_scalar { 'scalar' }
+sub test_scalarref { \'scalarref' }
+sub test_undef { undef }
+sub test_die { shift; die @_; 'unreachable' }
+sub test_pid { $$ }
+1;
+
+my $ipc = bless {}, 'PublicInbox::IPC';
+my @t = qw(array scalar scalarref undef);
+my $test = sub {
+	my $x = shift;
+	for my $type (@t) {
+		my $m = "test_$type";
+		my @ret = $ipc->ipc_do($m);
+		my @exp = $ipc->$m;
+		is_deeply(\@ret, \@exp, "wantarray $m $x");
+
+		$ipc->ipc_do($m);
+
+		my $ret = $ipc->ipc_do($m);
+		my $exp = $ipc->$m;
+		is_deeply($ret, $exp, "!wantarray $m $x");
+	}
+	my $ret = eval { $ipc->test_die('phail') };
+	my $exp = $@;
+	$ret = eval { $ipc->ipc_do('test_die', 'phail') };
+	my $err = $@;
+	my %lines;
+	for ($err, $exp) {
+		s/ line (\d+).*//s and $lines{$1}++;
+	}
+	is(scalar keys %lines, 1, 'line numbers match');
+	is((values %lines)[0], 2, '2 hits on same line number');
+	is($err, $exp, "$x die matches");
+	is($ret, undef, "$x die did not return");
+};
+$test->('local');
+
+SKIP: {
+	require_mods(qw(Storable), 16);
+	my $pid = $ipc->ipc_worker_spawn('test worker');
+	ok($pid > 0 && kill(0, $pid), 'worker spawned and running');
+	defined($pid) or BAIL_OUT 'no spawn, no test';
+	is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
+	$test->('worker');
+	{
+		my ($tmp, $for_destroy) = tmpdir();
+		$ipc->ipc_lock_init("$tmp/lock");
+		is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
+	}
+	$ipc->ipc_worker_stop;
+	ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped');
+}
+$ipc->ipc_worker_stop; # idempotent
+done_testing;
diff --git a/t/lei_store.t b/t/lei_store.t
index 03ab5af6..a189f897 100644
--- a/t/lei_store.t
+++ b/t/lei_store.t
@@ -85,4 +85,9 @@ for my $parallel (0, 1) {
 	is_deeply(\@kw, [], 'set clobbers all');
 }
 
+SKIP: {
+	require_mods(qw(Storable), 1);
+	ok($lst->can('ipc_do'), 'ipc_do works if we have Storable');
+}
+
 done_testing;

^ permalink raw reply related	[relevance 6%]

* [PATCH 00/36] another round of lei stuff
@ 2020-12-31 13:51  7% Eric Wong
  2020-12-31 13:51  6% ` [PATCH 14/36] ipc: generic IPC dispatch based on Storable Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

This is against lei branch @ commit
0c8106d44f317175e122744b43407bf067183175 in
https://public-inbox.org/public-inbox.git

Infrastructure stuff for reading + writing local Maildirs and a
bunch of mbox formats are done (including gz/bz2/xz support)
and it's usage should be familiar to mairix(1) users.

Infrastructure for deduplication + augmenting search results
in place and tested.

Going to skip MH and MMDF for now; but IMAP/JMAP might happen
sooner but deduplication needs low-latency.

"extinbox" renamed "external"

Basic infrastructure like PublicInbox::IPC and SharedKV
should've been done and in use ages ago...  I look forward to
using them, at least.

Some DS safety fixes since lei will use it in stranger ways
than current.

Bad enough we have messages with duplicate Message-IDs, lei will
need to deal with Unsent/Drafts messages w/o Message-IDs at all!

Eric Wong (36):
  import: respect init.defaultBranch
  lei_store: use per-machine refname as git HEAD
  revert "lei_store: use per-machine refname as git HEAD"
  lei_to_mail: initial implementation for writing mbox formats
  sharedkv: fork()-friendly key-value store
  sharedkv: split out index_values
  lei_to_mail: start atomic and compressed mbox writing
  mboxreader: new class for reading various mbox formats
  lei_to_mail: start --augment, dedupe, bz2 and xz
  lei: implement various deduplication strategies
  lei_to_mail: lazy-require LeiDedupe
  lei_to_mail: support for non-seekable outputs
  lei_to_mail: support Maildir, fix+test --augment
  ipc: generic IPC dispatch based on Storable
  ipc: support Sereal
  lei_store: add ->set_eml, ->add_eml can return smsg
  lei: rename "extinbox" => "external"
  mid: use defined-or with `push' for uniqueness check
  mid: hoist out mids_in sub
  lei_store: handle messages without Message-ID at all
  ipc: use shutdown(2), base atfork* callback
  lei_to_mail: unlink mboxes if not augmenting
  lei: add --mfolder as an option
  spawn: move run_die here from PublicInbox::Import
  init: remove embedded UnlinkMe package
  t/run.perl: avoid uninitialized var on incomplete test
  gcf2client: reap process on DESTROY
  lei_to_mail: open FIFOs O_WRONLY so we block
  searchidxshard: call DS->Reset at worker start
  t/ipc.t: test for references via `die'
  use PublicInbox::DS for dwaitpid
  syscall: SFD_NONBLOCK can be a constant, again
  lei: avoid Spawn package when starting daemon
  avoid calling waitpid from children in DESTROY
  ds: clobber $in_loop first at reset
  on_destroy: support PID owner guard

 MANIFEST                                      |  12 +-
 lib/PublicInbox/DS.pm                         |  42 +-
 lib/PublicInbox/DSKQXS.pm                     |   4 +-
 lib/PublicInbox/Daemon.pm                     |   4 +-
 lib/PublicInbox/Gcf2Client.pm                 |  18 +-
 lib/PublicInbox/Git.pm                        |   7 +-
 lib/PublicInbox/IPC.pm                        | 165 ++++++++
 lib/PublicInbox/Import.pm                     |  36 +-
 lib/PublicInbox/LEI.pm                        |  44 +--
 lib/PublicInbox/LeiDedupe.pm                  | 100 +++++
 .../{LeiExtinbox.pm => LeiExternal.pm}        |  18 +-
 lib/PublicInbox/LeiStore.pm                   |  32 +-
 lib/PublicInbox/LeiToMail.pm                  | 361 ++++++++++++++++++
 lib/PublicInbox/LeiXSearch.pm                 |   2 +-
 lib/PublicInbox/Lock.pm                       |  17 +-
 lib/PublicInbox/MID.pm                        |  15 +-
 lib/PublicInbox/MboxReader.pm                 | 127 ++++++
 lib/PublicInbox/OnDestroy.pm                  |   5 +
 lib/PublicInbox/OverIdx.pm                    |   2 +
 lib/PublicInbox/ProcessPipe.pm                |  34 +-
 lib/PublicInbox/Qspawn.pm                     |  43 +--
 lib/PublicInbox/SearchIdxShard.pm             |   1 +
 lib/PublicInbox/SharedKV.pm                   | 148 +++++++
 lib/PublicInbox/Sigfd.pm                      |   4 +-
 lib/PublicInbox/Smsg.pm                       |   6 +-
 lib/PublicInbox/Spawn.pm                      |   9 +-
 lib/PublicInbox/Syscall.pm                    |   4 +-
 lib/PublicInbox/TestCommon.pm                 |  25 +-
 lib/PublicInbox/V2Writable.pm                 |  10 +-
 script/lei                                    |  17 +-
 script/public-inbox-init                      |  32 +-
 script/public-inbox-watch                     |   4 +-
 t/convert-compact.t                           |   4 +-
 t/index-git-times.t                           |   3 +-
 t/ipc.t                                       |  80 ++++
 t/lei.t                                       |  22 +-
 t/lei_dedupe.t                                |  59 +++
 t/lei_store.t                                 |  47 ++-
 t/lei_to_mail.t                               | 246 ++++++++++++
 t/lei_xsearch.t                               |   2 +-
 t/mbox_reader.t                               |  75 ++++
 t/on_destroy.t                                |   9 +
 t/plack.t                                     |   4 +-
 t/run.perl                                    |   3 +-
 t/shared_kv.t                                 |  58 +++
 t/sigfd.t                                     |   6 +-
 46 files changed, 1755 insertions(+), 211 deletions(-)
 create mode 100644 lib/PublicInbox/IPC.pm
 create mode 100644 lib/PublicInbox/LeiDedupe.pm
 rename lib/PublicInbox/{LeiExtinbox.pm => LeiExternal.pm} (75%)
 create mode 100644 lib/PublicInbox/LeiToMail.pm
 create mode 100644 lib/PublicInbox/MboxReader.pm
 create mode 100644 lib/PublicInbox/SharedKV.pm
 create mode 100644 t/ipc.t
 create mode 100644 t/lei_dedupe.t
 create mode 100644 t/lei_to_mail.t
 create mode 100644 t/mbox_reader.t
 create mode 100644 t/shared_kv.t


^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2020-12-31 13:51  7% [PATCH 00/36] another round of lei stuff Eric Wong
2020-12-31 13:51  6% ` [PATCH 14/36] ipc: generic IPC dispatch based on Storable Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).