public-inbox.git  about / heads / tags
an "archives first" approach to mailing lists
blob f84c196a56cdbbcb4b7c3acaa46d626eca5a612e 3974 bytes (raw)
$ git show HEAD:lib/PublicInbox/DSKQXS.pm	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
 
# Copyright (C) all contributors <meta@public-inbox.org>
# Licensed the same as Danga::Socket (and Perl5)
# License: GPL-1.0+ or Artistic-1.0-Perl
#  <https://www.gnu.org/licenses/gpl-1.0.txt>
#  <https://dev.perl.org/licenses/artistic.html>
#
# kqueue support via IO::KQueue XS module.  This makes kqueue look
# like epoll to simplify the code in DS.pm.  This is NOT meant to be
# an all encompassing emulation of epoll via IO::KQueue, but just to
# support cases public-inbox-nntpd/httpd care about.
#
# It also implements signalfd(2) emulation via "tie".
package PublicInbox::DSKQXS;
use v5.12;
use Symbol qw(gensym);
use IO::KQueue;
use Errno qw(EAGAIN);
use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLLET);

sub EV_DISPATCH () { 0x0080 }

# map EPOLL* bits to kqueue EV_* flags for EV_SET
sub kq_flag ($$) {
	my ($bit, $ev) = @_;
	if ($ev & $bit) {
		my $fl = EV_ENABLE;
		$fl |= EV_CLEAR if $fl & EPOLLET;

		# EV_DISPATCH matches EPOLLONESHOT semantics more closely
		# than EV_ONESHOT, in that EV_ADD is not required to
		# re-enable a disabled watch.
		($ev & EPOLLONESHOT) ? ($fl | EV_DISPATCH) : $fl;
	} else {
		EV_DISABLE;
	}
}

sub new {
	my ($class) = @_;
	bless { kq => IO::KQueue->new, owner_pid => $$ }, $class;
}

# returns a new instance which behaves like signalfd on Linux.
# It's wasteful in that it uses another FD, but it simplifies
# our epoll-oriented code.
sub signalfd {
	my ($class, $signo) = @_;
	my $sym = gensym;
	tie *$sym, $class, $signo; # calls TIEHANDLE
	$sym
}

sub TIEHANDLE { # similar to signalfd()
	my ($class, $signo) = @_;
	my $self = $class->new;
	my $kq = $self->{kq};
	$kq->EV_SET($_, EVFILT_SIGNAL, EV_ADD) for @$signo;
	$self;
}

sub READ { # called by sysread() for signalfd compatibility
	my ($self, undef, $len, $off) = @_; # $_[1] = buf
	die "bad args for signalfd read" if ($len % 128) // defined($off);
	my $sigbuf = $self->{sigbuf} //= [];
	my $nr = $len / 128;
	my $r = 0;
	$_[1] = '';
	while (1) {
		while ($nr--) {
			my $signo = shift(@$sigbuf) or last;
			# caller only cares about signalfd_siginfo.ssi_signo:
			$_[1] .= pack('L', $signo) . ("\0" x 124);
			$r += 128;
		}
		return $r if $r;
		my @events = eval { $self->{kq}->kevent(0) };
		# workaround https://rt.cpan.org/Ticket/Display.html?id=116615
		if ($@) {
			next if $@ =~ /Interrupted system call/;
			die;
		}
		if (!scalar(@events)) {
			$! = EAGAIN;
			return;
		}

		# Grab the kevent.ident (signal number).  The kevent.data
		# field shows coalesced signals, and maybe we'll use it
		# in the future...
		@$sigbuf = map { $_->[0] } @events;
	}
}

# for fileno() calls in PublicInbox::DS
sub FILENO { ${$_[0]->{kq}} }

sub _ep_mod_add ($$$$) {
	my ($kq, $fd, $ev, $add) = @_;
	$kq->EV_SET($fd, EVFILT_READ, $add|kq_flag(EPOLLIN, $ev));

	# we call this blindly for read-only FDs such as tied
	# DSKQXS (signalfd emulation) and Listeners
	eval { $kq->EV_SET($fd, EVFILT_WRITE, $add|kq_flag(EPOLLOUT, $ev)) };
	0;
}

sub ep_add { _ep_mod_add($_[0]->{kq}, fileno($_[1]), $_[2], EV_ADD) };
sub ep_mod { _ep_mod_add($_[0]->{kq}, fileno($_[1]), $_[2], 0) };

sub ep_del {
	my ($self, $io, $ev) = @_;
	my $kq = $_[0]->{kq} // return; # called in cleanup
	my $fd = fileno($io);
	$kq->EV_SET($fd, EVFILT_READ, EV_DISABLE);
	eval { $kq->EV_SET($fd, EVFILT_WRITE, EV_DISABLE) };
	0;
}

sub ep_wait {
	my ($self, $timeout_msec, $events) = @_;
	# n.b.: IO::KQueue is hard-coded to return up to 1000 events
	@$events = eval { $self->{kq}->kevent($timeout_msec) };
	if (my $err = $@) {
		# workaround https://rt.cpan.org/Ticket/Display.html?id=116615
		if ($err =~ /Interrupted system call/) {
			@$events = ();
		} else {
			die $err;
		}
	}
	# caller only cares for $events[$i]->[0]
	$_ = $_->[0] for @$events;
}

# kqueue is close-on-fork (not exec), so we must not close it
# in forked processes:
sub DESTROY {
	my ($self) = @_;
	my $kq = delete $self->{kq} or return;
	if (delete($self->{owner_pid}) == $$) {
		POSIX::close($$kq);
	}
}

1;

git clone https://public-inbox.org/public-inbox.git
git clone http://7fh6tueqddpjyxjmgtdiueylzoqt6pt7hec3pukyptlmohoowvhde4yd.onion/public-inbox.git