# Copyright (C) all contributors # License: AGPL-3.0+ # Intended for PublicInbox::DS::event_loop for -cindex --associate, # this reports auxilliary status while dumping package PublicInbox::CidxXapHelperAux; use v5.12; use parent qw(PublicInbox::DS); use PublicInbox::Syscall qw(EPOLLIN); # rpipe connects to req->fp[1] in xap_helper.h sub new { my ($cls, $rpipe, $cidx, $pfx) = @_; my $self = bless { cidx => $cidx, pfx => $pfx }, $cls; $rpipe->blocking(0); $self->SUPER::new($rpipe, EPOLLIN); } sub event_step { my ($self) = @_; # xap_helper.h is line-buffered my $buf = delete($self->{buf}) // ''; my $n = sysread($self->{sock}, $buf, 65536, length($buf)); if (!defined($n)) { return if $!{EAGAIN}; die "sysread: $!"; } my $pfx = $self->{pfx}; if ($n == 0) { warn "BUG? $pfx buf=$buf" if $buf ne ''; if (delete $self->{cidx}->{PENDING}->{$pfx}) { warn "BUG? $pfx did not get mset.size"; $self->{cidx}->index_next; } return $self->close; } my @lines = split(/^/m, $buf); $self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n"; for my $l (@lines) { if ($l =~ /\Amset\.size=[0-9]+ nr_out=[0-9]+\n\z/) { delete $self->{cidx}->{PENDING}->{$pfx}; $self->{cidx}->index_next; } chomp $l; $self->{cidx}->progress("$pfx $l"); } } 1;