about summary refs log tree commit homepage
path: root/lib/PublicInbox/Mbox.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/Mbox.pm')
-rw-r--r--lib/PublicInbox/Mbox.pm115
1 files changed, 67 insertions, 48 deletions
diff --git a/lib/PublicInbox/Mbox.pm b/lib/PublicInbox/Mbox.pm
index ac565df9..82fba5c6 100644
--- a/lib/PublicInbox/Mbox.pm
+++ b/lib/PublicInbox/Mbox.pm
@@ -31,8 +31,8 @@ sub async_next {
         my ($http) = @_; # PublicInbox::HTTP
         my $ctx = $http->{forward} or return; # client aborted
         eval {
-                my $smsg = $ctx->{smsg} or return $ctx->close;
-                $ctx->smsg_blob($smsg);
+                my $smsg = $ctx->{smsg} // return $ctx->close;
+                $ctx->smsg_blob($smsg) if $smsg;
         };
         warn "E: $@" if $@;
 }
@@ -159,6 +159,7 @@ sub all_ids_cb {
                 }
                 $ctx->{ids} = $ids = $over->ids_after(\($ctx->{prev}));
         } while (@$ids);
+        undef;
 }
 
 sub mbox_all_ids {
@@ -175,52 +176,79 @@ sub mbox_all_ids {
         PublicInbox::MboxGz::mbox_gz($ctx, \&all_ids_cb, 'all');
 }
 
-sub refill_result_ids ($) {
-        my ($ctx) = @_;
+my $refill_ids_cb = sub { # async_mset cb
+        my ($ctx, $http, $mset, $err) = @_;
+        $http = undef unless $ctx->{-really_async};
+        if ($err) {
+                warn "E: $err";
+                $ctx->close if $http; # our async httpd
+                return;
+        }
         # refill result set, deprioritize since there's many results
-        my $srch = $ctx->{ibx}->isrch or return $ctx->gone('search');
-        my $mset = $srch->mset($ctx->{query}, $ctx->{qopts});
-        my $size = $mset->size or return;
+        my $size = $mset->size or do {
+                $ctx->close if $http;
+                $ctx->{-mbox_done} = 1;
+                return;
+        };
         $ctx->{qopts}->{offset} += $size;
-        $ctx->{ids} = $srch->mset_to_artnums($mset, $ctx->{qopts});
+        $ctx->{ids} = $ctx->{srch}->mset_to_artnums($mset, $ctx->{qopts});
         $ctx->{-low_prio} = 1; # true
-}
+        return if !$http;
+        eval {
+                my $smsg = results_cb($ctx) // return $ctx->close;
+                return if !$smsg; # '' wait for async_mset
+                $ctx->smsg_blob($ctx->{smsg} = $smsg);
+        };
+        warn "E: $@" if $@;
+};
 
-sub results_cb {
-        my ($ctx) = @_;
+sub results_cb { # async_next or MboxGz->getline cb
+        my ($ctx, $http) = @_;
         my $over = $ctx->{ibx}->over or return $ctx->gone('over');
         while (1) {
-                while (defined(my $num = shift(@{$ctx->{ids}}))) {
+                my $ids = $ctx->{xids} // $ctx->{ids};
+                while (defined(my $num = shift(@$ids))) {
                         my $smsg = $over->get_art($num) or next;
                         return $smsg;
                 }
-                refill_result_ids($ctx) or return; # refill ctx->{ids}
+                next if $ctx->{xids} && $over->expand_thread($ctx);
+                return '' if $ctx->{srch}->async_mset(@$ctx{qw(query qopts)},
+                                                $refill_ids_cb, $ctx, $http);
+                return if $ctx->{-mbox_done};
         }
 }
 
-sub results_thread_cb {
-        my ($ctx) = @_;
-
-        my $over = $ctx->{ibx}->over or return $ctx->gone('over');
-        while (1) {
-                while (defined(my $num = shift(@{$ctx->{xids}}))) {
-                        my $smsg = $over->get_art($num) or next;
-                        return $smsg;
-                }
-                next if $over->expand_thread($ctx); # refills ctx->{xids}
-
-                refill_result_ids($ctx) or return; # refill ctx->{ids}
+sub mbox_qry_cb { # async_mset cb
+        my ($ctx, $q, $mset, $err) = @_;
+        my $wcb = delete $ctx->{wcb};
+        if ($err) {
+                warn "E: $err";
+                return $wcb->([500, [qw(Content-Type text/plain)],
+                                [ "Internal server error\n" ]])
         }
+        $ctx->{qopts}->{offset} = $mset->size or
+                        return $wcb->([404, [qw(Content-Type text/plain)],
+                                        ["No results found\n"]]);
+        $ctx->{ids} = $ctx->{srch}->mset_to_artnums($mset, $ctx->{qopts});
+        my $fn;
+        if ($q->{t} && $ctx->{srch}->has_threadid) {
+                $ctx->{xids} = []; # triggers over->expand_thread
+                $fn = "results-thread-$ctx->{query}";
+        } else {
+                $fn = "results-$ctx->{query}";
+        }
+        require PublicInbox::MboxGz;
+        my $res = PublicInbox::MboxGz::mbox_gz($ctx, \&results_cb, $fn);
+        ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res);
 }
 
 sub mbox_all {
         my ($ctx, $q) = @_;
-        my $q_string = $q->{'q'};
-        return mbox_all_ids($ctx) if $q_string !~ /\S/;
-        my $srch = $ctx->{ibx}->isrch or
+        my $qstr = $q->{'q'};
+        return mbox_all_ids($ctx) if $qstr !~ /\S/;
+        my $srch = $ctx->{srch} = $ctx->{ibx}->isrch or
                 return PublicInbox::WWW::need($ctx, 'Search');
-
-        my $qopts = $ctx->{qopts} = { relevance => -2 }; # ORDER BY docid DESC
+        my $opt = $ctx->{qopts} = { relevance => -2 }; # ORDER BY docid DESC
 
         # {threadid} limits results to a given thread
         # {threads} collapses results from messages in the same thread,
@@ -230,25 +258,16 @@ sub mbox_all {
                                 $ctx->{ibx}->{isrch}->{es}->over :
                                 $ctx->{ibx}->over) or
                         return PublicInbox::WWW::need($ctx, 'Overview');
-                $qopts->{threadid} = $over->mid2tid($ctx->{mid});
-        }
-        $qopts->{threads} = 1 if $q->{t};
-        $srch->query_approxidate($ctx->{ibx}->git, $q_string);
-        my $mset = $srch->mset($q_string, $qopts);
-        $qopts->{offset} = $mset->size or
-                        return [404, [qw(Content-Type text/plain)],
-                                ["No results found\n"]];
-        $ctx->{query} = $q_string;
-        $ctx->{ids} = $srch->mset_to_artnums($mset, $qopts);
-        require PublicInbox::MboxGz;
-        my $fn;
-        if ($q->{t} && $srch->has_threadid) {
-                $fn = 'results-thread-'.$q_string;
-                PublicInbox::MboxGz::mbox_gz($ctx, \&results_thread_cb, $fn);
-        } else {
-                $fn = 'results-'.$q_string;
-                PublicInbox::MboxGz::mbox_gz($ctx, \&results_cb, $fn);
+                $opt->{threadid} = $over->mid2tid($ctx->{mid});
         }
+        $opt->{threads} = 1 if $q->{t};
+        $srch->query_approxidate($ctx->{ibx}->git, $qstr);
+        $ctx->{query} = $qstr;
+        sub { # called by PSGI server
+                $ctx->{wcb} = $_[0]; # PSGI server supplied write cb
+                $srch->async_mset($qstr, $opt, \&mbox_qry_cb, $ctx, $q) and
+                        $ctx->{-really_async} = 1;
+        };
 }
 
 1;