user/dev discussion of public-inbox itself
 help / color / mirror / Atom feed
* [PATCH 00/10] lei: avoid wasting IMAP connections
@ 2021-02-22 11:21 Eric Wong
  2021-02-22 11:22 ` [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity Eric Wong
  0 siblings, 1 reply; 11+ messages in thread
From: Eric Wong @ 2021-02-22 11:21 UTC (permalink / raw)
  To: meta

This makes the code a bit less straightforward, unfortunately;
but I've tried to comment it a bit and add some flow notes.
The payoff is it saves IMAP connection setup costs which is
noticeable in high-latency and/or metered bandwidth situations.

LeiAuth is signficantly rewritten so it uses lei-daemon
to route credentials from the first worker to other workers.

Eric Wong (10):
  lei_auth: rename {nrd} field to {net} for clarity
  lei: keep client {sock} in short-lived workers
  lei: _lei_cfg: return empty hashref if unconfigured
  lei convert: auth directly from worker process
  lei import: no separate auth worker
  lei_auth: migrate common auth code from lei_import
  lei q: reduce wasted IMAP connection for auth
  net_reader: mic_get: reuse connections if cache enabled
  lei convert: inline convert_start
  lei_auth: trim and remove leftover worker code

 lib/PublicInbox/LEI.pm         |  8 ++--
 lib/PublicInbox/LeiAuth.pm     | 76 +++++++++++++---------------------
 lib/PublicInbox/LeiConvert.pm  | 36 +++++++---------
 lib/PublicInbox/LeiExternal.pm |  6 +--
 lib/PublicInbox/LeiImport.pm   | 60 ++++++++++++++++-----------
 lib/PublicInbox/LeiQuery.pm    |  9 ++--
 lib/PublicInbox/LeiToMail.pm   | 53 ++++++++++++++++--------
 lib/PublicInbox/LeiXSearch.pm  | 26 ++++++++----
 lib/PublicInbox/NetReader.pm   | 20 +++++----
 lib/PublicInbox/NetWriter.pm   |  2 +-
 10 files changed, 158 insertions(+), 138 deletions(-)


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity
  2021-02-22 11:21 [PATCH 00/10] lei: avoid wasting IMAP connections Eric Wong
@ 2021-02-22 11:22 ` Eric Wong
  2021-02-22 11:22   ` [PATCH 02/10] lei: keep client {sock} in short-lived workers Eric Wong
                     ` (8 more replies)
  0 siblings, 9 replies; 11+ messages in thread
From: Eric Wong @ 2021-02-22 11:22 UTC (permalink / raw)
  To: meta

We're authing for both reads and writes, this makes it
clear that we support both NetReader and NetWriter.
---
 lib/PublicInbox/LeiAuth.pm | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index bf0110ed..c70d8e8f 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -10,22 +10,22 @@ use parent qw(PublicInbox::IPC);
 use PublicInbox::PktOp qw(pkt_do);
 use PublicInbox::NetReader;
 
