1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| | # Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# Intended for PublicInbox::DS::event_loop for -cindex --associate,
# this reports auxilliary status while dumping
package PublicInbox::CidxXapHelperAux;
use v5.12;
use parent qw(PublicInbox::DS);
use PublicInbox::Syscall qw(EPOLLIN);
# rpipe connects to req->fp[1] in xap_helper.h
sub new {
my ($cls, $rpipe, $cidx, $pfx) = @_;
my $self = bless { cidx => $cidx, pfx => $pfx }, $cls;
$rpipe->blocking(0);
$self->SUPER::new($rpipe, EPOLLIN);
}
sub event_step {
my ($self) = @_; # xap_helper.h is line-buffered
my $buf = delete($self->{buf}) // '';
my $n = sysread($self->{sock}, $buf, 65536, length($buf));
if (!defined($n)) {
return if $!{EAGAIN};
die "sysread: $!";
}
my $pfx = $self->{pfx};
if ($n == 0) {
warn "BUG? $pfx buf=$buf" if $buf ne '';
if (delete $self->{cidx}->{PENDING}->{$pfx}) {
warn "BUG? $pfx did not get mset.size";
$self->{cidx}->index_next;
}
return $self->close;
}
my @lines = split(/^/m, $buf);
$self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n";
for my $l (@lines) {
if ($l =~ /\Amset\.size=[0-9]+ nr_out=[0-9]+\n\z/) {
delete $self->{cidx}->{PENDING}->{$pfx};
$self->{cidx}->index_next;
}
chomp $l;
$self->{cidx}->progress("$pfx $l");
}
}
1;
|