about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiXSearch.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r--lib/PublicInbox/LeiXSearch.pm573
1 files changed, 372 insertions, 201 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 2d399653..43dedd10 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # Combine any combination of PublicInbox::Search,
@@ -9,15 +9,20 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
 use PublicInbox::DS qw(now);
-use PublicInbox::PktOp qw(pkt_do);
 use File::Temp 0.19 (); # 0.19 for ->newdir
 use File::Spec ();
 use PublicInbox::Search qw(xap_terms);
-use PublicInbox::Spawn qw(popen_rd spawn which);
-use PublicInbox::MID qw(mids);
+use PublicInbox::Spawn qw(popen_rd popen_wr which);
+use PublicInbox::MID qw(mids mid_escape);
 use PublicInbox::Smsg;
 use PublicInbox::Eml;
+use PublicInbox::LEI;
 use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR);
+use PublicInbox::ContentHash qw(git_sha);
+use POSIX qw(strftime);
+use autodie qw(close open read seek truncate);
+use PublicInbox::Syscall qw($F_SETPIPE_SZ);
+use PublicInbox::OnDestroy;
 
 sub new {
         my ($class) = @_;
@@ -31,8 +36,8 @@ sub new {
 sub attach_external {
         my ($self, $ibxish) = @_; # ibxish = ExtSearch or Inbox
         my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
-        my $srch = $ibxish->search or
-                return warn("$desc not indexed for Xapian\n");
+        my $srch = $ibxish->search //
+                return warn("$desc not indexed for Xapian ($@ $!)\n");
         my @shards = $srch->xdb_shards_flat or
                 return warn("$desc has no Xapian shards\n");
 
@@ -63,9 +68,26 @@ sub locals { @{$_[0]->{locals} // []} }
 
 sub remotes { @{$_[0]->{remotes} // []} }
 
-# called by PublicInbox::Search::xdb
+# called by PublicInbox::Search::xdb (usually via ->mset)
 sub xdb_shards_flat { @{$_[0]->{shards_flat} // []} }
 
+sub _mitem_kw { # retry_reopen callback
+        my ($srch, $smsg, $mitem, $flagged) = @_;
+        my $doc = $mitem->get_document;
+        my $kw = xap_terms('K', $doc);
+        $kw->{flagged} = 1 if $flagged;
+        my @L = xap_terms('L', $doc);
+        # we keep the empty {kw} array here to prevent expensive work in
+        # ->xsmsg_vmd, _unbless_smsg will clobber it iff it's empty
+        $smsg->{kw} = [ sort keys %$kw ];
+        $smsg->{L} = \@L if scalar(@L);
+}
+
+sub mitem_kw ($$$;$) {
+        my ($srch, $smsg, $mitem, $flagged) = @_;
+        $srch->retry_reopen(\&_mitem_kw, $smsg, $mitem, $flagged);
+}
+
 # like over->get_art
 sub smsg_for {
         my ($self, $mitem) = @_;
@@ -76,285 +98,363 @@ sub smsg_for {
         my $num = int(($docid - 1) / $nshard) + 1;
         my $ibx = $self->{shard2ibx}->[$shard];
         my $smsg = $ibx->over->get_art($num);
-        if (ref($ibx->can('msg_keywords'))) {
-                my $kw = xap_terms('K', $mitem->get_document);
-                $smsg->{kw} = [ sort keys %$kw ];
+        return if $smsg->{bytes} == 0; # external message
+        if ($ibx->can('msg_keywords')) {
+                mitem_kw($self, $smsg, $mitem);
         }
-        $smsg->{docid} = $docid;
         $smsg;
 }
 
-sub recent {
-        my ($self, $qstr, $opt) = @_;
-        $opt //= {};
-        $opt->{relevance} //= -2;
-        $self->mset($qstr //= 'bytes:1..', $opt);
-}
-
 sub over {}
 
+sub _check_mset_limit ($$$) {
+        my ($lei, $desc, $mset) = @_;
+        return if defined($lei->{opt}->{limit}); # user requested limit
+        my $est = $mset->get_matches_estimated;
+        my $tot = $lei->{mset_opt}->{total};
+        $est > $tot and $lei->qerr(<<"");
+# $desc estimated matches ($est) exceeds default --limit=$tot
+
+}
+
 sub _mset_more ($$) {
         my ($mset, $mo) = @_;
         my $size = $mset->size;
-        $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit});
+        $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{total});
 }
 
-# $startq will EOF when do_augment is done augmenting and allow
-# query_mset and query_thread_mset to proceed.
+# $startq will see `q' in do_post_augment -> start_mua if spawning MUA.
+# Otherwise $startq will EOF when do_augment is done augmenting and allow
+# query_combined_mset and query_thread_mset to proceed.
 sub wait_startq ($) {
         my ($lei) = @_;
-        my $startq = delete $lei->{startq} or return;
-        while (1) {
-                my $n = sysread($startq, my $do_augment_done, 1);
-                if (defined $n) {
-                        return if $n == 0; # no MUA
-                        if ($do_augment_done eq 'q') {
-                                $lei->{opt}->{quiet} = 1;
-                                delete $lei->{opt}->{verbose};
-                                delete $lei->{-progress};
-                        } else {
-                                $lei->fail("$$ WTF `$do_augment_done'");
-                        }
-                        return;
-                }
-                return $lei->fail("$$ wait_startq: $!") unless $!{EINTR};
-        }
+        read(delete($lei->{startq}) // return, my $buf, 1) or return; # EOF
+        die "BUG: wrote `$buf' to au_done" if $buf ne 'q';
+        $lei->{opt}->{quiet} = 1;
+        delete $lei->{opt}->{verbose};
+        delete $lei->{-progress};
 }
 
 sub mset_progress {
         my $lei = shift;
         return if $lei->{early_mua} || !$lei->{-progress};
         if ($lei->{pkt_op_p}) {
-                pkt_do($lei->{pkt_op_p}, 'mset_progress', @_);
+                $lei->{pkt_op_p}->pkt_do('mset_progress', @_);
         } else { # single lei-daemon consumer
                 my ($desc, $mset_size, $mset_total_est) = @_;
-                $lei->{-mset_total} += $mset_size;
+                $lei->{-mset_total} += $mset_size if $mset_total_est ne '?';
                 $lei->qerr("# $desc $mset_size/$mset_total_est");
         }
 }
 
-sub query_thread_mset { # for --threads
+sub query_one_mset { # for --threads and l2m w/o sort
         my ($self, $ibxish) = @_;
-        local $0 = "$0 query_thread_mset";
         my $lei = $self->{lei};
         my ($srch, $over) = ($ibxish->search, $ibxish->over);
-        my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
-        return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
-        my $mo = { %{$lei->{mset_opt}} };
+        my $dir = $ibxish->{inboxdir} // $ibxish->{topdir};
+        return warn("$dir not indexed by Xapian\n") unless ($srch && $over);
+        bless $srch, 'PublicInbox::LeiSearch'; # for ->qparse_new
+        my $mo = { %{$lei->{mset_opt}} }; # copy
+        local $0 = "$0 1 $mo->{qstr}";
         my $mset;
-        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
+        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+        my $can_kw = !!$ibxish->can('msg_keywords');
+        my $threads = $lei->{opt}->{threads} // 0;
+        my $fl = $threads > 1 ? 1 : undef;
+        my $mid = $lei->{opt}->{'thread-id'};
+        $mo->{threadid} = $over->mid2tid($mid) if defined $mid;
+        my $lss = $lei->{lss};
+        my $maxk = "external.$dir.maxuid"; # max of previous, so our min
+        my $min = $lss ? ($lss->{-cfg}->{$maxk} // 0) : 0;
+        ref($min) and return warn("$maxk=$min has multiple values\n");
+        ($min =~ /[^0-9]/) and return warn("$maxk=$min not numeric\n");
+        my $first_ids;
         do {
-                $mset = $srch->mset($mo->{qstr}, $mo);
-                mset_progress($lei, $desc, $mset->size,
+                $mset = eval { $srch->mset($mo->{qstr}, $mo) };
+                return $lei->child_error(22 << 8, "E: $@") if $@; # 22 from curl
+                mset_progress($lei, $dir, $mo->{offset} + $mset->size,
                                 $mset->get_matches_estimated);
+                wait_startq($lei); # wait for keyword updates
                 my $ids = $srch->mset_to_artnums($mset, $mo);
-                my $ctx = { ids => $ids };
                 my $i = 0;
-                my %n2item = map { ($ids->[$i++], $_) } $mset->items;
-                while ($over->expand_thread($ctx)) {
-                        for my $n (@{$ctx->{xids}}) {
+                if ($threads) {
+                        # copy $ids if $lss since over->expand_thread
+                        # shifts @{$ctx->{ids}}
+                        $first_ids = [ @$ids ] if $lss;
+                        my $ctx = { ids => $ids, min => $min };
+                        my %n2item = map { $ids->[$i++] => $_ } $mset->items;
+                        while ($over->expand_thread($ctx)) { # fills {xids}
+                                for my $n (@{delete $ctx->{xids}}) {
+                                        my $smsg = $over->get_art($n) or next;
+                                        my $mi = delete $n2item{$n};
+                                        next if $smsg->{bytes} == 0;
+                                        if ($mi && $can_kw) {
+                                                mitem_kw($srch, $smsg, $mi, $fl)
+                                        } elsif ($mi && $fl) {
+                                                # call ->xsmsg_vmd, later
+                                                $smsg->{lei_q_tt_flagged} = 1;
+                                        }
+                                        $each_smsg->($smsg, $mi);
+                                }
+                        }
+                } else {
+                        $first_ids = $ids;
+                        my @items = $mset->items; # parallel with @$ids
+                        for my $n (@$ids) {
+                                my $mitem = $items[$i++];
+                                next if $n <= $min;
                                 my $smsg = $over->get_art($n) or next;
-                                wait_startq($lei);
-                                my $mitem = delete $n2item{$smsg->{num}};
+                                next if $smsg->{bytes} == 0;
+                                mitem_kw($srch, $smsg, $mitem, $fl) if $can_kw;
                                 $each_smsg->($smsg, $mitem);
                         }
-                        @{$ctx->{xids}} = ();
                 }
         } while (_mset_more($mset, $mo));
-        undef $each_smsg; # drops @io for l2m->{each_smsg_done}
+        _check_mset_limit($lei, $dir, $mset);
+        if ($lss && scalar(@$first_ids)) {
+                my $max = $first_ids->[0];
+                $lss->cfg_set($maxk, $max);
+                undef $lss;
+        }
+        undef $each_smsg; # may commit
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
-sub query_mset { # non-parallel for non-"--threads" users
+sub query_combined_mset { # non-parallel for non-"--threads" users
         my ($self) = @_;
-        local $0 = "$0 query_mset";
         my $lei = $self->{lei};
         my $mo = { %{$lei->{mset_opt}} };
+        local $0 = "$0 C $mo->{qstr}";
         my $mset;
         for my $loc (locals($self)) {
                 attach_external($self, $loc);
         }
-        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
+        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
         do {
-                $mset = $self->mset($mo->{qstr}, $mo);
-                mset_progress($lei, 'xsearch', $mset->size,
-                                $mset->size, $mset->get_matches_estimated);
+                $mset = eval { $self->mset($mo->{qstr}, $mo) };
+                return $lei->child_error(22 << 8, "E: $@") if $@; # 22 from curl
+                mset_progress($lei, 'xsearch', $mo->{offset} + $mset->size,
+                                $mset->get_matches_estimated);
+                wait_startq($lei); # wait for keyword updates
                 for my $mitem ($mset->items) {
                         my $smsg = smsg_for($self, $mitem) or next;
-                        wait_startq($lei);
                         $each_smsg->($smsg, $mitem);
                 }
         } while (_mset_more($mset, $mo));
-        undef $each_smsg; # drops @io for l2m->{each_smsg_done}
+        _check_mset_limit($lei, 'xsearch', $mset);
+        undef $each_smsg; # may commit
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
-sub each_remote_eml { # callback for MboxReader->mboxrd
-        my ($eml, $self, $lei, $each_smsg) = @_;
-        $lei->{sto}->ipc_do('set_eml', $eml) if $lei->{sto}; # --import-remote
-        my $smsg = bless {}, 'PublicInbox::Smsg';
+sub _smsg_fill ($$) {
+        my ($smsg, $eml) = @_;
         $smsg->populate($eml);
         $smsg->parse_references($eml, mids($eml));
         $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
         delete @$smsg{qw(From Subject -ds -ts)};
+}
+
+sub each_remote_eml { # callback for MboxReader->mboxrd
+        my ($eml, $self, $lei, $each_smsg) = @_;
+        my $xoids = $lei->{ale}->xoids_for($eml, 1);
+        my $smsg = bless {}, 'PublicInbox::Smsg';
+        if ($self->{import_sto} && !$xoids) {
+                my ($res, $kw) = $self->{import_sto}->wq_do('add_eml', $eml);
+                if (ref($res) eq ref($smsg)) { # totally new message
+                        $smsg = $res;
+                        $self->{-sto_imported} = 1;
+                }
+                $smsg->{kw} = $kw; # short-circuit xsmsg_vmd
+        }
+        $smsg->{blob} //= $xoids ? (keys(%$xoids))[0]
+                                : $lei->git_oid($eml)->hexdigest;
+        _smsg_fill($smsg, $eml);
         wait_startq($lei);
+        my $nr = ++$lei->{-nr_remote_eml}; # needed for lss->cfg_set
         if ($lei->{-progress}) {
-                ++$lei->{-nr_remote_eml};
                 my $now = now();
                 my $next = $lei->{-next_progress} //= ($now + 1);
                 if ($now > $next) {
                         $lei->{-next_progress} = $now + 1;
-                        my $nr = $lei->{-nr_remote_eml};
-                        $lei->err("# $lei->{-current_url} $nr/?");
+                        mset_progress($lei, $lei->{-current_url}, $nr, '?');
                 }
         }
         $each_smsg->($smsg, undef, $eml);
 }
 
+sub fudge_qstr_time ($$$) {
+        my ($lei, $uri, $qstr) = @_;
+        return ($qstr, undef) unless $lei->{lss};
+        my $cfg = $lei->{lss}->{-cfg} // die 'BUG: no lss->{-cfg}';
+        my $cfg_key = "external.$uri.lastresult";
+        my $lr = $cfg->{$cfg_key} or return ($qstr, $cfg_key);
+        if ($lr !~ /\A\-?[0-9]+\z/) {
+                $lei->child_error(0,
+                        "$cfg->{-f}: $cfg_key=$lr not an integer, ignoring");
+                return ($qstr, $cfg_key);
+        }
+        my $rft = $lei->{opt}->{'remote-fudge-time'};
+        if ($rft && $rft !~ /\A-?[0-9]+\z/) {
+                my @t = $lei->{lss}->git->date_parse($rft);
+                my $diff = time - $t[0];
+                $lei->qerr("# $rft => $diff seconds");
+                $rft = $diff;
+        }
+        $lr -= ($rft || (48 * 60 * 60));
+        require PublicInbox::Admin;
+        $lei->qerr("# $uri limiting to ".
+                PublicInbox::Admin::fmt_localtime($lr).' and newer');
+        # this should really be rt: (received-time), but no stable
+        # public-inbox releases support it, yet.
+        my $dt = 'dt:'.strftime('%Y%m%d%H%M%S', gmtime($lr)).'..';
+        if ($qstr =~ /\S/) {
+                substr($qstr, 0, 0, '(');
+                $qstr .= ') AND ';
+        }
+        ($qstr .= $dt, $cfg_key);
+}
+
 sub query_remote_mboxrd {
         my ($self, $uris) = @_;
-        local $0 = "$0 query_remote_mboxrd";
         local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap)
         my $lei = $self->{lei};
         my $opt = $lei->{opt};
-        my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
+        my $qstr = $lei->{mset_opt}->{qstr};
+        local $0 = "$0 R $qstr";
+        my @qform = (x => 'm');
         push(@qform, t => 1) if $opt->{threads};
-        my $verbose = $opt->{verbose};
-        my ($reap_tail, $reap_curl);
-        my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1);
-        fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!";
-        my $rdr = { 2 => $cerr, pgid => 0 };
-        my $sigint_reap = $lei->can('sigint_reap');
-        if ($verbose) {
-                # spawn a process to force line-buffering, otherwise curl
+        open my $cerr, '+>', undef;
+        my $rdr = { 2 => $cerr };
+        my @lbf_tee;
+        if ($opt->{verbose}) {
+                # spawn a line-buffered tee(1) script, otherwise curl
                 # will write 1 character at-a-time and parallel outputs
                 # mmmaaayyy llloookkk llliiikkkeee ttthhhiiisss
-                my $o = { 1 => $lei->{2}, 2 => $lei->{2}, pgid => 0 };
-                my $pid = spawn(['tail', '-f', $cerr->filename], undef, $o);
-                $reap_tail = PublicInbox::OnDestroy->new($sigint_reap, $pid);
+                # (n.b. POSIX tee(1) cannot do any buffering)
+                my $o = { 1 => $cerr, 2 => $lei->{2} };
+                delete $rdr->{2};
+                @lbf_tee = ([ $^X, qw(-w -p -e), <<'' ], undef, $o);
+BEGIN { $| = 1; use IO::Handle; STDERR->autoflush(1); }
+print STDERR $_;
+
         }
         my $curl = PublicInbox::LeiCurl->new($lei, $self->{curl}) or return;
         push @$curl, '-s', '-d', '';
         my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+        $self->{import_sto} = $lei->{sto} if $lei->{opt}->{'import-remote'};
+        if (defined(my $mid = $opt->{'thread-id'})) {
+                $mid = mid_escape($mid);
+                for my $uri (@$uris) {
+                        $uri->path($uri->path.$mid.'/');
+                }
+        }
         for my $uri (@$uris) {
                 $lei->{-current_url} = $uri->as_string;
-                $lei->{-nr_remote_eml} = 0;
-                $uri->query_form(@qform);
+                my $start = time;
+                my ($q, $key) = fudge_qstr_time($lei, $uri, $qstr);
+                $uri->query_form(@qform, q => $q);
                 my $cmd = $curl->for_uri($lei, $uri);
                 $lei->qerr("# $cmd");
-                my ($fh, $pid) = popen_rd($cmd, undef, $rdr);
-                $reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid);
-                $fh = IO::Uncompress::Gunzip->new($fh);
-                PublicInbox::MboxReader->mboxrd($fh, \&each_remote_eml, $self,
-                                                $lei, $each_smsg);
-                my $err = waitpid($pid, 0) == $pid ? undef
-                                                : "BUG: waitpid($cmd): $!";
-                @$reap_curl = (); # cancel OnDestroy
-                die $err if $err;
-                my $nr = $lei->{-nr_remote_eml};
-                if ($nr && $lei->{sto}) {
-                        my $wait = $lei->{sto}->ipc_do('done');
-                }
-                if ($? == 0) {
+                $rdr->{2} //= popen_wr(@lbf_tee) if @lbf_tee;
+                my $fh = popen_rd($cmd, undef, $rdr);
+                $fh = IO::Uncompress::Gunzip->new($fh,
+                                        MultiStream => 1, AutoClose => 1);
+                eval {
+                        PublicInbox::MboxReader->mboxrd($fh, \&each_remote_eml,
+                                                $self, $lei, $each_smsg);
+                };
+                my ($exc, $code) = ($@, $?);
+                $lei->sto_barrier_request if delete($self->{-sto_imported});
+                die "E: $exc" if $exc && !$code;
+                my $nr = delete $lei->{-nr_remote_eml} // 0;
+                if (!$code) { # don't update if no results, maybe MTA is down
+                        $lei->{lss}->cfg_set($key, $start) if $key && $nr;
                         mset_progress($lei, $lei->{-current_url}, $nr, $nr);
                         next;
                 }
-                $err = '';
-                if (-s $cerr) {
-                        seek($cerr, 0, SEEK_SET) or
-                                        $lei->err("seek($cmd stderr): $!");
-                        $err = do { local $/; <$cerr> } //
-                                        "read($cmd stderr): $!";
-                        truncate($cerr, 0) or
-                                        $lei->err("truncate($cmd stderr): $!");
-                }
-                next if (($? >> 8) == 22 && $err =~ /\b404\b/);
-                $uri->query_form(q => $lei->{mset_opt}->{qstr});
-                $lei->child_error($?, "E: <$uri> $err");
+                delete($rdr->{2})->close if @lbf_tee;
+                seek($cerr, 0, SEEK_SET);
+                read($cerr, my $err, -s $cerr);
+                truncate($cerr, 0);
+                next if (($code >> 8) == 22 && $err =~ /\b404\b/);
+                $uri->query_form(q => $qstr);
+                $lei->child_error($code, "E: <$uri> `$cmd` failed");
         }
         undef $each_smsg;
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
-# called by LeiOverview::each_smsg_cb
-sub git { $_[0]->{git_tmp} // die 'BUG: caller did not set {git_tmp}' }
-
-sub git_tmp ($) {
-        my ($self) = @_;
-        my (%seen, @dirs);
-        my $tmp = File::Temp->newdir("lei_xsearch_git.$$-XXXX", TMPDIR => 1);
-        for my $ibxish (locals($self)) {
-                my $d = File::Spec->canonpath($ibxish->git->{git_dir});
-                $seen{$d} //= push @dirs, "$d/objects\n"
-        }
-        my $git_dir = $tmp->dirname;
-        PublicInbox::Import::init_bare($git_dir);
-        my $f = "$git_dir/objects/info/alternates";
-        open my $alt, '>', $f or die "open($f): $!";
-        print $alt @dirs or die "print $f: $!";
-        close $alt or die "close $f: $!";
-        my $git = PublicInbox::Git->new($git_dir);
-        $git->{-tmp} = $tmp;
-        $git;
-}
+sub git { $_[0]->{git} // die 'BUG: git uninitialized' }
 
-sub xsearch_done_wait { # dwaitpid callback
-        my ($arg, $pid) = @_;
-        my ($wq, $lei) = @$arg;
-        $lei->child_error($?, 'non-fatal error from '.ref($wq)) if $?;
+sub xsearch_done_wait { # awaitpid cb
+        my ($pid, $wq, $lei) = @_;
+        return if !$?;
+        my $s = $? & 127;
+        return $lei->child_error($?) if $s == 13 || $s == 15;
+        $lei->child_error($?, 'non-fatal error from '.ref($wq)." \$?=$?");
 }
 
 sub query_done { # EOF callback for main daemon
         my ($lei) = @_;
         my $l2m = delete $lei->{l2m};
-        $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m;
-        if (my $lxs = delete $lei->{lxs}) {
-                $lxs->wq_wait_old(\&xsearch_done_wait, $lei);
-        }
+        delete $lei->{lxs};
+        ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
+                warn "BUG: {sto} missing with --mail-sync";
+        $lei->sto_barrier_request;
         $lei->{ovv}->ovv_end($lei);
         if ($l2m) { # close() calls LeiToMail reap_compress
-                if (my $out = delete $lei->{old_1}) {
-                        if (my $mbout = $lei->{1}) {
-                                close($mbout) or return $lei->fail(<<"");
-Error closing $lei->{ovv}->{dst}: $!
-
-                        }
-                        $lei->{1} = $out;
-                }
+                $l2m->finish_output($lei);
                 if ($l2m->lock_free) {
                         $l2m->poke_dst;
                         $lei->poke_mua;
                 } else { # mbox users
-                        $lei->start_mua;
+                        delete $l2m->{mbl}; # drop dotlock
                 }
         }
-        $lei->{-progress} and
-                $lei->err('# ', $lei->{-mset_total} // 0, " matches");
+        my $nr_w = delete($lei->{-nr_write}) // 0;
+        my $nr_dup = (delete($lei->{-nr_seen}) // 0) - $nr_w;
+        if ($lei->{-progress}) {
+                my $tot = $lei->{-mset_total} // 0;
+                my $x = "$tot matches";
+                $x .= ", $nr_dup duplicates" if $nr_dup;
+                if ($l2m) {
+                        my $m = "# $nr_w written to " .
+                                "$lei->{ovv}->{dst} ($x)";
+                        $nr_w ? $lei->qfin($m) : $lei->qerr($m);
+                } else {
+                        $lei->qerr("# $x");
+                }
+        }
+        $lei->start_mua if $l2m && !$l2m->lock_free;
         $lei->dclose;
 }
 
-sub do_post_augment {
+sub post_augment_done { # via on_destroy in top-level lei-daemon
         my ($lei) = @_;
-        my $l2m = $lei->{l2m} or die 'BUG: unexpected do_post_augment';
-        my $err;
-        eval { $l2m->post_augment($lei) };
-        $err = $@;
+        my $err = delete $lei->{post_augment_err};
         if ($err) {
                 if (my $lxs = delete $lei->{lxs}) {
-                        $lxs->wq_kill;
-                        $lxs->wq_close(0, undef, $lei);
+                        $lxs->wq_kill(-POSIX::SIGTERM());
+                        $lxs->wq_close;
                 }
                 $lei->fail("$err");
         }
         if (!$err && delete $lei->{early_mua}) { # non-augment case
-                $lei->start_mua;
+                eval { $lei->start_mua }; # may trigger wait_startq
+                $lei->fail($@) if $@;
         }
-        close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
+        close(delete $lei->{au_done}); # trigger wait_startq if start_mua didn't
+}
+
+sub do_post_augment {
+        my ($lei) = @_;
+        my $l2m = $lei->{l2m} or return; # client disconnected
+        $l2m->post_augment($lei, on_destroy(\&post_augment_done, $lei));
 }
 
 sub incr_post_augment { # called whenever an l2m shard finishes augment
         my ($lei) = @_;
-        my $l2m = $lei->{l2m} or die 'BUG: unexpected incr_post_augment';
+        my $l2m = $lei->{l2m} or return; # client disconnected
         return if ++$lei->{nr_post_augment} != $l2m->{-wq_nr_workers};
         do_post_augment($lei);
 }
@@ -369,14 +469,17 @@ sub concurrency {
         $nl + $nr;
 }
 
-sub start_query { # always runs in main (lei-daemon) process
-        my ($self) = @_;
-        if ($self->{threads}) {
+sub start_query ($$) { # always runs in main (lei-daemon) process
+        my ($self, $lei) = @_;
+        local $PublicInbox::LEI::current_lei = $lei;
+        if ($lei->{opt}->{threads} ||
+                        defined($lei->{opt}->{'thread-id'}) ||
+                        ($lei->{l2m} && !$lei->{opt}->{'sort'})) {
                 for my $ibxish (locals($self)) {
-                        $self->wq_io_do('query_thread_mset', [], $ibxish);
+                        $self->wq_io_do('query_one_mset', [], $ibxish);
                 }
         } elsif (locals($self)) {
-                $self->wq_io_do('query_mset', []);
+                $self->wq_io_do('query_combined_mset', []);
         }
         my $i = 0;
         my $q = [];
@@ -386,71 +489,86 @@ sub start_query { # always runs in main (lei-daemon) process
         for my $uris (@$q) {
                 $self->wq_io_do('query_remote_mboxrd', [], $uris);
         }
-        $self->wq_close(1); # lei_xsearch workers stop when done
+        if ($self->{-do_lcat}) {
+                $self->wq_io_do('lcat_dump', []);
+        }
+        $self->wq_close; # lei_xsearch workers stop when done
 }
 
 sub incr_start_query { # called whenever an l2m shard starts do_post_auth
-        my ($self, $l2m) = @_;
+        my ($lei, $self) = @_;
+        my $l2m = $lei->{l2m};
         return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers};
-        start_query($self);
+        start_query($self, $lei);
 }
 
 sub ipc_atfork_child {
         my ($self) = @_;
-        $self->{lei}->lei_atfork_child;
-        $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
+        $self->{lei}->_lei_atfork_child;
         $self->SUPER::ipc_atfork_child;
 }
 
 sub do_query {
         my ($self, $lei) = @_;
         my $l2m = $lei->{l2m};
+        my $qstr = \($lei->{mset_opt}->{qstr});
+        chomp $$qstr;
+        $$qstr =~ s/[ \n\t]+/ /sg; # make URLs and $0 less ugly
         my $ops = {
-                '|' => [ $lei->can('sigpipe_handler'), $lei ],
-                '!' => [ $lei->can('fail_handler'), $lei ],
-                '.' => [ \&do_post_augment, $lei ],
-                '+' => [ \&incr_post_augment, $lei ],
+                sigpipe_handler => [ $lei ],
+                fail_handler => [ $lei ],
+                do_post_augment => [ \&do_post_augment, $lei ],
+                incr_post_augment => [ \&incr_post_augment, $lei ],
                 '' => [ \&query_done, $lei ],
-                'mset_progress' => [ \&mset_progress, $lei ],
-                'x_it' => [ $lei->can('x_it'), $lei ],
-                'child_error' => [ $lei->can('child_error'), $lei ],
-                'incr_start_query' => [ \&incr_start_query, $self, $l2m ],
+                mset_progress => [ \&mset_progress, $lei ],
+                incr => [ $lei ],
+                x_it => [ $lei ],
+                child_error => [ $lei ],
+                incr_start_query => [ \&incr_start_query, $lei, $self ],
         };
-        $lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth};
-        ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+        $lei->{auth}->op_merge($ops, $l2m, $lei) if $l2m && $lei->{auth};
+        my $end = $lei->pkt_op_pair;
         $lei->{1}->autoflush(1);
         $lei->start_pager if delete $lei->{need_pager};
         $lei->{ovv}->ovv_begin($lei);
+        die 'BUG: xdb|over open' if $lei->{lse}->{xdb} || $lei->{lse}->{over};
         if ($l2m) {
                 $l2m->pre_augment($lei);
                 if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
                         $lei->start_mua;
                 }
+                if ($l2m->{-wq_nr_workers} > 1 &&
+                                $l2m->{base_type} =~ /\A(?:maildir|mbox)\z/) {
+                        # setup two barriers to coordinate ->has_entries
+                        # between l2m workers
+                        pipe(my ($a_r, $a_w)) or die "pipe: $!";
+                        fcntl($a_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+                        pipe(my ($b_r, $b_w)) or die "pipe: $!";
+                        fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+                        $l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
+                }
                 $l2m->wq_workers_start('lei2mail', undef,
-                                        $lei->oldset, { lei => $lei });
+                                        $lei->oldset, { lei => $lei },
+                                        \&xsearch_done_wait, $lei);
                 pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
-                # 1031: F_SETPIPE_SZ
-                fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
-        }
-        if (!$lei->{opt}->{threads} && locals($self)) { # for query_mset
-                # lei->{git_tmp} is set for wq_wait_old so we don't
-                # delete until all lei2mail + lei_xsearch workers are reaped
-                $lei->{git_tmp} = $self->{git_tmp} = git_tmp($self);
+                fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+                delete $l2m->{au_peers};
+                close(delete $l2m->{-wq_s2}); # share wq_s1 with lei_xsearch
         }
         $self->wq_workers_start('lei_xsearch', undef,
-                                $lei->oldset, { lei => $lei });
-        my $op = delete $lei->{pkt_op_c};
+                                $lei->oldset, { lei => $lei },
+                                \&xsearch_done_wait, $lei);
+        my $op_c = delete $lei->{pkt_op_c};
         delete $lei->{pkt_op_p};
-        $self->{threads} = $lei->{opt}->{threads};
+        @$end = ();
+        $self->{-do_lcat} = !!(delete $lei->{lcat_todo});
         if ($l2m) {
-                $l2m->net_merge_complete unless $lei->{auth};
+                $l2m->net_merge_all_done($lei) unless $lei->{auth};
         } else {
-                start_query($self);
+                start_query($self, $lei);
         }
         $lei->event_step_init; # wait for shutdowns
-        if ($lei->{oneshot}) {
-                while ($op->{sock}) { $op->event_step }
-        }
+        $lei->wait_wq_events($op_c, $ops);
 }
 
 sub add_uri {
@@ -460,30 +578,83 @@ sub add_uri {
                 require IO::Uncompress::Gunzip;
                 require PublicInbox::LeiCurl;
                 push @{$self->{remotes}}, $uri;
+                $uri;
         } else {
                 warn "curl missing, ignoring $uri\n";
+                undef;
         }
 }
 
+# returns URI or PublicInbox::Inbox-like object
 sub prepare_external {
         my ($self, $loc, $boost) = @_; # n.b. already ordered by boost
         if (ref $loc) { # already a URI, or PublicInbox::Inbox-like object
                 return add_uri($self, $loc) if $loc->can('scheme');
+                # fall-through on Inbox-like objects
         } elsif ($loc =~ m!\Ahttps?://!) {
                 require URI;
                 return add_uri($self, URI->new($loc));
-        } elsif (-f "$loc/ei.lock") {
+        } elsif (-f "$loc/ei.lock" && -d "$loc/ALL.git/objects") {
                 require PublicInbox::ExtSearch;
+                die "`\\n' not allowed in `$loc'\n" if index($loc, "\n") >= 0;
                 $loc = PublicInbox::ExtSearch->new($loc);
-        } elsif (-f "$loc/inbox.lock" || -d "$loc/public-inbox") {
+        } elsif ((-f "$loc/inbox.lock" && -d "$loc/all.git/objects") ||
+                        (-d "$loc/public-inbox" && -d "$loc/objects")) {
+                die "`\\n' not allowed in `$loc'\n" if index($loc, "\n") >= 0;
                 require PublicInbox::Inbox; # v2, v1
                 $loc = bless { inboxdir => $loc }, 'PublicInbox::Inbox';
+        } elsif (!-e $loc) {
+                warn "W: $loc gone, perhaps run: lei forget-external $loc\n";
+                return undef;
         } else {
-                warn "W: ignoring $loc, unable to determine type\n";
-                return;
+                warn "W: $loc ignored, unable to determine external type\n";
+                return undef;
         }
         push @{$self->{locals}}, $loc;
+        $loc;
+}
+
+sub _lcat_i { # LeiMailSync->each_src iterator callback
+        my ($oidbin, $id, $each_smsg) = @_;
+        $each_smsg->({blob => unpack('H*', $oidbin), pct => 100});
 }
 
+sub _lcat2smsg { # git->cat_async callback
+        my ($bref, $oid, $type, $size, $smsg) = @_;
+        if ($bref) {
+                my $eml = PublicInbox::Eml->new($bref);
+                my $json_dump = delete $smsg->{-json_dump};
+                bless $smsg, 'PublicInbox::Smsg';
+                _smsg_fill($smsg, $eml);
+                $json_dump->($smsg, undef, $eml);
+        }
+}
+
+sub lcat_dump { # via wq_io_do
+        my ($self) = @_;
+        my $lei = $self->{lei};
+        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+        my $git = $lei->{ale}->git;
+        if (!$lei->{l2m}) {
+                my $json_dump = $each_smsg;
+                $each_smsg = sub {
+                        my ($smsg) = @_;
+                        $smsg->{-json_dump} = $json_dump;
+                        $git->cat_async($smsg->{blob}, \&_lcat2smsg, $smsg);
+                };
+        }
+        my $lms;
+        for my $ent (@{$lei->{lcat_todo}}) {
+                if (ref $ent eq 'HASH') { # { fid => $fid ,.. }
+                        $lms //= $lei->{lse}->lms;
+                        $lms->each_src($ent, \&_lcat_i, $each_smsg);
+                } else { # oidhex
+                        $each_smsg->({ blob => $ent, pct => 100 });
+                }
+        }
+        $git->async_wait_all;
+        undef $each_smsg; # may commit
+        $lei->{ovv}->ovv_atexit_child($lei);
+}
 
 1;