about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-02-07 23:05:09 -1000
committerEric Wong <e@80x24.org>2021-02-08 22:07:40 +0000
commit0df6f43c81c72924020b8f71c696784f154d93fa (patch)
tree0480da99726a254a2bf71ee05d7d072c1201c045
parent68250ff1c247bb737e6daef1af31dcd7fa918644 (diff)
downloadpublic-inbox-0df6f43c81c72924020b8f71c696784f154d93fa.tar.gz
For early MUA spawners using lock-free outputs, we we need to
on the startq pipe to silence progress reporting.  For
--augment users, we can start the MUA even earlier by
creating Maildirs in the pre-augment phase.

To improve progress reporting for non-MUA (or late-MUA)
spawners, we'll no longer blindly append "--compressed" to the
curl(1) command when POST-ing for the gzipped mboxrd.
Furthermore, we'll overload stringify ('""') in LeiCurl to
ensure the empty -d '' string shows up properly.

v2: fix startq waiting with --threads
    mset_progress is never shown with early MUA spawning,
    The plan is to still show progress when augmenting and
    deduping.  This fixes all local search cases.
    A leftover debug bit is dropped, too
-rw-r--r--lib/PublicInbox/IPC.pm8
-rw-r--r--lib/PublicInbox/LEI.pm4
-rw-r--r--lib/PublicInbox/LeiCurl.pm11
-rw-r--r--lib/PublicInbox/LeiMirror.pm5
-rw-r--r--lib/PublicInbox/LeiOverview.pm3
-rw-r--r--lib/PublicInbox/LeiToMail.pm24
-rw-r--r--lib/PublicInbox/LeiXSearch.pm88
7 files changed, 86 insertions, 57 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index c8673e26..9331233a 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -109,7 +109,6 @@ sub ipc_worker_spawn {
                 $w_res->autoflush(1);
                 $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
                 local $0 = $ident;
-                PublicInbox::DS::sig_setmask($sigset);
                 # ensure we properly exit even if warn() dies:
                 my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                 eval {
@@ -117,6 +116,7 @@ sub ipc_worker_spawn {
                         local @$self{keys %$fields} = values(%$fields);
                         my $on_destroy = $self->ipc_atfork_child;
                         local %SIG = %SIG;
+                        PublicInbox::DS::sig_setmask($sigset);
                         ipc_worker_loop($self, $r_req, $w_res);
                 };
                 warn "worker $ident PID:$$ died: $@\n" if $@;
@@ -293,7 +293,6 @@ sub _wq_worker_start ($$$) {
                 $SIG{$_} = 'IGNORE' for (qw(PIPE));
                 $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
                 local $0 = $self->{-wq_ident};
-                PublicInbox::DS::sig_setmask($oldset);
                 # ensure we properly exit even if warn() dies:
                 my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                 eval {
@@ -301,6 +300,7 @@ sub _wq_worker_start ($$$) {
                         local @$self{keys %$fields} = values(%$fields);
                         my $on_destroy = $self->ipc_atfork_child;
                         local %SIG = %SIG;
+                        PublicInbox::DS::sig_setmask($oldset);
                         wq_worker_loop($self);
                 };
                 warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
@@ -395,9 +395,9 @@ sub wq_close {
 }
 
 sub wq_kill_old {
-        my ($self) = @_;
+        my ($self, $sig) = @_;
         my $pids = $self->{"-wq_old_pids.$$"} or return;
-        kill 'TERM', @$pids;
+        kill($sig // 'TERM', @$pids);
 }
 
 sub wq_kill {
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index dce80762..c3645698 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -741,7 +741,9 @@ sub start_mua {
         } elsif ($self->{oneshot}) {
                 $self->{"mua.pid.$self.$$"} = spawn(\@cmd);
         }
-        delete $self->{-progress};
+        if ($self->{lxs} && $self->{au_done}) { # kick wait_startq
+                syswrite($self->{au_done}, 'q' x ($self->{lxs}->{jobs} // 0));
+        }
 }
 
 # caller needs to "-t $self->{1}" to check if tty
diff --git a/lib/PublicInbox/LeiCurl.pm b/lib/PublicInbox/LeiCurl.pm
index 38b17c78..f346a1b4 100644
--- a/lib/PublicInbox/LeiCurl.pm
+++ b/lib/PublicInbox/LeiCurl.pm
@@ -8,6 +8,12 @@ use v5.10.1;
 use PublicInbox::Spawn qw(which);
 use PublicInbox::Config;
 
+# Ensures empty strings are quoted, we don't need more
+# sophisticated quoting than for empty strings: curl -d ''
+use overload '""' => sub {
+        join(' ', map { $_ eq '' ?  "''" : $_ } @{$_[0]});
+};
+
 my %lei2curl = (
         'curl-config=s@' => 'config|K=s@',
 );
@@ -63,10 +69,9 @@ EOM
 
 # completes the result of cmd() for $uri
 sub for_uri {
-        my ($self, $lei, $uri) = @_;
+        my ($self, $lei, $uri, @opt) = @_;
         my $pfx = torsocks($self, $lei, $uri) or return; # error
-        [ @$pfx, @$self, substr($uri->path, -3) eq '.gz' ? () : '--compressed',
-                $uri->as_string ]
+        bless [ @$pfx, @$self, @opt, $uri->as_string ], ref($self);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index 5ba69287..c5153148 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -31,7 +31,7 @@ sub try_scrape {
         my $uri = URI->new($self->{src});
         my $lei = $self->{lei};
         my $curl = $self->{curl} //= PublicInbox::LeiCurl->new($lei) or return;
-        my $cmd = $curl->for_uri($lei, $uri);
+        my $cmd = $curl->for_uri($lei, $uri, '--compressed');
         my $opt = { 0 => $lei->{0}, 2 => $lei->{2} };
         my $fh = popen_rd($cmd, $lei->{env}, $opt);
         my $html = do { local $/; <$fh> } // die "read(curl $uri): $!";
@@ -93,8 +93,7 @@ sub _try_config {
         my $path = $uri->path;
         chop($path) eq '/' or die "BUG: $uri not canonicalized";
         $uri->path($path . '/_/text/config/raw');
-        my $cmd = $self->{curl}->for_uri($lei, $uri);
-        push @$cmd, '--compressed'; # curl decompresses for us
+        my $cmd = $self->{curl}->for_uri($lei, $uri, '--compressed');
         my $ce = "$dst/inbox.config.example";
         my $f = "$ce-$$.tmp";
         open(my $fh, '+>', $f) or return $lei->err("open $f: $! (non-fatal)");
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index dcfb9cc7..f0ac4684 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -95,9 +95,10 @@ sub new {
                 $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
         } else {
                 # default to the cheapest sort since MUA usually resorts
-                $lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
+                $opt->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
                 $lei->{l2m} = eval { PublicInbox::LeiToMail->new($lei) };
                 return $lei->fail($@) if $@;
+                $lei->{early_mua} = 1 if $opt->{mua} && $lei->{l2m}->lock_free;
         }
         $self;
 }
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 4c5a5685..a5a196db 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -371,7 +371,17 @@ sub new {
         $self;
 }
 
-sub _pre_augment_maildir {} # noop
+sub _pre_augment_maildir {
+        my ($self, $lei) = @_;
+        my $dst = $lei->{ovv}->{dst};
+        for my $x (qw(tmp new cur)) {
+                my $d = $dst.$x;
+                next if -d $d;
+                require File::Path;
+                File::Path::mkpath($d);
+                -d $d or die "$d is not a directory";
+        }
+}
 
 sub _do_augment_maildir {
         my ($self, $lei) = @_;
@@ -388,17 +398,7 @@ sub _do_augment_maildir {
         }
 }
 
-sub _post_augment_maildir {
-        my ($self, $lei) = @_;
-        my $dst = $lei->{ovv}->{dst};
-        for my $x (qw(tmp new cur)) {
-                my $d = $dst.$x;
-                next if -d $d;
-                require File::Path;
-                File::Path::mkpath($d);
-                -d $d or die "$d is not a directory";
-        }
-}
+sub _post_augment_maildir {} # noop
 
 sub _pre_augment_mbox {
         my ($self, $lei) = @_;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 2794140a..db089a67 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -101,20 +101,34 @@ sub _mset_more ($$) {
 # $startq will EOF when query_prepare is done augmenting and allow
 # query_mset and query_thread_mset to proceed.
 sub wait_startq ($) {
-        my ($startq) = @_;
-        $_[0] = undef;
-        read($startq, my $query_prepare_done, 1);
+        my ($lei) = @_;
+        my $startq = delete $lei->{startq} or return;
+        while (1) {
+                my $n = sysread($startq, my $query_prepare_done, 1);
+                if (defined $n) {
+                        return if $n == 0; # no MUA
+                        if ($query_prepare_done eq 'q') {
+                                $lei->{opt}->{quiet} = 1;
+                                delete $lei->{opt}->{verbose};
+                                delete $lei->{-progress};
+                        } else {
+                                $lei->fail("$$ WTF `$query_prepare_done'");
+                        }
+                        return;
+                }
+                return $lei->fail("$$ wait_startq: $!") unless $!{EINTR};
+        }
 }
 
 sub mset_progress {
         my $lei = shift;
-        return unless $lei->{-progress};
+        return if $lei->{early_mua} || !$lei->{-progress};
         if ($lei->{pkt_op_p}) {
                 pkt_do($lei->{pkt_op_p}, 'mset_progress', @_);
         } else { # single lei-daemon consumer
                 my ($desc, $mset_size, $mset_total_est) = @_;
                 $lei->{-mset_total} += $mset_size;
-                $lei->err("# $desc $mset_size/$mset_total_est");
+                $lei->qerr("# $desc $mset_size/$mset_total_est");
         }
 }
 
@@ -122,7 +136,6 @@ sub query_thread_mset { # for --threads
         my ($self, $ibxish) = @_;
         local $0 = "$0 query_thread_mset";
         my $lei = $self->{lei};
-        my $startq = delete $lei->{startq};
         my ($srch, $over) = ($ibxish->search, $ibxish->over);
         my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
         return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
@@ -140,7 +153,7 @@ sub query_thread_mset { # for --threads
                 while ($over->expand_thread($ctx)) {
                         for my $n (@{$ctx->{xids}}) {
                                 my $smsg = $over->get_art($n) or next;
-                                wait_startq($startq) if $startq;
+                                wait_startq($lei);
                                 my $mitem = delete $n2item{$smsg->{num}};
                                 $each_smsg->($smsg, $mitem);
                         }
@@ -155,7 +168,6 @@ sub query_mset { # non-parallel for non-"--threads" users
         my ($self) = @_;
         local $0 = "$0 query_mset";
         my $lei = $self->{lei};
-        my $startq = delete $lei->{startq};
         my $mo = { %{$lei->{mset_opt}} };
         my $mset;
         for my $loc (locals($self)) {
@@ -168,7 +180,7 @@ sub query_mset { # non-parallel for non-"--threads" users
                                 $mset->size, $mset->get_matches_estimated);
                 for my $mitem ($mset->items) {
                         my $smsg = smsg_for($self, $mitem) or next;
-                        wait_startq($startq) if $startq;
+                        wait_startq($lei);
                         $each_smsg->($smsg, $mitem);
                 }
         } while (_mset_more($mset, $mo));
@@ -183,7 +195,7 @@ sub each_eml { # callback for MboxReader->mboxrd
         $smsg->parse_references($eml, mids($eml));
         $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
         delete @$smsg{qw(From Subject -ds -ts)};
-        if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+        wait_startq($lei);
         if ($lei->{-progress}) {
                 ++$lei->{-nr_remote_eml};
                 my $now = now();
@@ -210,7 +222,6 @@ sub query_remote_mboxrd {
         my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1);
         fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!";
         my $rdr = { 2 => $cerr, pgid => 0 };
-        my $coff = 0;
         my $sigint_reap = $lei->can('sigint_reap');
         if ($verbose) {
                 # spawn a process to force line-buffering, otherwise curl
@@ -228,13 +239,14 @@ sub query_remote_mboxrd {
                 $lei->{-nr_remote_eml} = 0;
                 $uri->query_form(@qform);
                 my $cmd = $curl->for_uri($lei, $uri);
-                $lei->err("# @$cmd") if $verbose;
+                $lei->qerr("# $cmd");
                 my ($fh, $pid) = popen_rd($cmd, $env, $rdr);
                 $reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid);
                 $fh = IO::Uncompress::Gunzip->new($fh);
                 PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
                                                 $lei, $each_smsg);
-                my $err = waitpid($pid, 0) == $pid ? undef : "BUG: waitpid: $!";
+                my $err = waitpid($pid, 0) == $pid ? undef
+                                                : "BUG: waitpid($cmd): $!";
                 @$reap_curl = (); # cancel OnDestroy
                 die $err if $err;
                 if ($? == 0) {
@@ -242,16 +254,18 @@ sub query_remote_mboxrd {
                         mset_progress($lei, $lei->{-current_url}, $nr, $nr);
                         next;
                 }
-                seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
-                my $e = do { local $/; <$cerr> } //
-                                die "read(curl stderr): $!\n";
-                $coff += length($e);
-                truncate($cerr, 0);
-                next if (($? >> 8) == 22 && $e =~ /\b404\b/);
-                $lei->child_error($?);
+                $err = '';
+                if (-s $cerr) {
+                        seek($cerr, 0, SEEK_SET) or
+                                        $lei->err("seek($cmd stderr): $!");
+                        $err = do { local $/; <$cerr> } //
+                                        "read($cmd stderr): $!";
+                        truncate($cerr, 0) or
+                                        $lei->err("truncate($cmd stderr): $!");
+                }
+                next if (($? >> 8) == 22 && $err =~ /\b404\b/);
                 $uri->query_form(q => $lei->{mset_opt}->{qstr});
-                # --verbose already showed the error via tail(1)
-                $lei->err("E: $uri \$?=$?\n", $verbose ? () : $e);
+                $lei->child_error($?, "E: <$uri> $err");
         }
         undef $each_smsg;
         $lei->{ovv}->ovv_atexit_child($lei);
@@ -311,15 +325,23 @@ Error closing $lei->{ovv}->{dst}: $!
 
 sub do_post_augment {
         my ($lei) = @_;
-        eval { $lei->{l2m}->post_augment($lei) };
-        if (my $err = $@) {
-                if (my $lxs = delete $lei->{lxs}) {
-                        $lxs->wq_kill;
-                        $lxs->wq_close(0, undef, $lei);
+        my $l2m = $lei->{l2m};
+        my $err;
+        if ($l2m) {
+                eval { $l2m->post_augment($lei) };
+                $err = $@;
+                if ($err) {
+                        if (my $lxs = delete $lei->{lxs}) {
+                                $lxs->wq_kill;
+                                $lxs->wq_close(0, undef, $lei);
+                        }
+                        $lei->fail("$err");
                 }
-                $lei->fail("$err");
         }
-        close(delete $lei->{au_done}); # triggers wait_startq
+        if (!$err && delete $lei->{early_mua}) { # non-augment case
+                $lei->start_mua;
+        }
+        close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
 }
 
 my $MAX_PER_HOST = 4;
@@ -334,9 +356,6 @@ sub concurrency {
 
 sub start_query { # always runs in main (lei-daemon) process
         my ($self, $lei) = @_;
-        if (my $l2m = $lei->{l2m}) {
-                $lei->start_mua if $l2m->lock_free;
-        }
         if ($lei->{opt}->{threads}) {
                 for my $ibxish (locals($self)) {
                         $self->wq_io_do('query_thread_mset', [], $ibxish);
@@ -387,6 +406,9 @@ sub do_query {
         my $l2m = $lei->{l2m};
         if ($l2m) {
                 $l2m->pre_augment($lei);
+                if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
+                        $lei->start_mua;
+                }
                 $l2m->wq_workers_start('lei2mail', $l2m->{jobs},
                                         $lei->oldset, { lei => $lei });
                 pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
@@ -404,7 +426,7 @@ sub do_query {
         delete $lei->{pkt_op_p};
         $l2m->wq_close(1) if $l2m;
         $lei->event_step_init; # wait for shutdowns
-        $self->wq_io_do('query_prepare', []) if $l2m;
+        $self->wq_io_do('query_prepare', []) if $l2m; # for augment/dedupe
         start_query($self, $lei);
         $self->wq_close(1); # lei_xsearch workers stop when done
         if ($lei->{oneshot}) {