From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 15/19] lei q: improve remote mboxrd UX
Date: Sun, 7 Feb 2021 08:51:57 +0000 [thread overview]
Message-ID: <20210207085201.13871-16-e@80x24.org> (raw)
In-Reply-To: <20210207085201.13871-1-e@80x24.org>
For early MUA spawners using lock-free outputs, we we need to
on the startq pipe to silence progress reporting. For
--augment users, we can start the MUA even earlier by
creating Maildirs in the pre-augment phase.
To improve progress reporting for non-MUA (or late-MUA)
spawners, we'll no longer blindly append "--compressed" to the
curl(1) command when POST-ing for the gzipped mboxrd.
Furthermore, we'll overload stringify ('""') in LeiCurl to
ensure the empty -d '' string shows up properly.
---
lib/PublicInbox/IPC.pm | 8 ++--
lib/PublicInbox/LEI.pm | 4 +-
lib/PublicInbox/LeiCurl.pm | 11 +++--
lib/PublicInbox/LeiMirror.pm | 5 +-
lib/PublicInbox/LeiOverview.pm | 3 +-
lib/PublicInbox/LeiToMail.pm | 24 +++++-----
lib/PublicInbox/LeiXSearch.pm | 87 ++++++++++++++++++++++------------
7 files changed, 88 insertions(+), 54 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index c8673e26..9331233a 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -109,7 +109,6 @@ sub ipc_worker_spawn {
$w_res->autoflush(1);
$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
local $0 = $ident;
- PublicInbox::DS::sig_setmask($sigset);
# ensure we properly exit even if warn() dies:
my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
eval {
@@ -117,6 +116,7 @@ sub ipc_worker_spawn {
local @$self{keys %$fields} = values(%$fields);
my $on_destroy = $self->ipc_atfork_child;
local %SIG = %SIG;
+ PublicInbox::DS::sig_setmask($sigset);
ipc_worker_loop($self, $r_req, $w_res);
};
warn "worker $ident PID:$$ died: $@\n" if $@;
@@ -293,7 +293,6 @@ sub _wq_worker_start ($$$) {
$SIG{$_} = 'IGNORE' for (qw(PIPE));
$SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
local $0 = $self->{-wq_ident};
- PublicInbox::DS::sig_setmask($oldset);
# ensure we properly exit even if warn() dies:
my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
eval {
@@ -301,6 +300,7 @@ sub _wq_worker_start ($$$) {
local @$self{keys %$fields} = values(%$fields);
my $on_destroy = $self->ipc_atfork_child;
local %SIG = %SIG;
+ PublicInbox::DS::sig_setmask($oldset);
wq_worker_loop($self);
};
warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
@@ -395,9 +395,9 @@ sub wq_close {
}
sub wq_kill_old {
- my ($self) = @_;
+ my ($self, $sig) = @_;
my $pids = $self->{"-wq_old_pids.$$"} or return;
- kill 'TERM', @$pids;
+ kill($sig // 'TERM', @$pids);
}
sub wq_kill {
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 31e6b4a8..e52154e5 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -741,7 +741,9 @@ sub start_mua {
} elsif ($self->{oneshot}) {
$self->{"mua.pid.$self.$$"} = spawn(\@cmd);
}
- delete $self->{-progress};
+ if ($self->{lxs} && $self->{au_done}) { # kick wait_startq
+ syswrite($self->{au_done}, 'q' x ($self->{lxs}->{jobs} // 0));
+ }
}
# caller needs to "-t $self->{1}" to check if tty
diff --git a/lib/PublicInbox/LeiCurl.pm b/lib/PublicInbox/LeiCurl.pm
index 38b17c78..f346a1b4 100644
--- a/lib/PublicInbox/LeiCurl.pm
+++ b/lib/PublicInbox/LeiCurl.pm
@@ -8,6 +8,12 @@ use v5.10.1;
use PublicInbox::Spawn qw(which);
use PublicInbox::Config;
+# Ensures empty strings are quoted, we don't need more
+# sophisticated quoting than for empty strings: curl -d ''
+use overload '""' => sub {
+ join(' ', map { $_ eq '' ? "''" : $_ } @{$_[0]});
+};
+
my %lei2curl = (
'curl-config=s@' => 'config|K=s@',
);
@@ -63,10 +69,9 @@ EOM
# completes the result of cmd() for $uri
sub for_uri {
- my ($self, $lei, $uri) = @_;
+ my ($self, $lei, $uri, @opt) = @_;
my $pfx = torsocks($self, $lei, $uri) or return; # error
- [ @$pfx, @$self, substr($uri->path, -3) eq '.gz' ? () : '--compressed',
- $uri->as_string ]
+ bless [ @$pfx, @$self, @opt, $uri->as_string ], ref($self);
}
1;
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index 5ba69287..c5153148 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -31,7 +31,7 @@ sub try_scrape {
my $uri = URI->new($self->{src});
my $lei = $self->{lei};
my $curl = $self->{curl} //= PublicInbox::LeiCurl->new($lei) or return;
- my $cmd = $curl->for_uri($lei, $uri);
+ my $cmd = $curl->for_uri($lei, $uri, '--compressed');
my $opt = { 0 => $lei->{0}, 2 => $lei->{2} };
my $fh = popen_rd($cmd, $lei->{env}, $opt);
my $html = do { local $/; <$fh> } // die "read(curl $uri): $!";
@@ -93,8 +93,7 @@ sub _try_config {
my $path = $uri->path;
chop($path) eq '/' or die "BUG: $uri not canonicalized";
$uri->path($path . '/_/text/config/raw');
- my $cmd = $self->{curl}->for_uri($lei, $uri);
- push @$cmd, '--compressed'; # curl decompresses for us
+ my $cmd = $self->{curl}->for_uri($lei, $uri, '--compressed');
my $ce = "$dst/inbox.config.example";
my $f = "$ce-$$.tmp";
open(my $fh, '+>', $f) or return $lei->err("open $f: $! (non-fatal)");
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index dcfb9cc7..f0ac4684 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -95,9 +95,10 @@ sub new {
$lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
} else {
# default to the cheapest sort since MUA usually resorts
- $lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
+ $opt->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
$lei->{l2m} = eval { PublicInbox::LeiToMail->new($lei) };
return $lei->fail($@) if $@;
+ $lei->{early_mua} = 1 if $opt->{mua} && $lei->{l2m}->lock_free;
}
$self;
}
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 3f65e9e9..857aeb63 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -370,7 +370,17 @@ sub new {
$self;
}
-sub _pre_augment_maildir {} # noop
+sub _pre_augment_maildir {
+ my ($self, $lei) = @_;
+ my $dst = $lei->{ovv}->{dst};
+ for my $x (qw(tmp new cur)) {
+ my $d = $dst.$x;
+ next if -d $d;
+ require File::Path;
+ File::Path::mkpath($d);
+ -d $d or die "$d is not a directory";
+ }
+}
sub _do_augment_maildir {
my ($self, $lei) = @_;
@@ -387,17 +397,7 @@ sub _do_augment_maildir {
}
}
-sub _post_augment_maildir {
- my ($self, $lei) = @_;
- my $dst = $lei->{ovv}->{dst};
- for my $x (qw(tmp new cur)) {
- my $d = $dst.$x;
- next if -d $d;
- require File::Path;
- File::Path::mkpath($d);
- -d $d or die "$d is not a directory";
- }
-}
+sub _post_augment_maildir {} # noop
sub _pre_augment_mbox {
my ($self, $lei) = @_;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 2794140a..0e99e4b4 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -101,9 +101,23 @@ sub _mset_more ($$) {
# $startq will EOF when query_prepare is done augmenting and allow
# query_mset and query_thread_mset to proceed.
sub wait_startq ($) {
- my ($startq) = @_;
- $_[0] = undef;
- read($startq, my $query_prepare_done, 1);
+ my ($lei) = @_;
+ my $startq = delete $lei->{startq} or return;
+ while (1) {
+ my $n = sysread($startq, my $query_prepare_done, 1);
+ if (defined $n) {
+ return if $n == 0; # no MUA
+ if ($query_prepare_done eq 'q') {
+ $lei->{opt}->{quiet} = 1;
+ delete $lei->{opt}->{verbose};
+ delete $lei->{-progress};
+ } else {
+ $lei->fail("$$ WTF `$query_prepare_done'");
+ }
+ return;
+ }
+ return $lei->fail("$$ wait_startq: $!") unless $!{EINTR};
+ }
}
sub mset_progress {
@@ -140,7 +154,7 @@ sub query_thread_mset { # for --threads
while ($over->expand_thread($ctx)) {
for my $n (@{$ctx->{xids}}) {
my $smsg = $over->get_art($n) or next;
- wait_startq($startq) if $startq;
+ wait_startq($lei);
my $mitem = delete $n2item{$smsg->{num}};
$each_smsg->($smsg, $mitem);
}
@@ -155,7 +169,6 @@ sub query_mset { # non-parallel for non-"--threads" users
my ($self) = @_;
local $0 = "$0 query_mset";
my $lei = $self->{lei};
- my $startq = delete $lei->{startq};
my $mo = { %{$lei->{mset_opt}} };
my $mset;
for my $loc (locals($self)) {
@@ -168,7 +181,7 @@ sub query_mset { # non-parallel for non-"--threads" users
$mset->size, $mset->get_matches_estimated);
for my $mitem ($mset->items) {
my $smsg = smsg_for($self, $mitem) or next;
- wait_startq($startq) if $startq;
+ wait_startq($lei);
$each_smsg->($smsg, $mitem);
}
} while (_mset_more($mset, $mo));
@@ -183,7 +196,7 @@ sub each_eml { # callback for MboxReader->mboxrd
$smsg->parse_references($eml, mids($eml));
$smsg->{$_} //= '' for qw(from to cc ds subject references mid);
delete @$smsg{qw(From Subject -ds -ts)};
- if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+ wait_startq($lei);
if ($lei->{-progress}) {
++$lei->{-nr_remote_eml};
my $now = now();
@@ -200,6 +213,10 @@ sub each_eml { # callback for MboxReader->mboxrd
sub query_remote_mboxrd {
my ($self, $uris) = @_;
local $0 = "$0 query_remote_mboxrd";
+open my $dbg, '>>', '/tmp/dbg'; $dbg->autoflush(1); use Data::Dumper;
+ local $SIG{__WARN__} = sub {
+ print $dbg "$$ @_";
+ };
local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap)
my $lei = $self->{lei};
my ($opt, $env) = @$lei{qw(opt env)};
@@ -210,7 +227,6 @@ sub query_remote_mboxrd {
my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1);
fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!";
my $rdr = { 2 => $cerr, pgid => 0 };
- my $coff = 0;
my $sigint_reap = $lei->can('sigint_reap');
if ($verbose) {
# spawn a process to force line-buffering, otherwise curl
@@ -228,13 +244,14 @@ sub query_remote_mboxrd {
$lei->{-nr_remote_eml} = 0;
$uri->query_form(@qform);
my $cmd = $curl->for_uri($lei, $uri);
- $lei->err("# @$cmd") if $verbose;
+ $lei->qerr("# $cmd");
my ($fh, $pid) = popen_rd($cmd, $env, $rdr);
$reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid);
$fh = IO::Uncompress::Gunzip->new($fh);
PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
$lei, $each_smsg);
- my $err = waitpid($pid, 0) == $pid ? undef : "BUG: waitpid: $!";
+ my $err = waitpid($pid, 0) == $pid ? undef
+ : "BUG: waitpid($cmd): $!";
@$reap_curl = (); # cancel OnDestroy
die $err if $err;
if ($? == 0) {
@@ -242,16 +259,18 @@ sub query_remote_mboxrd {
mset_progress($lei, $lei->{-current_url}, $nr, $nr);
next;
}
- seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
- my $e = do { local $/; <$cerr> } //
- die "read(curl stderr): $!\n";
- $coff += length($e);
- truncate($cerr, 0);
- next if (($? >> 8) == 22 && $e =~ /\b404\b/);
- $lei->child_error($?);
+ $err = '';
+ if (-s $cerr) {
+ seek($cerr, 0, SEEK_SET) or
+ $lei->err("seek($cmd stderr): $!");
+ $err = do { local $/; <$cerr> } //
+ "read($cmd stderr): $!";
+ truncate($cerr, 0) or
+ $lei->err("truncate($cmd stderr): $!");
+ }
+ next if (($? >> 8) == 22 && $err =~ /\b404\b/);
$uri->query_form(q => $lei->{mset_opt}->{qstr});
- # --verbose already showed the error via tail(1)
- $lei->err("E: $uri \$?=$?\n", $verbose ? () : $e);
+ $lei->child_error($?, "E: <$uri> $err");
}
undef $each_smsg;
$lei->{ovv}->ovv_atexit_child($lei);
@@ -311,15 +330,23 @@ Error closing $lei->{ovv}->{dst}: $!
sub do_post_augment {
my ($lei) = @_;
- eval { $lei->{l2m}->post_augment($lei) };
- if (my $err = $@) {
- if (my $lxs = delete $lei->{lxs}) {
- $lxs->wq_kill;
- $lxs->wq_close(0, undef, $lei);
+ my $l2m = $lei->{l2m};
+ my $err;
+ if ($l2m) {
+ eval { $l2m->post_augment($lei) };
+ $err = $@;
+ if ($err) {
+ if (my $lxs = delete $lei->{lxs}) {
+ $lxs->wq_kill;
+ $lxs->wq_close(0, undef, $lei);
+ }
+ $lei->fail("$err");
}
- $lei->fail("$err");
}
- close(delete $lei->{au_done}); # triggers wait_startq
+ if (!$err && delete $lei->{early_mua}) { # non-augment case
+ $lei->start_mua;
+ }
+ close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
}
my $MAX_PER_HOST = 4;
@@ -334,9 +361,6 @@ sub concurrency {
sub start_query { # always runs in main (lei-daemon) process
my ($self, $lei) = @_;
- if (my $l2m = $lei->{l2m}) {
- $lei->start_mua if $l2m->lock_free;
- }
if ($lei->{opt}->{threads}) {
for my $ibxish (locals($self)) {
$self->wq_io_do('query_thread_mset', [], $ibxish);
@@ -387,6 +411,9 @@ sub do_query {
my $l2m = $lei->{l2m};
if ($l2m) {
$l2m->pre_augment($lei);
+ if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
+ $lei->start_mua;
+ }
$l2m->wq_workers_start('lei2mail', $l2m->{jobs},
$lei->oldset, { lei => $lei });
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
@@ -404,7 +431,7 @@ sub do_query {
delete $lei->{pkt_op_p};
$l2m->wq_close(1) if $l2m;
$lei->event_step_init; # wait for shutdowns
- $self->wq_io_do('query_prepare', []) if $l2m;
+ $self->wq_io_do('query_prepare', []) if $l2m; # for augment/dedupe
start_query($self, $lei);
$self->wq_close(1); # lei_xsearch workers stop when done
if ($lei->{oneshot}) {
next prev parent reply other threads:[~2021-02-07 8:52 UTC|newest]
Thread overview: 23+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-02-07 8:51 [PATCH 00/19] lei import Maildir, remote mboxrd fixes Eric Wong
2021-02-07 8:51 ` [PATCH 01/19] spawn: pi_fork_exec: restore parent sigmask in child Eric Wong
2021-02-07 8:51 ` [PATCH 02/19] spawn: pi_fork_exec: support "pgid" Eric Wong
2021-02-07 23:10 ` dprintf(3) portability? [was [02/19] spawn: pi_fork_exec: support "pgid"] Eric Wong
2021-02-07 8:51 ` [PATCH 03/19] lei add-external: handle interrupts with --mirror Eric Wong
2021-02-07 8:51 ` [PATCH 04/19] spawn_pp: die more consistently in child Eric Wong
2021-02-07 8:51 ` [PATCH 05/19] ipc: do not die inside wq_worker child process Eric Wong
2021-02-07 8:51 ` [PATCH 06/19] ipc: trim down the Storable checks Eric Wong
2021-02-07 8:51 ` [PATCH 07/19] Makefile.PL: depend on IO::Uncompress::Gunzip Eric Wong
2021-02-07 8:51 ` [PATCH 08/19] xapcmd: avoid potential die surprise in children Eric Wong
2021-02-07 8:51 ` [PATCH 09/19] tests: guard setup_public_inboxes for SQLite and Xapian Eric Wong
2021-02-07 8:51 ` [PATCH 10/19] Revert "ipc: add support for asynchronous callbacks" Eric Wong
2021-02-07 8:51 ` [PATCH 11/19] ipc: wq_do => wq_io_do Eric Wong
2021-02-07 8:51 ` [PATCH 12/19] lei: more consistent IPC exit and error handling Eric Wong
2021-02-07 8:51 ` [PATCH 13/19] lei: remove --mua-cmd alias for --mua Eric Wong
2021-02-07 8:51 ` [PATCH 14/19] lei: replace --thread with --threads Eric Wong
2021-02-07 8:51 ` Eric Wong [this message]
2021-02-07 9:32 ` [PATCH 20/19] lei_xsearch: allow quieting regular mset progress, too Eric Wong
2021-02-07 8:51 ` [PATCH 16/19] lei q: SIGWINCH process group with the terminal Eric Wong
2021-02-07 8:51 ` [PATCH 17/19] lei import: support Maildirs Eric Wong
2021-02-07 8:52 ` [PATCH 18/19] imap: avoid unnecessary on-stack delete Eric Wong
2021-02-07 8:52 ` [PATCH 19/19] httpd/async: " Eric Wong
2021-02-07 10:40 ` [PATCH 21/19] lei q: fix arbitrary --mua command handling Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210207085201.13871-16-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).