user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH (resend) 3/4] lei: consolidate the bulk of the IPC code
Date: Thu, 18 Feb 2021 02:06:46 -0900	[thread overview]
Message-ID: <20210218110647.9822-4-e@80x24.org> (raw)
In-Reply-To: <20210218110647.9822-1-e@80x24.org>

The backends for "lei add-external --mirror", "lei convert", and
"lei import" all share a similar pattern for spawning background
workers.  Hoist out the common parts to slim down our code base
a bit.

The LeiXSearch and LeiToMail workers for "lei q" remains a the
odd duck due to the deep pipelining and parallelization.
---
 lib/PublicInbox/LEI.pm        | 19 +++++++++++++++++++
 lib/PublicInbox/LeiAuth.pm    | 17 +++--------------
 lib/PublicInbox/LeiConvert.pm | 22 +++++-----------------
 lib/PublicInbox/LeiImport.pm  | 19 ++++---------------
 lib/PublicInbox/LeiMirror.pm  | 19 ++++---------------
 5 files changed, 35 insertions(+), 61 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1e4c36d0..0b4bc20e 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -468,6 +468,25 @@ sub lei_atfork_child {
 	$current_lei = $persist ? undef : $self; # for SIG{__WARN__}
 }
 
+sub workers_start {
+	my ($lei, $wq, $ident, $jobs, $ops) = @_;
+	$ops = {
+		'!' => [ $lei->can('fail_handler'), $lei ],
+		'|' => [ $lei->can('sigpipe_handler'), $lei ],
+		'x_it' => [ $lei->can('x_it'), $lei ],
+		'child_error' => [ $lei->can('child_error'), $lei ],
+		%$ops
+	};
+	require PublicInbox::PktOp;
+	($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+	$wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+	delete $lei->{pkt_op_p};
+	my $op = delete $lei->{pkt_op_c};
+	$lei->event_step_init;
+	# oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
+	$lei->{oneshot} ? $op : undef;
+}
+
 sub _help {
 	require PublicInbox::LeiHelp;
 	PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);
diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index 88310874..7210af99 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -42,24 +42,13 @@ sub auth_eof {
 
 sub auth_start {
 	my ($self, $lei, $post_auth_cb, @args) = @_;
-	my $ops = {
-		'!' => [ $lei->can('fail_handler'), $lei ],
-		'|' => [ $lei->can('sigpipe_handler'), $lei ],
-		'x_it' => [ $lei->can('x_it'), $lei ],
-		'child_error' => [ $lei->can('child_error'), $lei ],
+	my $op = $lei->workers_start($self, 'auth', 1, {
 		'nrd_merge' => [ \&nrd_merge, $lei ],
 		'' => [ \&auth_eof, $lei, $post_auth_cb, @args ],
-	};
-	($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
-	$self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei});
-	my $op = delete $lei->{pkt_op_c};
-	delete $lei->{pkt_op_p};
+	});
 	$self->wq_io_do('do_auth', []);
 	$self->wq_close(1);
-	$lei->event_step_init; # wait for shutdowns
-	if ($lei->{oneshot}) {
-		while ($op->{sock}) { $op->event_step }
-	}
+	while ($op && $op->{sock}) { $op->event_step }
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 78fd5e17..ba375772 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -8,7 +8,6 @@ use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
 use PublicInbox::InboxWritable qw(eml_from_path);
-use PublicInbox::PktOp;
 use PublicInbox::LeiStore;
 use PublicInbox::LeiOverview;
 
@@ -59,26 +58,15 @@ sub do_convert { # via wq_do
 	delete $self->{wcb}; # commit
 }
 
-sub convert_start {
+sub convert_start { # LeiAuth->auth_start callback
 	my ($lei) = @_;
-	my $ops = {
-		'!' => [ $lei->can('fail_handler'), $lei ],
-		'|' => [ $lei->can('sigpipe_handler'), $lei ],
-		'x_it' => [ $lei->can('x_it'), $lei ],
-		'child_error' => [ $lei->can('child_error'), $lei ],
-		'' => [ $lei->can('dclose'), $lei ],
-	};
-	($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
 	my $self = $lei->{cnv};
-	$self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei});
-	my $op = delete $lei->{pkt_op_c};
-	delete $lei->{pkt_op_p};
+	my $op = $lei->workers_start($self, 'lei_convert', 1, {
+		'' => [ $lei->can('dclose'), $lei ]
+	});
 	$self->wq_io_do('do_convert', []);
 	$self->wq_close(1);
-	$lei->event_step_init; # wait for shutdowns
-	if ($lei->{oneshot}) {
-		while ($op->{sock}) { $op->event_step }
-	}
+	while ($op && $op->{sock}) { $op->event_step }
 }
 
 sub call { # the main "lei convert" method
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 62a2a412..68cab12c 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -8,7 +8,6 @@ use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
 use PublicInbox::InboxWritable qw(eml_from_path);
-use PublicInbox::PktOp;
 
 sub _import_eml { # MboxReader callback
 	my ($eml, $sto, $set_kw) = @_;
@@ -31,13 +30,6 @@ sub import_done { # EOF callback for main daemon
 
 sub import_start {
 	my ($lei) = @_;
-	my $ops = {
-		'!' => [ $lei->can('fail_handler'), $lei ],
-		'x_it' => [ $lei->can('x_it'), $lei ],
-		'child_error' => [ $lei->can('child_error'), $lei ],
-		'' => [ \&import_done, $lei ],
-	};
-	($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
 	my $self = $lei->{imp};
 	my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
 	if (my $nrd = $lei->{nrd}) {
@@ -46,18 +38,15 @@ sub import_start {
 		my $nproc = $self->detect_nproc;
 		$j = $nproc if $j > $nproc;
 	}
-	$self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei});
-	my $op = delete $lei->{pkt_op_c};
-	delete $lei->{pkt_op_p};
+	my $op = $lei->workers_start($self, 'lei_import', $j, {
+		'' => [ \&import_done, $lei ],
+	});
 	$self->wq_io_do('import_stdin', []) if $self->{0};
 	for my $input (@{$self->{inputs}}) {
 		$self->wq_io_do('import_path_url', [], $input);
 	}
 	$self->wq_close(1);
-	$lei->event_step_init; # wait for shutdowns
-	if ($lei->{oneshot}) {
-		while ($op->{sock}) { $op->event_step }
-	}
+	while ($op && $op->{sock}) { $op->event_step }
 }
 
 sub call { # the main "lei import" method
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index c5153148..f8ca1ee5 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -8,7 +8,6 @@ use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
 use PublicInbox::Spawn qw(popen_rd spawn);
-use PublicInbox::PktOp;
 
 sub do_finish_mirror { # dwaitpid callback
 	my ($arg, $pid) = @_;
@@ -279,22 +278,12 @@ sub start {
 	require PublicInbox::Inbox;
 	require PublicInbox::Admin;
 	require PublicInbox::InboxWritable;
-	my $ops = {
-		'!' => [ $lei->can('fail_handler'), $lei ],
-		'x_it' => [ $lei->can('x_it'), $lei ],
-		'child_error' => [ $lei->can('child_error'), $lei ],
-		'' => [ \&mirror_done, $lei ],
-	};
-	($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
-	$self->wq_workers_start('lei_mirror', 1, $lei->oldset, {lei => $lei});
-	my $op = delete $lei->{pkt_op_c};
-	delete $lei->{pkt_op_p};
+	my $op = $lei->workers_start($self, 'lei_mirror', 1, {
+		'' => [ \&mirror_done, $lei ]
+	});
 	$self->wq_io_do('do_mirror', []);
 	$self->wq_close(1);
-	$lei->event_step_init; # wait for shutdowns
-	if ($lei->{oneshot}) {
-		while ($op->{sock}) { $op->event_step }
-	}
+	while ($op && $op->{sock}) { $op->event_step }
 }
 
 sub ipc_atfork_child {

  parent reply	other threads:[~2021-02-18 11:06 UTC|newest]

Thread overview: 23+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-17 10:06 [PATCH 00/11] lei IMAP read support Eric Wong
2021-02-17 10:06 ` [PATCH 01/11] lei: bless config Eric Wong
2021-02-17 10:06 ` [PATCH 02/11] watch: move imap_common_init to NetReader Eric Wong
2021-02-17 10:06 ` [PATCH 03/11] watch: connect to NNTP and IMAP in config order Eric Wong
2021-02-17 10:07 ` [PATCH 04/11] lei import: start rearranging code for IMAP support Eric Wong
2021-02-17 10:07 ` [PATCH 05/11] lei import: move check_input_format to lei Eric Wong
2021-02-17 10:07 ` [PATCH 06/11] tests: setup_public_inboxes: use IMAP-friendly newsgroups Eric Wong
2021-02-17 10:07 ` [PATCH 07/11] t/lei_to_mail: remove unnecessary arg passing Eric Wong
2021-02-17 10:07 ` [PATCH 08/11] lei convert: mail format conversion sub-command Eric Wong
2021-02-17 10:53   ` Eric Wong
2021-02-18 11:06     ` [PATCHv2 0/4] lei IMAP support take #2 Eric Wong
2021-02-18 11:06       ` [PATCHv2 1/4] lei convert: mail format conversion sub-command Eric Wong
2021-02-18 20:22         ` [PATCHv3 0/4] lei convert IMAP support Eric Wong
2021-02-18 20:22         ` [PATCHv3 1/4] lei convert: mail format conversion sub-command Eric Wong
2021-02-18 20:22         ` [PATCHv3 2/4] lei import: add IMAP and (maildir|mbox*):$PATHNAME support Eric Wong
2021-02-18 20:22         ` [PATCHv3 3/4] lei: consolidate the bulk of the IPC code Eric Wong
2021-02-18 20:22         ` [PATCHv3 4/4] lei: check for IMAP auth errors Eric Wong
2021-02-18 11:06       ` [PATCHv2 2/4] lei import: add IMAP and (maildir|mbox*):$PATHNAME support Eric Wong
2021-02-18 11:06       ` Eric Wong [this message]
2021-02-18 11:06       ` [PATCHv2 4/4] lei: check for IMAP auth errors Eric Wong
2021-02-17 10:07 ` [PATCH 09/11] lei import: add IMAP, (maildir|mbox*):$PATHNAME support Eric Wong
2021-02-17 10:07 ` [PATCH 10/11] lei: consolidate the bulk of the IPC code Eric Wong
2021-02-17 10:07 ` [PATCH 11/11] lei: check for IMAP auth errors Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210218110647.9822-4-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    --subject='Re: [PATCH (resend) 3/4] lei: consolidate the bulk of the IPC code' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Code repositories for project(s) associated with this inbox:

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).