user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 03/16] lei q: emit progress and counting via PktOp
Date: Tue,  2 Feb 2021 11:46:49 +0000	[thread overview]
Message-ID: <20210202114702.29886-4-e@80x24.org> (raw)
In-Reply-To: <20210202114702.29886-1-e@80x24.org>

Sometimes it can be confusing for "lei q" to finish writing to a
Maildir|mbox and not know if it did anything.  So show some
per-external progress and stats.

These can be disabled via the new --quiet/-q switch.

We differ slightly from mairix(1) here, as we use stderr
instead of stdout for reporting totals (and we support
parallel queries from various sources).
---
 lib/PublicInbox/IPC.pm        | 23 +++++++++-------
 lib/PublicInbox/LEI.pm        |  2 +-
 lib/PublicInbox/LeiXSearch.pm | 51 ++++++++++++++++++++++++++---------
 lib/PublicInbox/PktOp.pm      | 36 +++++++++----------------
 t/lei.t                       |  8 +++---
 xt/lei-sigpipe.t              |  2 +-
 6 files changed, 71 insertions(+), 51 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 689f32d0..50de1bed 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -10,6 +10,7 @@
 package PublicInbox::IPC;
 use strict;
 use v5.10.1;
+use parent qw(Exporter);
 use Carp qw(confess croak);
 use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn;
@@ -18,6 +19,7 @@ use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
+our @EXPORT_OK = qw(ipc_freeze ipc_thaw);
 my $WQ_MAX_WORKERS = 4096;
 my ($enc, $dec);
 # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+
@@ -33,12 +35,13 @@ BEGIN {
 };
 
 if ($enc && $dec) { # should be custom ops
-	*freeze = sub ($) { sereal_encode_with_object $enc, $_[0] };
-	*thaw = sub ($) { sereal_decode_with_object $dec, $_[0], my $ret };
+	*ipc_freeze = sub ($) { sereal_encode_with_object $enc, $_[0] };
+	*ipc_thaw = sub ($) { sereal_decode_with_object $dec, $_[0], my $ret };
 } else {
 	eval { # some distros have Storable as a separate package from Perl
 		require Storable;
-		Storable->import(qw(freeze thaw));
+		*ipc_freeze = \&Storable::freeze;
+		*ipc_thaw = \&Storable::thaw;
 		$enc = 1;
 	} // warn("Storable (part of Perl) missing: $@\n");
 }
@@ -56,12 +59,12 @@ sub _get_rec ($) {
 	chop($len) eq "\n" or croak "no LF byte in $len";
 	defined(my $n = read($r, my $buf, $len)) or croak "read error: $!";
 	$n == $len or croak "short read: $n != $len";
-	thaw($buf);
+	ipc_thaw($buf);
 }
 
 sub _pack_rec ($) {
 	my ($ref) = @_;
-	my $buf = freeze($ref);
+	my $buf = ipc_freeze($ref);
 	length($buf) . "\n" . $buf;
 }
 
@@ -275,7 +278,7 @@ sub recv_and_run {
 		$n = length($buf);
 	}
 	# Sereal dies on truncated data, Storable returns undef
-	my $args = thaw($buf) // die "thaw error on buffer of size: $n";
+	my $args = ipc_thaw($buf) // die "thaw error on buffer of size: $n";
 	undef $buf;
 	my $sub = shift @$args;
 	eval { $self->$sub(@$args) };
@@ -301,15 +304,15 @@ sub wq_do { # always async
 	my ($self, $sub, $ios, @args) = @_;
 	if (my $s1 = $self->{-wq_s1}) { # run in worker
 		my $fds = [ map { fileno($_) } @$ios ];
-		my $n = $send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR);
+		my $buf = ipc_freeze([$sub, @args]);
+		my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
 		return if defined($n); # likely
 		croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS};
 		croak "sendmsg: $!" if !$!{EMSGSIZE};
 		socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
 			croak "socketpair: $!";
-		my $buf = freeze([$sub, @args]);
 		$n = $send_cmd->($s1, [ fileno($r) ],
-				freeze(['do_sock_stream', length($buf)]),
+				ipc_freeze(['do_sock_stream', length($buf)]),
 				MSG_EOR) // croak "sendmsg: $!";
 		undef $r;
 		$n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
@@ -461,6 +464,6 @@ sub DESTROY {
 }
 
 # Sereal doesn't have dclone
