# Copyright (C) all contributors # License: AGPL-3.0+ # for reading pipes, sockets, and TTYs off the DS event loop package PublicInbox::InputPipe; use v5.12; use parent qw(PublicInbox::DS); use PublicInbox::Syscall qw(EPOLLIN); sub consume { my ($in, $cb, @args) = @_; my $self = bless { cb => $cb, args => \@args }, __PACKAGE__; eval { $self->SUPER::new($in, EPOLLIN) }; if ($@) { # regular file (but not w/ select|IO::Poll backends) $self->{-need_rq} = 1; $self->requeue; } elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes $in->blocking(0); } $self; } sub close { # idempotent my ($self) = @_; $self->{-need_rq} ? delete($self->{sock}) : $self->SUPER::close } sub event_step { my ($self) = @_; my $r = sysread($self->{sock} // return, my $rbuf, 65536); eval { if ($r) { $self->{cb}->($self, @{$self->{args}}, $rbuf); $self->requeue if $self->{-need_rq}; } elsif (defined($r)) { # EOF $self->{cb}->($self, @{$self->{args}}, ''); $self->close } elsif ($!{EAGAIN}) { # rely on EPOLLIN } elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes $self->requeue if $self->{-need_rq}; } else { # another error $self->{cb}->($self, @{$self->{args}}, undef); $self->close; } }; if ($@) { warn "E: $@"; $self->close; } } 1;