diff options
Diffstat (limited to 'lib/PublicInbox/Mbox.pm')
-rw-r--r-- | lib/PublicInbox/Mbox.pm | 115 |
1 files changed, 67 insertions, 48 deletions
diff --git a/lib/PublicInbox/Mbox.pm b/lib/PublicInbox/Mbox.pm index ac565df9..82fba5c6 100644 --- a/lib/PublicInbox/Mbox.pm +++ b/lib/PublicInbox/Mbox.pm @@ -31,8 +31,8 @@ sub async_next { my ($http) = @_; # PublicInbox::HTTP my $ctx = $http->{forward} or return; # client aborted eval { - my $smsg = $ctx->{smsg} or return $ctx->close; - $ctx->smsg_blob($smsg); + my $smsg = $ctx->{smsg} // return $ctx->close; + $ctx->smsg_blob($smsg) if $smsg; }; warn "E: $@" if $@; } @@ -159,6 +159,7 @@ sub all_ids_cb { } $ctx->{ids} = $ids = $over->ids_after(\($ctx->{prev})); } while (@$ids); + undef; } sub mbox_all_ids { @@ -175,52 +176,79 @@ sub mbox_all_ids { PublicInbox::MboxGz::mbox_gz($ctx, \&all_ids_cb, 'all'); } -sub refill_result_ids ($) { - my ($ctx) = @_; +my $refill_ids_cb = sub { # async_mset cb + my ($ctx, $http, $mset, $err) = @_; + $http = undef unless $ctx->{-really_async}; + if ($err) { + warn "E: $err"; + $ctx->close if $http; # our async httpd + return; + } # refill result set, deprioritize since there's many results - my $srch = $ctx->{ibx}->isrch or return $ctx->gone('search'); - my $mset = $srch->mset($ctx->{query}, $ctx->{qopts}); - my $size = $mset->size or return; + my $size = $mset->size or do { + $ctx->close if $http; + $ctx->{-mbox_done} = 1; + return; + }; $ctx->{qopts}->{offset} += $size; - $ctx->{ids} = $srch->mset_to_artnums($mset, $ctx->{qopts}); + $ctx->{ids} = $ctx->{srch}->mset_to_artnums($mset, $ctx->{qopts}); $ctx->{-low_prio} = 1; # true -} + return if !$http; + eval { + my $smsg = results_cb($ctx) // return $ctx->close; + return if !$smsg; # '' wait for async_mset + $ctx->smsg_blob($ctx->{smsg} = $smsg); + }; + warn "E: $@" if $@; +}; -sub results_cb { - my ($ctx) = @_; +sub results_cb { # async_next or MboxGz->getline cb + my ($ctx, $http) = @_; my $over = $ctx->{ibx}->over or return $ctx->gone('over'); while (1) { - while (defined(my $num = shift(@{$ctx->{ids}}))) { + my $ids = $ctx->{xids} // $ctx->{ids}; + while (defined(my $num = shift(@$ids))) { my $smsg = $over->get_art($num) or next; return $smsg; } - refill_result_ids($ctx) or return; # refill ctx->{ids} + next if $ctx->{xids} && $over->expand_thread($ctx); + return '' if $ctx->{srch}->async_mset(@$ctx{qw(query qopts)}, + $refill_ids_cb, $ctx, $http); + return if $ctx->{-mbox_done}; } } -sub results_thread_cb { - my ($ctx) = @_; - - my $over = $ctx->{ibx}->over or return $ctx->gone('over'); - while (1) { - while (defined(my $num = shift(@{$ctx->{xids}}))) { - my $smsg = $over->get_art($num) or next; - return $smsg; - } - next if $over->expand_thread($ctx); # refills ctx->{xids} - - refill_result_ids($ctx) or return; # refill ctx->{ids} +sub mbox_qry_cb { # async_mset cb + my ($ctx, $q, $mset, $err) = @_; + my $wcb = delete $ctx->{wcb}; + if ($err) { + warn "E: $err"; + return $wcb->([500, [qw(Content-Type text/plain)], + [ "Internal server error\n" ]]) } + $ctx->{qopts}->{offset} = $mset->size or + return $wcb->([404, [qw(Content-Type text/plain)], + ["No results found\n"]]); + $ctx->{ids} = $ctx->{srch}->mset_to_artnums($mset, $ctx->{qopts}); + my $fn; + if ($q->{t} && $ctx->{srch}->has_threadid) { + $ctx->{xids} = []; # triggers over->expand_thread + $fn = "results-thread-$ctx->{query}"; + } else { + $fn = "results-$ctx->{query}"; + } + require PublicInbox::MboxGz; + my $res = PublicInbox::MboxGz::mbox_gz($ctx, \&results_cb, $fn); + ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res); } sub mbox_all { my ($ctx, $q) = @_; - my $q_string = $q->{'q'}; - return mbox_all_ids($ctx) if $q_string !~ /\S/; - my $srch = $ctx->{ibx}->isrch or + my $qstr = $q->{'q'}; + return mbox_all_ids($ctx) if $qstr !~ /\S/; + my $srch = $ctx->{srch} = $ctx->{ibx}->isrch or return PublicInbox::WWW::need($ctx, 'Search'); - - my $qopts = $ctx->{qopts} = { relevance => -2 }; # ORDER BY docid DESC + my $opt = $ctx->{qopts} = { relevance => -2 }; # ORDER BY docid DESC # {threadid} limits results to a given thread # {threads} collapses results from messages in the same thread, @@ -230,25 +258,16 @@ sub mbox_all { $ctx->{ibx}->{isrch}->{es}->over : $ctx->{ibx}->over) or return PublicInbox::WWW::need($ctx, 'Overview'); - $qopts->{threadid} = $over->mid2tid($ctx->{mid}); - } - $qopts->{threads} = 1 if $q->{t}; - $srch->query_approxidate($ctx->{ibx}->git, $q_string); - my $mset = $srch->mset($q_string, $qopts); - $qopts->{offset} = $mset->size or - return [404, [qw(Content-Type text/plain)], - ["No results found\n"]]; - $ctx->{query} = $q_string; - $ctx->{ids} = $srch->mset_to_artnums($mset, $qopts); - require PublicInbox::MboxGz; - my $fn; - if ($q->{t} && $srch->has_threadid) { - $fn = 'results-thread-'.$q_string; - PublicInbox::MboxGz::mbox_gz($ctx, \&results_thread_cb, $fn); - } else { - $fn = 'results-'.$q_string; - PublicInbox::MboxGz::mbox_gz($ctx, \&results_cb, $fn); + $opt->{threadid} = $over->mid2tid($ctx->{mid}); } + $opt->{threads} = 1 if $q->{t}; + $srch->query_approxidate($ctx->{ibx}->git, $qstr); + $ctx->{query} = $qstr; + sub { # called by PSGI server + $ctx->{wcb} = $_[0]; # PSGI server supplied write cb + $srch->async_mset($qstr, $opt, \&mbox_qry_cb, $ctx, $q) and + $ctx->{-really_async} = 1; + }; } 1; |