about summary refs log tree commit homepage
path: root/lib/PublicInbox/LEI.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LEI.pm')
-rw-r--r--lib/PublicInbox/LEI.pm15
1 files changed, 14 insertions, 1 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 3e1706a0..887025de 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -631,7 +631,10 @@ sub pkt_ops {
 
 sub workers_start {
         my ($lei, $wq, $jobs, $ops, $flds) = @_;
-        $ops = pkt_ops($lei, { ($ops ? %$ops : ()) });
+        $ops //= {};
+        ($wq->can('net_merge_all_done') && $lei->{auth}) and
+                $lei->{auth}->op_merge($ops, $wq, $lei);
+        pkt_ops($lei, $ops);
         $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
         my $end = $lei->pkt_op_pair;
         my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
@@ -648,12 +651,22 @@ sub workers_start {
 # call this when we're ready to wait on events and yield to other clients
 sub wait_wq_events {
         my ($lei, $op_c, $ops) = @_;
+        my $wq1 = $lei->{wq1};
+        ($wq1 && $wq1->can('net_merge_all_done') && !$lei->{auth}) and
+                $wq1->net_merge_all_done;
         for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
                 $wq->wq_close;
         }
         $op_c->{ops} = $ops;
 }
 
+sub wq1_start {
+        my ($lei, $wq, $jobs) = @_;
+        my ($op_c, $ops) = workers_start($lei, $wq, $jobs // 1);
+        $lei->{wq1} = $wq;
+        wait_wq_events($lei, $op_c, $ops); # net_merge_all_done if !{auth}
+}
+
 sub _help {
         require PublicInbox::LeiHelp;
         PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC);