user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 07/10] lei q: reduce wasted IMAP connection for auth
  @ 2021-02-22 11:22  4%   ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2021-02-22 11:22 UTC (permalink / raw)
  To: meta

We can rework the first lei2mail worker to authenticate, and
then share auth info with the rest of the lei2mail workers.  As
with "lei import", this uses PktOp and lei-daemon to share
updated credentials between the first an subsequent l2m workers.
---
 lib/PublicInbox/LeiAuth.pm    | 37 ------------------------
 lib/PublicInbox/LeiConvert.pm |  2 +-
 lib/PublicInbox/LeiQuery.pm   |  9 ++----
 lib/PublicInbox/LeiToMail.pm  | 53 ++++++++++++++++++++++++-----------
 lib/PublicInbox/LeiXSearch.pm | 26 ++++++++++++-----
 5 files changed, 59 insertions(+), 68 deletions(-)

diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm
index d329eadb..b4777114 100644
--- a/lib/PublicInbox/LeiAuth.pm
+++ b/lib/PublicInbox/LeiAuth.pm
@@ -20,13 +20,6 @@ sub net_merge {
 	}
 }
 
-sub do_auth { # called via wq_io_do
-	my ($self) = @_;
-	my ($lei, $net) = @$self{qw(lei net)};
-	$net->imap_common_init($lei);
-	net_merge($lei, $net); # tell lei-daemon updated auth info
-}
-
 sub do_auth_atfork { # used by IPC WQ workers
 	my ($self, $wq) = @_;
 	return if $wq->{-wq_worker_nr} != 0;
@@ -63,36 +56,6 @@ sub op_merge { # prepares PktOp->pair ops
 	$ops->{net_merge_done1} = [ \&net_merge_done1, $wq ];
 }
 
-sub do_finish_auth { # dwaitpid callback
-	my ($arg, $pid) = @_;
-	my ($self, $lei, $post_auth_cb, @args) = @$arg;
-	$? ? $lei->dclose : $post_auth_cb->(@args);
-}
-
-sub auth_eof {
-	my ($lei, $post_auth_cb, @args) = @_;
-	my $self = delete $lei->{auth} or return;
-	$self->wq_wait_old(\&do_finish_auth, $lei, $post_auth_cb, @args);
-}
-
-sub auth_start {
-	my ($self, $lei, $post_auth_cb, @args) = @_;
-	my $op = $lei->workers_start($self, 'auth', 1, {
-		'net_merge' => [ \&net_merge, $lei ],
-		'' => [ \&auth_eof, $lei, $post_auth_cb, @args ],
-	});
-	$self->wq_io_do('do_auth', []);
-	$self->wq_close(1);
-	while ($op && $op->{sock}) { $op->event_step }
-}
-
-sub ipc_atfork_child {
-	my ($self) = @_;
-	delete $self->{lei}->{auth}; # drop circular ref
-	$self->{lei}->lei_atfork_child;
-	$self->SUPER::ipc_atfork_child;
-}
-
 sub new {
 	my ($cls, $net) = @_; # net may be NetReader or descendant (NetWriter)
 	bless { net => $net }, $cls;
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 3a714502..b45de4e0 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -62,7 +62,7 @@ sub do_convert { # via wq_do
 	delete $self->{wcb}; # commit
 }
 
-sub convert_start { # LeiAuth->auth_start callback
+sub convert_start {
 	my ($lei) = @_;
 	my $self = $lei->{cnv};
 	my $op = $lei->workers_start($self, 'lei_convert', 1, {
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 398f834f..64c9394c 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -13,14 +13,11 @@ sub prep_ext { # externals_each callback
 
 sub _start_query {
 	my ($self) = @_;
-	if (my $nwr = $self->{nwr}) {
+	if (my $net = $self->{net}) {
 		require PublicInbox::LeiAuth;
-		my $auth = $self->{auth} = PublicInbox::LeiAuth->new($nwr);
-		my $lxs = $self->{lxs};
-		$auth->auth_start($self, $lxs->can('do_query'), $lxs, $self);
-	} else {
-		$self->{lxs}->do_query($self);
+		$self->{auth} = PublicInbox::LeiAuth->new($net);
 	}
+	$self->{lxs}->do_query($self);
 }
 
 sub qstr_add { # PublicInbox::InputPipe::consume callback for --stdin
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 6efd398a..df813064 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -345,8 +345,8 @@ sub _imap_write_cb ($$) {
 	my ($self, $lei) = @_;
 	my $dedupe = $lei->{dedupe};
 	$dedupe->prepare_dedupe if $dedupe;
-	my $imap_append = $lei->{nwr}->can('imap_append');
-	my $mic = $lei->{nwr}->mic_get($self->{uri});
+	my $imap_append = $lei->{net}->can('imap_append');
+	my $mic = $lei->{net}->mic_get($self->{uri});
 	my $folder = $self->{uri}->mailbox;
 	sub { # for git_to_mail
 		my ($bref, $smsg, $eml) = @_;
@@ -394,15 +394,15 @@ sub new {
 		$self->{base_type} = 'mbox';
 	} elsif ($fmt =~ /\Aimaps?\z/) { # TODO .onion support
 		require PublicInbox::NetWriter;
-		my $nwr = PublicInbox::NetWriter->new;
-		$nwr->add_url($dst);
-		$nwr->{quiet} = $lei->{opt}->{quiet};
-		my $err = $nwr->errors($dst);
+		my $net = PublicInbox::NetWriter->new;
+		$net->add_url($dst);
+		$net->{quiet} = $lei->{opt}->{quiet};
+		my $err = $net->errors($dst);
 		return $lei->fail($err) if $err;
 		require PublicInbox::URIimap; # TODO: URI cast early
 		$self->{uri} = PublicInbox::URIimap->new($dst);
 		$self->{uri}->mailbox or die "No mailbox: $dst";
-		$lei->{nwr} = $nwr;
+		$lei->{net} = $net;
 		$self->{base_type} = 'imap';
 	} else {
 		die "bad mail --format=$fmt\n";
@@ -447,15 +447,16 @@ sub _augment_imap { # PublicInbox::NetReader::imap_each cb
 
 sub _do_augment_imap {
 	my ($self, $lei) = @_;
-	my $nwr = $lei->{nwr};
+	my $net = $lei->{net};
 	if ($lei->{opt}->{augment}) {
 		my $dedupe = $lei->{dedupe};
 		if ($dedupe && $dedupe->prepare_dedupe) {
-			$nwr->imap_each($self->{uri}, \&_augment_imap, $lei);
+			$net->imap_each($self->{uri}, \&_augment_imap, $lei);
 			$dedupe->pause_dedupe;
 		}
-	} else { # clobber existing IMAP folder
-		$nwr->imap_delete_all($self->{uri});
+	} elsif (!$self->{-wq_worker_nr}) { # undef or 0
+		# clobber existing IMAP folder
+		$net->imap_delete_all($self->{uri});
 	}
 }
 
@@ -523,16 +524,18 @@ sub post_augment {
 	$m->($self, $lei, @args);
 }
 
-sub ipc_atfork_child {
+sub do_post_auth {
 	my ($self) = @_;
-	my $lei = delete $self->{lei};
-	$lei->lei_atfork_child;
+	my $lei = $self->{lei};
+	# lei_xsearch can start as soon as all l2m workers get here
+	pkt_do($lei->{pkt_op_p}, 'incr_start_query') or
+		die "incr_start_query: $!";
 	my $aug;
 	if (lock_free($self)) {
 		my $mod = $self->{-wq_nr_workers};
 		my $shard = $self->{-wq_worker_nr};
-		if (my $nwr = $lei->{nwr}) {
-			$nwr->{shard_info} = [ $mod, $shard ];
+		if (my $net = $lei->{net}) {
+			$net->{shard_info} = [ $mod, $shard ];
 		} else { # Maildir (MH?)
 			$self->{shard_info} = [ $mod, $shard ];
 		}
@@ -545,13 +548,20 @@ sub ipc_atfork_child {
 		eval { do_augment($self, $lei) };
 		$lei->fail($@) if $@;
 		pkt_do($lei->{pkt_op_p}, $aug) == 1 or
-					die "do_post_augment trigger: $!";
+				die "do_post_augment trigger: $!";
 	}
 	if (my $zpipe = delete $lei->{zpipe}) {
 		$lei->{1} = $zpipe->[1];
 		close $zpipe->[0];
 	}
 	$self->{wcb} = $self->write_cb($lei);
+}
+
+sub ipc_atfork_child {
+	my ($self) = @_;
+	my $lei = $self->{lei};
+	$lei->lei_atfork_child;
+	$lei->{auth}->do_auth_atfork($self) if $lei->{auth};
 	$SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
 	$self->SUPER::ipc_atfork_child;
 }
@@ -584,4 +594,13 @@ sub wq_atexit_child {
 	$SIG{__WARN__} = 'DEFAULT';
 }
 
+# called in top-level lei-daemon when LeiAuth is done
+sub net_merge_complete {
+	my ($self) = @_;
+	$self->wq_broadcast('do_post_auth');
+	$self->wq_close(1);
+}
+
+no warnings 'once'; # the following works even when LeiAuth is lazy-loaded
+*net_merge_all = \&PublicInbox::LeiAuth::net_merge_all;
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index e982165f..6dcadf0a 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -348,7 +348,7 @@ sub do_post_augment {
 	close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
 }
 
-sub incr_post_augment { # called whenever an l2m shard finishes
+sub incr_post_augment { # called whenever an l2m shard finishes augment
 	my ($lei) = @_;
 	my $l2m = $lei->{l2m} or die 'BUG: unexpected incr_post_augment';
 	return if ++$lei->{nr_post_augment} != $l2m->{-wq_nr_workers};
@@ -366,8 +366,8 @@ sub concurrency {
 }
 
 sub start_query { # always runs in main (lei-daemon) process
-	my ($self, $lei) = @_;
-	if ($lei->{opt}->{threads}) {
+	my ($self) = @_;
+	if ($self->{threads}) {
 		for my $ibxish (locals($self)) {
 			$self->wq_io_do('query_thread_mset', [], $ibxish);
 		}
@@ -382,6 +382,13 @@ sub start_query { # always runs in main (lei-daemon) process
 	for my $uris (@$q) {
 		$self->wq_io_do('query_remote_mboxrd', [], $uris);
 	}
+	$self->wq_close(1); # lei_xsearch workers stop when done
+}
+
+sub incr_start_query { # called whenever an l2m shard starts do_post_auth
+	my ($self, $l2m) = @_;
+	return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers};
+	start_query($self);
 }
 
 sub ipc_atfork_child {
@@ -393,6 +400,7 @@ sub ipc_atfork_child {
 
 sub do_query {
 	my ($self, $lei) = @_;
+	my $l2m = $lei->{l2m};
 	my $ops = {
 		'|' => [ $lei->can('sigpipe_handler'), $lei ],
 		'!' => [ $lei->can('fail_handler'), $lei ],
@@ -402,12 +410,13 @@ sub do_query {
 		'mset_progress' => [ \&mset_progress, $lei ],
 		'x_it' => [ $lei->can('x_it'), $lei ],
 		'child_error' => [ $lei->can('child_error'), $lei ],
+		'incr_start_query' => [ \&incr_start_query, $self, $l2m ],
 	};
+	$lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth};
 	($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
 	$lei->{1}->autoflush(1);
 	$lei->start_pager if delete $lei->{need_pager};
 	$lei->{ovv}->ovv_begin($lei);
-	my $l2m = $lei->{l2m};
 	if ($l2m) {
 		$l2m->pre_augment($lei);
 		if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
@@ -428,10 +437,13 @@ sub do_query {
 				$lei->oldset, { lei => $lei });
 	my $op = delete $lei->{pkt_op_c};
 	delete $lei->{pkt_op_p};
-	$l2m->wq_close(1) if $l2m;
+	$self->{threads} = $lei->{opt}->{threads};
+	if ($l2m) {
+		$l2m->net_merge_complete unless $lei->{auth};
+	} else {
+		start_query($self);
+	}
 	$lei->event_step_init; # wait for shutdowns
-	start_query($self, $lei);
-	$self->wq_close(1); # lei_xsearch workers stop when done
 	if ($lei->{oneshot}) {
 		while ($op->{sock}) { $op->event_step }
 	}

^ permalink raw reply related	[relevance 4%]

* [PATCH 00/10] lei: avoid wasting IMAP connections
@ 2021-02-22 11:21  7% Eric Wong
    0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2021-02-22 11:21 UTC (permalink / raw)
  To: meta

This makes the code a bit less straightforward, unfortunately;
but I've tried to comment it a bit and add some flow notes.
The payoff is it saves IMAP connection setup costs which is
noticeable in high-latency and/or metered bandwidth situations.

LeiAuth is signficantly rewritten so it uses lei-daemon
to route credentials from the first worker to other workers.

Eric Wong (10):
  lei_auth: rename {nrd} field to {net} for clarity
  lei: keep client {sock} in short-lived workers
  lei: _lei_cfg: return empty hashref if unconfigured
  lei convert: auth directly from worker process
  lei import: no separate auth worker
  lei_auth: migrate common auth code from lei_import
  lei q: reduce wasted IMAP connection for auth
  net_reader: mic_get: reuse connections if cache enabled
  lei convert: inline convert_start
  lei_auth: trim and remove leftover worker code

 lib/PublicInbox/LEI.pm         |  8 ++--
 lib/PublicInbox/LeiAuth.pm     | 76 +++++++++++++---------------------
 lib/PublicInbox/LeiConvert.pm  | 36 +++++++---------
 lib/PublicInbox/LeiExternal.pm |  6 +--
 lib/PublicInbox/LeiImport.pm   | 60 ++++++++++++++++-----------
 lib/PublicInbox/LeiQuery.pm    |  9 ++--
 lib/PublicInbox/LeiToMail.pm   | 53 ++++++++++++++++--------
 lib/PublicInbox/LeiXSearch.pm  | 26 ++++++++----
 lib/PublicInbox/NetReader.pm   | 20 +++++----
 lib/PublicInbox/NetWriter.pm   |  2 +-
 10 files changed, 158 insertions(+), 138 deletions(-)


^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-02-22 11:21  7% [PATCH 00/10] lei: avoid wasting IMAP connections Eric Wong
2021-02-22 11:22     ` [PATCH 01/10] lei_auth: rename {nrd} field to {net} for clarity Eric Wong
2021-02-22 11:22  4%   ` [PATCH 07/10] lei q: reduce wasted IMAP connection for auth Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).