diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/PublicInbox/LEI.pm | 21 | ||||
-rw-r--r-- | lib/PublicInbox/LeiTag.pm | 12 |
2 files changed, 22 insertions, 11 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index beeb8b48..d34997fd 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -567,6 +567,11 @@ sub pkt_op_pair { $end; } +sub incr { + my ($self, $field, $nr) = @_; + $self->{counters}->{$field} += $nr; +} + sub workers_start { my ($lei, $wq, $jobs, $ops, $flds) = @_; $ops = { @@ -574,6 +579,7 @@ sub workers_start { '|' => [ \&sigpipe_handler, $lei ], 'x_it' => [ \&x_it, $lei ], 'child_error' => [ \&child_error, $lei ], + 'incr' => [ \&incr, $lei ], ($ops ? %$ops : ()), }; $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ]; @@ -583,8 +589,6 @@ sub workers_start { $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds); delete $lei->{pkt_op_p}; my $op_c = delete $lei->{pkt_op_c}; - # {-lei_sock} persists script/lei process until ops->{''} EOF callback - $op_c->{-lei_sock} = $lei->{sock}; @$end = (); $lei->event_step_init; ($op_c, $ops); @@ -1092,10 +1096,11 @@ sub event_step { sub event_step_init { my ($self) = @_; - return if $self->{-event_init_done}++; - if (my $sock = $self->{sock}) { # using DS->EventLoop + my $sock = $self->{sock} or return; + $self->{-event_init_done} //= do { # persist til $ops done $self->SUPER::new($sock, EPOLLIN|EPOLLET); - } + $sock; + }; } sub noop {} @@ -1246,6 +1251,12 @@ sub busy { 1 } # prevent daemon-shutdown if client is connected # can immediately reread it sub DESTROY { my ($self) = @_; + if (my $counters = delete $self->{counters}) { + for my $k (sort keys %$counters) { + my $nr = $counters->{$k}; + $self->child_error(1 << 8, "$nr $k messages"); + } + } $self->{1}->autoflush(1) if $self->{1}; stop_pager($self); # preserve $? for ->fail or ->x_it code diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm index e0532653..463fb921 100644 --- a/lib/PublicInbox/LeiTag.pm +++ b/lib/PublicInbox/LeiTag.pm @@ -15,7 +15,7 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh $self->{lei}->{sto}->ipc_do('update_xvmd', $xoids, $eml, $self->{vmd_mod}); } else { - ++$self->{missing}; + ++$self->{unimported}; } } @@ -40,7 +40,7 @@ sub lei_tag { # the "lei tag" method my ($lei, @argv) = @_; my $sto = $lei->_lei_store(1); $sto->write_prepare($lei); - my $self = bless { missing => 0 }, __PACKAGE__; + my $self = bless {}, __PACKAGE__; $lei->ale; # refresh and prepare my $vmd_mod = $self->vmd_mod_extract(\@argv); return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err}; @@ -58,10 +58,10 @@ sub lei_tag { # the "lei tag" method $lei->wait_wq_events($op_c, $ops); } -sub note_missing { +sub note_unimported { my ($self) = @_; - my $n = $self->{missing} or return; - $self->{lei}->child_error(1 << 8, "$n missed messages"); + my $n = $self->{unimported} or return; + $self->{lei}->{pkt_op_p}->pkt_do('incr', 'unimported', $n); } sub ipc_atfork_child { @@ -69,7 +69,7 @@ sub ipc_atfork_child { PublicInbox::LeiInput::input_only_atfork_child($self); $self->{lse} = $self->{lei}->{sto}->search; # this goes out-of-scope at worker process exit: - PublicInbox::OnDestroy->new($$, \¬e_missing, $self); + PublicInbox::OnDestroy->new($$, \¬e_unimported, $self); } # Workaround bash word-splitting s to ['kw', ':', 'keyword' ...] |