about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiXSearch.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LeiXSearch.pm')
-rw-r--r--lib/PublicInbox/LeiXSearch.pm51
1 files changed, 39 insertions, 12 deletions
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index e577ab09..95862306 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -8,12 +8,11 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::DS qw(dwaitpid);
-use PublicInbox::PktOp;
+use PublicInbox::DS qw(dwaitpid now);
+use PublicInbox::PktOp qw(pkt_do);
 use PublicInbox::Import;
 use File::Temp 0.19 (); # 0.19 for ->newdir
 use File::Spec ();
-use Socket qw(MSG_EOR);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::Spawn qw(popen_rd spawn which);
 use PublicInbox::MID qw(mids);
@@ -97,7 +96,7 @@ sub over {}
 sub _mset_more ($$) {
         my ($mset, $mo) = @_;
         my $size = $mset->size;
-        $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+        $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit});
 }
 
 # $startq will EOF when query_prepare is done augmenting and allow
@@ -115,16 +114,15 @@ sub query_thread_mset { # for --thread
         my $startq = delete $lei->{startq};
 
         my ($srch, $over) = ($ibxish->search, $ibxish->over);
-        unless ($srch && $over) {
-                my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
-                warn "$desc not indexed by Xapian\n";
-                return;
-        }
+        my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+        return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
         my $mo = { %{$lei->{mset_opt}} };
         my $mset;
         my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
         do {
                 $mset = $srch->mset($mo->{qstr}, $mo);
+                pkt_do($lei->{pkt_op}, 'mset_progress', $desc, $mset->size,
+                                $mset->get_matches_estimated);
                 my $ids = $srch->mset_to_artnums($mset, $mo);
                 my $ctx = { ids => $ids };
                 my $i = 0;
@@ -156,6 +154,8 @@ sub query_mset { # non-parallel for non-"--thread" users
         my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
         do {
                 $mset = $self->mset($mo->{qstr}, $mo);
+                pkt_do($lei->{pkt_op}, 'mset_progress', 'xsearch',
+                                $mset->size, $mset->get_matches_estimated);
                 for my $mitem ($mset->items) {
                         my $smsg = smsg_for($self, $mitem) or next;
                         wait_startq($startq) if $startq;
@@ -174,6 +174,16 @@ sub each_eml { # callback for MboxReader->mboxrd
         $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
         delete @$smsg{qw(From Subject -ds -ts)};
         if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+        ++$lei->{-nr_remote_eml};
+        if (!$lei->{opt}->{quiet}) {
+                my $now = now();
+                my $next = $lei->{-next_progress} //= ($now + 1);
+                if ($now > $next) {
+                        $lei->{-next_progress} = $now + 1;
+                        my $nr = $lei->{-nr_remote_eml};
+                        $lei->err("# $lei->{-current_url} $nr/?");
+                }
+        }
         $each_smsg->($smsg, undef, $eml);
 }
 
@@ -223,6 +233,8 @@ sub query_remote_mboxrd {
         my $tor = $opt->{torsocks} //= 'auto';
         my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
         for my $uri (@$uris) {
+                $lei->{-current_url} = $uri->as_string;
+                $lei->{-nr_remote_eml} = 0;
                 $uri->query_form(@qform);
                 my $cmd = [ @cmd, $uri->as_string ];
                 if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' &&
@@ -246,7 +258,12 @@ sub query_remote_mboxrd {
                                                         $lei, $each_smsg);
                 };
                 return $lei->fail("E: @$cmd: $@") if $@;
-                next unless $?;
+                if ($? == 0) {
+                        my $nr = $lei->{-nr_remote_eml};
+                        pkt_do($lei->{pkt_op}, 'mset_progress',
+                                $lei->{-current_url}, $nr, $nr);
+                        next;
+                }
                 seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
                 my $e = do { local $/; <$cerr> } //
                                 die "read(curl stderr): $!\n";
@@ -299,9 +316,19 @@ Error closing $lei->{ovv}->{dst}: $!
                 }
                 $lei->start_mua;
         }
+        $lei->{opt}->{quiet} or
+                $lei->err('# ', $lei->{-mset_total} // 0, " matches");
         $lei->dclose;
 }
 
+sub mset_progress { # called via pkt_op/pkt_do from workers
+        my ($lei, $pargs) = @_;
+        my ($desc, $mset_size, $mset_total_est) = @$pargs;
+        return if $lei->{opt}->{quiet};
+        $lei->{-mset_total} += $mset_size;
+        $lei->err("# $desc $mset_size/$mset_total_est");
+}
+
 sub do_post_augment {
         my ($lei, $zpipe, $au_done) = @_;
         my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
@@ -354,8 +381,7 @@ sub query_prepare { # called by wq_do
         delete $lei->{l2m}->{-wq_s1};
         eval { $lei->{l2m}->do_augment($lei) };
         $lei->fail($@) if $@;
-        send($lei->{pkt_op}, '.', MSG_EOR) == 1 or
-                die "do_post_augment trigger: $!"
+        pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
 sub fail_handler ($;$$) {
@@ -388,6 +414,7 @@ sub do_query {
                 '!' => [ \&fail_handler, $lei ],
                 '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
                 '' => [ \&query_done, $lei ],
+                'mset_progress' => [ \&mset_progress, $lei ],
         };
         (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop);
         my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);