about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--MANIFEST1
-rw-r--r--lib/PublicInbox/IPC.pm4
-rw-r--r--lib/PublicInbox/LEI.pm48
-rw-r--r--lib/PublicInbox/LeiImport.pm106
-rw-r--r--lib/PublicInbox/LeiStore.pm18
-rw-r--r--lib/PublicInbox/LeiXSearch.pm18
-rw-r--r--t/lei.t15
7 files changed, 184 insertions, 26 deletions
diff --git a/MANIFEST b/MANIFEST
index 6922f9b1..a11d4106 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -179,6 +179,7 @@ lib/PublicInbox/KQNotify.pm
 lib/PublicInbox/LEI.pm
 lib/PublicInbox/LeiDedupe.pm
 lib/PublicInbox/LeiExternal.pm
+lib/PublicInbox/LeiImport.pm
 lib/PublicInbox/LeiOverview.pm
 lib/PublicInbox/LeiQuery.pm
 lib/PublicInbox/LeiSearch.pm
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 7f5a3f6f..a0e6bfee 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -101,7 +101,7 @@ sub ipc_worker_loop ($$$) {
 
 # starts a worker if Sereal or Storable is installed
 sub ipc_worker_spawn {
-        my ($self, $ident, $oldset) = @_;
+        my ($self, $ident, $oldset, $fields) = @_;
         return unless $enc; # no Sereal or Storable
         return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
         delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
@@ -123,6 +123,8 @@ sub ipc_worker_spawn {
                 # ensure we properly exit even if warn() dies:
                 my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                 eval {
+                        $fields //= {};
+                        local @$self{keys %$fields} = values(%$fields);
                         my $on_destroy = $self->ipc_atfork_child;
                         local %SIG = %SIG;
                         ipc_worker_loop($self, $r_req, $w_res);
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 24efb494..682d1bd1 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -160,9 +160,10 @@ our %CMD = ( # sorted in order of importance/use:
 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
         qw(prune) ],
 
-'import' => [ 'URL_OR_PATHNAME|--stdin',
-        'one-shot import/update from URL or filesystem',
-        qw(stdin| offset=i recursive|r exclude=s include=s !flags),
+'import' => [ 'URLS_OR_PATHNAMES...|--stdin',
+        'one-time import/update from URL or filesystem',
+        qw(stdin| offset=i recursive|r exclude=s include|I=s
+        format|f=s flags!),
         ],
 
 'config' => [ '[...]', sub {
@@ -194,8 +195,8 @@ our %CMD = ( # sorted in order of importance/use:
 # $spec => [@ALLOWED_VALUES (default is first), $description],
 # $spec => $description
 # "$SUB_COMMAND TAB $spec" => as above
-my $stdin_formats = [ 'IN|auto|raw|mboxrd|mboxcl2|mboxcl|mboxo',
-                'specify message input format' ];
+my $stdin_formats = [ 'MAIL_FORMAT|eml|mboxrd|mboxcl2|mboxcl|mboxo',
+                        'specify message input format' ];
 my $ls_format = [ 'OUT|plain|json|null', 'listing output format' ];
 
 my %OPTDESC = (
@@ -240,6 +241,8 @@ my %OPTDESC = (
 'q        jobs=s'        => [ '[SEARCH_JOBS][,WRITER_JOBS]',
                 'control number of search and writer jobs' ],
 
+'import format|f=s' => $stdin_formats,
+
 'ls-query        format|f=s' => $ls_format,
 'ls-external        format|f=s' => $ls_format,
 
@@ -319,6 +322,20 @@ sub err ($;@) {
 
 sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
 
+sub fail_handler ($;$$) {
+        my ($lei, $code, $io) = @_;
+        for my $f (qw(imp lxs l2m)) {
+                my $wq = delete $lei->{$f} or next;
+                $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
+        }
+        close($io) if $io; # needed to avoid warnings on SIGPIPE
+        $lei->x_it($code // (1 >> 8));
+}
+
+sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
+        fail_handler($_[0], 13, delete $_[0]->{1});
+}
+
 sub fail ($$;$) {
         my ($self, $buf, $exit_code) = @_;
         err($self, $buf) if defined $buf;
@@ -340,7 +357,8 @@ sub out ($;@) {
 sub puts ($;@) { out(shift, map { "$_\n" } @_) }
 
 sub child_error { # passes non-fatal curl exit codes to user
-        my ($self, $child_error) = @_; # child_error is $?
+        my ($self, $child_error, $msg) = @_; # child_error is $?
+        $self->err($msg) if $msg;
         if (my $s = $self->{pkt_op_p} // $self->{sock}) {
                 # send to the parent lei-daemon or to lei(1) client
                 send($s, "child_error $child_error", MSG_EOR);
@@ -357,9 +375,16 @@ sub note_sigpipe { # triggers sigpipe_handler
 }
 
 sub lei_atfork_child {
-        my ($self) = @_;
+        my ($self, $persist) = @_;
         # we need to explicitly close things which are on stack
-        delete $self->{0};
+        if ($persist) {
+                my @io = delete @$self{0,1,2};
+                unless ($self->{oneshot}) {
+                        close($_) for @io;
+                }
+        } else {
+                delete $self->{0};
+        }
         for (delete @$self{qw(3 sock old_1 au_done)}) {
                 close($_) if defined($_);
         }
@@ -374,7 +399,7 @@ sub lei_atfork_child {
         %PATH2CFG = ();
         undef $errors_log;
         $quit = \&CORE::exit;
-        $current_lei = $self; # for SIG{__WARN__}
+        $current_lei = $persist ? undef : $self; # for SIG{__WARN__}
 }
 
 sub _help ($;$) {
@@ -606,6 +631,11 @@ sub lei_config {
         x_it($self, $?) if $?;
 }
 
+sub lei_import {
+        require PublicInbox::LeiImport;
+        PublicInbox::LeiImport->call(@_);
+}
+
 sub lei_init {
         my ($self, $dir) = @_;
         my $cfg = _lei_cfg($self, 1);
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
new file mode 100644
index 00000000..4a9af8a7
--- /dev/null
+++ b/lib/PublicInbox/LeiImport.pm
@@ -0,0 +1,106 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# front-end for the "lei import" sub-command
+package PublicInbox::LeiImport;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+use PublicInbox::MboxReader;
+use PublicInbox::Eml;
+
+sub _import_eml { # MboxReader callback
+        my ($eml, $sto, $set_kw) = @_;
+        $sto->ipc_do('set_eml', $eml, $set_kw ? $sto->mbox_keywords($eml) : ());
+}
+
+sub import_done { # EOF callback for main daemon
+        my ($lei) = @_;
+        my $imp = delete $lei->{imp};
+        $imp->wq_wait_old($lei) if $imp;
+        my $wait = $lei->{sto}->ipc_do('done');
+        $lei->dclose;
+}
+
+sub call { # the main "lei import" method
+        my ($cls, $lei, @argv) = @_;
+        my $sto = $lei->_lei_store(1);
+        $sto->write_prepare($lei);
+        $lei->{opt}->{flags} //= 1;
+        my $fmt = $lei->{opt}->{'format'};
+        my $self = $lei->{imp} = bless {}, $cls;
+        return $lei->fail('--format unspecified') if !$fmt;
+        $self->{0} = $lei->{0} if $lei->{opt}->{stdin};
+        my $ops = {
+                '!' => [ $lei->can('fail_handler'), $lei ],
+                'x_it' => [ $lei->can('x_it'), $lei ],
+                'child_error' => [ $lei->can('child_error'), $lei ],
+                '' => [ \&import_done, $lei ],
+        };
+        ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+        my $j = $lei->{opt}->{jobs} // scalar(@argv) || 1;
+        my $nproc = $self->detect_nproc;
+        $j = $nproc if $j > $nproc;
+        $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
+        my $op = delete $lei->{pkt_op_c};
+        delete $lei->{pkt_op_p};
+        $self->wq_do('import_stdin', []) if $self->{0};
+        for my $x (@argv) {
+                $self->wq_do('import_path_url', [], $x);
+        }
+        $self->wq_close(1);
+        $lei->event_step_init; # wait for shutdowns
+        if ($lei->{oneshot}) {
+                while ($op->{sock}) { $op->event_step }
+        }
+}
+
+sub ipc_atfork_child {
+        my ($self) = @_;
+        $self->{lei}->lei_atfork_child;
+        $self->SUPER::ipc_atfork_child;
+}
+
+sub _import_fh {
+        my ($lei, $fh, $x) = @_;
+        my $set_kw = $lei->{opt}->{flags};
+        my $fmt = $lei->{opt}->{'format'};
+        eval {
+                if ($fmt eq 'eml') {
+                        my $buf = do { local $/; <$fh> } //
+                                return $lei->child_error(1 >> 8, <<"");
+                error reading $x: $!
+
+                        my $eml = PublicInbox::Eml->new(\$buf);
+                        _import_eml($eml, $lei->{sto}, $set_kw);
+                } else { # some mbox
+                        my $cb = PublicInbox::MboxReader->can($fmt);
+                        $cb or return $lei->child_error(1 >> 8, <<"");
+        --format $fmt unsupported for $x
+
+                        $cb->(undef, $fh, \&_import_eml, $lei->{sto}, $set_kw);
+                }
+        };
+        $lei->child_error(1 >> 8, "<stdin>: $@") if $@;
+}
+
+sub import_path_url {
+        my ($self, $x) = @_;
+        my $lei = $self->{lei};
+        # TODO auto-detect?
+        if (-f $x) {
+                open my $fh, '<', $x or return $lei->child_error(1 >> 8, <<"");
+unable to open $x: $!
+
+                _import_fh($lei, $fh, $x);
+        } else {
+                $lei->fail("$x unsupported (TODO)");
+        }
+}
+
+sub import_stdin {
+        my ($self) = @_;
+        _import_fh($self->{lei}, $self->{0}, '<stdin>');
+}
+
+1;
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index a7d7d953..3a215973 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -17,6 +17,7 @@ use PublicInbox::V2Writable;
 use PublicInbox::ContentHash qw(content_hash content_digest);
 use PublicInbox::MID qw(mids mids_in);
 use PublicInbox::LeiSearch;
+use PublicInbox::MDA;
 use List::Util qw(max);
 
 sub new {
@@ -237,4 +238,21 @@ sub done {
         die $err if $err;
 }
 
+sub ipc_atfork_child {
+        my ($self) = @_;
+        my $lei = delete $self->{lei};
+        $lei->lei_atfork_child(1) if $lei;
+        $self->SUPER::ipc_atfork_child;
+}
+
+sub write_prepare {
+        my ($self, $lei) = @_;
+        $self->ipc_lock_init;
+        # Mail we import into lei are private, so headers filtered out
+        # by -mda for public mail are not appropriate
+        local @PublicInbox::MDA::BAD_HEADERS = ();
+        $self->ipc_worker_spawn('lei_store', $lei->oldset, { lei => $lei });
+        $lei->{sto} = $self;
+}
+
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index daf42098..f8068362 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -392,25 +392,11 @@ sub query_prepare { # called by wq_do
         pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
-sub fail_handler ($;$$) {
-        my ($lei, $code, $io) = @_;
-        for my $f (qw(lxs l2m)) {
-                my $wq = delete $lei->{$f} or next;
-                $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
-        }
-        close($io) if $io; # needed to avoid warnings on SIGPIPE
-        $lei->x_it($code // (1 >> 8));
-}
-
-sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
-        fail_handler($_[0], 13, delete $_[0]->{1});
-}
-
 sub do_query {
         my ($self, $lei) = @_;
         my $ops = {
-                '|' => [ \&sigpipe_handler, $lei ],
-                '!' => [ \&fail_handler, $lei ],
+                '|' => [ $lei->can('sigpipe_handler'), $lei ],
+                '!' => [ $lei->can('fail_handler'), $lei ],
                 '.' => [ \&do_post_augment, $lei ],
                 '' => [ \&query_done, $lei ],
                 'mset_progress' => [ \&mset_progress, $lei ],
diff --git a/t/lei.t b/t/lei.t
index a08a6d0d..eb824a30 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -389,6 +389,20 @@ SKIP: {
 }; # /SKIP
 };
 
+my $test_import = sub {
+        $cleanup->();
+        ok($lei->(qw(q s:boolean)), 'search miss before import');
+        unlike($out, qr/boolean/i, 'no results, yet');
+        open my $fh, '<', 't/data/0001.patch' or BAIL_OUT $!;
+        ok($lei->([qw(import -f eml -)], undef, { %$opt, 0 => $fh }),
+                'import single file from stdin');
+        close $fh;
+        ok($lei->(qw(q s:boolean)), 'search hit after import');
+        ok($lei->(qw(import -f eml), 't/data/message_embed.eml'),
+                'import single file by path');
+        $cleanup->();
+};
+
 my $test_lei_common = sub {
         $test_help->();
         $test_config->();
@@ -396,6 +410,7 @@ my $test_lei_common = sub {
         $test_external->();
         $test_completion->();
         $test_fail->();
+        $test_import->();
 };
 
 if ($ENV{TEST_LEI_ONESHOT}) {