From fa5650d399f51a596e5c126b3ce65347409d4fe9 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 22 Feb 2021 08:22:56 -0300 Subject: lei q: reduce wasted IMAP connection for auth We can rework the first lei2mail worker to authenticate, and then share auth info with the rest of the lei2mail workers. As with "lei import", this uses PktOp and lei-daemon to share updated credentials between the first an subsequent l2m workers. --- lib/PublicInbox/LeiAuth.pm | 37 ------------------------------ lib/PublicInbox/LeiConvert.pm | 2 +- lib/PublicInbox/LeiQuery.pm | 9 +++----- lib/PublicInbox/LeiToMail.pm | 53 +++++++++++++++++++++++++++++-------------- lib/PublicInbox/LeiXSearch.pm | 26 +++++++++++++++------ 5 files changed, 59 insertions(+), 68 deletions(-) diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm index d329eadb..b4777114 100644 --- a/lib/PublicInbox/LeiAuth.pm +++ b/lib/PublicInbox/LeiAuth.pm @@ -20,13 +20,6 @@ sub net_merge { } } -sub do_auth { # called via wq_io_do - my ($self) = @_; - my ($lei, $net) = @$self{qw(lei net)}; - $net->imap_common_init($lei); - net_merge($lei, $net); # tell lei-daemon updated auth info -} - sub do_auth_atfork { # used by IPC WQ workers my ($self, $wq) = @_; return if $wq->{-wq_worker_nr} != 0; @@ -63,36 +56,6 @@ sub op_merge { # prepares PktOp->pair ops $ops->{net_merge_done1} = [ \&net_merge_done1, $wq ]; } -sub do_finish_auth { # dwaitpid callback - my ($arg, $pid) = @_; - my ($self, $lei, $post_auth_cb, @args) = @$arg; - $? ? $lei->dclose : $post_auth_cb->(@args); -} - -sub auth_eof { - my ($lei, $post_auth_cb, @args) = @_; - my $self = delete $lei->{auth} or return; - $self->wq_wait_old(\&do_finish_auth, $lei, $post_auth_cb, @args); -} - -sub auth_start { - my ($self, $lei, $post_auth_cb, @args) = @_; - my $op = $lei->workers_start($self, 'auth', 1, { - 'net_merge' => [ \&net_merge, $lei ], - '' => [ \&auth_eof, $lei, $post_auth_cb, @args ], - }); - $self->wq_io_do('do_auth', []); - $self->wq_close(1); - while ($op && $op->{sock}) { $op->event_step } -} - -sub ipc_atfork_child { - my ($self) = @_; - delete $self->{lei}->{auth}; # drop circular ref - $self->{lei}->lei_atfork_child; - $self->SUPER::ipc_atfork_child; -} - sub new { my ($cls, $net) = @_; # net may be NetReader or descendant (NetWriter) bless { net => $net }, $cls; diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 3a714502..b45de4e0 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -62,7 +62,7 @@ sub do_convert { # via wq_do delete $self->{wcb}; # commit } -sub convert_start { # LeiAuth->auth_start callback +sub convert_start { my ($lei) = @_; my $self = $lei->{cnv}; my $op = $lei->workers_start($self, 'lei_convert', 1, { diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 398f834f..64c9394c 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -13,14 +13,11 @@ sub prep_ext { # externals_each callback sub _start_query { my ($self) = @_; - if (my $nwr = $self->{nwr}) { + if (my $net = $self->{net}) { require PublicInbox::LeiAuth; - my $auth = $self->{auth} = PublicInbox::LeiAuth->new($nwr); - my $lxs = $self->{lxs}; - $auth->auth_start($self, $lxs->can('do_query'), $lxs, $self); - } else { - $self->{lxs}->do_query($self); + $self->{auth} = PublicInbox::LeiAuth->new($net); } + $self->{lxs}->do_query($self); } sub qstr_add { # PublicInbox::InputPipe::consume callback for --stdin diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 6efd398a..df813064 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -345,8 +345,8 @@ sub _imap_write_cb ($$) { my ($self, $lei) = @_; my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe if $dedupe; - my $imap_append = $lei->{nwr}->can('imap_append'); - my $mic = $lei->{nwr}->mic_get($self->{uri}); + my $imap_append = $lei->{net}->can('imap_append'); + my $mic = $lei->{net}->mic_get($self->{uri}); my $folder = $self->{uri}->mailbox; sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; @@ -394,15 +394,15 @@ sub new { $self->{base_type} = 'mbox'; } elsif ($fmt =~ /\Aimaps?\z/) { # TODO .onion support require PublicInbox::NetWriter; - my $nwr = PublicInbox::NetWriter->new; - $nwr->add_url($dst); - $nwr->{quiet} = $lei->{opt}->{quiet}; - my $err = $nwr->errors($dst); + my $net = PublicInbox::NetWriter->new; + $net->add_url($dst); + $net->{quiet} = $lei->{opt}->{quiet}; + my $err = $net->errors($dst); return $lei->fail($err) if $err; require PublicInbox::URIimap; # TODO: URI cast early $self->{uri} = PublicInbox::URIimap->new($dst); $self->{uri}->mailbox or die "No mailbox: $dst"; - $lei->{nwr} = $nwr; + $lei->{net} = $net; $self->{base_type} = 'imap'; } else { die "bad mail --format=$fmt\n"; @@ -447,15 +447,16 @@ sub _augment_imap { # PublicInbox::NetReader::imap_each cb sub _do_augment_imap { my ($self, $lei) = @_; - my $nwr = $lei->{nwr}; + my $net = $lei->{net}; if ($lei->{opt}->{augment}) { my $dedupe = $lei->{dedupe}; if ($dedupe && $dedupe->prepare_dedupe) { - $nwr->imap_each($self->{uri}, \&_augment_imap, $lei); + $net->imap_each($self->{uri}, \&_augment_imap, $lei); $dedupe->pause_dedupe; } - } else { # clobber existing IMAP folder - $nwr->imap_delete_all($self->{uri}); + } elsif (!$self->{-wq_worker_nr}) { # undef or 0 + # clobber existing IMAP folder + $net->imap_delete_all($self->{uri}); } } @@ -523,16 +524,18 @@ sub post_augment { $m->($self, $lei, @args); } -sub ipc_atfork_child { +sub do_post_auth { my ($self) = @_; - my $lei = delete $self->{lei}; - $lei->lei_atfork_child; + my $lei = $self->{lei}; + # lei_xsearch can start as soon as all l2m workers get here + pkt_do($lei->{pkt_op_p}, 'incr_start_query') or + die "incr_start_query: $!"; my $aug; if (lock_free($self)) { my $mod = $self->{-wq_nr_workers}; my $shard = $self->{-wq_worker_nr}; - if (my $nwr = $lei->{nwr}) { - $nwr->{shard_info} = [ $mod, $shard ]; + if (my $net = $lei->{net}) { + $net->{shard_info} = [ $mod, $shard ]; } else { # Maildir (MH?) $self->{shard_info} = [ $mod, $shard ]; } @@ -545,13 +548,20 @@ sub ipc_atfork_child { eval { do_augment($self, $lei) }; $lei->fail($@) if $@; pkt_do($lei->{pkt_op_p}, $aug) == 1 or - die "do_post_augment trigger: $!"; + die "do_post_augment trigger: $!"; } if (my $zpipe = delete $lei->{zpipe}) { $lei->{1} = $zpipe->[1]; close $zpipe->[0]; } $self->{wcb} = $self->write_cb($lei); +} + +sub ipc_atfork_child { + my ($self) = @_; + my $lei = $self->{lei}; + $lei->lei_atfork_child; + $lei->{auth}->do_auth_atfork($self) if $lei->{auth}; $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); $self->SUPER::ipc_atfork_child; } @@ -584,4 +594,13 @@ sub wq_atexit_child { $SIG{__WARN__} = 'DEFAULT'; } +# called in top-level lei-daemon when LeiAuth is done +sub net_merge_complete { + my ($self) = @_; + $self->wq_broadcast('do_post_auth'); + $self->wq_close(1); +} + +no warnings 'once'; # the following works even when LeiAuth is lazy-loaded +*net_merge_all = \&PublicInbox::LeiAuth::net_merge_all; 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index e982165f..6dcadf0a 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -348,7 +348,7 @@ sub do_post_augment { close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch } -sub incr_post_augment { # called whenever an l2m shard finishes +sub incr_post_augment { # called whenever an l2m shard finishes augment my ($lei) = @_; my $l2m = $lei->{l2m} or die 'BUG: unexpected incr_post_augment'; return if ++$lei->{nr_post_augment} != $l2m->{-wq_nr_workers}; @@ -366,8 +366,8 @@ sub concurrency { } sub start_query { # always runs in main (lei-daemon) process - my ($self, $lei) = @_; - if ($lei->{opt}->{threads}) { + my ($self) = @_; + if ($self->{threads}) { for my $ibxish (locals($self)) { $self->wq_io_do('query_thread_mset', [], $ibxish); } @@ -382,6 +382,13 @@ sub start_query { # always runs in main (lei-daemon) process for my $uris (@$q) { $self->wq_io_do('query_remote_mboxrd', [], $uris); } + $self->wq_close(1); # lei_xsearch workers stop when done +} + +sub incr_start_query { # called whenever an l2m shard starts do_post_auth + my ($self, $l2m) = @_; + return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers}; + start_query($self); } sub ipc_atfork_child { @@ -393,6 +400,7 @@ sub ipc_atfork_child { sub do_query { my ($self, $lei) = @_; + my $l2m = $lei->{l2m}; my $ops = { '|' => [ $lei->can('sigpipe_handler'), $lei ], '!' => [ $lei->can('fail_handler'), $lei ], @@ -402,12 +410,13 @@ sub do_query { 'mset_progress' => [ \&mset_progress, $lei ], 'x_it' => [ $lei->can('x_it'), $lei ], 'child_error' => [ $lei->can('child_error'), $lei ], + 'incr_start_query' => [ \&incr_start_query, $self, $l2m ], }; + $lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth}; ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); $lei->{1}->autoflush(1); $lei->start_pager if delete $lei->{need_pager}; $lei->{ovv}->ovv_begin($lei); - my $l2m = $lei->{l2m}; if ($l2m) { $l2m->pre_augment($lei); if ($lei->{opt}->{augment} && delete $lei->{early_mua}) { @@ -428,10 +437,13 @@ sub do_query { $lei->oldset, { lei => $lei }); my $op = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; - $l2m->wq_close(1) if $l2m; + $self->{threads} = $lei->{opt}->{threads}; + if ($l2m) { + $l2m->net_merge_complete unless $lei->{auth}; + } else { + start_query($self); + } $lei->event_step_init; # wait for shutdowns - start_query($self, $lei); - $self->wq_close(1); # lei_xsearch workers stop when done if ($lei->{oneshot}) { while ($op->{sock}) { $op->event_step } } -- cgit v1.2.3-24-ge0c7