user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCHv2 01/13] lei q: improve remote mboxrd UX + MUA
  2021-02-08  9:05  7% [PATCH 00/13] lei approxidate, startup fix, --alert Eric Wong
@ 2021-02-08  9:05  3% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2021-02-08  9:05 UTC (permalink / raw)
  To: meta

For early MUA spawners using lock-free outputs, we we need to
on the startq pipe to silence progress reporting.  For
--augment users, we can start the MUA even earlier by
creating Maildirs in the pre-augment phase.

To improve progress reporting for non-MUA (or late-MUA)
spawners, we'll no longer blindly append "--compressed" to the
curl(1) command when POST-ing for the gzipped mboxrd.
Furthermore, we'll overload stringify ('""') in LeiCurl to
ensure the empty -d '' string shows up properly.

v2: fix startq waiting with --threads
    mset_progress is never shown with early MUA spawning,
    The plan is to still show progress when augmenting and
    deduping.  This fixes all local search cases.
    A leftover debug bit is dropped, too
---
 lib/PublicInbox/IPC.pm         |  8 ++--
 lib/PublicInbox/LEI.pm         |  4 +-
 lib/PublicInbox/LeiCurl.pm     | 11 +++--
 lib/PublicInbox/LeiMirror.pm   |  5 +-
 lib/PublicInbox/LeiOverview.pm |  3 +-
 lib/PublicInbox/LeiToMail.pm   | 24 +++++-----
 lib/PublicInbox/LeiXSearch.pm  | 88 +++++++++++++++++++++-------------
 7 files changed, 86 insertions(+), 57 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index c8673e26..9331233a 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -109,7 +109,6 @@ sub ipc_worker_spawn {
 		$w_res->autoflush(1);
 		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
 		local $0 = $ident;
-		PublicInbox::DS::sig_setmask($sigset);
 		# ensure we properly exit even if warn() dies:
 		my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
 		eval {
@@ -117,6 +116,7 @@ sub ipc_worker_spawn {
 			local @$self{keys %$fields} = values(%$fields);
 			my $on_destroy = $self->ipc_atfork_child;
 			local %SIG = %SIG;
+			PublicInbox::DS::sig_setmask($sigset);
 			ipc_worker_loop($self, $r_req, $w_res);
 		};
 		warn "worker $ident PID:$$ died: $@\n" if $@;
@@ -293,7 +293,6 @@ sub _wq_worker_start ($$$) {
 		$SIG{$_} = 'IGNORE' for (qw(PIPE));
 		$SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
 		local $0 = $self->{-wq_ident};
-		PublicInbox::DS::sig_setmask($oldset);
 		# ensure we properly exit even if warn() dies:
 		my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
 		eval {
@@ -301,6 +300,7 @@ sub _wq_worker_start ($$$) {
 			local @$self{keys %$fields} = values(%$fields);
 			my $on_destroy = $self->ipc_atfork_child;
 			local %SIG = %SIG;
+			PublicInbox::DS::sig_setmask($oldset);
 			wq_worker_loop($self);
 		};
 		warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
@@ -395,9 +395,9 @@ sub wq_close {
 }
 
 sub wq_kill_old {
-	my ($self) = @_;
+	my ($self, $sig) = @_;
 	my $pids = $self->{"-wq_old_pids.$$"} or return;
-	kill 'TERM', @$pids;
+	kill($sig // 'TERM', @$pids);
 }
 
 sub wq_kill {
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index dce80762..c3645698 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -741,7 +741,9 @@ sub start_mua {
 	} elsif ($self->{oneshot}) {
 		$self->{"mua.pid.$self.$$"} = spawn(\@cmd);
 	}
-	delete $self->{-progress};
+	if ($self->{lxs} && $self->{au_done}) { # kick wait_startq
+		syswrite($self->{au_done}, 'q' x ($self->{lxs}->{jobs} // 0));
+	}
 }
 
 # caller needs to "-t $self->{1}" to check if tty
diff --git a/lib/PublicInbox/LeiCurl.pm b/lib/PublicInbox/LeiCurl.pm
index 38b17c78..f346a1b4 100644
--- a/lib/PublicInbox/LeiCurl.pm
+++ b/lib/PublicInbox/LeiCurl.pm
@@ -8,6 +8,12 @@ use v5.10.1;
 use PublicInbox::Spawn qw(which);
 use PublicInbox::Config;
 
+# Ensures empty strings are quoted, we don't need more
+# sophisticated quoting than for empty strings: curl -d ''
+use overload '""' => sub {
+	join(' ', map { $_ eq '' ?  "''" : $_ } @{$_[0]});
+};
+
 my %lei2curl = (
 	'curl-config=s@' => 'config|K=s@',
 );
@@ -63,10 +69,9 @@ EOM
 
 # completes the result of cmd() for $uri
 sub for_uri {
-	my ($self, $lei, $uri) = @_;
+	my ($self, $lei, $uri, @opt) = @_;
 	my $pfx = torsocks($self, $lei, $uri) or return; # error
-	[ @$pfx, @$self, substr($uri->path, -3) eq '.gz' ? () : '--compressed',
-		$uri->as_string ]
+	bless [ @$pfx, @$self, @opt, $uri->as_string ], ref($self);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index 5ba69287..c5153148 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -31,7 +31,7 @@ sub try_scrape {
 	my $uri = URI->new($self->{src});
 	my $lei = $self->{lei};
 	my $curl = $self->{curl} //= PublicInbox::LeiCurl->new($lei) or return;
-	my $cmd = $curl->for_uri($lei, $uri);
+	my $cmd = $curl->for_uri($lei, $uri, '--compressed');
 	my $opt = { 0 => $lei->{0}, 2 => $lei->{2} };
 	my $fh = popen_rd($cmd, $lei->{env}, $opt);
 	my $html = do { local $/; <$fh> } // die "read(curl $uri): $!";
@@ -93,8 +93,7 @@ sub _try_config {
 	my $path = $uri->path;
 	chop($path) eq '/' or die "BUG: $uri not canonicalized";
 	$uri->path($path . '/_/text/config/raw');
-	my $cmd = $self->{curl}->for_uri($lei, $uri);
-	push @$cmd, '--compressed'; # curl decompresses for us
+	my $cmd = $self->{curl}->for_uri($lei, $uri, '--compressed');
 	my $ce = "$dst/inbox.config.example";
 	my $f = "$ce-$$.tmp";
 	open(my $fh, '+>', $f) or return $lei->err("open $f: $! (non-fatal)");
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index dcfb9cc7..f0ac4684 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -95,9 +95,10 @@ sub new {
 		$lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
 	} else {
 		# default to the cheapest sort since MUA usually resorts
-		$lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
+		$opt->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
 		$lei->{l2m} = eval { PublicInbox::LeiToMail->new($lei) };
 		return $lei->fail($@) if $@;
+		$lei->{early_mua} = 1 if $opt->{mua} && $lei->{l2m}->lock_free;
 	}
 	$self;
 }
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 4c5a5685..a5a196db 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -371,7 +371,17 @@ sub new {
 	$self;
 }
 
-sub _pre_augment_maildir {} # noop
+sub _pre_augment_maildir {
+	my ($self, $lei) = @_;
+	my $dst = $lei->{ovv}->{dst};
+	for my $x (qw(tmp new cur)) {
+		my $d = $dst.$x;
+		next if -d $d;
+		require File::Path;
+		File::Path::mkpath($d);
+		-d $d or die "$d is not a directory";
+	}
+}
 
 sub _do_augment_maildir {
 	my ($self, $lei) = @_;
@@ -388,17 +398,7 @@ sub _do_augment_maildir {
 	}
 }
 
-sub _post_augment_maildir {
-	my ($self, $lei) = @_;
-	my $dst = $lei->{ovv}->{dst};
-	for my $x (qw(tmp new cur)) {
-		my $d = $dst.$x;
-		next if -d $d;
-		require File::Path;
-		File::Path::mkpath($d);
-		-d $d or die "$d is not a directory";
-	}
-}
+sub _post_augment_maildir {} # noop
 
 sub _pre_augment_mbox {
 	my ($self, $lei) = @_;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 2794140a..db089a67 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -101,20 +101,34 @@ sub _mset_more ($$) {
 # $startq will EOF when query_prepare is done augmenting and allow
 # query_mset and query_thread_mset to proceed.
 sub wait_startq ($) {
-	my ($startq) = @_;
-	$_[0] = undef;
-	read($startq, my $query_prepare_done, 1);
+	my ($lei) = @_;
+	my $startq = delete $lei->{startq} or return;
+	while (1) {
+		my $n = sysread($startq, my $query_prepare_done, 1);
+		if (defined $n) {
+			return if $n == 0; # no MUA
+			if ($query_prepare_done eq 'q') {
+				$lei->{opt}->{quiet} = 1;
+				delete $lei->{opt}->{verbose};
+				delete $lei->{-progress};
+			} else {
+				$lei->fail("$$ WTF `$query_prepare_done'");
+			}
+			return;
+		}
+		return $lei->fail("$$ wait_startq: $!") unless $!{EINTR};
+	}
 }
 
 sub mset_progress {
 	my $lei = shift;
-	return unless $lei->{-progress};
+	return if $lei->{early_mua} || !$lei->{-progress};
 	if ($lei->{pkt_op_p}) {
 		pkt_do($lei->{pkt_op_p}, 'mset_progress', @_);
 	} else { # single lei-daemon consumer
 		my ($desc, $mset_size, $mset_total_est) = @_;
 		$lei->{-mset_total} += $mset_size;
-		$lei->err("# $desc $mset_size/$mset_total_est");
+		$lei->qerr("# $desc $mset_size/$mset_total_est");
 	}
 }
 
@@ -122,7 +136,6 @@ sub query_thread_mset { # for --threads
 	my ($self, $ibxish) = @_;
 	local $0 = "$0 query_thread_mset";
 	my $lei = $self->{lei};
-	my $startq = delete $lei->{startq};
 	my ($srch, $over) = ($ibxish->search, $ibxish->over);
 	my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
 	return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
@@ -140,7 +153,7 @@ sub query_thread_mset { # for --threads
 		while ($over->expand_thread($ctx)) {
 			for my $n (@{$ctx->{xids}}) {
 				my $smsg = $over->get_art($n) or next;
-				wait_startq($startq) if $startq;
+				wait_startq($lei);
 				my $mitem = delete $n2item{$smsg->{num}};
 				$each_smsg->($smsg, $mitem);
 			}
@@ -155,7 +168,6 @@ sub query_mset { # non-parallel for non-"--threads" users
 	my ($self) = @_;
 	local $0 = "$0 query_mset";
 	my $lei = $self->{lei};
-	my $startq = delete $lei->{startq};
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
 	for my $loc (locals($self)) {
@@ -168,7 +180,7 @@ sub query_mset { # non-parallel for non-"--threads" users
 				$mset->size, $mset->get_matches_estimated);
 		for my $mitem ($mset->items) {
 			my $smsg = smsg_for($self, $mitem) or next;
-			wait_startq($startq) if $startq;
+			wait_startq($lei);
 			$each_smsg->($smsg, $mitem);
 		}
 	} while (_mset_more($mset, $mo));
@@ -183,7 +195,7 @@ sub each_eml { # callback for MboxReader->mboxrd
 	$smsg->parse_references($eml, mids($eml));
 	$smsg->{$_} //= '' for qw(from to cc ds subject references mid);
 	delete @$smsg{qw(From Subject -ds -ts)};
-	if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+	wait_startq($lei);
 	if ($lei->{-progress}) {
 		++$lei->{-nr_remote_eml};
 		my $now = now();
@@ -210,7 +222,6 @@ sub query_remote_mboxrd {
 	my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1);
 	fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!";
 	my $rdr = { 2 => $cerr, pgid => 0 };
-	my $coff = 0;
 	my $sigint_reap = $lei->can('sigint_reap');
 	if ($verbose) {
 		# spawn a process to force line-buffering, otherwise curl
@@ -228,13 +239,14 @@ sub query_remote_mboxrd {
 		$lei->{-nr_remote_eml} = 0;
 		$uri->query_form(@qform);
 		my $cmd = $curl->for_uri($lei, $uri);
-		$lei->err("# @$cmd") if $verbose;
+		$lei->qerr("# $cmd");
 		my ($fh, $pid) = popen_rd($cmd, $env, $rdr);
 		$reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid);
 		$fh = IO::Uncompress::Gunzip->new($fh);
 		PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
 						$lei, $each_smsg);
-		my $err = waitpid($pid, 0) == $pid ? undef : "BUG: waitpid: $!";
+		my $err = waitpid($pid, 0) == $pid ? undef
+						: "BUG: waitpid($cmd): $!";
 		@$reap_curl = (); # cancel OnDestroy
 		die $err if $err;
 		if ($? == 0) {
@@ -242,16 +254,18 @@ sub query_remote_mboxrd {
 			mset_progress($lei, $lei->{-current_url}, $nr, $nr);
 			next;
 		}
-		seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
-		my $e = do { local $/; <$cerr> } //
-				die "read(curl stderr): $!\n";
-		$coff += length($e);
-		truncate($cerr, 0);
-		next if (($? >> 8) == 22 && $e =~ /\b404\b/);
-		$lei->child_error($?);
+		$err = '';
+		if (-s $cerr) {
+			seek($cerr, 0, SEEK_SET) or
+					$lei->err("seek($cmd stderr): $!");
+			$err = do { local $/; <$cerr> } //
+					"read($cmd stderr): $!";
+			truncate($cerr, 0) or
+					$lei->err("truncate($cmd stderr): $!");
+		}
+		next if (($? >> 8) == 22 && $err =~ /\b404\b/);
 		$uri->query_form(q => $lei->{mset_opt}->{qstr});
-		# --verbose already showed the error via tail(1)
-		$lei->err("E: $uri \$?=$?\n", $verbose ? () : $e);
+		$lei->child_error($?, "E: <$uri> $err");
 	}
 	undef $each_smsg;
 	$lei->{ovv}->ovv_atexit_child($lei);
@@ -311,15 +325,23 @@ Error closing $lei->{ovv}->{dst}: $!
 
 sub do_post_augment {
 	my ($lei) = @_;
-	eval { $lei->{l2m}->post_augment($lei) };
-	if (my $err = $@) {
-		if (my $lxs = delete $lei->{lxs}) {
-			$lxs->wq_kill;
-			$lxs->wq_close(0, undef, $lei);
+	my $l2m = $lei->{l2m};
+	my $err;
+	if ($l2m) {
+		eval { $l2m->post_augment($lei) };
+		$err = $@;
+		if ($err) {
+			if (my $lxs = delete $lei->{lxs}) {
+				$lxs->wq_kill;
+				$lxs->wq_close(0, undef, $lei);
+			}
+			$lei->fail("$err");
 		}
-		$lei->fail("$err");
 	}
-	close(delete $lei->{au_done}); # triggers wait_startq
+	if (!$err && delete $lei->{early_mua}) { # non-augment case
+		$lei->start_mua;
+	}
+	close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
 }
 
 my $MAX_PER_HOST = 4;
@@ -334,9 +356,6 @@ sub concurrency {
 
 sub start_query { # always runs in main (lei-daemon) process
 	my ($self, $lei) = @_;
-	if (my $l2m = $lei->{l2m}) {
-		$lei->start_mua if $l2m->lock_free;
-	}
 	if ($lei->{opt}->{threads}) {
 		for my $ibxish (locals($self)) {
 			$self->wq_io_do('query_thread_mset', [], $ibxish);
@@ -387,6 +406,9 @@ sub do_query {
 	my $l2m = $lei->{l2m};
 	if ($l2m) {
 		$l2m->pre_augment($lei);
+		if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
+			$lei->start_mua;
+		}
 		$l2m->wq_workers_start('lei2mail', $l2m->{jobs},
 					$lei->oldset, { lei => $lei });
 		pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
@@ -404,7 +426,7 @@ sub do_query {
 	delete $lei->{pkt_op_p};
 	$l2m->wq_close(1) if $l2m;
 	$lei->event_step_init; # wait for shutdowns
-	$self->wq_io_do('query_prepare', []) if $l2m;
+	$self->wq_io_do('query_prepare', []) if $l2m; # for augment/dedupe
 	start_query($self, $lei);
 	$self->wq_close(1); # lei_xsearch workers stop when done
 	if ($lei->{oneshot}) {

^ permalink raw reply related	[relevance 3%]

* [PATCH 00/13] lei approxidate, startup fix, --alert
@ 2021-02-08  9:05  7% Eric Wong
  2021-02-08  9:05  3% ` [PATCHv2 01/13] lei q: improve remote mboxrd UX + MUA Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2021-02-08  9:05 UTC (permalink / raw)
  To: meta

I've redone and squashed some changes into PATCH 1/13 which
was posted yesterday.

3/13 (SIGWINCH) is rebase necessary after 1/13,
4/13 (--alert=CMD) is a generalized take on 3/13.

12/13 is...

Eric Wong (13):
  lei q: improve remote mboxrd UX + MUA
  lei_xsearch: quiet Eml warnings from remote mboxrds
  lei q: SIGWINCH process group with the terminal
  lei q: support --alert=CMD for early MUA users
  tests: favor IPv6
  ds: improve add_timer usability
  lei: start_pager: drop COLUMNS default
  lei: avoid racing on unlink + bind + listen
  lei: drop BSD::Resource usage
  git: implement date_parse method
  lei q: use git approxidate with d:, dt: and rt: ranges
  search: use one git-rev-parse process for all dates
  spawnpp: raise exception on E2BIG errors

 lib/PublicInbox/DS.pm           |  10 ++--
 lib/PublicInbox/ExtSearchIdx.pm |   5 +-
 lib/PublicInbox/FakeInotify.pm  |   4 +-
 lib/PublicInbox/Git.pm          |  10 +++-
 lib/PublicInbox/IPC.pm          |   8 +--
 lib/PublicInbox/LEI.pm          | 100 ++++++++++++++++++++++----------
 lib/PublicInbox/LeiCurl.pm      |  11 +++-
 lib/PublicInbox/LeiMirror.pm    |   5 +-
 lib/PublicInbox/LeiOverview.pm  |   6 +-
 lib/PublicInbox/LeiQuery.pm     |  12 ++--
 lib/PublicInbox/LeiToMail.pm    |  24 ++++----
 lib/PublicInbox/LeiXSearch.pm   |  97 ++++++++++++++++++++-----------
 lib/PublicInbox/Search.pm       |  86 +++++++++++++++++++++++++++
 lib/PublicInbox/SpawnPP.pm      |  23 ++++++--
 lib/PublicInbox/TestCommon.pm   |  30 ++++++++--
 lib/PublicInbox/Watch.pm        |  19 +++---
 script/lei                      |  16 ++---
 t/extsearch.t                   |   2 +-
 t/git.t                         |  17 +++++-
 t/httpd-corner.psgi             |   2 +-
 t/httpd-corner.t                |  12 ++--
 t/httpd-https.t                 |   2 +-
 t/httpd-unix.t                  |   7 +--
 t/httpd.t                       |   8 +--
 t/imapd-tls.t                   |   4 +-
 t/imapd.t                       |   8 +--
 t/lei-mirror.t                  |   2 +-
 t/nntpd-tls.t                   |   4 +-
 t/nntpd.t                       |  11 ++--
 t/psgi_attach.t                 |   2 +-
 t/psgi_v2.t                     |   2 +-
 t/search.t                      |  51 ++++++++++++++++
 t/solver_git.t                  |   2 +-
 t/v2mirror.t                    |   3 +-
 t/v2writable.t                  |   3 +-
 t/www_altid.t                   |   2 +-
 t/www_listing.t                 |   3 +-
 xt/git-http-backend.t           |   4 +-
 xt/httpd-async-stream.t         |   2 +-
 xt/imapd-mbsync-oimap.t         |   4 +-
 xt/imapd-validate.t             |   4 +-
 xt/mem-imapd-tls.t              |   2 +-
 xt/nntpd-validate.t             |   3 +-
 xt/perf-nntpd.t                 |  16 ++---
 xt/solver.t                     |   3 +-
 45 files changed, 441 insertions(+), 210 deletions(-)


^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2021-02-08  9:05  7% [PATCH 00/13] lei approxidate, startup fix, --alert Eric Wong
2021-02-08  9:05  3% ` [PATCHv2 01/13] lei q: improve remote mboxrd UX + MUA Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).