* [PATCHv3 3/4] lei: consolidate the bulk of the IPC code
2021-02-18 20:22 6% ` [PATCHv3 0/4] lei convert IMAP support Eric Wong
@ 2021-02-18 20:22 7% ` Eric Wong
1 sibling, 0 replies; 6+ results
From: Eric Wong @ 2021-02-18 20:22 UTC (permalink / raw)
To: meta
The backends for "lei add-external --mirror", "lei convert", and
"lei import" all share a similar pattern for spawning background
workers. Hoist out the common parts to slim down our code base
a bit.
The LeiXSearch and LeiToMail workers for "lei q" remains a the
odd duck due to the deep pipelining and parallelization.
---
lib/PublicInbox/LEI.pm | 19 +++++++++++++++++++
lib/PublicInbox/LeiAuth.pm | 17 +++--------------
lib/PublicInbox/LeiConvert.pm | 22 +++++-----------------
lib/PublicInbox/LeiImport.pm | 19 ++++---------------
lib/PublicInbox/LeiMirror.pm | 19 ++++---------------
5 files changed, 35 insertions(+), 61 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1e4c36d0..0b4bc20e 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -468,6 +468,25 @@ sub lei_atfork_child {
$current_lei = $persist ? undef : $self; # for SIG{__WARN__}
}
+sub workers_start {
+ my ($lei, $wq, $ident, $jobs, $ops) = @_;
+ $ops = {
+ '!' => [ $lei->can('fail_handler'), $lei ],
+ '|' => [ $lei->can('sigpipe_handler'), $lei ],
+ 'x_it' => [ $lei->can('x_it'), $lei ],
+ 'child_error' => [ $lei->can('child_error'), $lei ],
+ %$ops
+ };
+ require PublicInbox::PktOp;
+ ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+ $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+ delete $lei->{pkt_op_p};
+ my $op = delete $lei->{pkt_op_c};
+ $lei->event_step_init;
+ # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
+ $lei->{oneshot} ? $op : undef;
+}
+
sub _help {
require PublicInbox::LeiHelp;
PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index 6593ba51..7acb9900 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -43,24 +43,13 @@ sub auth_eof {
sub auth_start {
my ($self, $lei, $post_auth_cb, @args) = @_;
$lei->_lei_cfg(1); # workers may need to read config
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- '|' => [ $lei->can('sigpipe_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
+ my $op = $lei->workers_start($self, 'auth', 1, {
'nrd_merge' => [ \&nrd_merge, $lei ],
'' => [ \&auth_eof, $lei, $post_auth_cb, @args ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
- $self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
+ });
$self->wq_io_do('do_auth', []);
$self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ while ($op && $op->{sock}) { $op->event_step }
}
sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 78fd5e17..ba375772 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -8,7 +8,6 @@ use v5.10.1;
use parent qw(PublicInbox::IPC);
use PublicInbox::Eml;
use PublicInbox::InboxWritable qw(eml_from_path);
-use PublicInbox::PktOp;
use PublicInbox::LeiStore;
use PublicInbox::LeiOverview;
@@ -59,26 +58,15 @@ sub do_convert { # via wq_do
delete $self->{wcb}; # commit
}
-sub convert_start {
+sub convert_start { # LeiAuth->auth_start callback
my ($lei) = @_;
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- '|' => [ $lei->can('sigpipe_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
- '' => [ $lei->can('dclose'), $lei ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
my $self = $lei->{cnv};
- $self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
+ my $op = $lei->workers_start($self, 'lei_convert', 1, {
+ '' => [ $lei->can('dclose'), $lei ]
+ });
$self->wq_io_do('do_convert', []);
$self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ while ($op && $op->{sock}) { $op->event_step }
}
sub call { # the main "lei convert" method
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 62a2a412..68cab12c 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -8,7 +8,6 @@ use v5.10.1;
use parent qw(PublicInbox::IPC);
use PublicInbox::Eml;
use PublicInbox::InboxWritable qw(eml_from_path);
-use PublicInbox::PktOp;
sub _import_eml { # MboxReader callback
my ($eml, $sto, $set_kw) = @_;
@@ -31,13 +30,6 @@ sub import_done { # EOF callback for main daemon
sub import_start {
my ($lei) = @_;
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
- '' => [ \&import_done, $lei ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
my $self = $lei->{imp};
my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
if (my $nrd = $lei->{nrd}) {
@@ -46,18 +38,15 @@ sub import_start {
my $nproc = $self->detect_nproc;
$j = $nproc if $j > $nproc;
}
- $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
+ my $op = $lei->workers_start($self, 'lei_import', $j, {
+ '' => [ \&import_done, $lei ],
+ });
$self->wq_io_do('import_stdin', []) if $self->{0};
for my $input (@{$self->{inputs}}) {
$self->wq_io_do('import_path_url', [], $input);
}
$self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ while ($op && $op->{sock}) { $op->event_step }
}
sub call { # the main "lei import" method
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index c5153148..f8ca1ee5 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -8,7 +8,6 @@ use v5.10.1;
use parent qw(PublicInbox::IPC);
use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
use PublicInbox::Spawn qw(popen_rd spawn);
-use PublicInbox::PktOp;
sub do_finish_mirror { # dwaitpid callback
my ($arg, $pid) = @_;
@@ -279,22 +278,12 @@ sub start {
require PublicInbox::Inbox;
require PublicInbox::Admin;
require PublicInbox::InboxWritable;
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
- '' => [ \&mirror_done, $lei ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
- $self->wq_workers_start('lei_mirror', 1, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
+ my $op = $lei->workers_start($self, 'lei_mirror', 1, {
+ '' => [ \&mirror_done, $lei ]
+ });
$self->wq_io_do('do_mirror', []);
$self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ while ($op && $op->{sock}) { $op->event_step }
}
sub ipc_atfork_child {
^ permalink raw reply related [relevance 7%]
* [PATCHv3 0/4] lei convert IMAP support
@ 2021-02-18 20:22 6% ` Eric Wong
2021-02-18 20:22 7% ` [PATCHv3 3/4] lei: consolidate the bulk of the IPC code Eric Wong
1 sibling, 0 replies; 6+ results
From: Eric Wong @ 2021-02-18 20:22 UTC (permalink / raw)
To: meta
Fixed to setup ->_lei_cfg at LeiAuth->auth_start in PATCH 1/4
instead of 4/4. This fixes failures on my FreeBSD 11.x VM
where 1/4 alone was failing (I never caught this on Debian 10.x).
Eric Wong (4):
lei convert: mail format conversion sub-command
lei import: add IMAP and (maildir|mbox*):$PATHNAME support
lei: consolidate the bulk of the IPC code
lei: check for IMAP auth errors
MANIFEST | 6 ++
lib/PublicInbox/GitCredential.pm | 18 ++--
lib/PublicInbox/LEI.pm | 57 +++++++++--
lib/PublicInbox/LeiAuth.pm | 70 +++++++++++++
lib/PublicInbox/LeiConvert.pm | 148 +++++++++++++++++++++++++++
lib/PublicInbox/LeiDedupe.pm | 2 +-
lib/PublicInbox/LeiImport.pm | 148 +++++++++++++++++----------
lib/PublicInbox/LeiMirror.pm | 19 +---
lib/PublicInbox/LeiOverview.pm | 7 +-
lib/PublicInbox/LeiToMail.pm | 5 +-
lib/PublicInbox/MdirReader.pm | 26 +++++
lib/PublicInbox/NetReader.pm | 166 ++++++++++++++++++++++++++++---
lib/PublicInbox/TestCommon.pm | 11 +-
t/lei-convert.t | 71 +++++++++++++
t/lei-import-imap.t | 28 ++++++
t/lei-import-maildir.t | 4 +-
t/lei_to_mail.t | 10 ++
t/net_reader-imap.t | 40 ++++++++
xt/lei-auth-fail.t | 20 ++++
19 files changed, 747 insertions(+), 109 deletions(-)
create mode 100644 lib/PublicInbox/LeiAuth.pm
create mode 100644 lib/PublicInbox/LeiConvert.pm
create mode 100644 t/lei-convert.t
create mode 100644 t/lei-import-imap.t
create mode 100644 t/net_reader-imap.t
create mode 100644 xt/lei-auth-fail.t
^ permalink raw reply [relevance 6%]
* [PATCH (resend) 3/4] lei: consolidate the bulk of the IPC code
2021-02-18 11:06 6% ` [PATCHv2 0/4] lei IMAP support take #2 Eric Wong
@ 2021-02-18 11:06 7% ` Eric Wong
0 siblings, 0 replies; 6+ results
From: Eric Wong @ 2021-02-18 11:06 UTC (permalink / raw)
To: meta
The backends for "lei add-external --mirror", "lei convert", and
"lei import" all share a similar pattern for spawning background
workers. Hoist out the common parts to slim down our code base
a bit.
The LeiXSearch and LeiToMail workers for "lei q" remains a the
odd duck due to the deep pipelining and parallelization.
---
lib/PublicInbox/LEI.pm | 19 +++++++++++++++++++
lib/PublicInbox/LeiAuth.pm | 17 +++--------------
lib/PublicInbox/LeiConvert.pm | 22 +++++-----------------
lib/PublicInbox/LeiImport.pm | 19 ++++---------------
lib/PublicInbox/LeiMirror.pm | 19 ++++---------------
5 files changed, 35 insertions(+), 61 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1e4c36d0..0b4bc20e 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -468,6 +468,25 @@ sub lei_atfork_child {
$current_lei = $persist ? undef : $self; # for SIG{__WARN__}
}
+sub workers_start {
+ my ($lei, $wq, $ident, $jobs, $ops) = @_;
+ $ops = {
+ '!' => [ $lei->can('fail_handler'), $lei ],
+ '|' => [ $lei->can('sigpipe_handler'), $lei ],
+ 'x_it' => [ $lei->can('x_it'), $lei ],
+ 'child_error' => [ $lei->can('child_error'), $lei ],
+ %$ops
+ };
+ require PublicInbox::PktOp;
+ ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+ $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+ delete $lei->{pkt_op_p};
+ my $op = delete $lei->{pkt_op_c};
+ $lei->event_step_init;
+ # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
+ $lei->{oneshot} ? $op : undef;
+}
+
sub _help {
require PublicInbox::LeiHelp;
PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index 88310874..7210af99 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -42,24 +42,13 @@ sub auth_eof {
sub auth_start {
my ($self, $lei, $post_auth_cb, @args) = @_;
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- '|' => [ $lei->can('sigpipe_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
+ my $op = $lei->workers_start($self, 'auth', 1, {
'nrd_merge' => [ \&nrd_merge, $lei ],
'' => [ \&auth_eof, $lei, $post_auth_cb, @args ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
- $self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
+ });
$self->wq_io_do('do_auth', []);
$self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ while ($op && $op->{sock}) { $op->event_step }
}
sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 78fd5e17..ba375772 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -8,7 +8,6 @@ use v5.10.1;
use parent qw(PublicInbox::IPC);
use PublicInbox::Eml;
use PublicInbox::InboxWritable qw(eml_from_path);
-use PublicInbox::PktOp;
use PublicInbox::LeiStore;
use PublicInbox::LeiOverview;
@@ -59,26 +58,15 @@ sub do_convert { # via wq_do
delete $self->{wcb}; # commit
}
-sub convert_start {
+sub convert_start { # LeiAuth->auth_start callback
my ($lei) = @_;
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- '|' => [ $lei->can('sigpipe_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
- '' => [ $lei->can('dclose'), $lei ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
my $self = $lei->{cnv};
- $self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
+ my $op = $lei->workers_start($self, 'lei_convert', 1, {
+ '' => [ $lei->can('dclose'), $lei ]
+ });
$self->wq_io_do('do_convert', []);
$self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ while ($op && $op->{sock}) { $op->event_step }
}
sub call { # the main "lei convert" method
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 62a2a412..68cab12c 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -8,7 +8,6 @@ use v5.10.1;
use parent qw(PublicInbox::IPC);
use PublicInbox::Eml;
use PublicInbox::InboxWritable qw(eml_from_path);
-use PublicInbox::PktOp;
sub _import_eml { # MboxReader callback
my ($eml, $sto, $set_kw) = @_;
@@ -31,13 +30,6 @@ sub import_done { # EOF callback for main daemon
sub import_start {
my ($lei) = @_;
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
- '' => [ \&import_done, $lei ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
my $self = $lei->{imp};
my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
if (my $nrd = $lei->{nrd}) {
@@ -46,18 +38,15 @@ sub import_start {
my $nproc = $self->detect_nproc;
$j = $nproc if $j > $nproc;
}
- $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
+ my $op = $lei->workers_start($self, 'lei_import', $j, {
+ '' => [ \&import_done, $lei ],
+ });
$self->wq_io_do('import_stdin', []) if $self->{0};
for my $input (@{$self->{inputs}}) {
$self->wq_io_do('import_path_url', [], $input);
}
$self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ while ($op && $op->{sock}) { $op->event_step }
}
sub call { # the main "lei import" method
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index c5153148..f8ca1ee5 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -8,7 +8,6 @@ use v5.10.1;
use parent qw(PublicInbox::IPC);
use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
use PublicInbox::Spawn qw(popen_rd spawn);
-use PublicInbox::PktOp;
sub do_finish_mirror { # dwaitpid callback
my ($arg, $pid) = @_;
@@ -279,22 +278,12 @@ sub start {
require PublicInbox::Inbox;
require PublicInbox::Admin;
require PublicInbox::InboxWritable;
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
- '' => [ \&mirror_done, $lei ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
- $self->wq_workers_start('lei_mirror', 1, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
+ my $op = $lei->workers_start($self, 'lei_mirror', 1, {
+ '' => [ \&mirror_done, $lei ]
+ });
$self->wq_io_do('do_mirror', []);
$self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ while ($op && $op->{sock}) { $op->event_step }
}
sub ipc_atfork_child {
^ permalink raw reply related [relevance 7%]
* [PATCHv2 0/4] lei IMAP support take #2
@ 2021-02-18 11:06 6% ` Eric Wong
2021-02-18 11:06 7% ` [PATCH (resend) 3/4] lei: consolidate the bulk of the IPC code Eric Wong
0 siblings, 1 reply; 6+ results
From: Eric Wong @ 2021-02-18 11:06 UTC (permalink / raw)
To: meta
The original t/lei-convert.t was bonkers and now fixed in 1/4
Minor changes for everything except 3/4 which AFAIK has no
changes.
Eric Wong (4):
lei convert: mail format conversion sub-command
lei import: add IMAP and (maildir|mbox*):$PATHNAME support
lei: consolidate the bulk of the IPC code
lei: check for IMAP auth errors
MANIFEST | 6 ++
lib/PublicInbox/GitCredential.pm | 18 ++--
lib/PublicInbox/LEI.pm | 57 +++++++++--
lib/PublicInbox/LeiAuth.pm | 70 +++++++++++++
lib/PublicInbox/LeiConvert.pm | 148 +++++++++++++++++++++++++++
lib/PublicInbox/LeiDedupe.pm | 2 +-
lib/PublicInbox/LeiImport.pm | 148 +++++++++++++++++----------
lib/PublicInbox/LeiMirror.pm | 19 +---
lib/PublicInbox/LeiOverview.pm | 7 +-
lib/PublicInbox/LeiToMail.pm | 5 +-
lib/PublicInbox/MdirReader.pm | 26 +++++
lib/PublicInbox/NetReader.pm | 166 ++++++++++++++++++++++++++++---
lib/PublicInbox/TestCommon.pm | 11 +-
t/lei-convert.t | 71 +++++++++++++
t/lei-import-imap.t | 28 ++++++
t/lei-import-maildir.t | 4 +-
t/lei_to_mail.t | 10 ++
t/net_reader-imap.t | 40 ++++++++
xt/lei-auth-fail.t | 20 ++++
19 files changed, 747 insertions(+), 109 deletions(-)
create mode 100644 lib/PublicInbox/LeiAuth.pm
create mode 100644 lib/PublicInbox/LeiConvert.pm
create mode 100644 t/lei-convert.t
create mode 100644 t/lei-import-imap.t
create mode 100644 t/net_reader-imap.t
create mode 100644 xt/lei-auth-fail.t
^ permalink raw reply [relevance 6%]
* [PATCH 10/11] lei: consolidate the bulk of the IPC code
2021-02-17 10:06 5% [PATCH 00/11] lei IMAP read support Eric Wong
@ 2021-02-17 10:07 7% ` Eric Wong
0 siblings, 0 replies; 6+ results
From: Eric Wong @ 2021-02-17 10:07 UTC (permalink / raw)
To: meta
The backends for "lei add-external --mirror", "lei convert", and
"lei import" all share a similar pattern for spawning background
workers. Hoist out the common parts to slim down our code base
a bit.
The LeiXSearch and LeiToMail workers for "lei q" remains a the
odd duck due to the deep pipelining and parallelization.
---
lib/PublicInbox/LEI.pm | 19 +++++++++++++++++++
lib/PublicInbox/LeiAuth.pm | 17 +++--------------
lib/PublicInbox/LeiConvert.pm | 22 +++++-----------------
lib/PublicInbox/LeiImport.pm | 19 ++++---------------
lib/PublicInbox/LeiMirror.pm | 19 ++++---------------
5 files changed, 35 insertions(+), 61 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1e4c36d0..0b4bc20e 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -468,6 +468,25 @@ sub lei_atfork_child {
$current_lei = $persist ? undef : $self; # for SIG{__WARN__}
}
+sub workers_start {
+ my ($lei, $wq, $ident, $jobs, $ops) = @_;
+ $ops = {
+ '!' => [ $lei->can('fail_handler'), $lei ],
+ '|' => [ $lei->can('sigpipe_handler'), $lei ],
+ 'x_it' => [ $lei->can('x_it'), $lei ],
+ 'child_error' => [ $lei->can('child_error'), $lei ],
+ %$ops
+ };
+ require PublicInbox::PktOp;
+ ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+ $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+ delete $lei->{pkt_op_p};
+ my $op = delete $lei->{pkt_op_c};
+ $lei->event_step_init;
+ # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
+ $lei->{oneshot} ? $op : undef;
+}
+
sub _help {
require PublicInbox::LeiHelp;
PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index 88310874..7210af99 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -42,24 +42,13 @@ sub auth_eof {
sub auth_start {
my ($self, $lei, $post_auth_cb, @args) = @_;
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- '|' => [ $lei->can('sigpipe_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
+ my $op = $lei->workers_start($self, 'auth', 1, {
'nrd_merge' => [ \&nrd_merge, $lei ],
'' => [ \&auth_eof, $lei, $post_auth_cb, @args ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
- $self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
+ });
$self->wq_io_do('do_auth', []);
$self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ while ($op && $op->{sock}) { $op->event_step }
}
sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 44d5131b..6dd137bc 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -8,7 +8,6 @@ use v5.10.1;
use parent qw(PublicInbox::IPC);
use PublicInbox::Eml;
use PublicInbox::InboxWritable qw(eml_from_path);
-use PublicInbox::PktOp;
use PublicInbox::LeiStore;
use PublicInbox::LeiOverview;
@@ -59,26 +58,15 @@ sub do_convert { # via wq_do
delete $self->{wcb}; # commit
}
-sub convert_start {
+sub convert_start { # LeiAuth->auth_start callback
my ($lei) = @_;
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- '|' => [ $lei->can('sigpipe_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
- '' => [ $lei->can('dclose'), $lei ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
my $self = $lei->{cnv};
- $self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
+ my $op = $lei->workers_start($self, 'lei_convert', 1, {
+ '' => [ $lei->can('dclose'), $lei ]
+ });
$self->wq_io_do('do_convert', []);
$self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ while ($op && $op->{sock}) { $op->event_step }
}
sub call { # the main "lei convert" method
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 4d225262..a0d79282 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -8,7 +8,6 @@ use v5.10.1;
use parent qw(PublicInbox::IPC);
use PublicInbox::Eml;
use PublicInbox::InboxWritable qw(eml_from_path);
-use PublicInbox::PktOp;
sub _import_eml { # MboxReader callback
my ($eml, $sto, $set_kw) = @_;
@@ -31,13 +30,6 @@ sub import_done { # EOF callback for main daemon
sub import_start {
my ($lei) = @_;
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
- '' => [ \&import_done, $lei ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
my $self = $lei->{imp};
my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
if (my $nrd = $lei->{nrd}) {
@@ -46,18 +38,15 @@ sub import_start {
my $nproc = $self->detect_nproc;
$j = $nproc if $j > $nproc;
}
- $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
+ my $op = $lei->workers_start($self, 'lei_import', $j, {
+ '' => [ \&import_done, $lei ],
+ });
$self->wq_io_do('import_stdin', []) if $self->{0};
for my $input (@{$self->{inputs}}) {
$self->wq_io_do('import_path_url', [], $input);
}
$self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ while ($op && $op->{sock}) { $op->event_step }
}
sub call { # the main "lei import" method
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index c5153148..f8ca1ee5 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -8,7 +8,6 @@ use v5.10.1;
use parent qw(PublicInbox::IPC);
use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
use PublicInbox::Spawn qw(popen_rd spawn);
-use PublicInbox::PktOp;
sub do_finish_mirror { # dwaitpid callback
my ($arg, $pid) = @_;
@@ -279,22 +278,12 @@ sub start {
require PublicInbox::Inbox;
require PublicInbox::Admin;
require PublicInbox::InboxWritable;
- my $ops = {
- '!' => [ $lei->can('fail_handler'), $lei ],
- 'x_it' => [ $lei->can('x_it'), $lei ],
- 'child_error' => [ $lei->can('child_error'), $lei ],
- '' => [ \&mirror_done, $lei ],
- };
- ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
- $self->wq_workers_start('lei_mirror', 1, $lei->oldset, {lei => $lei});
- my $op = delete $lei->{pkt_op_c};
- delete $lei->{pkt_op_p};
+ my $op = $lei->workers_start($self, 'lei_mirror', 1, {
+ '' => [ \&mirror_done, $lei ]
+ });
$self->wq_io_do('do_mirror', []);
$self->wq_close(1);
- $lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ while ($op && $op->{sock}) { $op->event_step }
}
sub ipc_atfork_child {
^ permalink raw reply related [relevance 7%]
* [PATCH 00/11] lei IMAP read support
@ 2021-02-17 10:06 5% Eric Wong
2021-02-17 10:07 7% ` [PATCH 10/11] lei: consolidate the bulk of the IPC code Eric Wong
0 siblings, 1 reply; 6+ results
From: Eric Wong @ 2021-02-17 10:06 UTC (permalink / raw)
To: meta
IMAP write support for search results is planned, but testing
could get tricky...
Still unsure about some UI bits w.r.t --format/-f:
https://public-inbox.org/meta/20210217044032.GA17934@dcvr/
convert and import should support parallel network xfers,
NNTP reads, and eventually JMAP...
convert and import don't support compressed mboxes, yet.
Eric Wong (11):
lei: bless config
watch: move imap_common_init to NetReader
watch: connect to NNTP and IMAP in config order
lei import: start rearranging code for IMAP support
lei import: move check_input_format to lei
tests: setup_public_inboxes: use IMAP-friendly newsgroups
t/lei_to_mail: remove unnecessary arg passing
lei convert: mail format conversion sub-command
lei import: add IMAP, (maildir|mbox*):$PATHNAME support
lei: consolidate the bulk of the IPC code
lei: check for IMAP auth errors
MANIFEST | 11 +-
lib/PublicInbox/GitCredential.pm | 18 ++-
lib/PublicInbox/LEI.pm | 62 +++++++-
lib/PublicInbox/LeiAuth.pm | 70 +++++++++
lib/PublicInbox/LeiConvert.pm | 137 +++++++++++++++++
lib/PublicInbox/LeiDedupe.pm | 2 +-
lib/PublicInbox/LeiImport.pm | 156 +++++++++++++-------
lib/PublicInbox/LeiMirror.pm | 19 +--
lib/PublicInbox/LeiOverview.pm | 7 +-
lib/PublicInbox/LeiToMail.pm | 5 +-
lib/PublicInbox/MdirReader.pm | 26 ++++
lib/PublicInbox/NetReader.pm | 242 +++++++++++++++++++++++++++++--
lib/PublicInbox/TestCommon.pm | 15 +-
lib/PublicInbox/Watch.pm | 82 ++---------
t/{home1 => home2}/.gitignore | 0
t/{home1 => home2}/Makefile | 0
t/{home1 => home2}/README | 0
t/lei-convert.t | 36 +++++
t/lei-import-imap.t | 28 ++++
t/lei-import-maildir.t | 4 +-
t/lei_to_mail.t | 14 +-
t/net_reader-imap.t | 40 +++++
xt/lei-auth-fail.t | 20 +++
23 files changed, 820 insertions(+), 174 deletions(-)
create mode 100644 lib/PublicInbox/LeiAuth.pm
create mode 100644 lib/PublicInbox/LeiConvert.pm
rename t/{home1 => home2}/.gitignore (100%)
rename t/{home1 => home2}/Makefile (100%)
rename t/{home1 => home2}/README (100%)
create mode 100644 t/lei-convert.t
create mode 100644 t/lei-import-imap.t
create mode 100644 t/net_reader-imap.t
create mode 100644 xt/lei-auth-fail.t
^ permalink raw reply [relevance 5%]
Results 1-6 of 6 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-02-17 10:06 5% [PATCH 00/11] lei IMAP read support Eric Wong
2021-02-17 10:07 7% ` [PATCH 10/11] lei: consolidate the bulk of the IPC code Eric Wong
2021-02-17 10:53 [PATCH 08/11] lei convert: mail format conversion sub-command Eric Wong
2021-02-18 11:06 6% ` [PATCHv2 0/4] lei IMAP support take #2 Eric Wong
2021-02-18 11:06 7% ` [PATCH (resend) 3/4] lei: consolidate the bulk of the IPC code Eric Wong
2021-02-18 11:06 [PATCHv2 1/4] lei convert: mail format conversion sub-command Eric Wong
2021-02-18 20:22 6% ` [PATCHv3 0/4] lei convert IMAP support Eric Wong
2021-02-18 20:22 7% ` [PATCHv3 3/4] lei: consolidate the bulk of the IPC code Eric Wong
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).