diff options
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r-- | lib/PublicInbox/LeiXSearch.pm | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index fc95d401..43dedd10 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -13,7 +13,7 @@ use File::Temp 0.19 (); # 0.19 for ->newdir use File::Spec (); use PublicInbox::Search qw(xap_terms); use PublicInbox::Spawn qw(popen_rd popen_wr which); -use PublicInbox::MID qw(mids); +use PublicInbox::MID qw(mids mid_escape); use PublicInbox::Smsg; use PublicInbox::Eml; use PublicInbox::LEI; @@ -22,6 +22,7 @@ use PublicInbox::ContentHash qw(git_sha); use POSIX qw(strftime); use autodie qw(close open read seek truncate); use PublicInbox::Syscall qw($F_SETPIPE_SZ); +use PublicInbox::OnDestroy; sub new { my ($class) = @_; @@ -160,6 +161,8 @@ sub query_one_mset { # for --threads and l2m w/o sort my $can_kw = !!$ibxish->can('msg_keywords'); my $threads = $lei->{opt}->{threads} // 0; my $fl = $threads > 1 ? 1 : undef; + my $mid = $lei->{opt}->{'thread-id'}; + $mo->{threadid} = $over->mid2tid($mid) if defined $mid; my $lss = $lei->{lss}; my $maxk = "external.$dir.maxuid"; # max of previous, so our min my $min = $lss ? ($lss->{-cfg}->{$maxk} // 0) : 0; @@ -339,6 +342,12 @@ print STDERR $_; push @$curl, '-s', '-d', ''; my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); $self->{import_sto} = $lei->{sto} if $lei->{opt}->{'import-remote'}; + if (defined(my $mid = $opt->{'thread-id'})) { + $mid = mid_escape($mid); + for my $uri (@$uris) { + $uri->path($uri->path.$mid.'/'); + } + } for my $uri (@$uris) { $lei->{-current_url} = $uri->as_string; my $start = time; @@ -355,7 +364,7 @@ print STDERR $_; $self, $lei, $each_smsg); }; my ($exc, $code) = ($@, $?); - $lei->sto_done_request if delete($self->{-sto_imported}); + $lei->sto_barrier_request if delete($self->{-sto_imported}); die "E: $exc" if $exc && !$code; my $nr = delete $lei->{-nr_remote_eml} // 0; if (!$code) { # don't update if no results, maybe MTA is down @@ -391,7 +400,7 @@ sub query_done { # EOF callback for main daemon delete $lei->{lxs}; ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and warn "BUG: {sto} missing with --mail-sync"; - $lei->sto_done_request; + $lei->sto_barrier_request; $lei->{ovv}->ovv_end($lei); if ($l2m) { # close() calls LeiToMail reap_compress $l2m->finish_output($lei); @@ -420,11 +429,9 @@ sub query_done { # EOF callback for main daemon $lei->dclose; } -sub do_post_augment { +sub post_augment_done { # via on_destroy in top-level lei-daemon my ($lei) = @_; - my $l2m = $lei->{l2m} or return; # client disconnected - eval { $l2m->post_augment($lei) }; - my $err = $@; + my $err = delete $lei->{post_augment_err}; if ($err) { if (my $lxs = delete $lei->{lxs}) { $lxs->wq_kill(-POSIX::SIGTERM()); @@ -439,6 +446,12 @@ sub do_post_augment { close(delete $lei->{au_done}); # trigger wait_startq if start_mua didn't } +sub do_post_augment { + my ($lei) = @_; + my $l2m = $lei->{l2m} or return; # client disconnected + $l2m->post_augment($lei, on_destroy(\&post_augment_done, $lei)); +} + sub incr_post_augment { # called whenever an l2m shard finishes augment my ($lei) = @_; my $l2m = $lei->{l2m} or return; # client disconnected @@ -459,7 +472,9 @@ sub concurrency { sub start_query ($$) { # always runs in main (lei-daemon) process my ($self, $lei) = @_; local $PublicInbox::LEI::current_lei = $lei; - if ($self->{opt_threads} || ($lei->{l2m} && !$self->{opt_sort})) { + if ($lei->{opt}->{threads} || + defined($lei->{opt}->{'thread-id'}) || + ($lei->{l2m} && !$lei->{opt}->{'sort'})) { for my $ibxish (locals($self)) { $self->wq_io_do('query_one_mset', [], $ibxish); } @@ -546,8 +561,6 @@ sub do_query { my $op_c = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; @$end = (); - $self->{opt_threads} = $lei->{opt}->{threads}; - $self->{opt_sort} = $lei->{opt}->{'sort'}; $self->{-do_lcat} = !!(delete $lei->{lcat_todo}); if ($l2m) { $l2m->net_merge_all_done($lei) unless $lei->{auth}; |