about summary refs log tree commit homepage
path: root/lib/PublicInbox/XapHelper.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/XapHelper.pm')
-rw-r--r--lib/PublicInbox/XapHelper.pm105
1 files changed, 78 insertions, 27 deletions
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index 8c7732f5..ba41b5d2 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -18,7 +18,7 @@ use POSIX qw(:signal_h);
 use Fcntl qw(LOCK_UN LOCK_EX);
 use Carp qw(croak);
 my $X = \%PublicInbox::Search::X;
-our (%SRCH, %WORKERS, $nworker, $workerset, $in);
+our (%SRCH, %WORKERS, $nworker, $workerset, $in, $SHARD_NFD, $MY_FD_MAX);
 our $stderr = \*STDERR;
 
 sub cmd_test_inspect {
@@ -27,6 +27,8 @@ sub cmd_test_inspect {
                 ($req->{srch}->has_threadid ? 1 : 0)
 }
 
+sub cmd_test_sleep { select(undef, undef, undef, 0.01) while 1 }
+
 sub iter_retry_check ($) {
         if (ref($@) =~ /\bDatabaseModifiedError\b/) {
                 $_[0]->{srch}->reopen;
@@ -147,17 +149,8 @@ sub cmd_dump_roots {
 
 sub mset_iter ($$) {
         my ($req, $it) = @_;
-        eval {
-                my $buf = $it->get_docid;
-                $buf .= "\0".$it->get_percent if $req->{p};
-                my $doc = ($req->{A} || $req->{D}) ? $it->get_document : undef;
-                for my $p (@{$req->{A}}) {
-                        $buf .= "\0".$p.$_ for xap_terms($p, $doc);
-                }
-                $buf .= "\0".$doc->get_data if $req->{D};
-                say { $req->{0} } $buf;
-        };
-        $@ ? iter_retry_check($req) : 0;
+        say { $req->{0} } $it->get_docid, "\0",
+                        $it->get_percent, "\0", $it->get_rank;
 }
 
 sub cmd_mset { # to be used by WWW + IMAP
@@ -170,7 +163,8 @@ sub cmd_mset { # to be used by WWW + IMAP
         $opt->{eidx_key} = $req->{O} if defined $req->{O};
         $opt->{threadid} = $req->{T} if defined $req->{T};
         my $mset = $req->{srch}->mset($qry_str, $opt);
-        say { $req->{0} } 'mset.size=', $mset->size;
+        say { $req->{0} } 'mset.size=', $mset->size,
+                ' .get_matches_estimated=', $mset->get_matches_estimated;
         for my $it ($mset->items) {
                 for (my $t = 10; $t > 0; --$t) {
                         $t = mset_iter($req, $it) // $t;
@@ -178,36 +172,76 @@ sub cmd_mset { # to be used by WWW + IMAP
         }
 }
 
+sub srch_init_extra ($) {
+        my ($req) = @_;
+        my $qp = $req->{srch}->{qp};
+        for (@{$req->{Q}}) {
+                my ($upfx, $m, $xpfx) = split /([:=])/;
+                $xpfx // die "E: bad -Q $_";
+                $m = $m eq '=' ? 'add_boolean_prefix' : 'add_prefix';
+                $qp->$m($upfx, $xpfx);
+        }
+        $req->{srch}->{qp_extra_done} = 1;
+}
+
 sub dispatch {
         my ($req, $cmd, @argv) = @_;
         my $fn = $req->can("cmd_$cmd") or return;
         $GLP->getoptionsfromarray(\@argv, $req, @PublicInbox::Search::XH_SPEC)
                 or return;
         my $dirs = delete $req->{d} or die 'no -d args';
-        my $key = join("\0", @$dirs);
-        $req->{srch} = $SRCH{$key} //= do {
-                my $new = { qp_flags => $PublicInbox::Search::QP_FLAGS };
+        my $key = "-d\0".join("\0-d\0", @$dirs);
+        $key .= "\0".join("\0", map { ('-Q', $_) } @{$req->{Q}}) if $req->{Q};
+        my $new;
+        $req->{srch} = $SRCH{$key} // do {
+                $new = { qp_flags => $PublicInbox::Search::QP_FLAGS };
+                my $nfd = scalar(@$dirs) * PublicInbox::Search::SHARD_COST;
+                $SHARD_NFD += $nfd;
+                if ($SHARD_NFD > $MY_FD_MAX) {
+                        $SHARD_NFD = $nfd;
+                        %SRCH = ();
+                }
                 my $first = shift @$dirs;
-                my $slow_phrase = -f "$first/iamchert";
-                $new->{xdb} = $X->{Database}->new($first);
-                for (@$dirs) {
-                        $slow_phrase ||= -f "$_/iamchert";
-                        $new->{xdb}->add_database($X->{Database}->new($_));
+                for my $retried (0, 1) {
+                        my $slow_phrase = -f "$first/iamchert";
+                        eval {
+                                $new->{xdb} = $X->{Database}->new($first);
+                                for (@$dirs) {
+                                        $slow_phrase ||= -f "$_/iamchert";
+                                        $new->{xdb}->add_database(
+                                                        $X->{Database}->new($_))
+                                }
+                        };
+                        last unless $@;
+                        if ($retried) {
+                                die "E: $@\n";
+                        } else { # may be EMFILE/ENFILE/ENOMEM....
+                                warn "W: $@, retrying...\n";
+                                %SRCH = ();
+                                $SHARD_NFD = $nfd;
+                        }
+                        $slow_phrase or $new->{qp_flags}
+                                |= PublicInbox::Search::FLAG_PHRASE();
                 }
-                $slow_phrase or
-                        $new->{qp_flags} |= PublicInbox::Search::FLAG_PHRASE();
                 bless $new, $req->{c} ? 'PublicInbox::CodeSearch' :
                                         'PublicInbox::Search';
                 $new->{qp} = $new->qparse_new;
-                $new;
+                $SRCH{$key} = $new;
         };
+        $req->{srch}->{xdb}->reopen unless $new;
+        $req->{Q} && !$req->{srch}->{qp_extra_done} and
+                srch_init_extra $req;
+        my $timeo = $req->{K};
+        alarm($timeo) if $timeo;
         $fn->($req, @argv);
+        alarm(0) if $timeo;
 }
 
 sub recv_loop {
         local $SIG{__WARN__} = sub { print $stderr @_ };
         my $rbuf;
         local $SIG{TERM} = sub { undef $in };
+        local $SIG{USR1} = \&reopen_logs;
         while (defined($in)) {
                 PublicInbox::DS::sig_setmask($workerset);
                 my @fds = eval { # we undef $in in SIG{TERM}
@@ -219,7 +253,7 @@ sub recv_loop {
                 }
                 scalar(@fds) or exit(66); # EX_NOINPUT
                 die "recvmsg: $!" if !defined($fds[0]);
-                PublicInbox::DS::block_signals();
+                PublicInbox::DS::block_signals(POSIX::SIGALRM);
                 my $req = bless {}, __PACKAGE__;
                 my $i = 0;
                 open($req->{$i++}, '+<&=', $_) for @fds;
@@ -271,6 +305,18 @@ sub do_sigttou {
         }
 }
 
+sub reopen_logs {
+        my $p = $ENV{STDOUT_PATH};
+        defined($p) && open(STDOUT, '>>', $p) and STDOUT->autoflush(1);
+        $p = $ENV{STDERR_PATH};
+        defined($p) && open(STDERR, '>>', $p) and STDERR->autoflush(1);
+}
+
+sub parent_reopen_logs {
+        reopen_logs();
+        kill('USR1', values %WORKERS);
+}
+
 sub xh_alive { $in || scalar(keys %WORKERS) }
 
 sub start (@) {
@@ -278,15 +324,19 @@ sub start (@) {
         my $c = getsockopt(local $in = \*STDIN, SOL_SOCKET, SO_TYPE);
         unpack('i', $c) == SOCK_SEQPACKET or die 'stdin is not SOCK_SEQPACKET';
 
-        local (%SRCH, %WORKERS);
+        local (%SRCH, %WORKERS, $SHARD_NFD, $MY_FD_MAX);
         PublicInbox::Search::load_xapian();
         $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or
                 die 'bad args';
         local $workerset = POSIX::SigSet->new;
         $workerset->fillset or die "fillset: $!";
-        for (@PublicInbox::DS::UNBLOCKABLE) {
+        for (@PublicInbox::DS::UNBLOCKABLE, POSIX::SIGUSR1) {
                 $workerset->delset($_) or die "delset($_): $!";
         }
+        $MY_FD_MAX = PublicInbox::Search::ulimit_n //
+                die "E: unable to get RLIMIT_NOFILE: $!";
+        warn "W: RLIMIT_NOFILE=$MY_FD_MAX too low\n" if $MY_FD_MAX < 72;
+        $MY_FD_MAX -= 64;
 
         local $nworker = $opt->{j};
         return recv_loop() if $nworker == 0;
@@ -303,6 +353,7 @@ sub start (@) {
                 },
                 TTOU => \&do_sigttou,
                 CHLD => \&PublicInbox::DS::enqueue_reap,
+                USR1 => \&parent_reopen_logs,
         };
         PublicInbox::DS::block_signals();
         start_workers();