From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id CE1A71FFB0 for ; Tue, 2 Feb 2021 11:47:02 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 03/16] lei q: emit progress and counting via PktOp Date: Tue, 2 Feb 2021 11:46:49 +0000 Message-Id: <20210202114702.29886-4-e@80x24.org> In-Reply-To: <20210202114702.29886-1-e@80x24.org> References: <20210202114702.29886-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Sometimes it can be confusing for "lei q" to finish writing to a Maildir|mbox and not know if it did anything. So show some per-external progress and stats. These can be disabled via the new --quiet/-q switch. We differ slightly from mairix(1) here, as we use stderr instead of stdout for reporting totals (and we support parallel queries from various sources). --- lib/PublicInbox/IPC.pm | 23 +++++++++------- lib/PublicInbox/LEI.pm | 2 +- lib/PublicInbox/LeiXSearch.pm | 51 ++++++++++++++++++++++++++--------- lib/PublicInbox/PktOp.pm | 36 +++++++++---------------- t/lei.t | 8 +++--- xt/lei-sigpipe.t | 2 +- 6 files changed, 71 insertions(+), 51 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 689f32d0..50de1bed 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -10,6 +10,7 @@ package PublicInbox::IPC; use strict; use v5.10.1; +use parent qw(Exporter); use Carp qw(confess croak); use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn; @@ -18,6 +19,7 @@ use PublicInbox::WQWorker; use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); +our @EXPORT_OK = qw(ipc_freeze ipc_thaw); my $WQ_MAX_WORKERS = 4096; my ($enc, $dec); # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+ @@ -33,12 +35,13 @@ BEGIN { }; if ($enc && $dec) { # should be custom ops - *freeze = sub ($) { sereal_encode_with_object $enc, $_[0] }; - *thaw = sub ($) { sereal_decode_with_object $dec, $_[0], my $ret }; + *ipc_freeze = sub ($) { sereal_encode_with_object $enc, $_[0] }; + *ipc_thaw = sub ($) { sereal_decode_with_object $dec, $_[0], my $ret }; } else { eval { # some distros have Storable as a separate package from Perl require Storable; - Storable->import(qw(freeze thaw)); + *ipc_freeze = \&Storable::freeze; + *ipc_thaw = \&Storable::thaw; $enc = 1; } // warn("Storable (part of Perl) missing: $@\n"); } @@ -56,12 +59,12 @@ sub _get_rec ($) { chop($len) eq "\n" or croak "no LF byte in $len"; defined(my $n = read($r, my $buf, $len)) or croak "read error: $!"; $n == $len or croak "short read: $n != $len"; - thaw($buf); + ipc_thaw($buf); } sub _pack_rec ($) { my ($ref) = @_; - my $buf = freeze($ref); + my $buf = ipc_freeze($ref); length($buf) . "\n" . $buf; } @@ -275,7 +278,7 @@ sub recv_and_run { $n = length($buf); } # Sereal dies on truncated data, Storable returns undef - my $args = thaw($buf) // die "thaw error on buffer of size: $n"; + my $args = ipc_thaw($buf) // die "thaw error on buffer of size: $n"; undef $buf; my $sub = shift @$args; eval { $self->$sub(@$args) }; @@ -301,15 +304,15 @@ sub wq_do { # always async my ($self, $sub, $ios, @args) = @_; if (my $s1 = $self->{-wq_s1}) { # run in worker my $fds = [ map { fileno($_) } @$ios ]; - my $n = $send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR); + my $buf = ipc_freeze([$sub, @args]); + my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR); return if defined($n); # likely croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS}; croak "sendmsg: $!" if !$!{EMSGSIZE}; socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or croak "socketpair: $!"; - my $buf = freeze([$sub, @args]); $n = $send_cmd->($s1, [ fileno($r) ], - freeze(['do_sock_stream', length($buf)]), + ipc_freeze(['do_sock_stream', length($buf)]), MSG_EOR) // croak "sendmsg: $!"; undef $r; $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!"; @@ -461,6 +464,6 @@ sub DESTROY { } # Sereal doesn't have dclone -sub deep_clone { thaw(freeze($_[-1])) } +sub deep_clone { ipc_thaw(ipc_freeze($_[-1])) } 1; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 737db1e1..6c2515dc 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -104,7 +104,7 @@ our %CMD = ( # sorted in order of importance/use: 'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw( save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a sort|s=s reverse|r offset=i remote! local! external! pretty - mua-cmd|mua=s no-torsocks torsocks=s verbose|v + mua-cmd|mua=s no-torsocks torsocks=s verbose|v quiet|q received-after=s received-before=s sent-after=s sent-since=s), PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ], diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index e577ab09..95862306 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -8,12 +8,11 @@ package PublicInbox::LeiXSearch; use strict; use v5.10.1; use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); -use PublicInbox::DS qw(dwaitpid); -use PublicInbox::PktOp; +use PublicInbox::DS qw(dwaitpid now); +use PublicInbox::PktOp qw(pkt_do); use PublicInbox::Import; use File::Temp 0.19 (); # 0.19 for ->newdir use File::Spec (); -use Socket qw(MSG_EOR); use PublicInbox::Search qw(xap_terms); use PublicInbox::Spawn qw(popen_rd spawn which); use PublicInbox::MID qw(mids); @@ -97,7 +96,7 @@ sub over {} sub _mset_more ($$) { my ($mset, $mo) = @_; my $size = $mset->size; - $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000)); + $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit}); } # $startq will EOF when query_prepare is done augmenting and allow @@ -115,16 +114,15 @@ sub query_thread_mset { # for --thread my $startq = delete $lei->{startq}; my ($srch, $over) = ($ibxish->search, $ibxish->over); - unless ($srch && $over) { - my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; - warn "$desc not indexed by Xapian\n"; - return; - } + my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; + return warn("$desc not indexed by Xapian\n") unless ($srch && $over); my $mo = { %{$lei->{mset_opt}} }; my $mset; my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish); do { $mset = $srch->mset($mo->{qstr}, $mo); + pkt_do($lei->{pkt_op}, 'mset_progress', $desc, $mset->size, + $mset->get_matches_estimated); my $ids = $srch->mset_to_artnums($mset, $mo); my $ctx = { ids => $ids }; my $i = 0; @@ -156,6 +154,8 @@ sub query_mset { # non-parallel for non-"--thread" users my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self); do { $mset = $self->mset($mo->{qstr}, $mo); + pkt_do($lei->{pkt_op}, 'mset_progress', 'xsearch', + $mset->size, $mset->get_matches_estimated); for my $mitem ($mset->items) { my $smsg = smsg_for($self, $mitem) or next; wait_startq($startq) if $startq; @@ -174,6 +174,16 @@ sub each_eml { # callback for MboxReader->mboxrd $smsg->{$_} //= '' for qw(from to cc ds subject references mid); delete @$smsg{qw(From Subject -ds -ts)}; if (my $startq = delete($lei->{startq})) { wait_startq($startq) } + ++$lei->{-nr_remote_eml}; + if (!$lei->{opt}->{quiet}) { + my $now = now(); + my $next = $lei->{-next_progress} //= ($now + 1); + if ($now > $next) { + $lei->{-next_progress} = $now + 1; + my $nr = $lei->{-nr_remote_eml}; + $lei->err("# $lei->{-current_url} $nr/?"); + } + } $each_smsg->($smsg, undef, $eml); } @@ -223,6 +233,8 @@ sub query_remote_mboxrd { my $tor = $opt->{torsocks} //= 'auto'; my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); for my $uri (@$uris) { + $lei->{-current_url} = $uri->as_string; + $lei->{-nr_remote_eml} = 0; $uri->query_form(@qform); my $cmd = [ @cmd, $uri->as_string ]; if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' && @@ -246,7 +258,12 @@ sub query_remote_mboxrd { $lei, $each_smsg); }; return $lei->fail("E: @$cmd: $@") if $@; - next unless $?; + if ($? == 0) { + my $nr = $lei->{-nr_remote_eml}; + pkt_do($lei->{pkt_op}, 'mset_progress', + $lei->{-current_url}, $nr, $nr); + next; + } seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n"; my $e = do { local $/; <$cerr> } // die "read(curl stderr): $!\n"; @@ -299,9 +316,19 @@ Error closing $lei->{ovv}->{dst}: $! } $lei->start_mua; } + $lei->{opt}->{quiet} or + $lei->err('# ', $lei->{-mset_total} // 0, " matches"); $lei->dclose; } +sub mset_progress { # called via pkt_op/pkt_do from workers + my ($lei, $pargs) = @_; + my ($desc, $mset_size, $mset_total_est) = @$pargs; + return if $lei->{opt}->{quiet}; + $lei->{-mset_total} += $mset_size; + $lei->err("# $desc $mset_size/$mset_total_est"); +} + sub do_post_augment { my ($lei, $zpipe, $au_done) = @_; my $l2m = $lei->{l2m} or die 'BUG: no {l2m}'; @@ -354,8 +381,7 @@ sub query_prepare { # called by wq_do delete $lei->{l2m}->{-wq_s1}; eval { $lei->{l2m}->do_augment($lei) }; $lei->fail($@) if $@; - send($lei->{pkt_op}, '.', MSG_EOR) == 1 or - die "do_post_augment trigger: $!" + pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!" } sub fail_handler ($;$$) { @@ -388,6 +414,7 @@ sub do_query { '!' => [ \&fail_handler, $lei ], '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ], '' => [ \&query_done, $lei ], + 'mset_progress' => [ \&mset_progress, $lei ], }; (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop); my ($lei_ipc, @io) = $lei->atfork_parent_wq($self); diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm index d5b95a73..12839e71 100644 --- a/lib/PublicInbox/PktOp.pm +++ b/lib/PublicInbox/PktOp.pm @@ -9,25 +9,16 @@ package PublicInbox::PktOp; use strict; use v5.10.1; -use parent qw(PublicInbox::DS); +use parent qw(PublicInbox::DS Exporter); use Errno qw(EAGAIN EINTR); use PublicInbox::Syscall qw(EPOLLIN EPOLLET); use Socket qw(AF_UNIX MSG_EOR SOCK_SEQPACKET); +use PublicInbox::IPC qw(ipc_freeze ipc_thaw); +our @EXPORT_OK = qw(pkt_do); sub new { my ($cls, $r, $ops, $in_loop) = @_; my $self = bless { sock => $r, ops => $ops, re => [] }, $cls; - if (ref($ops) eq 'ARRAY') { - my %ops; - for my $op (@$ops) { - if (ref($op->[0])) { - push @{$self->{re}}, $op; - } else { - $ops{$op->[0]} = $op->[1]; - } - } - $self->{ops} = \%ops; - } if ($in_loop) { # iff using DS->EventLoop $r->blocking(0); $self->SUPER::new($r, EPOLLIN|EPOLLET); @@ -43,6 +34,11 @@ sub pair { (new($cls, $c, $ops, $in_loop), $p); } +sub pkt_do { # for the producer to trigger event_step in consumer + my ($producer, $cmd, @args) = @_; + send($producer, @args ? "$cmd\0".ipc_freeze(\@args) : $cmd, MSG_EOR); +} + sub close { my ($self) = @_; my $c = $self->{sock} or return; @@ -54,24 +50,18 @@ sub event_step { my $c = $self->{sock}; my $msg; do { - my $n = recv($c, $msg, 128, 0); + my $n = recv($c, $msg, 4096, 0); unless (defined $n) { return if $! == EAGAIN; next if $! == EINTR; $self->close; die "recv: $!"; } - my $op = $self->{ops}->{$msg}; - unless ($op) { - for my $re_op (@{$self->{re}}) { - $msg =~ $re_op->[0] or next; - $op = $re_op->[1]; - last; - } - } - die "BUG: unknown message: `$msg'" unless $op; + my ($cmd, $pargs) = split(/\0/, $msg, 2); + my $op = $self->{ops}->{$cmd // $msg}; + die "BUG: unknown message: `$cmd'" unless $op; my ($sub, @args) = @$op; - $sub->(@args); + $sub->(@args, $pargs ? ipc_thaw($pargs) : ()); return $self->close if $msg eq ''; # close on EOF } while (1); } diff --git a/t/lei.t b/t/lei.t index 3f6702e6..a46e46f2 100644 --- a/t/lei.t +++ b/t/lei.t @@ -174,11 +174,11 @@ SKIP: { } $lei->('add-external', $url); my $mid = '20140421094015.GA8962@dcvr.yhbt.net'; - ok($lei->('q', "m:$mid"), "query $url"); + ok($lei->('q', '-q', "m:$mid"), "query $url"); is($err, '', "no errors on $url"); my $res = $json->decode($out); is($res->[0]->{'m'}, "<$mid>", "got expected mid from $url"); - ok($lei->('q', "m:$mid", 'd:..20101002'), 'no results, no error'); + ok($lei->('q', '-q', "m:$mid", 'd:..20101002'), 'no results, no error'); is($err, '', 'no output on 404, matching local FS behavior'); is($out, "[null]\n", 'got null results'); $lei->('forget-external', $url); @@ -291,12 +291,12 @@ my $test_external = sub { my @s = grep(/^Subject:/, $cat->()); is(scalar(@s), 1, "1 result in mbox$sfx"); $lei->('q', '-a', '-o', "mboxcl2:$f", 's:see attachment'); - is($err, '', 'no errors from augment'); + is(grep(!/^#/, $err), 0, 'no errors from augment'); @s = grep(/^Subject:/, my @wtf = $cat->()); is(scalar(@s), 2, "2 results in mbox$sfx"); $lei->('q', '-a', '-o', "mboxcl2:$f", 's:nonexistent'); - is($err, '', "no errors on no results ($sfx)"); + is(grep(!/^#/, $err), 0, "no errors on no results ($sfx)"); my @s2 = grep(/^Subject:/, $cat->()); is_deeply(\@s2, \@s, diff --git a/xt/lei-sigpipe.t b/xt/lei-sigpipe.t index 448bd7db..1aa9ed07 100644 --- a/xt/lei-sigpipe.t +++ b/xt/lei-sigpipe.t @@ -15,7 +15,7 @@ my $do_test = sub { pipe(my ($r, $w)) or BAIL_OUT $!; open my $err, '+>', undef or BAIL_OUT $!; my $opt = { run_mode => 0, 1 => $w, 2 => $err }; - my $cmd = [qw(lei q -t), @$out, 'bytes:1..']; + my $cmd = [qw(lei q -q -t), @$out, 'bytes:1..']; my $tp = start_script($cmd, $env, $opt); close $w; sysread($r, my $buf, 1);