about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-02-18 23:22:24 +0300
committerEric Wong <e@80x24.org>2021-02-18 20:02:20 -0400
commitd972b6711b01500181f809ef54a08aa29cb0d4a4 (patch)
tree485a06a9bf3eef48fe1ab24f7ba6c1cac78f9df5 /lib
parent46eac797d44b068a5e144ecc8269e9dbe878f495 (diff)
downloadpublic-inbox-d972b6711b01500181f809ef54a08aa29cb0d4a4.tar.gz
The backends for "lei add-external --mirror", "lei convert", and
"lei import" all share a similar pattern for spawning background
workers.  Hoist out the common parts to slim down our code base
a bit.

The LeiXSearch and LeiToMail workers for "lei q" remains a the
odd duck due to the deep pipelining and parallelization.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/LEI.pm19
-rw-r--r--lib/PublicInbox/LeiAuth.pm17
-rw-r--r--lib/PublicInbox/LeiConvert.pm22
-rw-r--r--lib/PublicInbox/LeiImport.pm19
-rw-r--r--lib/PublicInbox/LeiMirror.pm19
5 files changed, 35 insertions, 61 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1e4c36d0..0b4bc20e 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -468,6 +468,25 @@ sub lei_atfork_child {
         $current_lei = $persist ? undef : $self; # for SIG{__WARN__}
 }
 
