public-inbox.git  about / heads / tags
an "archives first" approach to mailing lists
blob ee5bda59781f91f6be329fdc11ba8ab13a29f607 1355 bytes (raw)
$ git show HEAD:lib/PublicInbox/InputPipe.pm	# shows this blob on the CLI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
 
# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>

# for reading pipes, sockets, and TTYs off the DS event loop
package PublicInbox::InputPipe;
use v5.12;
use parent qw(PublicInbox::DS);
use PublicInbox::Syscall qw(EPOLLIN);

sub consume {
	my ($in, $cb, @args) = @_;
	my $self = bless { cb => $cb, args => \@args }, __PACKAGE__;
	eval { $self->SUPER::new($in, EPOLLIN) };
	if ($@) { # regular file (but not w/ select|IO::Poll backends)
		$self->{-need_rq} = 1;
		$self->requeue;
	} elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes
		$in->blocking(0);
	}
	$self;
}

sub close { # idempotent
	my ($self) = @_;
	$self->{-need_rq} ? delete($self->{sock}) : $self->SUPER::close
}

sub event_step {
	my ($self) = @_;
	my $r = sysread($self->{sock} // return, my $rbuf, 65536);
	eval {
		if ($r) {
			$self->{cb}->($self, @{$self->{args}}, $rbuf);
			$self->requeue if $self->{-need_rq};
		} elsif (defined($r)) { # EOF
			$self->{cb}->($self, @{$self->{args}}, '');
			$self->close
		} elsif ($!{EAGAIN}) { # rely on EPOLLIN
		} elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes
			$self->requeue if $self->{-need_rq};
		} else { # another error
			$self->{cb}->($self, @{$self->{args}}, undef);
			$self->close;
		}
	};
	if ($@) {
		warn "E: $@";
		$self->close;
	}
}

1;

git clone https://public-inbox.org/public-inbox.git
git clone http://7fh6tueqddpjyxjmgtdiueylzoqt6pt7hec3pukyptlmohoowvhde4yd.onion/public-inbox.git