-sub deep_clone { thaw(freeze($_[-1])) }
+sub deep_clone { ipc_thaw(ipc_freeze($_[-1])) }
 
 1;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 737db1e1..6c2515dc 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -104,7 +104,7 @@ our %CMD = ( # sorted in order of importance/use:
 'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw(
 	save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a
 	sort|s=s reverse|r offset=i remote! local! external! pretty
-	mua-cmd|mua=s no-torsocks torsocks=s verbose|v
+	mua-cmd|mua=s no-torsocks torsocks=s verbose|v quiet|q
 	received-after=s received-before=s sent-after=s sent-since=s),
 	PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ],
 
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index e577ab09..95862306 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -8,12 +8,11 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::DS qw(dwaitpid);
-use PublicInbox::PktOp;
+use PublicInbox::DS qw(dwaitpid now);
+use PublicInbox::PktOp qw(pkt_do);
 use PublicInbox::Import;
 use File::Temp 0.19 (); # 0.19 for ->newdir
 use File::Spec ();
-use Socket qw(MSG_EOR);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::Spawn qw(popen_rd spawn which);
 use PublicInbox::MID qw(mids);
@@ -97,7 +96,7 @@ sub over {}
 sub _mset_more ($$) {
 	my ($mset, $mo) = @_;
 	my $size = $mset->size;
-	$size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+	$size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit});
 }
 
 # $startq will EOF when query_prepare is done augmenting and allow