+sub workers_start {
+        my ($lei, $wq, $ident, $jobs, $ops) = @_;
+        $ops = {
+                '!' => [ $lei->can('fail_handler'), $lei ],
+                '|' => [ $lei->can('sigpipe_handler'), $lei ],
+                'x_it' => [ $lei->can('x_it'), $lei ],
+                'child_error' => [ $lei->can('child_error'), $lei ],
+                %$ops
+        };
+        require PublicInbox::PktOp;
+        ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+        $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+        delete $lei->{pkt_op_p};
+        my $op = delete $lei->{pkt_op_c};
+        $lei->event_step_init;
+        # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
+        $lei->{oneshot} ? $op : undef;
+}
+
 sub _help {
         require PublicInbox::LeiHelp;
         PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index 6593ba51..7acb9900 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -43,24 +43,13 @@ sub auth_eof {
 sub auth_start {
         my ($self, $lei, $post_auth_cb, @args) = @_;
         $lei->_lei_cfg(1); # workers may need to read config
-        my $ops = {
-                '!' => [ $lei->can('fail_handler'), $lei ],
-                '|' => [ $lei->can('sigpipe_handler'), $lei ],
-                'x_it' => [ $lei->can('x_it'), $lei ],
-                'child_error' => [ $lei->can('child_error'), $lei ],
+        my $op = $lei->workers_start($self, 'auth', 1, {
                 'nrd_merge' => [ \&nrd_merge, $lei ],
                 '' => [ \&auth_eof, $lei, $post_auth_cb, @args ],
-        };
-        ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
-        $self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei});
-        my $op = delete $lei->{pkt_op_c};
-        delete $lei->{pkt_op_p};
+        });
         $self->wq_io_do('do_auth', []);
         $self->wq_close(1);
-        $lei->event_step_init; # wait for shutdowns
-        if ($lei->{oneshot}) {
-                while ($op->{sock}) { $op->event_step }
-        }
+        while ($op && $op->{sock}) { $op->event_step }
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 78fd5e17..ba375772 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -8,7 +8,6 @@ use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
 use PublicInbox::InboxWritable qw(eml_from_path);
-use PublicInbox::PktOp;
 use PublicInbox::LeiStore;
 use PublicInbox::LeiOverview;
 
@@ -59,26 +58,15 @@ sub do_convert { # via wq_do
         delete $self->{wcb}; # commit
 }
 
-sub convert_start {
+sub convert_start { # LeiAuth->auth_start callback
         my ($lei) = @_;
-        my $ops = {
-                '!' => [ $lei->can('fail_handler'), $lei ],
-                '|' => [ $lei->can('sigpipe_handler'), $lei ],
-                'x_it' => [ $lei->can('x_it'), $lei ],
-                'child_error' => [ $lei->can('child_error'), $lei ],
-                '' => [ $lei->can('dclose'), $lei ],
-        };
-        ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
         my $self = $lei->{cnv};
-        $self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei});
-        my $op = delete $lei->{pkt_op_c};
-        delete $lei->{pkt_op_p};
+        my $op = $lei->workers_start($self, 'lei_convert', 1, {
+                '' => [ $lei->can('dclose'), $lei ]
+        });
         $self->wq_io_do('do_convert', []);
         $self->wq_close(1);
-        $lei->event_step_init; # wait for shutdowns
-        if ($lei->{oneshot}) {
-                while ($op->{sock}) { $op->event_step }
-        }
+        while ($op && $op->{sock}) { $op->event_step }
 }
 
 sub call { # the main "lei convert" method
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 62a2a412..68cab12c 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -8,7 +8,6 @@ use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
 use PublicInbox::InboxWritable qw(eml_from_path);
-use PublicInbox::PktOp;
 
 sub _import_eml { # MboxReader callback
         my ($eml, $sto, $set_kw) = @_;
@@ -31,13 +30,6 @@ sub import_done { # EOF callback for main daemon
 
 sub import_start {
         my ($lei) = @_;
-        my $ops = {
-                '!' => [ $lei->can('fail_handler'), $lei ],
-                'x_it' => [ $lei->can('x_it'), $lei ],
-                'child_error' => [ $lei->can('child_error'), $lei ],
-                '' => [ \&import_done, $lei ],
-        };
-        ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
         my $self = $lei->{imp};
         my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
         if (my $nrd = $lei->{nrd}) {
@@ -46,18 +38,15 @@ sub import_start {
                 my $nproc = $self->detect_nproc;
                 $j = $nproc if $j > $nproc;
         }
-        $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
-        my $op = delete $lei->{pkt_op_c};
-        delete $lei->{pkt_op_p};
+        my $op = $lei->workers_start($self, 'lei_import', $j, {
+                '' => [ \&import_done, $lei ],
+        });
         $self->wq_io_do('import_stdin', []) if $self->{0};
         for my $input (@{$self->{inputs}}) {
                 $self->wq_io_do('import_path_url', [], $input);
         }
         $self->wq_close(1);
-        $lei->event_step_init; # wait for shutdowns
-        if ($lei->{oneshot}) {
-                while ($op->{sock}) { $op->event_step }
-        }
+        while ($op && $op->{sock}) { $op->event_step }
 }
 
 sub call { # the main "lei import" method
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index c5153148..f8ca1ee5 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -8,7 +8,6 @@ use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
 use PublicInbox::Spawn qw(popen_rd spawn);
-use PublicInbox::PktOp;
 
 sub do_finish_mirror { # dwaitpid callback
         my ($arg, $pid) = @_;
@@ -279,22 +278,12 @@ sub start {
         require PublicInbox::Inbox;
         require PublicInbox::Admin;
         require PublicInbox::InboxWritable;
-        my $ops = {
-                '!' => [ $lei->can('fail_handler'), $lei ],
-                'x_it' => [ $lei->can('x_it'), $lei ],
-                'child_error' => [ $lei->can('child_error'), $lei ],
-                '' => [ \&mirror_done, $lei ],
-        };
-        ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
-        $self->wq_workers_start('lei_mirror', 1, $lei->oldset, {lei => $lei});
-        my $op = delete $lei->{pkt_op_c};
-        delete $lei->{pkt_op_p};
+        my $op = $lei->workers_start($self, 'lei_mirror', 1, {
+                '' => [ \&mirror_done, $lei ]
+        });
         $self->wq_io_do('do_mirror', []);
         $self->wq_close(1);
-        $lei->event_step_init; # wait for shutdowns
-        if ($lei->{oneshot}) {
-                while ($op->{sock}) { $op->event_step }
-        }
+        while ($op && $op->{sock}) { $op->event_step }
 }
 
 sub ipc_atfork_child {