about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiXSearch.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r--lib/PublicInbox/LeiXSearch.pm33
1 files changed, 23 insertions, 10 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index fc95d401..43dedd10 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -13,7 +13,7 @@ use File::Temp 0.19 (); # 0.19 for ->newdir
 use File::Spec ();
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::Spawn qw(popen_rd popen_wr which);
-use PublicInbox::MID qw(mids);
+use PublicInbox::MID qw(mids mid_escape);
 use PublicInbox::Smsg;
 use PublicInbox::Eml;
 use PublicInbox::LEI;
@@ -22,6 +22,7 @@ use PublicInbox::ContentHash qw(git_sha);
 use POSIX qw(strftime);
 use autodie qw(close open read seek truncate);
 use PublicInbox::Syscall qw($F_SETPIPE_SZ);
+use PublicInbox::OnDestroy;
 
 sub new {
         my ($class) = @_;
@@ -160,6 +161,8 @@ sub query_one_mset { # for --threads and l2m w/o sort
         my $can_kw = !!$ibxish->can('msg_keywords');
         my $threads = $lei->{opt}->{threads} // 0;
         my $fl = $threads > 1 ? 1 : undef;
+        my $mid = $lei->{opt}->{'thread-id'};
+        $mo->{threadid} = $over->mid2tid($mid) if defined $mid;
         my $lss = $lei->{lss};
         my $maxk = "external.$dir.maxuid"; # max of previous, so our min
         my $min = $lss ? ($lss->{-cfg}->{$maxk} // 0) : 0;
@@ -339,6 +342,12 @@ print STDERR $_;
         push @$curl, '-s', '-d', '';
         my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
         $self->{import_sto} = $lei->{sto} if $lei->{opt}->{'import-remote'};
+        if (defined(my $mid = $opt->{'thread-id'})) {
+                $mid = mid_escape($mid);
+                for my $uri (@$uris) {
+                        $uri->path($uri->path.$mid.'/');
+                }
+        }
         for my $uri (@$uris) {
                 $lei->{-current_url} = $uri->as_string;
                 my $start = time;
@@ -355,7 +364,7 @@ print STDERR $_;
                                                 $self, $lei, $each_smsg);
                 };
                 my ($exc, $code) = ($@, $?);
-                $lei->sto_done_request if delete($self->{-sto_imported});
+                $lei->sto_barrier_request if delete($self->{-sto_imported});
                 die "E: $exc" if $exc && !$code;
                 my $nr = delete $lei->{-nr_remote_eml} // 0;
                 if (!$code) { # don't update if no results, maybe MTA is down
@@ -391,7 +400,7 @@ sub query_done { # EOF callback for main daemon
         delete $lei->{lxs};
         ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
                 warn "BUG: {sto} missing with --mail-sync";
-        $lei->sto_done_request;
+        $lei->sto_barrier_request;
         $lei->{ovv}->ovv_end($lei);
         if ($l2m) { # close() calls LeiToMail reap_compress
                 $l2m->finish_output($lei);
@@ -420,11 +429,9 @@ sub query_done { # EOF callback for main daemon
         $lei->dclose;
 }
 
-sub do_post_augment {
+sub post_augment_done { # via on_destroy in top-level lei-daemon
         my ($lei) = @_;
-        my $l2m = $lei->{l2m} or return; # client disconnected
-        eval { $l2m->post_augment($lei) };
-        my $err = $@;
+        my $err = delete $lei->{post_augment_err};
         if ($err) {
                 if (my $lxs = delete $lei->{lxs}) {
                         $lxs->wq_kill(-POSIX::SIGTERM());
@@ -439,6 +446,12 @@ sub do_post_augment {
         close(delete $lei->{au_done}); # trigger wait_startq if start_mua didn't
 }
 
+sub do_post_augment {
+        my ($lei) = @_;
+        my $l2m = $lei->{l2m} or return; # client disconnected
+        $l2m->post_augment($lei, on_destroy(\&post_augment_done, $lei));
+}
+
 sub incr_post_augment { # called whenever an l2m shard finishes augment
         my ($lei) = @_;
         my $l2m = $lei->{l2m} or return; # client disconnected
@@ -459,7 +472,9 @@ sub concurrency {
 sub start_query ($$) { # always runs in main (lei-daemon) process
         my ($self, $lei) = @_;
         local $PublicInbox::LEI::current_lei = $lei;
-        if ($self->{opt_threads} || ($lei->{l2m} && !$self->{opt_sort})) {
+        if ($lei->{opt}->{threads} ||
+                        defined($lei->{opt}->{'thread-id'}) ||
+                        ($lei->{l2m} && !$lei->{opt}->{'sort'})) {
                 for my $ibxish (locals($self)) {
                         $self->wq_io_do('query_one_mset', [], $ibxish);
                 }
@@ -546,8 +561,6 @@ sub do_query {
         my $op_c = delete $lei->{pkt_op_c};
         delete $lei->{pkt_op_p};
         @$end = ();
-        $self->{opt_threads} = $lei->{opt}->{threads};
-        $self->{opt_sort} = $lei->{opt}->{'sort'};
         $self->{-do_lcat} = !!(delete $lei->{lcat_todo});
         if ($l2m) {
                 $l2m->net_merge_all_done($lei) unless $lei->{auth};