@@ -115,16 +114,15 @@ sub query_thread_mset { # for --thread
 	my $startq = delete $lei->{startq};
 
 	my ($srch, $over) = ($ibxish->search, $ibxish->over);
-	unless ($srch && $over) {
-		my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
-		warn "$desc not indexed by Xapian\n";
-		return;
-	}
+	my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+	return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
 	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
 	do {
 		$mset = $srch->mset($mo->{qstr}, $mo);
+		pkt_do($lei->{pkt_op}, 'mset_progress', $desc, $mset->size,
+				$mset->get_matches_estimated);
 		my $ids = $srch->mset_to_artnums($mset, $mo);
 		my $ctx = { ids => $ids };
 		my $i = 0;
@@ -156,6 +154,8 @@ sub query_mset { # non-parallel for non-"--thread" users
 	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
 	do {
 		$mset = $self->mset($mo->{qstr}, $mo);
+		pkt_do($lei->{pkt_op}, 'mset_progress', 'xsearch',
+				$mset->size, $mset->get_matches_estimated);
 		for my $mitem ($mset->items) {
 			my $smsg = smsg_for($self, $mitem) or next;
 			wait_startq($startq) if $startq;
@@ -174,6 +174,16 @@ sub each_eml { # callback for MboxReader->mboxrd
 	$smsg->{$_} //= '' for qw(from to cc ds subject references mid);
 	delete @$smsg{qw(From Subject -ds -ts)};
 	if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+	++$lei->{-nr_remote_eml};
+	if (!$lei->{opt}->{quiet}) {
+		my $now = now();
+		my $next = $lei->{-next_progress} //= ($now + 1);
+		if ($now > $next) {
+			$lei->{-next_progress} = $now + 1;
+			my $nr = $lei->{-nr_remote_eml};
+			$lei->err("# $lei->{-current_url} $nr/?");
+		}
+	}
 	$each_smsg->($smsg, undef, $eml);
 }
 
@@ -223,6 +233,8 @@ sub query_remote_mboxrd {
 	my $tor = $opt->{torsocks} //= 'auto';
 	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
 	for my $uri (@$uris) {
+		$lei->{-current_url} = $uri->as_string;
+		$lei->{-nr_remote_eml} = 0;
 		$uri->query_form(@qform);
 		my $cmd = [ @cmd, $uri->as_string ];
 		if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' &&
@@ -246,7 +258,12 @@ sub query_remote_mboxrd {
 							$lei, $each_smsg);
 		};
 		return $lei->fail("E: @$cmd: $@") if $@;
-		next unless $?;
+		if ($? == 0) {
+			my $nr = $lei->{-nr_remote_eml};
+			pkt_do($lei->{pkt_op}, 'mset_progress',
+				$lei->{-current_url}, $nr, $nr);
+			next;
+		}
 		seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
 		my $e = do { local $/; <$cerr> } //
 				die "read(curl stderr): $!\n";
@@ -299,9 +316,19 @@ Error closing $lei->{ovv}->{dst}: $!
 		}
 		$lei->start_mua;
 	}
+	$lei->{opt}->{quiet} or
+		$lei->err('# ', $lei->{-mset_total} // 0, " matches");
 	$lei->dclose;
 }
 
+sub mset_progress { # called via pkt_op/pkt_do from workers
+	my ($lei, $pargs) = @_;
+	my ($desc, $mset_size, $mset_total_est) = @$pargs;
+	return if $lei->{opt}->{quiet};
+	$lei->{-mset_total} += $mset_size;
+	$lei->err("# $desc $mset_size/$mset_total_est");
+}
+
 sub do_post_augment {
 	my ($lei, $zpipe, $au_done) = @_;
 	my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
@@ -354,8 +381,7 @@ sub query_prepare { # called by wq_do
 	delete $lei->{l2m}->{-wq_s1};
 	eval { $lei->{l2m}->do_augment($lei) };
 	$lei->fail($@) if $@;
-	send($lei->{pkt_op}, '.', MSG_EOR) == 1 or
-		die "do_post_augment trigger: $!"
+	pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
 sub fail_handler ($;$$) {
@@ -388,6 +414,7 @@ sub do_query {
 		'!' => [ \&fail_handler, $lei ],
 		'.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
 		'' => [ \&query_done, $lei ],
+		'mset_progress' => [ \&mset_progress, $lei ],
 	};
 	(my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop);
 	my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm
index d5b95a73..12839e71 100644
--- a/lib/PublicInbox/PktOp.pm
+++ b/lib/PublicInbox/PktOp.pm
@@ -9,25 +9,16 @@
 package PublicInbox::PktOp;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::DS);
+use parent qw(PublicInbox::DS Exporter);
 use Errno qw(EAGAIN EINTR);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 use Socket qw(AF_UNIX MSG_EOR SOCK_SEQPACKET);
+use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
+our @EXPORT_OK = qw(pkt_do);
 
 sub new {
 	my ($cls, $r, $ops, $in_loop) = @_;
 	my $self = bless { sock => $r, ops => $ops, re => [] }, $cls;
-	if (ref($ops) eq 'ARRAY') {
-		my %ops;
-		for my $op (@$ops) {
-			if (ref($op->[0])) {
-				push @{$self->{re}}, $op;
-			} else {
-				$ops{$op->[0]} = $op->[1];
-			}
-		}
-		$self->{ops} = \%ops;
-	}
 	if ($in_loop) { # iff using DS->EventLoop
 		$r->blocking(0);
 		$self->SUPER::new($r, EPOLLIN|EPOLLET);
@@ -43,6 +34,11 @@ sub pair {
 	(new($cls, $c, $ops, $in_loop), $p);
 }
 
+sub pkt_do { # for the producer to trigger event_step in consumer
+	my ($producer, $cmd, @args) = @_;
+	send($producer, @args ? "$cmd\0".ipc_freeze(\@args) : $cmd, MSG_EOR);
+}
+
 sub close {
 	my ($self) = @_;
 	my $c = $self->{sock} or return;
@@ -54,24 +50,18 @@ sub event_step {
 	my $c = $self->{sock};
 	my $msg;
 	do {
-		my $n = recv($c, $msg, 128, 0);
+		my $n = recv($c, $msg, 4096, 0);
 		unless (defined $n) {
 			return if $! == EAGAIN;
 			next if $! == EINTR;
 			$self->close;
 			die "recv: $!";
 		}
-		my $op = $self->{ops}->{$msg};
-		unless ($op) {
-			for my $re_op (@{$self->{re}}) {
-				$msg =~ $re_op->[0] or next;
-				$op = $re_op->[1];
-				last;
-			}
-		}
-		die "BUG: unknown message: `$msg'" unless $op;
+		my ($cmd, $pargs) = split(/\0/, $msg, 2);
+		my $op = $self->{ops}->{$cmd // $msg};
+		die "BUG: unknown message: `$cmd'" unless $op;
 		my ($sub, @args) = @$op;
-		$sub->(@args);
+		$sub->(@args, $pargs ? ipc_thaw($pargs) : ());
 		return $self->close if $msg eq ''; # close on EOF
 	} while (1);
 }
diff --git a/t/lei.t b/t/lei.t
index 3f6702e6..a46e46f2 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -174,11 +174,11 @@ SKIP: {
 	}
 	$lei->('add-external', $url);
 	my $mid = '20140421094015.GA8962@dcvr.yhbt.net';
-	ok($lei->('q', "m:$mid"), "query $url");
+	ok($lei->('q', '-q', "m:$mid"), "query $url");
 	is($err, '', "no errors on $url");
 	my $res = $json->decode($out);
 	is($res->[0]->{'m'}, "<$mid>", "got expected mid from $url");
-	ok($lei->('q', "m:$mid", 'd:..20101002'), 'no results, no error');
+	ok($lei->('q', '-q', "m:$mid", 'd:..20101002'), 'no results, no error');
 	is($err, '', 'no output on 404, matching local FS behavior');
 	is($out, "[null]\n", 'got null results');
 	$lei->('forget-external', $url);
@@ -291,12 +291,12 @@ my $test_external = sub {
 		my @s = grep(/^Subject:/, $cat->());
 		is(scalar(@s), 1, "1 result in mbox$sfx");
 		$lei->('q', '-a', '-o', "mboxcl2:$f", 's:see attachment');
-		is($err, '', 'no errors from augment');
+		is(grep(!/^#/, $err), 0, 'no errors from augment');
 		@s = grep(/^Subject:/, my @wtf = $cat->());
 		is(scalar(@s), 2, "2 results in mbox$sfx");
 
 		$lei->('q', '-a', '-o', "mboxcl2:$f", 's:nonexistent');
-		is($err, '', "no errors on no results ($sfx)");
+		is(grep(!/^#/, $err), 0, "no errors on no results ($sfx)");
 
 		my @s2 = grep(/^Subject:/, $cat->());
 		is_deeply(\@s2, \@s,
diff --git a/xt/lei-sigpipe.t b/xt/lei-sigpipe.t
index 448bd7db..1aa9ed07 100644
--- a/xt/lei-sigpipe.t
+++ b/xt/lei-sigpipe.t
@@ -15,7 +15,7 @@ my $do_test = sub {
 		pipe(my ($r, $w)) or BAIL_OUT $!;
 		open my $err, '+>', undef or BAIL_OUT $!;
 		my $opt = { run_mode => 0, 1 => $w, 2 => $err };
-		my $cmd = [qw(lei q -t), @$out, 'bytes:1..'];
+		my $cmd = [qw(lei q -q -t), @$out, 'bytes:1..'];
 		my $tp = start_script($cmd, $env, $opt);
 		close $w;
 		sysread($r, my $buf, 1);

  parent reply	other threads:[~2021-02-02 11:47 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-02 11:46 [PATCH 00/16] lei: -I/--include and more Eric Wong
2021-02-02 11:46 ` [PATCH 01/16] lei: switch to use SEQPACKET socketpair instead of pipe Eric Wong
2021-02-02 11:46 ` [PATCH 02/16] lei_query: default to 10000 messages as documented Eric Wong
2021-02-02 11:46 ` Eric Wong [this message]
2021-02-02 11:46 ` [PATCH 04/16] lei q: support --only, --include and --exclude Eric Wong
2021-02-02 11:46 ` [PATCH 05/16] lei: complete: do not complete non-arg options w/ help text Eric Wong
2021-02-02 11:46 ` [PATCH 06/16] lei: q: shell completion for --(include|exclude|only) Eric Wong
2021-02-02 11:46 ` [PATCH 07/16] lei_xsearch: truncate curl stderr after reading it Eric Wong
2021-02-02 11:46 ` [PATCH 08/16] lib: explicitly distinguish oneshot use Eric Wong
2021-02-02 11:46 ` [PATCH 09/16] lei q: do not leave temporary files after oneshot exit Eric Wong
2021-02-02 11:46 ` [PATCH 10/16] cmd_ipc4: fix comments and formatting Eric Wong
2021-02-02 11:46 ` [PATCH 11/16] pktop: fix potential undefined var Eric Wong
2021-02-02 11:46 ` [PATCH 12/16] lei_xsearch: ensure curl.err and tail(1) cleanup happens Eric Wong
2021-02-02 11:46 ` [PATCH 13/16] doc: lei-q: note "-a" and link to Xapian QueryParser Eric Wong
2021-02-02 11:47 ` [PATCH 14/16] lei_overview: avoid unnecessary {l2m} delete Eric Wong
2021-02-02 11:47 ` [PATCH 15/16] lei q: tidy up progress reporting Eric Wong
2021-02-02 11:47 ` [PATCH 16/16] lei q: support --jobs [SEARCHERS],[WRITERS] Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210202114702.29886-4-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    --subject='Re: [PATCH 03/16] lei q: emit progress and counting via PktOp' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Code repositories for project(s) associated with this inbox:

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).