user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 10/10] lei import: initial implementation
Date: Thu,  4 Feb 2021 00:59:30 -0900	[thread overview]
Message-ID: <20210204095930.20278-11-e@80x24.org> (raw)
In-Reply-To: <20210204095930.20278-1-e@80x24.org>

Only tested with .eml files so far, but Maildir + IMAP
will be supported.
---
 MANIFEST                      |   1 +
 lib/PublicInbox/IPC.pm        |   4 +-
 lib/PublicInbox/LEI.pm        |  48 ++++++++++++---
 lib/PublicInbox/LeiImport.pm  | 106 ++++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiStore.pm   |  18 ++++++
 lib/PublicInbox/LeiXSearch.pm |  18 +-----
 t/lei.t                       |  15 +++++
 7 files changed, 184 insertions(+), 26 deletions(-)
 create mode 100644 lib/PublicInbox/LeiImport.pm

diff --git a/MANIFEST b/MANIFEST
index 6922f9b1..a11d4106 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -179,6 +179,7 @@ lib/PublicInbox/KQNotify.pm
 lib/PublicInbox/LEI.pm
 lib/PublicInbox/LeiDedupe.pm
 lib/PublicInbox/LeiExternal.pm
+lib/PublicInbox/LeiImport.pm
 lib/PublicInbox/LeiOverview.pm
 lib/PublicInbox/LeiQuery.pm
 lib/PublicInbox/LeiSearch.pm
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 7f5a3f6f..a0e6bfee 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -101,7 +101,7 @@ sub ipc_worker_loop ($$$) {
 
 # starts a worker if Sereal or Storable is installed
 sub ipc_worker_spawn {
-	my ($self, $ident, $oldset) = @_;
+	my ($self, $ident, $oldset, $fields) = @_;
 	return unless $enc; # no Sereal or Storable
 	return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
 	delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
@@ -123,6 +123,8 @@ sub ipc_worker_spawn {
 		# ensure we properly exit even if warn() dies:
 		my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
 		eval {
+			$fields //= {};
+			local @$self{keys %$fields} = values(%$fields);
 			my $on_destroy = $self->ipc_atfork_child;
 			local %SIG = %SIG;
 			ipc_worker_loop($self, $r_req, $w_res);
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 24efb494..682d1bd1 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -160,9 +160,10 @@ our %CMD = ( # sorted in order of importance/use:
 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
 	qw(prune) ],
 
-'import' => [ 'URL_OR_PATHNAME|--stdin',
-	'one-shot import/update from URL or filesystem',
-	qw(stdin| offset=i recursive|r exclude=s include=s !flags),
+'import' => [ 'URLS_OR_PATHNAMES...|--stdin',
+	'one-time import/update from URL or filesystem',
+	qw(stdin| offset=i recursive|r exclude=s include|I=s
+	format|f=s flags!),
 	],
 
 'config' => [ '[...]', sub {
@@ -194,8 +195,8 @@ our %CMD = ( # sorted in order of importance/use:
 # $spec => [@ALLOWED_VALUES (default is first), $description],
 # $spec => $description
 # "$SUB_COMMAND TAB $spec" => as above
-my $stdin_formats = [ 'IN|auto|raw|mboxrd|mboxcl2|mboxcl|mboxo',
-		'specify message input format' ];
+my $stdin_formats = [ 'MAIL_FORMAT|eml|mboxrd|mboxcl2|mboxcl|mboxo',
+			'specify message input format' ];
 my $ls_format = [ 'OUT|plain|json|null', 'listing output format' ];
 
 my %OPTDESC = (
@@ -240,6 +241,8 @@ my %OPTDESC = (
 'q	jobs=s'	=> [ '[SEARCH_JOBS][,WRITER_JOBS]',
 		'control number of search and writer jobs' ],
 
+'import format|f=s' => $stdin_formats,
+
 'ls-query	format|f=s' => $ls_format,
 'ls-external	format|f=s' => $ls_format,
 
@@ -319,6 +322,20 @@ sub err ($;@) {
 
 sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
 
+sub fail_handler ($;$$) {
+	my ($lei, $code, $io) = @_;
+	for my $f (qw(imp lxs l2m)) {
+		my $wq = delete $lei->{$f} or next;
+		$wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
+	}
+	close($io) if $io; # needed to avoid warnings on SIGPIPE
+	$lei->x_it($code // (1 >> 8));
+}
+
+sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
+	fail_handler($_[0], 13, delete $_[0]->{1});
+}
+
 sub fail ($$;$) {
 	my ($self, $buf, $exit_code) = @_;
 	err($self, $buf) if defined $buf;
@@ -340,7 +357,8 @@ sub out ($;@) {
 sub puts ($;@) { out(shift, map { "$_\n" } @_) }
 
 sub child_error { # passes non-fatal curl exit codes to user
-	my ($self, $child_error) = @_; # child_error is $?
+	my ($self, $child_error, $msg) = @_; # child_error is $?
+	$self->err($msg) if $msg;
 	if (my $s = $self->{pkt_op_p} // $self->{sock}) {
 		# send to the parent lei-daemon or to lei(1) client
 		send($s, "child_error $child_error", MSG_EOR);
@@ -357,9 +375,16 @@ sub note_sigpipe { # triggers sigpipe_handler
 }
 
 sub lei_atfork_child {
-	my ($self) = @_;
+	my ($self, $persist) = @_;
 	# we need to explicitly close things which are on stack
-	delete $self->{0};
+	if ($persist) {
+		my @io = delete @$self{0,1,2};
+		unless ($self->{oneshot}) {
+			close($_) for @io;
+		}
+	} else {
+		delete $self->{0};
+	}
 	for (delete @$self{qw(3 sock old_1 au_done)}) {
 		close($_) if defined($_);
 	}
@@ -374,7 +399,7 @@ sub lei_atfork_child {
 	%PATH2CFG = ();
 	undef $errors_log;
 	$quit = \&CORE::exit;
-	$current_lei = $self; # for SIG{__WARN__}
+	$current_lei = $persist ? undef : $self; # for SIG{__WARN__}
 }
 
 sub _help ($;$) {
@@ -606,6 +631,11 @@ sub lei_config {
 	x_it($self, $?) if $?;
 }
 
+sub lei_import {
+	require PublicInbox::LeiImport;
+	PublicInbox::LeiImport->call(@_);
+}
+
 sub lei_init {
 	my ($self, $dir) = @_;
 	my $cfg = _lei_cfg($self, 1);
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
new file mode 100644
index 00000000..4a9af8a7
--- /dev/null
+++ b/lib/PublicInbox/LeiImport.pm
@@ -0,0 +1,106 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# front-end for the "lei import" sub-command
+package PublicInbox::LeiImport;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+use PublicInbox::MboxReader;
+use PublicInbox::Eml;
+
+sub _import_eml { # MboxReader callback
+	my ($eml, $sto, $set_kw) = @_;
+	$sto->ipc_do('set_eml', $eml, $set_kw ? $sto->mbox_keywords($eml) : ());
+}
+
+sub import_done { # EOF callback for main daemon
+	my ($lei) = @_;
+	my $imp = delete $lei->{imp};
+	$imp->wq_wait_old($lei) if $imp;
+	my $wait = $lei->{sto}->ipc_do('done');
+	$lei->dclose;
+}
+
+sub call { # the main "lei import" method
+	my ($cls, $lei, @argv) = @_;
+	my $sto = $lei->_lei_store(1);
+	$sto->write_prepare($lei);
+	$lei->{opt}->{flags} //= 1;
+	my $fmt = $lei->{opt}->{'format'};
+	my $self = $lei->{imp} = bless {}, $cls;
+	return $lei->fail('--format unspecified') if !$fmt;
+	$self->{0} = $lei->{0} if $lei->{opt}->{stdin};
+	my $ops = {
+		'!' => [ $lei->can('fail_handler'), $lei ],
+		'x_it' => [ $lei->can('x_it'), $lei ],
+		'child_error' => [ $lei->can('child_error'), $lei ],
+		'' => [ \&import_done, $lei ],
+	};
+	($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+	my $j = $lei->{opt}->{jobs} // scalar(@argv) || 1;
+	my $nproc = $self->detect_nproc;
+	$j = $nproc if $j > $nproc;
+	$self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
+	my $op = delete $lei->{pkt_op_c};
+	delete $lei->{pkt_op_p};
+	$self->wq_do('import_stdin', []) if $self->{0};
+	for my $x (@argv) {
+		$self->wq_do('import_path_url', [], $x);
+	}
+	$self->wq_close(1);
+	$lei->event_step_init; # wait for shutdowns
+	if ($lei->{oneshot}) {
+		while ($op->{sock}) { $op->event_step }
+	}
+}
+
+sub ipc_atfork_child {
+	my ($self) = @_;
+	$self->{lei}->lei_atfork_child;
+	$self->SUPER::ipc_atfork_child;
+}
+
+sub _import_fh {
+	my ($lei, $fh, $x) = @_;
+	my $set_kw = $lei->{opt}->{flags};
+	my $fmt = $lei->{opt}->{'format'};
+	eval {
+		if ($fmt eq 'eml') {
+			my $buf = do { local $/; <$fh> } //
+				return $lei->child_error(1 >> 8, <<"");
+		error reading $x: $!
+
+			my $eml = PublicInbox::Eml->new(\$buf);
+			_import_eml($eml, $lei->{sto}, $set_kw);
+		} else { # some mbox
+			my $cb = PublicInbox::MboxReader->can($fmt);
+			$cb or return $lei->child_error(1 >> 8, <<"");
+	--format $fmt unsupported for $x
+
+			$cb->(undef, $fh, \&_import_eml, $lei->{sto}, $set_kw);
+		}
+	};
+	$lei->child_error(1 >> 8, "<stdin>: $@") if $@;
+}
+
+sub import_path_url {
+	my ($self, $x) = @_;
+	my $lei = $self->{lei};
+	# TODO auto-detect?
+	if (-f $x) {
+		open my $fh, '<', $x or return $lei->child_error(1 >> 8, <<"");
+unable to open $x: $!
+
+		_import_fh($lei, $fh, $x);
+	} else {
+		$lei->fail("$x unsupported (TODO)");
+	}
+}
+
+sub import_stdin {
+	my ($self) = @_;
+	_import_fh($self->{lei}, $self->{0}, '<stdin>');
+}
+
+1;
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index a7d7d953..3a215973 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -17,6 +17,7 @@ use PublicInbox::V2Writable;
 use PublicInbox::ContentHash qw(content_hash content_digest);
 use PublicInbox::MID qw(mids mids_in);
 use PublicInbox::LeiSearch;
+use PublicInbox::MDA;
 use List::Util qw(max);
 
 sub new {
@@ -237,4 +238,21 @@ sub done {
 	die $err if $err;
 }
 
+sub ipc_atfork_child {
+	my ($self) = @_;
+	my $lei = delete $self->{lei};
+	$lei->lei_atfork_child(1) if $lei;
+	$self->SUPER::ipc_atfork_child;
+}
+
+sub write_prepare {
+	my ($self, $lei) = @_;
+	$self->ipc_lock_init;
+	# Mail we import into lei are private, so headers filtered out
+	# by -mda for public mail are not appropriate
+	local @PublicInbox::MDA::BAD_HEADERS = ();
+	$self->ipc_worker_spawn('lei_store', $lei->oldset, { lei => $lei });
+	$lei->{sto} = $self;
+}
+
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index daf42098..f8068362 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -392,25 +392,11 @@ sub query_prepare { # called by wq_do
 	pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
-sub fail_handler ($;$$) {
-	my ($lei, $code, $io) = @_;
-	for my $f (qw(lxs l2m)) {
-		my $wq = delete $lei->{$f} or next;
-		$wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
-	}
-	close($io) if $io; # needed to avoid warnings on SIGPIPE
-	$lei->x_it($code // (1 >> 8));
-}
-
-sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
-	fail_handler($_[0], 13, delete $_[0]->{1});
-}
-
 sub do_query {
 	my ($self, $lei) = @_;
 	my $ops = {
-		'|' => [ \&sigpipe_handler, $lei ],
-		'!' => [ \&fail_handler, $lei ],
+		'|' => [ $lei->can('sigpipe_handler'), $lei ],
+		'!' => [ $lei->can('fail_handler'), $lei ],
 		'.' => [ \&do_post_augment, $lei ],
 		'' => [ \&query_done, $lei ],
 		'mset_progress' => [ \&mset_progress, $lei ],
diff --git a/t/lei.t b/t/lei.t
index a08a6d0d..eb824a30 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -389,6 +389,20 @@ SKIP: {
 }; # /SKIP
 };
 
+my $test_import = sub {
+	$cleanup->();
+	ok($lei->(qw(q s:boolean)), 'search miss before import');
+	unlike($out, qr/boolean/i, 'no results, yet');
+	open my $fh, '<', 't/data/0001.patch' or BAIL_OUT $!;
+	ok($lei->([qw(import -f eml -)], undef, { %$opt, 0 => $fh }),
+		'import single file from stdin');
+	close $fh;
+	ok($lei->(qw(q s:boolean)), 'search hit after import');
+	ok($lei->(qw(import -f eml), 't/data/message_embed.eml'),
+		'import single file by path');
+	$cleanup->();
+};
+
 my $test_lei_common = sub {
 	$test_help->();
 	$test_config->();
@@ -396,6 +410,7 @@ my $test_lei_common = sub {
 	$test_external->();
 	$test_completion->();
 	$test_fail->();
+	$test_import->();
 };
 
 if ($ENV{TEST_LEI_ONESHOT}) {

      parent reply	other threads:[~2021-02-04  9:59 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-04  9:59 [PATCH 00/10] lei: cleanups + initial import support Eric Wong
2021-02-04  9:59 ` [PATCH 01/10] lei q: delay worker spawn Eric Wong
2021-02-04  9:59 ` [PATCH 02/10] ipc: localize fields assignment Eric Wong
2021-02-04  9:59 ` [PATCH 03/10] lei q: reorder internals to reduce FD passing Eric Wong
2021-02-04  9:59 ` [PATCH 04/10] lei q: only start pager if output is to stdout Eric Wong
2021-02-04  9:59 ` [PATCH 05/10] lei q: reinstate early MUA spawn for Maildir Eric Wong
2021-02-04  9:59 ` [PATCH 06/10] eml: handle warning ignores for lei Eric Wong
2021-02-04  9:59 ` [PATCH 07/10] lei q: eliminate $not_done temporary git dir hack Eric Wong
2021-02-04  9:59 ` [PATCH 08/10] lei_query: remove uneeded dwaitpid import Eric Wong
2021-02-04  9:59 ` [PATCH 09/10] lei_xsearch: drop unused imports Eric Wong
2021-02-04  9:59 ` Eric Wong [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210204095930.20278-11-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).