* [PATCHv2 01/13] lei q: improve remote mboxrd UX + MUA
2021-02-08 9:05 7% [PATCH 00/13] lei approxidate, startup fix, --alert Eric Wong
@ 2021-02-08 9:05 3% ` Eric Wong
0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2021-02-08 9:05 UTC (permalink / raw)
To: meta
For early MUA spawners using lock-free outputs, we we need to
on the startq pipe to silence progress reporting. For
--augment users, we can start the MUA even earlier by
creating Maildirs in the pre-augment phase.
To improve progress reporting for non-MUA (or late-MUA)
spawners, we'll no longer blindly append "--compressed" to the
curl(1) command when POST-ing for the gzipped mboxrd.
Furthermore, we'll overload stringify ('""') in LeiCurl to
ensure the empty -d '' string shows up properly.
v2: fix startq waiting with --threads
mset_progress is never shown with early MUA spawning,
The plan is to still show progress when augmenting and
deduping. This fixes all local search cases.
A leftover debug bit is dropped, too
---
lib/PublicInbox/IPC.pm | 8 ++--
lib/PublicInbox/LEI.pm | 4 +-
lib/PublicInbox/LeiCurl.pm | 11 +++--
lib/PublicInbox/LeiMirror.pm | 5 +-
lib/PublicInbox/LeiOverview.pm | 3 +-
lib/PublicInbox/LeiToMail.pm | 24 +++++-----
lib/PublicInbox/LeiXSearch.pm | 88 +++++++++++++++++++++-------------
7 files changed, 86 insertions(+), 57 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index c8673e26..9331233a 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -109,7 +109,6 @@ sub ipc_worker_spawn {
$w_res->autoflush(1);
$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
local $0 = $ident;
- PublicInbox::DS::sig_setmask($sigset);
# ensure we properly exit even if warn() dies:
my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
eval {
@@ -117,6 +116,7 @@ sub ipc_worker_spawn {
local @$self{keys %$fields} = values(%$fields);
my $on_destroy = $self->ipc_atfork_child;
local %SIG = %SIG;
+ PublicInbox::DS::sig_setmask($sigset);
ipc_worker_loop($self, $r_req, $w_res);
};
warn "worker $ident PID:$$ died: $@\n" if $@;
@@ -293,7 +293,6 @@ sub _wq_worker_start ($$$) {
$SIG{$_} = 'IGNORE' for (qw(PIPE));
$SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
local $0 = $self->{-wq_ident};
- PublicInbox::DS::sig_setmask($oldset);
# ensure we properly exit even if warn() dies:
my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
eval {
@@ -301,6 +300,7 @@ sub _wq_worker_start ($$$) {
local @$self{keys %$fields} = values(%$fields);
my $on_destroy = $self->ipc_atfork_child;
local %SIG = %SIG;
+ PublicInbox::DS::sig_setmask($oldset);
wq_worker_loop($self);
};
warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
@@ -395,9 +395,9 @@ sub wq_close {
}
sub wq_kill_old {
- my ($self) = @_;
+ my ($self, $sig) = @_;
my $pids = $self->{"-wq_old_pids.$$"} or return;
- kill 'TERM', @$pids;
+ kill($sig // 'TERM', @$pids);
}
sub wq_kill {
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index dce80762..c3645698 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -741,7 +741,9 @@ sub start_mua {
} elsif ($self->{oneshot}) {
$self->{"mua.pid.$self.$$"} = spawn(\@cmd);
}
- delete $self->{-progress};
+ if ($self->{lxs} && $self->{au_done}) { # kick wait_startq
+ syswrite($self->{au_done}, 'q' x ($self->{lxs}->{jobs} // 0));
+ }
}
# caller needs to "-t $self->{1}" to check if tty
diff --git a/lib/PublicInbox/LeiCurl.pm b/lib/PublicInbox/LeiCurl.pm
index 38b17c78..f346a1b4 100644
--- a/lib/PublicInbox/LeiCurl.pm
+++ b/lib/PublicInbox/LeiCurl.pm
@@ -8,6 +8,12 @@ use v5.10.1;
use PublicInbox::Spawn qw(which);
use PublicInbox::Config;
+# Ensures empty strings are quoted, we don't need more
+# sophisticated quoting than for empty strings: curl -d ''
+use overload '""' => sub {
+ join(' ', map { $_ eq '' ? "''" : $_ } @{$_[0]});
+};
+
my %lei2curl = (
'curl-config=s@' => 'config|K=s@',
);
@@ -63,10 +69,9 @@ EOM
# completes the result of cmd() for $uri
sub for_uri {
- my ($self, $lei, $uri) = @_;
+ my ($self, $lei, $uri, @opt) = @_;
my $pfx = torsocks($self, $lei, $uri) or return; # error
- [ @$pfx, @$self, substr($uri->path, -3) eq '.gz' ? () : '--compressed',
- $uri->as_string ]
+ bless [ @$pfx, @$self, @opt, $uri->as_string ], ref($self);
}
1;
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index 5ba69287..c5153148 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -31,7 +31,7 @@ sub try_scrape {
my $uri = URI->new($self->{src});
my $lei = $self->{lei};
my $curl = $self->{curl} //= PublicInbox::LeiCurl->new($lei) or return;
- my $cmd = $curl->for_uri($lei, $uri);
+ my $cmd = $curl->for_uri($lei, $uri, '--compressed');
my $opt = { 0 => $lei->{0}, 2 => $lei->{2} };
my $fh = popen_rd($cmd, $lei->{env}, $opt);
my $html = do { local $/; <$fh> } // die "read(curl $uri): $!";
@@ -93,8 +93,7 @@ sub _try_config {
my $path = $uri->path;
chop($path) eq '/' or die "BUG: $uri not canonicalized";
$uri->path($path . '/_/text/config/raw');
- my $cmd = $self->{curl}->for_uri($lei, $uri);
- push @$cmd, '--compressed'; # curl decompresses for us
+ my $cmd = $self->{curl}->for_uri($lei, $uri, '--compressed');
my $ce = "$dst/inbox.config.example";
my $f = "$ce-$$.tmp";
open(my $fh, '+>', $f) or return $lei->err("open $f: $! (non-fatal)");
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index dcfb9cc7..f0ac4684 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -95,9 +95,10 @@ sub new {
$lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
} else {
# default to the cheapest sort since MUA usually resorts
- $lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
+ $opt->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
$lei->{l2m} = eval { PublicInbox::LeiToMail->new($lei) };
return $lei->fail($@) if $@;
+ $lei->{early_mua} = 1 if $opt->{mua} && $lei->{l2m}->lock_free;
}
$self;
}
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 4c5a5685..a5a196db 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -371,7 +371,17 @@ sub new {
$self;
}
-sub _pre_augment_maildir {} # noop
+sub _pre_augment_maildir {
+ my ($self, $lei) = @_;
+ my $dst = $lei->{ovv}->{dst};
+ for my $x (qw(tmp new cur)) {
+ my $d = $dst.$x;
+ next if -d $d;
+ require File::Path;
+ File::Path::mkpath($d);
+ -d $d or die "$d is not a directory";
+ }
+}
sub _do_augment_maildir {
my ($self, $lei) = @_;
@@ -388,17 +398,7 @@ sub _do_augment_maildir {
}
}
-sub _post_augment_maildir {
- my ($self, $lei) = @_;
- my $dst = $lei->{ovv}->{dst};
- for my $x (qw(tmp new cur)) {
- my $d = $dst.$x;
- next if -d $d;
- require File::Path;
- File::Path::mkpath($d);
- -d $d or die "$d is not a directory";
- }
-}
+sub _post_augment_maildir {} # noop
sub _pre_augment_mbox {
my ($self, $lei) = @_;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 2794140a..db089a67 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -101,20 +101,34 @@ sub _mset_more ($$) {
# $startq will EOF when query_prepare is done augmenting and allow
# query_mset and query_thread_mset to proceed.
sub wait_startq ($) {
- my ($startq) = @_;
- $_[0] = undef;
- read($startq, my $query_prepare_done, 1);
+ my ($lei) = @_;
+ my $startq = delete $lei->{startq} or return;
+ while (1) {
+ my $n = sysread($startq, my $query_prepare_done, 1);
+ if (defined $n) {
+ return if $n == 0; # no MUA
+ if ($query_prepare_done eq 'q') {
+ $lei->{opt}->{quiet} = 1;
+ delete $lei->{opt}->{verbose};
+ delete $lei->{-progress};
+ } else {
+ $lei->fail("$$ WTF `$query_prepare_done'");
+ }
+ return;
+ }
+ return $lei->fail("$$ wait_startq: $!") unless $!{EINTR};
+ }
}
sub mset_progress {
my $lei = shift;
- return unless $lei->{-progress};
+ return if $lei->{early_mua} || !$lei->{-progress};
if ($lei->{pkt_op_p}) {
pkt_do($lei->{pkt_op_p}, 'mset_progress', @_);
} else { # single lei-daemon consumer
my ($desc, $mset_size, $mset_total_est) = @_;
$lei->{-mset_total} += $mset_size;
- $lei->err("# $desc $mset_size/$mset_total_est");
+ $lei->qerr("# $desc $mset_size/$mset_total_est");
}
}
@@ -122,7 +136,6 @@ sub query_thread_mset { # for --threads
my ($self, $ibxish) = @_;
local $0 = "$0 query_thread_mset";
my $lei = $self->{lei};
- my $startq = delete $lei->{startq};
my ($srch, $over) = ($ibxish->search, $ibxish->over);
my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
@@ -140,7 +153,7 @@ sub query_thread_mset { # for --threads
while ($over->expand_thread($ctx)) {
for my $n (@{$ctx->{xids}}) {
my $smsg = $over->get_art($n) or next;
- wait_startq($startq) if $startq;
+ wait_startq($lei);
my $mitem = delete $n2item{$smsg->{num}};
$each_smsg->($smsg, $mitem);
}
@@ -155,7 +168,6 @@ sub query_mset { # non-parallel for non-"--threads" users
my ($self) = @_;
local $0 = "$0 query_mset";
my $lei = $self->{lei};
- my $startq = delete $lei->{startq};
my $mo = { %{$lei->{mset_opt}} };
my $mset;
for my $loc (locals($self)) {
@@ -168,7 +180,7 @@ sub query_mset { # non-parallel for non-"--threads" users
$mset->size, $mset->get_matches_estimated);
for my $mitem ($mset->items) {
my $smsg = smsg_for($self, $mitem) or next;
- wait_startq($startq) if $startq;
+ wait_startq($lei);
$each_smsg->($smsg, $mitem);
}
} while (_mset_more($mset, $mo));
@@ -183,7 +195,7 @@ sub each_eml { # callback for MboxReader->mboxrd
$smsg->parse_references($eml, mids($eml));
$smsg->{$_} //= '' for qw(from to cc ds subject references mid);
delete @$smsg{qw(From Subject -ds -ts)};
- if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+ wait_startq($lei);
if ($lei->{-progress}) {
++$lei->{-nr_remote_eml};
my $now = now();
@@ -210,7 +222,6 @@ sub query_remote_mboxrd {
my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1);
fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!";
my $rdr = { 2 => $cerr, pgid => 0 };
- my $coff = 0;
my $sigint_reap = $lei->can('sigint_reap');
if ($verbose) {
# spawn a process to force line-buffering, otherwise curl
@@ -228,13 +239,14 @@ sub query_remote_mboxrd {
$lei->{-nr_remote_eml} = 0;
$uri->query_form(@qform);
my $cmd = $curl->for_uri($lei, $uri);
- $lei->err("# @$cmd") if $verbose;
+ $lei->qerr("# $cmd");
my ($fh, $pid) = popen_rd($cmd, $env, $rdr);
$reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid);
$fh = IO::Uncompress::Gunzip->new($fh);
PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
$lei, $each_smsg);
- my $err = waitpid($pid, 0) == $pid ? undef : "BUG: waitpid: $!";
+ my $err = waitpid($pid, 0) == $pid ? undef
+ : "BUG: waitpid($cmd): $!";
@$reap_curl = (); # cancel OnDestroy
die $err if $err;
if ($? == 0) {
@@ -242,16 +254,18 @@ sub query_remote_mboxrd {
mset_progress($lei, $lei->{-current_url}, $nr, $nr);
next;
}
- seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
- my $e = do { local $/; <$cerr> } //
- die "read(curl stderr): $!\n";
- $coff += length($e);
- truncate($cerr, 0);
- next if (($? >> 8) == 22 && $e =~ /\b404\b/);
- $lei->child_error($?);
+ $err = '';
+ if (-s $cerr) {
+ seek($cerr, 0, SEEK_SET) or
+ $lei->err("seek($cmd stderr): $!");
+ $err = do { local $/; <$cerr> } //
+ "read($cmd stderr): $!";
+ truncate($cerr, 0) or
+ $lei->err("truncate($cmd stderr): $!");
+ }
+ next if (($? >> 8) == 22 && $err =~ /\b404\b/);
$uri->query_form(q => $lei->{mset_opt}->{qstr});
- # --verbose already showed the error via tail(1)
- $lei->err("E: $uri \$?=$?\n", $verbose ? () : $e);
+ $lei->child_error($?, "E: <$uri> $err");
}
undef $each_smsg;
$lei->{ovv}->ovv_atexit_child($lei);
@@ -311,15 +325,23 @@ Error closing $lei->{ovv}->{dst}: $!
sub do_post_augment {
my ($lei) = @_;
- eval { $lei->{l2m}->post_augment($lei) };
- if (my $err = $@) {
- if (my $lxs = delete $lei->{lxs}) {
- $lxs->wq_kill;
- $lxs->wq_close(0, undef, $lei);
+ my $l2m = $lei->{l2m};
+ my $err;
+ if ($l2m) {
+ eval { $l2m->post_augment($lei) };
+ $err = $@;
+ if ($err) {
+ if (my $lxs = delete $lei->{lxs}) {
+ $lxs->wq_kill;
+ $lxs->wq_close(0, undef, $lei);
+ }
+ $lei->fail("$err");
}
- $lei->fail("$err");
}
- close(delete $lei->{au_done}); # triggers wait_startq
+ if (!$err && delete $lei->{early_mua}) { # non-augment case
+ $lei->start_mua;
+ }
+ close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
}
my $MAX_PER_HOST = 4;
@@ -334,9 +356,6 @@ sub concurrency {
sub start_query { # always runs in main (lei-daemon) process
my ($self, $lei) = @_;
- if (my $l2m = $lei->{l2m}) {
- $lei->start_mua if $l2m->lock_free;
- }
if ($lei->{opt}->{threads}) {
for my $ibxish (locals($self)) {
$self->wq_io_do('query_thread_mset', [], $ibxish);
@@ -387,6 +406,9 @@ sub do_query {
my $l2m = $lei->{l2m};
if ($l2m) {
$l2m->pre_augment($lei);
+ if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
+ $lei->start_mua;
+ }
$l2m->wq_workers_start('lei2mail', $l2m->{jobs},
$lei->oldset, { lei => $lei });
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
@@ -404,7 +426,7 @@ sub do_query {
delete $lei->{pkt_op_p};
$l2m->wq_close(1) if $l2m;
$lei->event_step_init; # wait for shutdowns
- $self->wq_io_do('query_prepare', []) if $l2m;
+ $self->wq_io_do('query_prepare', []) if $l2m; # for augment/dedupe
start_query($self, $lei);
$self->wq_close(1); # lei_xsearch workers stop when done
if ($lei->{oneshot}) {
^ permalink raw reply related [relevance 3%]
* [PATCH 00/13] lei approxidate, startup fix, --alert
@ 2021-02-08 9:05 7% Eric Wong
2021-02-08 9:05 3% ` [PATCHv2 01/13] lei q: improve remote mboxrd UX + MUA Eric Wong
0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2021-02-08 9:05 UTC (permalink / raw)
To: meta
I've redone and squashed some changes into PATCH 1/13 which
was posted yesterday.
3/13 (SIGWINCH) is rebase necessary after 1/13,
4/13 (--alert=CMD) is a generalized take on 3/13.
12/13 is...
Eric Wong (13):
lei q: improve remote mboxrd UX + MUA
lei_xsearch: quiet Eml warnings from remote mboxrds
lei q: SIGWINCH process group with the terminal
lei q: support --alert=CMD for early MUA users
tests: favor IPv6
ds: improve add_timer usability
lei: start_pager: drop COLUMNS default
lei: avoid racing on unlink + bind + listen
lei: drop BSD::Resource usage
git: implement date_parse method
lei q: use git approxidate with d:, dt: and rt: ranges
search: use one git-rev-parse process for all dates
spawnpp: raise exception on E2BIG errors
lib/PublicInbox/DS.pm | 10 ++--
lib/PublicInbox/ExtSearchIdx.pm | 5 +-
lib/PublicInbox/FakeInotify.pm | 4 +-
lib/PublicInbox/Git.pm | 10 +++-
lib/PublicInbox/IPC.pm | 8 +--
lib/PublicInbox/LEI.pm | 100 ++++++++++++++++++++++----------
lib/PublicInbox/LeiCurl.pm | 11 +++-
lib/PublicInbox/LeiMirror.pm | 5 +-
lib/PublicInbox/LeiOverview.pm | 6 +-
lib/PublicInbox/LeiQuery.pm | 12 ++--
lib/PublicInbox/LeiToMail.pm | 24 ++++----
lib/PublicInbox/LeiXSearch.pm | 97 ++++++++++++++++++++-----------
lib/PublicInbox/Search.pm | 86 +++++++++++++++++++++++++++
lib/PublicInbox/SpawnPP.pm | 23 ++++++--
lib/PublicInbox/TestCommon.pm | 30 ++++++++--
lib/PublicInbox/Watch.pm | 19 +++---
script/lei | 16 ++---
t/extsearch.t | 2 +-
t/git.t | 17 +++++-
t/httpd-corner.psgi | 2 +-
t/httpd-corner.t | 12 ++--
t/httpd-https.t | 2 +-
t/httpd-unix.t | 7 +--
t/httpd.t | 8 +--
t/imapd-tls.t | 4 +-
t/imapd.t | 8 +--
t/lei-mirror.t | 2 +-
t/nntpd-tls.t | 4 +-
t/nntpd.t | 11 ++--
t/psgi_attach.t | 2 +-
t/psgi_v2.t | 2 +-
t/search.t | 51 ++++++++++++++++
t/solver_git.t | 2 +-
t/v2mirror.t | 3 +-
t/v2writable.t | 3 +-
t/www_altid.t | 2 +-
t/www_listing.t | 3 +-
xt/git-http-backend.t | 4 +-
xt/httpd-async-stream.t | 2 +-
xt/imapd-mbsync-oimap.t | 4 +-
xt/imapd-validate.t | 4 +-
xt/mem-imapd-tls.t | 2 +-
xt/nntpd-validate.t | 3 +-
xt/perf-nntpd.t | 16 ++---
xt/solver.t | 3 +-
45 files changed, 441 insertions(+), 210 deletions(-)
^ permalink raw reply [relevance 7%]
Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-02-08 9:05 7% [PATCH 00/13] lei approxidate, startup fix, --alert Eric Wong
2021-02-08 9:05 3% ` [PATCHv2 01/13] lei q: improve remote mboxrd UX + MUA Eric Wong
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).