-sub nrd_merge {
-	my ($lei, $nrd_new) = @_;
+sub net_merge {
+	my ($lei, $net_new) = @_;
 	if ($lei->{pkt_op_p}) { # from lei_convert worker
-		pkt_do($lei->{pkt_op_p}, 'nrd_merge', $nrd_new);
+		pkt_do($lei->{pkt_op_p}, 'net_merge', $net_new);
 	} else { # single lei-daemon consumer
 		my $self = $lei->{auth} or return; # client disconnected
-		my $nrd = $self->{nrd};
-		%$nrd = (%$nrd, %$nrd_new);
+		my $net = $self->{net};
+		%$net = (%$net, %$net_new);
 	}
 }
 
 sub do_auth { # called via wq_io_do
 	my ($self) = @_;
-	my ($lei, $nrd) = @$self{qw(lei nrd)};
-	$nrd->imap_common_init($lei);
-	nrd_merge($lei, $nrd); # tell lei-daemon updated auth info
+	my ($lei, $net) = @$self{qw(lei net)};
+	$net->imap_common_init($lei);
+	net_merge($lei, $net); # tell lei-daemon updated auth info
 }
 
 sub do_finish_auth { # dwaitpid callback
@@ -44,7 +44,7 @@ sub auth_start {
 	my ($self, $lei, $post_auth_cb, @args) = @_;
 	$lei->_lei_cfg(1); # workers may need to read config
 	my $op = $lei->workers_start($self, 'auth', 1, {
-		'nrd_merge' => [ \&nrd_merge, $lei ],
+		'net_merge' => [ \&net_merge, $lei ],
 		'' => [ \&auth_eof, $lei, $post_auth_cb, @args ],
 	});
 	$self->wq_io_do('do_auth', []);
@@ -63,8 +63,8 @@ sub ipc_atfork_child {
 }
 
 sub new {
-	my ($cls, $nrd) = @_; # nrd may be NetReader or descendant (NetWriter)
-	bless { nrd => $nrd }, $cls;
+	my ($cls, $net) = @_; # net may be NetReader or descendant (NetWriter)
+	bless { net => $net }, $cls;
 }
 
 1;

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH 02/10] lei: keep client {sock} in short-lived workers
  2021-02-22 11:22 ` [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity Eric Wong
@ 2021-02-22 11:22   ` Eric Wong
  2021-02-22 11:22   ` [PATCH 03/10] lei: _lei_cfg: return empty hashref if unconfigured Eric Wong
                     ` (7 subsequent siblings)
  8 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2021-02-22 11:22 UTC (permalink / raw)
  To: meta

For non-persistent workers, there's no harm in keeping the
client socket open.  This means we can avoid dancing around
closing it in PublicInbox::LeiAuth::ipc_atfork_child.
Eventually, other WQ workers will trigger "git credential"
spawning in script/lei directly.
---
 lib/PublicInbox/LEI.pm     | 4 ++--
 lib/PublicInbox/LeiAuth.pm | 3 ---
 2 files changed, 2 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 8d49b212..73c9e267 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -443,7 +443,7 @@ sub lei_atfork_child {
 	my ($self, $persist) = @_;
 	# we need to explicitly close things which are on stack
 	if ($persist) {
-		my @io = delete @$self{0,1,2};
+		my @io = delete @$self{qw(0 1 2 sock)};
 		unless ($self->{oneshot}) {
 			close($_) for @io;
 		}
@@ -451,7 +451,7 @@ sub lei_atfork_child {
 		delete $self->{0};
 	}
 	delete @$self{qw(cnv)};
-	for (delete @$self{qw(3 sock old_1 au_done)}) {
+	for (delete @$self{qw(3 old_1 au_done)}) {
 		close($_) if defined($_);
 	}
 	if (my $op_c = delete $self->{pkt_op_c}) {
diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index c70d8e8f..f2cdb026 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -54,11 +54,8 @@ sub auth_start {
 
 sub ipc_atfork_child {
 	my ($self) = @_;
-	# prevent {sock} from being closed in lei_atfork_child:
-	my $s = delete $self->{lei}->{sock};
 	delete $self->{lei}->{auth}; # drop circular ref
 	$self->{lei}->lei_atfork_child;
-	$self->{lei}->{sock} = $s if $s;
 	$self->SUPER::ipc_atfork_child;
 }
 

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH 03/10] lei: _lei_cfg: return empty hashref if unconfigured
  2021-02-22 11:22 ` [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity Eric Wong
  2021-02-22 11:22   ` [PATCH 02/10] lei: keep client {sock} in short-lived workers Eric Wong
@ 2021-02-22 11:22   ` Eric Wong
  2021-02-22 11:22   ` [PATCH 04/10] lei convert: auth directly from worker process Eric Wong
                     ` (6 subsequent siblings)
  8 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2021-02-22 11:22 UTC (permalink / raw)
  To: meta

Existing callers in LeiExternal actually depend on this,
and LeiAuth shouldn't need to be creating a config file
just to do a conversion against an anonymous IMAP server.
---
 lib/PublicInbox/LEI.pm         | 2 +-
 lib/PublicInbox/LeiAuth.pm     | 1 -
 lib/PublicInbox/LeiExternal.pm | 6 +++---
 3 files changed, 4 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 73c9e267..dd34c668 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -592,7 +592,7 @@ sub _lei_cfg ($;$) {
 	if (!@st) {
 		unless ($creat) {
 			delete $self->{cfg};
-			return;
+			return bless {}, 'PublicInbox::Config';
 		}
 		my (undef, $cfg_dir, undef) = File::Spec->splitpath($f);
 		-d $cfg_dir or mkpath($cfg_dir) or die "mkpath($cfg_dir): $!\n";
diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index f2cdb026..5d321be2 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -42,7 +42,6 @@ sub auth_eof {
 
 sub auth_start {
 	my ($self, $lei, $post_auth_cb, @args) = @_;
-	$lei->_lei_cfg(1); # workers may need to read config
 	my $op = $lei->workers_start($self, 'auth', 1, {
 		'net_merge' => [ \&net_merge, $lei ],
 		'' => [ \&auth_eof, $lei, $post_auth_cb, @args ],
diff --git a/lib/PublicInbox/LeiExternal.pm b/lib/PublicInbox/LeiExternal.pm
index 6cc2e671..0cc84cca 100644
--- a/lib/PublicInbox/LeiExternal.pm
+++ b/lib/PublicInbox/LeiExternal.pm
@@ -9,7 +9,7 @@ use PublicInbox::Config;
 
 sub externals_each {
 	my ($self, $cb, @arg) = @_;
-	my $cfg = $self->_lei_cfg(0);
+	my $cfg = $self->_lei_cfg;
 	my %boost;
 	for my $sec (grep(/\Aexternal\./, @{$cfg->{-section_order}})) {
 		my $loc = substr($sec, length('external.'));
@@ -234,7 +234,7 @@ sub _complete_url_common ($) {
 # shell completion helper called by lei__complete
 sub _complete_forget_external {
 	my ($self, @argv) = @_;
-	my $cfg = $self->_lei_cfg(0);
+	my $cfg = $self->_lei_cfg;
 	my ($cur, $re) = _complete_url_common(\@argv);
 	# FIXME: bash completion off "http:" or "https:" when the last
 	# character is a colon doesn't work properly even if we're
@@ -250,7 +250,7 @@ sub _complete_forget_external {
 
 sub _complete_add_external { # for bash, this relies on "compopt -o nospace"
 	my ($self, @argv) = @_;
-	my $cfg = $self->_lei_cfg(0);
+	my $cfg = $self->_lei_cfg;
 	my ($cur, $re) = _complete_url_common(\@argv);
 	require URI;
 	map {

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH 04/10] lei convert: auth directly from worker process
  2021-02-22 11:22 ` [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity Eric Wong
  2021-02-22 11:22   ` [PATCH 02/10] lei: keep client {sock} in short-lived workers Eric Wong
  2021-02-22 11:22   ` [PATCH 03/10] lei: _lei_cfg: return empty hashref if unconfigured Eric Wong
@ 2021-02-22 11:22   ` Eric Wong
  2021-02-22 11:22   ` [PATCH 05/10] lei import: no separate auth worker Eric Wong
                     ` (5 subsequent siblings)
  8 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2021-02-22 11:22 UTC (permalink / raw)
  To: meta

Since this only has one worker, we can auth directly in the
worker since the convert worker now has access to the script/lei
{sock} for running "git credential".
---
 lib/PublicInbox/LeiConvert.pm | 20 +++++++++++---------
 lib/PublicInbox/NetReader.pm  | 16 +++++++++-------
 lib/PublicInbox/NetWriter.pm  |  2 +-
 3 files changed, 21 insertions(+), 17 deletions(-)

diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index ba375772..3a714502 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -32,6 +32,10 @@ sub do_convert { # via wq_do
 	my ($self) = @_;
 	my $lei = $self->{lei};
 	my $in_fmt = $lei->{opt}->{'in-format'};
+	my $mics;
+	if (my $nrd = $lei->{nrd}) { # may prompt user once
+		$nrd->{mics_cached} = $nrd->imap_common_init($lei);
+	}
 	if (my $stdin = delete $self->{0}) {
 		PublicInbox::MboxReader->$in_fmt($stdin, \&mbox_cb, $self);
 	}
@@ -120,16 +124,14 @@ sub call { # the main "lei convert" method
 		require PublicInbox::MdirReader;
 	}
 	$self->{inputs} = \@inputs;
-	return convert_start($lei) if !$nrd;
-
-	if (my $err = $nrd->errors) {
-		return $lei->fail($err);
+	if ($nrd) {
+		if (my $err = $nrd->errors) {
+			return $lei->fail($err);
+		}
+		$nrd->{quiet} = $opt->{quiet};
+		$lei->{nrd} = $nrd;
 	}
-	$nrd->{quiet} = $opt->{quiet};
-	$lei->{nrd} = $nrd;
-	require PublicInbox::LeiAuth;
-	my $auth = $lei->{auth} = PublicInbox::LeiAuth->new($nrd);
-	$auth->auth_start($lei, \&convert_start, $lei);
+	convert_start($lei);
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm
index 0956d5da..c29e09c1 100644
--- a/lib/PublicInbox/NetReader.pm
+++ b/lib/PublicInbox/NetReader.pm
@@ -423,14 +423,16 @@ sub _imap_fetch_all ($$$) {
 
 # uses cached auth info prepared by mic_for
 sub mic_get {
-	my ($self, $sec) = @_;
-	my $mic_arg = $self->{mic_arg}->{$sec};
-	unless ($mic_arg) {
-		my $uri = ref $sec ? $sec : PublicInbox::URIimap->new($sec);
-		$sec = uri_section($uri);
-		$mic_arg = $self->{mic_arg}->{$sec} or
-			die "BUG: no Mail::IMAPClient->new arg for $sec";
+	my ($self, $uri) = @_;
+	my $sec = uri_section($uri);
+	# see if caller saved result of imap_common_init
+	if (my $cached = $self->{mics_cached}) {
+		my $mic = $cached->{$sec};
+		return $mic if $mic && $mic->IsConnected;
+		delete $cached->{$sec};
 	}
+	my $mic_arg = $self->{mic_arg}->{$sec} or
+			die "BUG: no Mail::IMAPClient->new arg for $sec";
 	if (defined(my $cb_name = $mic_arg->{Authcallback})) {
 		if (ref($cb_name) ne 'CODE') {
 			$mic_arg->{Authcallback} = $self->can($cb_name);
diff --git a/lib/PublicInbox/NetWriter.pm b/lib/PublicInbox/NetWriter.pm
index 89f8662e..c68b0669 100644
--- a/lib/PublicInbox/NetWriter.pm
+++ b/lib/PublicInbox/NetWriter.pm
@@ -28,7 +28,7 @@ sub imap_delete_all {
 	my $uri = PublicInbox::URIimap->new($url);
 	my $sec = $self->can('uri_section')->($uri);
 	local $0 = $uri->mailbox." $sec";
-	my $mic = $self->mic_get($sec) or die "E: not connected: $@";
+	my $mic = $self->mic_get($uri) or die "E: not connected: $@";
 	$mic->select($uri->mailbox) or return; # non-existent
 	if ($mic->delete_message('1:*')) {
 		$mic->expunge;

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH 05/10] lei import: no separate auth worker
  2021-02-22 11:22 ` [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity Eric Wong
                     ` (2 preceding siblings ...)
  2021-02-22 11:22   ` [PATCH 04/10] lei convert: auth directly from worker process Eric Wong
@ 2021-02-22 11:22   ` Eric Wong
  2021-02-22 11:22   ` [PATCH 06/10] lei_auth: migrate common auth code from lei_import Eric Wong
                     ` (4 subsequent siblings)
  8 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2021-02-22 11:22 UTC (permalink / raw)
  To: meta

We'll start sharing auth info from the first worker to the
rest of the workers via wq_broadcast.

This lays the groundwork for getting rid of LeiAuth workers for
authentication work and reducing network round trips required
for IMAP.
---
 lib/PublicInbox/LeiImport.pm | 87 ++++++++++++++++++++++++++----------
 1 file changed, 63 insertions(+), 24 deletions(-)

diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 68cab12c..5e2e61af 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -8,6 +8,7 @@ use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
 use PublicInbox::InboxWritable qw(eml_from_path);
+use PublicInbox::PktOp qw(pkt_do);
 
 sub _import_eml { # MboxReader callback
 	my ($eml, $sto, $set_kw) = @_;
@@ -28,24 +29,54 @@ sub import_done { # EOF callback for main daemon
 	$imp->wq_wait_old(\&import_done_wait, $lei);
 }
 
+sub net_merge_all { # via wq_broadcast
+	my ($self, $net_new) = @_;
+	my $net = $self->{lei}->{net};
+	%$net = (%$net, %$net_new);
+	pkt_do($self->{lei}->{pkt_op_p}, 'net_merge_done1') or
+		die "pkt_op_do net_merge_done1: $!";
+}
+
+sub net_merge_continue { # first worker is done with auth
+	my ($self, $net_new) = @_;
+	$self->wq_broadcast('net_merge_all', $net_new);
+}
+
+sub net_merge_complete {
+	my ($self) = @_;
+	for my $input (@{$self->{inputs}}) {
+		$self->wq_io_do('import_path_url', [], $input);
+	}
+	$self->wq_close(1);
+}
+
+sub net_merge_done1 {
+	my ($self) = @_;
+	my $lei = $self->{lei};
+	return if ++$lei->{nr_net_merge_done} != $self->{-wq_nr_workers};
+	net_merge_complete($self);
+}
+
 sub import_start {
 	my ($lei) = @_;
 	my $self = $lei->{imp};
 	my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
-	if (my $nrd = $lei->{nrd}) {
-		# $j = $nrd->net_concurrency($j); TODO
+	if (my $net = $lei->{net}) {
+		# $j = $net->net_concurrency($j); TODO
 	} else {
 		my $nproc = $self->detect_nproc;
 		$j = $nproc if $j > $nproc;
 	}
-	my $op = $lei->workers_start($self, 'lei_import', $j, {
-		'' => [ \&import_done, $lei ],
-	});
-	$self->wq_io_do('import_stdin', []) if $self->{0};
-	for my $input (@{$self->{inputs}}) {
-		$self->wq_io_do('import_path_url', [], $input);
+	my $ops = { '' => [ \&import_done, $lei ] };
+	my $auth = $lei->{auth};
+	if ($auth) {
+		$ops->{net_merge} = [ \&net_merge_continue, $self ];
+		$ops->{net_merge_done1} = [ \&net_merge_done1, $self ];
 	}
-	$self->wq_close(1);
+	$self->{-wq_nr_workers} = $j // 1; # locked
+	my $op = $lei->workers_start($self, 'lei_import', undef, $ops);
+	$self->wq_io_do('import_stdin', []) if $self->{0};
+	net_merge_complete($self) if !$auth;
 	while ($op && $op->{sock}) { $op->event_step }
 }
 
@@ -53,7 +84,7 @@ sub call { # the main "lei import" method
 	my ($cls, $lei, @inputs) = @_;
 	my $sto = $lei->_lei_store(1);
 	$sto->write_prepare($lei);
-	my ($nrd, @f, @d);
+	my ($net, @f, @d);
 	$lei->{opt}->{kw} //= 1;
 	my $self = $lei->{imp} = bless { inputs => \@inputs }, $cls;
 	if ($lei->{opt}->{stdin}) {
@@ -69,8 +100,8 @@ sub call { # the main "lei import" method
 		my $input_path = $input;
 		if ($input =~ m!\A(?:imap|nntp)s?://!i) {
 			require PublicInbox::NetReader;
-			$nrd //= PublicInbox::NetReader->new;
-			$nrd->add_url($input);
+			$net //= PublicInbox::NetReader->new;
+			$net->add_url($input);
 		} elsif ($input_path =~ s/\A([a-z0-9]+)://is) {
 			my $ifmt = lc $1;
 			if (($fmt // $ifmt) ne $ifmt) {
@@ -98,23 +129,31 @@ sub call { # the main "lei import" method
 		require PublicInbox::MdirReader;
 	}
 	$self->{inputs} = \@inputs;
-	return import_start($lei) if !$nrd;
-
-	if (my $err = $nrd->errors) {
-		return $lei->fail($err);
+	if ($net) {
+		if (my $err = $net->errors) {
+			return $lei->fail($err);
+		}
+		$net->{quiet} = $lei->{opt}->{quiet};
+		$lei->{net} = $net;
+		require PublicInbox::LeiAuth;
+		$lei->{auth} = PublicInbox::LeiAuth->new($net);
 	}
-	$nrd->{quiet} = $lei->{opt}->{quiet};
-	$lei->{nrd} = $nrd;
-	require PublicInbox::LeiAuth;
-	my $auth = $lei->{auth} = PublicInbox::LeiAuth->new($nrd);
-	$auth->auth_start($lei, \&import_start, $lei);
+	import_start($lei);
 }
 
 sub ipc_atfork_child {
 	my ($self) = @_;
-	delete $self->{lei}->{imp}; # drop circular ref
-	$self->{lei}->lei_atfork_child;
+	my $lei = $self->{lei};
+	delete $lei->{imp}; # drop circular ref
+	$lei->lei_atfork_child;
 	$self->SUPER::ipc_atfork_child;
+	my $net = $lei->{net};
+	if ($net && $self->{-wq_worker_nr} == 0) {
+		my $mics = $net->imap_common_init($lei);
+		PublicInbox::LeiAuth::net_merge($lei, $net);
+		$net->{mics_cached} = $mics;
+	}
+	undef;
 }
 
 sub _import_fh {
@@ -154,7 +193,7 @@ sub import_path_url {
 	my $ifmt = lc($lei->{opt}->{'format'} // '');
 	# TODO auto-detect?
 	if ($input =~ m!\A(imap|nntp)s?://!i) {
-		$lei->{nrd}->imap_each($input, \&_import_imap, $lei->{sto},
+		$lei->{net}->imap_each($input, \&_import_imap, $lei->{sto},
 					$lei->{opt}->{kw});
 		return;
 	} elsif ($input =~ s!\A([a-z0-9]+):!!i) {

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH 06/10] lei_auth: migrate common auth code from lei_import
  2021-02-22 11:22 ` [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity Eric Wong
                     ` (3 preceding siblings ...)
  2021-02-22 11:22   ` [PATCH 05/10] lei import: no separate auth worker Eric Wong
@ 2021-02-22 11:22   ` Eric Wong
  2021-02-22 11:22   ` [PATCH 07/10] lei q: reduce wasted IMAP connection for auth Eric Wong
                     ` (3 subsequent siblings)
  8 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2021-02-22 11:22 UTC (permalink / raw)
  To: meta

lei_to_mail will be able to use this, too.
---
 lib/PublicInbox/LeiAuth.pm   | 37 +++++++++++++++++++++++++++++++++-
 lib/PublicInbox/LeiImport.pm | 39 ++++++------------------------------
 2 files changed, 42 insertions(+), 34 deletions(-)

diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index 5d321be2..d329eadb 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -8,7 +8,6 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::PktOp qw(pkt_do);
-use PublicInbox::NetReader;
 
 sub net_merge {
 	my ($lei, $net_new) = @_;
@@ -28,6 +27,42 @@ sub do_auth { # called via wq_io_do
 	net_merge($lei, $net); # tell lei-daemon updated auth info
 }
 
+sub do_auth_atfork { # used by IPC WQ workers
+	my ($self, $wq) = @_;
+	return if $wq->{-wq_worker_nr} != 0;
+	my $lei = $wq->{lei};
+	my $net = $self->{net};
+	my $mics = $net->imap_common_init($lei);
+	net_merge($lei, $net);
+	$net->{mics_cached} = $mics;
+}
+
+sub net_merge_done1 { # bump merge-count in top-level lei-daemon
+	my ($wq) = @_;
+	return if ++$wq->{nr_net_merge_done} != $wq->{-wq_nr_workers};
+	$wq->net_merge_complete; # defined per wq-class (e.g. LeiImport)
+}
+
+sub net_merge_all { # called via wq_broadcast
+	my ($wq, $net_new) = @_;
+	my $net = $wq->{lei}->{net};
+	%$net = (%$net, %$net_new);
+	pkt_do($wq->{lei}->{pkt_op_p}, 'net_merge_done1') or
+		die "pkt_op_do net_merge_done1: $!";
+}
+
+# called by top-level lei-daemon when first worker is done with auth
+sub net_merge_continue {
+	my ($wq, $net_new) = @_;
+	$wq->wq_broadcast('net_merge_all', $net_new);
+}
+
+sub op_merge { # prepares PktOp->pair ops
+	my ($self, $ops, $wq) = @_;
+	$ops->{net_merge} = [ \&net_merge_continue, $wq ];
+	$ops->{net_merge_done1} = [ \&net_merge_done1, $wq ];
+}
+
 sub do_finish_auth { # dwaitpid callback
 	my ($arg, $pid) = @_;
 	my ($self, $lei, $post_auth_cb, @args) = @$arg;
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 5e2e61af..bc37c628 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -29,20 +29,7 @@ sub import_done { # EOF callback for main daemon
 	$imp->wq_wait_old(\&import_done_wait, $lei);
 }
 
-sub net_merge_all { # via wq_broadcast
-	my ($self, $net_new) = @_;
-	my $net = $self->{lei}->{net};
-	%$net = (%$net, %$net_new);
-	pkt_do($self->{lei}->{pkt_op_p}, 'net_merge_done1') or
-		die "pkt_op_do net_merge_done1: $!";
-}
-
-sub net_merge_continue { # first worker is done with auth
-	my ($self, $net_new) = @_;
-	$self->wq_broadcast('net_merge_all', $net_new);
-}
-
-sub net_merge_complete {
+sub net_merge_complete { # callback used by LeiAuth
 	my ($self) = @_;
 	for my $input (@{$self->{inputs}}) {
 		$self->wq_io_do('import_path_url', [], $input);
@@ -50,13 +37,6 @@ sub net_merge_complete {
 	$self->wq_close(1);
 }
 
-sub net_merge_done1 {
-	my ($self) = @_;
-	my $lei = $self->{lei};
-	return if ++$lei->{nr_net_merge_done} != $self->{-wq_nr_workers};
-	net_merge_complete($self);
-}
-
 sub import_start {
 	my ($lei) = @_;
 	my $self = $lei->{imp};
@@ -68,15 +48,11 @@ sub import_start {
 		$j = $nproc if $j > $nproc;
 	}
 	my $ops = { '' => [ \&import_done, $lei ] };
-	my $auth = $lei->{auth};
-	if ($auth) {
-		$ops->{net_merge} = [ \&net_merge_continue, $self ];
-		$ops->{net_merge_done1} = [ \&net_merge_done1, $self ];
-	}
+	$lei->{auth}->op_merge($ops, $self) if $lei->{auth};
 	$self->{-wq_nr_workers} = $j // 1; # locked
 	my $op = $lei->workers_start($self, 'lei_import', undef, $ops);
 	$self->wq_io_do('import_stdin', []) if $self->{0};
-	net_merge_complete($self) if !$auth;
+	net_merge_complete($self) unless $lei->{auth};
 	while ($op && $op->{sock}) { $op->event_step }
 }
 
@@ -147,12 +123,7 @@ sub ipc_atfork_child {
 	delete $lei->{imp}; # drop circular ref
 	$lei->lei_atfork_child;
 	$self->SUPER::ipc_atfork_child;
-	my $net = $lei->{net};
-	if ($net && $self->{-wq_worker_nr} == 0) {
-		my $mics = $net->imap_common_init($lei);
-		PublicInbox::LeiAuth::net_merge($lei, $net);
-		$net->{mics_cached} = $mics;
-	}
+	$lei->{auth}->do_auth_atfork($self) if $lei->{auth};
 	undef;
 }
 
@@ -222,4 +193,6 @@ sub import_stdin {
 	_import_fh($lei, delete $self->{0}, '<stdin>', $lei->{opt}->{'format'});
 }
 
+no warnings 'once'; # the following works even when LeiAuth is lazy-loaded
+*net_merge_all = \&PublicInbox::LeiAuth::net_merge_all;
 1;

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH 07/10] lei q: reduce wasted IMAP connection for auth
  2021-02-22 11:22 ` [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity Eric Wong
                     ` (4 preceding siblings ...)
  2021-02-22 11:22   ` [PATCH 06/10] lei_auth: migrate common auth code from lei_import Eric Wong
@ 2021-02-22 11:22   ` Eric Wong
  2021-02-22 11:22   ` [PATCH 08/10] net_reader: mic_get: reuse connections if cache enabled Eric Wong
                     ` (2 subsequent siblings)
  8 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2021-02-22 11:22 UTC (permalink / raw)
  To: meta

We can rework the first lei2mail worker to authenticate, and
then share auth info with the rest of the lei2mail workers.  As
with "lei import", this uses PktOp and lei-daemon to share
updated credentials between the first an subsequent l2m workers.
---
 lib/PublicInbox/LeiAuth.pm    | 37 ------------------------
 lib/PublicInbox/LeiConvert.pm |  2 +-
 lib/PublicInbox/LeiQuery.pm   |  9 ++----
 lib/PublicInbox/LeiToMail.pm  | 53 ++++++++++++++++++++++++-----------
 lib/PublicInbox/LeiXSearch.pm | 26 ++++++++++++-----
 5 files changed, 59 insertions(+), 68 deletions(-)

diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index d329eadb..b4777114 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -20,13 +20,6 @@ sub net_merge {
 	}
 }
 
-sub do_auth { # called via wq_io_do
-	my ($self) = @_;
-	my ($lei, $net) = @$self{qw(lei net)};
-	$net->imap_common_init($lei);
-	net_merge($lei, $net); # tell lei-daemon updated auth info
-}
-
 sub do_auth_atfork { # used by IPC WQ workers
 	my ($self, $wq) = @_;
 	return if $wq->{-wq_worker_nr} != 0;
@@ -63,36 +56,6 @@ sub op_merge { # prepares PktOp->pair ops
 	$ops->{net_merge_done1} = [ \&net_merge_done1, $wq ];
 }
 
-sub do_finish_auth { # dwaitpid callback
-	my ($arg, $pid) = @_;
-	my ($self, $lei, $post_auth_cb, @args) = @$arg;
-	$? ? $lei->dclose : $post_auth_cb->(@args);
-}
-
-sub auth_eof {
-	my ($lei, $post_auth_cb, @args) = @_;
-	my $self = delete $lei->{auth} or return;
-	$self->wq_wait_old(\&do_finish_auth, $lei, $post_auth_cb, @args);
-}
-
-sub auth_start {
-	my ($self, $lei, $post_auth_cb, @args) = @_;
-	my $op = $lei->workers_start($self, 'auth', 1, {
-		'net_merge' => [ \&net_merge, $lei ],
-		'' => [ \&auth_eof, $lei, $post_auth_cb, @args ],
-	});
-	$self->wq_io_do('do_auth', []);
-	$self->wq_close(1);
-	while ($op && $op->{sock}) { $op->event_step }
-}
-
-sub ipc_atfork_child {
-	my ($self) = @_;
-	delete $self->{lei}->{auth}; # drop circular ref
-	$self->{lei}->lei_atfork_child;
-	$self->SUPER::ipc_atfork_child;
-}
-
 sub new {
 	my ($cls, $net) = @_; # net may be NetReader or descendant (NetWriter)
 	bless { net => $net }, $cls;
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 3a714502..b45de4e0 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -62,7 +62,7 @@ sub do_convert { # via wq_do
 	delete $self->{wcb}; # commit
 }
 
-sub convert_start { # LeiAuth->auth_start callback
+sub convert_start {
 	my ($lei) = @_;
 	my $self = $lei->{cnv};
 	my $op = $lei->workers_start($self, 'lei_convert', 1, {
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 398f834f..64c9394c 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -13,14 +13,11 @@ sub prep_ext { # externals_each callback
 
 sub _start_query {
 	my ($self) = @_;
-	if (my $nwr = $self->{nwr}) {
+	if (my $net = $self->{net}) {
 		require PublicInbox::LeiAuth;
-		my $auth = $self->{auth} = PublicInbox::LeiAuth->new($nwr);
-		my $lxs = $self->{lxs};
-		$auth->auth_start($self, $lxs->can('do_query'), $lxs, $self);
-	} else {
-		$self->{lxs}->do_query($self);
+		$self->{auth} = PublicInbox::LeiAuth->new($net);
 	}
+	$self->{lxs}->do_query($self);
 }
 
 sub qstr_add { # PublicInbox::InputPipe::consume callback for --stdin
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 6efd398a..df813064 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -345,8 +345,8 @@ sub _imap_write_cb ($$) {
 	my ($self, $lei) = @_;
 	my $dedupe = $lei->{dedupe};
 	$dedupe->prepare_dedupe if $dedupe;
-	my $imap_append = $lei->{nwr}->can('imap_append');
-	my $mic = $lei->{nwr}->mic_get($self->{uri});
+	my $imap_append = $lei->{net}->can('imap_append');
+	my $mic = $lei->{net}->mic_get($self->{uri});
 	my $folder = $self->{uri}->mailbox;
 	sub { # for git_to_mail
 		my ($bref, $smsg, $eml) = @_;
@@ -394,15 +394,15 @@ sub new {
 		$self->{base_type} = 'mbox';
 	} elsif ($fmt =~ /\Aimaps?\z/) { # TODO .onion support
 		require PublicInbox::NetWriter;
-		my $nwr = PublicInbox::NetWriter->new;
-		$nwr->add_url($dst);
-		$nwr->{quiet} = $lei->{opt}->{quiet};
-		my $err = $nwr->errors($dst);
+		my $net = PublicInbox::NetWriter->new;
+		$net->add_url($dst);
+		$net->{quiet} = $lei->{opt}->{quiet};
+		my $err = $net->errors($dst);
 		return $lei->fail($err) if $err;
 		require PublicInbox::URIimap; # TODO: URI cast early
 		$self->{uri} = PublicInbox::URIimap->new($dst);
 		$self->{uri}->mailbox or die "No mailbox: $dst";
-		$lei->{nwr} = $nwr;
+		$lei->{net} = $net;
 		$self->{base_type} = 'imap';
 	} else {
 		die "bad mail --format=$fmt\n";
@@ -447,15 +447,16 @@ sub _augment_imap { # PublicInbox::NetReader::imap_each cb
 
 sub _do_augment_imap {
 	my ($self, $lei) = @_;
-	my $nwr = $lei->{nwr};
+	my $net = $lei->{net};
 	if ($lei->{opt}->{augment}) {
 		my $dedupe = $lei->{dedupe};
 		if ($dedupe && $dedupe->prepare_dedupe) {
-			$nwr->imap_each($self->{uri}, \&_augment_imap, $lei);
+			$net->imap_each($self->{uri}, \&_augment_imap, $lei);
 			$dedupe->pause_dedupe;
 		}
-	} else { # clobber existing IMAP folder
-		$nwr->imap_delete_all($self->{uri});
+	} elsif (!$self->{-wq_worker_nr}) { # undef or 0
+		# clobber existing IMAP folder
+		$net->imap_delete_all($self->{uri});
 	}
 }
 
@@ -523,16 +524,18 @@ sub post_augment {
 	$m->($self, $lei, @args);
 }
 
-sub ipc_atfork_child {
+sub do_post_auth {
 	my ($self) = @_;
-	my $lei = delete $self->{lei};
-	$lei->lei_atfork_child;
+	my $lei = $self->{lei};
+	# lei_xsearch can start as soon as all l2m workers get here
+	pkt_do($lei->{pkt_op_p}, 'incr_start_query') or
+		die "incr_start_query: $!";
 	my $aug;
 	if (lock_free($self)) {
 		my $mod = $self->{-wq_nr_workers};
 		my $shard = $self->{-wq_worker_nr};
-		if (my $nwr = $lei->{nwr}) {
-			$nwr->{shard_info} = [ $mod, $shard ];
+		if (my $net = $lei->{net}) {
+			$net->{shard_info} = [ $mod, $shard ];
 		} else { # Maildir (MH?)
 			$self->{shard_info} = [ $mod, $shard ];
 		}
@@ -545,13 +548,20 @@ sub ipc_atfork_child {
 		eval { do_augment($self, $lei) };
 		$lei->fail($@) if $@;
 		pkt_do($lei->{pkt_op_p}, $aug) == 1 or
-					die "do_post_augment trigger: $!";
+				die "do_post_augment trigger: $!";
 	}
 	if (my $zpipe = delete $lei->{zpipe}) {
 		$lei->{1} = $zpipe->[1];
 		close $zpipe->[0];
 	}
 	$self->{wcb} = $self->write_cb($lei);
+}
+
+sub ipc_atfork_child {
+	my ($self) = @_;
+	my $lei = $self->{lei};
+	$lei->lei_atfork_child;
+	$lei->{auth}->do_auth_atfork($self) if $lei->{auth};
 	$SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
 	$self->SUPER::ipc_atfork_child;
 }
@@ -584,4 +594,13 @@ sub wq_atexit_child {
 	$SIG{__WARN__} = 'DEFAULT';
 }
 
+# called in top-level lei-daemon when LeiAuth is done
+sub net_merge_complete {
+	my ($self) = @_;
+	$self->wq_broadcast('do_post_auth');
+	$self->wq_close(1);
+}
+
+no warnings 'once'; # the following works even when LeiAuth is lazy-loaded
+*net_merge_all = \&PublicInbox::LeiAuth::net_merge_all;
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index e982165f..6dcadf0a 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -348,7 +348,7 @@ sub do_post_augment {
 	close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
 }
 
-sub incr_post_augment { # called whenever an l2m shard finishes
+sub incr_post_augment { # called whenever an l2m shard finishes augment
 	my ($lei) = @_;
 	my $l2m = $lei->{l2m} or die 'BUG: unexpected incr_post_augment';
 	return if ++$lei->{nr_post_augment} != $l2m->{-wq_nr_workers};
@@ -366,8 +366,8 @@ sub concurrency {
 }
 
 sub start_query { # always runs in main (lei-daemon) process
-	my ($self, $lei) = @_;
-	if ($lei->{opt}->{threads}) {
+	my ($self) = @_;
+	if ($self->{threads}) {
 		for my $ibxish (locals($self)) {
 			$self->wq_io_do('query_thread_mset', [], $ibxish);
 		}
@@ -382,6 +382,13 @@ sub start_query { # always runs in main (lei-daemon) process
 	for my $uris (@$q) {
 		$self->wq_io_do('query_remote_mboxrd', [], $uris);
 	}
+	$self->wq_close(1); # lei_xsearch workers stop when done
+}
+
+sub incr_start_query { # called whenever an l2m shard starts do_post_auth
+	my ($self, $l2m) = @_;
+	return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers};
+	start_query($self);
 }
 
 sub ipc_atfork_child {
@@ -393,6 +400,7 @@ sub ipc_atfork_child {
 
 sub do_query {
 	my ($self, $lei) = @_;
+	my $l2m = $lei->{l2m};
 	my $ops = {
 		'|' => [ $lei->can('sigpipe_handler'), $lei ],
 		'!' => [ $lei->can('fail_handler'), $lei ],
@@ -402,12 +410,13 @@ sub do_query {
 		'mset_progress' => [ \&mset_progress, $lei ],
 		'x_it' => [ $lei->can('x_it'), $lei ],
 		'child_error' => [ $lei->can('child_error'), $lei ],
+		'incr_start_query' => [ \&incr_start_query, $self, $l2m ],
 	};
+	$lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth};
 	($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
 	$lei->{1}->autoflush(1);
 	$lei->start_pager if delete $lei->{need_pager};
 	$lei->{ovv}->ovv_begin($lei);
-	my $l2m = $lei->{l2m};
 	if ($l2m) {
 		$l2m->pre_augment($lei);
 		if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
@@ -428,10 +437,13 @@ sub do_query {
 				$lei->oldset, { lei => $lei });
 	my $op = delete $lei->{pkt_op_c};
 	delete $lei->{pkt_op_p};
-	$l2m->wq_close(1) if $l2m;
+	$self->{threads} = $lei->{opt}->{threads};
+	if ($l2m) {
+		$l2m->net_merge_complete unless $lei->{auth};
+	} else {
+		start_query($self);
+	}
 	$lei->event_step_init; # wait for shutdowns
-	start_query($self, $lei);
-	$self->wq_close(1); # lei_xsearch workers stop when done
 	if ($lei->{oneshot}) {
 		while ($op->{sock}) { $op->event_step }
 	}

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH 08/10] net_reader: mic_get: reuse connections if cache enabled
  2021-02-22 11:22 ` [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity Eric Wong
                     ` (5 preceding siblings ...)
  2021-02-22 11:22   ` [PATCH 07/10] lei q: reduce wasted IMAP connection for auth Eric Wong
@ 2021-02-22 11:22   ` Eric Wong
  2021-02-22 11:22   ` [PATCH 09/10] lei convert: inline convert_start Eric Wong
  2021-02-22 11:22   ` [PATCH 10/10] lei_auth: trim and remove leftover worker code Eric Wong
  8 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2021-02-22 11:22 UTC (permalink / raw)
  To: meta

We only enable {mic_cached} in WQ workers, and those
aren't expected to fork again going forward.  So cache
here avoid a penalty for the non-augmenting (imap_delete_all)
call with "lei q"
---
 lib/PublicInbox/NetReader.pm | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm
index c29e09c1..ff90468b 100644
--- a/lib/PublicInbox/NetReader.pm
+++ b/lib/PublicInbox/NetReader.pm
@@ -426,7 +426,8 @@ sub mic_get {
 	my ($self, $uri) = @_;
 	my $sec = uri_section($uri);
 	# see if caller saved result of imap_common_init
-	if (my $cached = $self->{mics_cached}) {
+	my $cached = $self->{mics_cached};
+	if ($cached) {
 		my $mic = $cached->{$sec};
 		return $mic if $mic && $mic->IsConnected;
 		delete $cached->{$sec};
@@ -439,7 +440,8 @@ sub mic_get {
 		}
 	}
 	my $mic = PublicInbox::IMAPClient->new(%$mic_arg);
-	$mic && $mic->IsConnected ? $mic : undef;
+	$cached //= {}; # invalid placeholder if no cache enabled
+	$mic && $mic->IsConnected ? ($cached->{$sec} = $mic) : undef;
 }
 
 sub imap_each {

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH 09/10] lei convert: inline convert_start
  2021-02-22 11:22 ` [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity Eric Wong
                     ` (6 preceding siblings ...)
  2021-02-22 11:22   ` [PATCH 08/10] net_reader: mic_get: reuse connections if cache enabled Eric Wong
@ 2021-02-22 11:22   ` Eric Wong
  2021-02-22 11:22   ` [PATCH 10/10] lei_auth: trim and remove leftover worker code Eric Wong
  8 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2021-02-22 11:22 UTC (permalink / raw)
  To: meta

Since we stopped using LeiAuth as a WQ worker, keeping this
around as a single-use sub makes no sense and wastes several
KB of memory.
---
 lib/PublicInbox/LeiConvert.pm | 18 ++++++------------
 1 file changed, 6 insertions(+), 12 deletions(-)

diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index b45de4e0..4839dea4 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -62,17 +62,6 @@ sub do_convert { # via wq_do
 	delete $self->{wcb}; # commit
 }
 
-sub convert_start {
-	my ($lei) = @_;
-	my $self = $lei->{cnv};
-	my $op = $lei->workers_start($self, 'lei_convert', 1, {
-		'' => [ $lei->can('dclose'), $lei ]
-	});
-	$self->wq_io_do('do_convert', []);
-	$self->wq_close(1);
-	while ($op && $op->{sock}) { $op->event_step }
-}
-
 sub call { # the main "lei convert" method
 	my ($cls, $lei, @inputs) = @_;
 	my $opt = $lei->{opt};
@@ -131,7 +120,12 @@ sub call { # the main "lei convert" method
 		$nrd->{quiet} = $opt->{quiet};
 		$lei->{nrd} = $nrd;
 	}
-	convert_start($lei);
+	my $op = $lei->workers_start($self, 'lei_convert', 1, {
+		'' => [ $lei->can('dclose'), $lei ]
+	});
+	$self->wq_io_do('do_convert', []);
+	$self->wq_close(1);
+	while ($op && $op->{sock}) { $op->event_step }
 }
 
 sub ipc_atfork_child {

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH 10/10] lei_auth: trim and remove leftover worker code
  2021-02-22 11:22 ` [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity Eric Wong
                     ` (7 preceding siblings ...)
  2021-02-22 11:22   ` [PATCH 09/10] lei convert: inline convert_start Eric Wong
@ 2021-02-22 11:22   ` Eric Wong
  8 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2021-02-22 11:22 UTC (permalink / raw)
  To: meta

LeiAuth is no longer a separate worker process.  Instead, it's
used directly by LeiToMail and LeiImport for sharing auth info
from the first worker to the rest of the workers, using
lei-daemon as a message router.  So drop the old code to reduce
human cognitive load and interpreter memory overhead.
---
 lib/PublicInbox/LEI.pm       |  2 +-
 lib/PublicInbox/LeiAuth.pm   | 24 +++++-------------------
 lib/PublicInbox/LeiImport.pm |  2 +-
 lib/PublicInbox/LeiQuery.pm  |  4 ++--
 4 files changed, 9 insertions(+), 23 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index dd34c668..415a425d 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -324,7 +324,7 @@ my %CONFIG_KEYS = (
 	'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m imp mrr cnv auth); # internal workers
+my @WQ_KEYS = qw(lxs l2m imp mrr cnv); # internal workers
 
 # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
 sub x_it ($$) {
diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index b4777114..099bdaca 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -6,27 +6,16 @@
 package PublicInbox::LeiAuth;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::IPC);
 use PublicInbox::PktOp qw(pkt_do);
 
-sub net_merge {
-	my ($lei, $net_new) = @_;
-	if ($lei->{pkt_op_p}) { # from lei_convert worker
-		pkt_do($lei->{pkt_op_p}, 'net_merge', $net_new);
-	} else { # single lei-daemon consumer
-		my $self = $lei->{auth} or return; # client disconnected
-		my $net = $self->{net};
-		%$net = (%$net, %$net_new);
-	}
-}
-
 sub do_auth_atfork { # used by IPC WQ workers
 	my ($self, $wq) = @_;
 	return if $wq->{-wq_worker_nr} != 0;
 	my $lei = $wq->{lei};
-	my $net = $self->{net};
+	my $net = $lei->{net};
 	my $mics = $net->imap_common_init($lei);
-	net_merge($lei, $net);
+	pkt_do($lei->{pkt_op_p}, 'net_merge', $net) or
+			die "pkt_do net_merge: $!";
 	$net->{mics_cached} = $mics;
 }
 
@@ -36,7 +25,7 @@ sub net_merge_done1 { # bump merge-count in top-level lei-daemon
 	$wq->net_merge_complete; # defined per wq-class (e.g. LeiImport)
 }
 
-sub net_merge_all { # called via wq_broadcast
+sub net_merge_all { # called in wq worker via wq_broadcast
 	my ($wq, $net_new) = @_;
 	my $net = $wq->{lei}->{net};
 	%$net = (%$net, %$net_new);
@@ -56,9 +45,6 @@ sub op_merge { # prepares PktOp->pair ops
 	$ops->{net_merge_done1} = [ \&net_merge_done1, $wq ];
 }
 
-sub new {
-	my ($cls, $net) = @_; # net may be NetReader or descendant (NetWriter)
-	bless { net => $net }, $cls;
-}
+sub new { bless \(my $x), __PACKAGE__ }
 
 1;
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index bc37c628..b85f4d6c 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -112,7 +112,7 @@ sub call { # the main "lei import" method
 		$net->{quiet} = $lei->{opt}->{quiet};
 		$lei->{net} = $net;
 		require PublicInbox::LeiAuth;
-		$lei->{auth} = PublicInbox::LeiAuth->new($net);
+		$lei->{auth} = PublicInbox::LeiAuth->new;
 	}
 	import_start($lei);
 }
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 64c9394c..214267ee 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -13,9 +13,9 @@ sub prep_ext { # externals_each callback
 
 sub _start_query {
 	my ($self) = @_;
-	if (my $net = $self->{net}) {
+	if ($self->{net}) {
 		require PublicInbox::LeiAuth;
-		$self->{auth} = PublicInbox::LeiAuth->new($net);
+		$self->{auth} = PublicInbox::LeiAuth->new
 	}
 	$self->{lxs}->do_query($self);
 }

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2021-02-22 11:23 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-02-22 11:21 [PATCH 00/10] lei: avoid wasting IMAP connections Eric Wong
2021-02-22 11:22 ` [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity Eric Wong
2021-02-22 11:22   ` [PATCH 02/10] lei: keep client {sock} in short-lived workers Eric Wong
2021-02-22 11:22   ` [PATCH 03/10] lei: _lei_cfg: return empty hashref if unconfigured Eric Wong
2021-02-22 11:22   ` [PATCH 04/10] lei convert: auth directly from worker process Eric Wong
2021-02-22 11:22   ` [PATCH 05/10] lei import: no separate auth worker Eric Wong
2021-02-22 11:22   ` [PATCH 06/10] lei_auth: migrate common auth code from lei_import Eric Wong
2021-02-22 11:22   ` [PATCH 07/10] lei q: reduce wasted IMAP connection for auth Eric Wong
2021-02-22 11:22   ` [PATCH 08/10] net_reader: mic_get: reuse connections if cache enabled Eric Wong
2021-02-22 11:22   ` [PATCH 09/10] lei convert: inline convert_start Eric Wong
2021-02-22 11:22   ` [PATCH 10/10] lei_auth: trim and remove leftover worker code Eric Wong

user/dev discussion of public-inbox itself

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://public-inbox.org/meta
	git clone --mirror http://czquwvybam4bgbro.onion/meta
	git clone --mirror http://hjrcffqmbrq6wope.onion/meta
	git clone --mirror http://ou63pmih66umazou.onion/meta

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V1 meta meta/ https://public-inbox.org/meta \
		meta@public-inbox.org
	public-inbox-index meta

Example config snippet for mirrors.
Newsgroups are available over NNTP:
	nntp://news.public-inbox.org/inbox.comp.mail.public-inbox.meta
	nntp://ou63pmih66umazou.onion/inbox.comp.mail.public-inbox.meta
	nntp://czquwvybam4bgbro.onion/inbox.comp.mail.public-inbox.meta
	nntp://hjrcffqmbrq6wope.onion/inbox.comp.mail.public-inbox.meta
	nntp://news.gmane.io/gmane.mail.public-inbox.general
 note: .onion URLs require Tor: https://www.torproject.org/

code repositories for the project(s) associated with this inbox:

	https://80x24.org/public-inbox.git

AGPL code for this site: git clone https://public-inbox.org/public-inbox.git