From be365fb4bd3eda10294d4a916321c28b90dd723d Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 9 Jun 2021 20:27:50 -0300 Subject: lei tag: less confusing warning about unimported messages "unimported" is more meaningful than "missing", here. And instead of having every worker spew about unimported messages, we'll accumulate and only print one warning line. This necessitated alterating ->DESTROY behavior and persisting the client socket within the $lei object itself, not just the PktOp consumer object. --- lib/PublicInbox/LEI.pm | 21 ++++++++++++++++----- lib/PublicInbox/LeiTag.pm | 12 ++++++------ 2 files changed, 22 insertions(+), 11 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index beeb8b48..d34997fd 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -567,6 +567,11 @@ sub pkt_op_pair { $end; } +sub incr { + my ($self, $field, $nr) = @_; + $self->{counters}->{$field} += $nr; +} + sub workers_start { my ($lei, $wq, $jobs, $ops, $flds) = @_; $ops = { @@ -574,6 +579,7 @@ sub workers_start { '|' => [ \&sigpipe_handler, $lei ], 'x_it' => [ \&x_it, $lei ], 'child_error' => [ \&child_error, $lei ], + 'incr' => [ \&incr, $lei ], ($ops ? %$ops : ()), }; $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ]; @@ -583,8 +589,6 @@ sub workers_start { $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds); delete $lei->{pkt_op_p}; my $op_c = delete $lei->{pkt_op_c}; - # {-lei_sock} persists script/lei process until ops->{''} EOF callback - $op_c->{-lei_sock} = $lei->{sock}; @$end = (); $lei->event_step_init; ($op_c, $ops); @@ -1092,10 +1096,11 @@ sub event_step { sub event_step_init { my ($self) = @_; - return if $self->{-event_init_done}++; - if (my $sock = $self->{sock}) { # using DS->EventLoop + my $sock = $self->{sock} or return; + $self->{-event_init_done} //= do { # persist til $ops done $self->SUPER::new($sock, EPOLLIN|EPOLLET); - } + $sock; + }; } sub noop {} @@ -1246,6 +1251,12 @@ sub busy { 1 } # prevent daemon-shutdown if client is connected # can immediately reread it sub DESTROY { my ($self) = @_; + if (my $counters = delete $self->{counters}) { + for my $k (sort keys %$counters) { + my $nr = $counters->{$k}; + $self->child_error(1 << 8, "$nr $k messages"); + } + } $self->{1}->autoflush(1) if $self->{1}; stop_pager($self); # preserve $? for ->fail or ->x_it code diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm index e0532653..463fb921 100644 --- a/lib/PublicInbox/LeiTag.pm +++ b/lib/PublicInbox/LeiTag.pm @@ -15,7 +15,7 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh $self->{lei}->{sto}->ipc_do('update_xvmd', $xoids, $eml, $self->{vmd_mod}); } else { - ++$self->{missing}; + ++$self->{unimported}; } } @@ -40,7 +40,7 @@ sub lei_tag { # the "lei tag" method my ($lei, @argv) = @_; my $sto = $lei->_lei_store(1); $sto->write_prepare($lei); - my $self = bless { missing => 0 }, __PACKAGE__; + my $self = bless {}, __PACKAGE__; $lei->ale; # refresh and prepare my $vmd_mod = $self->vmd_mod_extract(\@argv); return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err}; @@ -58,10 +58,10 @@ sub lei_tag { # the "lei tag" method $lei->wait_wq_events($op_c, $ops); } -sub note_missing { +sub note_unimported { my ($self) = @_; - my $n = $self->{missing} or return; - $self->{lei}->child_error(1 << 8, "$n missed messages"); + my $n = $self->{unimported} or return; + $self->{lei}->{pkt_op_p}->pkt_do('incr', 'unimported', $n); } sub ipc_atfork_child { @@ -69,7 +69,7 @@ sub ipc_atfork_child { PublicInbox::LeiInput::input_only_atfork_child($self); $self->{lse} = $self->{lei}->{sto}->search; # this goes out-of-scope at worker process exit: - PublicInbox::OnDestroy->new($$, \¬e_missing, $self); + PublicInbox::OnDestroy->new($$, \¬e_unimported, $self); } # Workaround bash word-splitting s to ['kw', ':', 'keyword' ...] -- cgit v1.2.3-24-ge0c7