public-inbox.git  about / heads / tags
an "archives first" approach to mailing lists
blob 8d931fa9d1ad8af9b941eceb9ad9e2dbf37ad68e 1275 bytes (raw)
$ git show HEAD:lib/PublicInbox/WQBlocked.pm	# shows this blob on the CLI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
 
# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>

# non-blocking workqueues, currently used by LeiNoteEvent to track renames
package PublicInbox::WQBlocked;
use v5.12;
use parent qw(PublicInbox::DS);
use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
use PublicInbox::IPC;
use Carp ();

sub new {
	my ($cls, $wq, $buf) = @_;
	my $self = bless { msgq => [$buf], }, $cls;
	$wq->{wqb} = $self->SUPER::new($wq->{-wq_s1}, EPOLLOUT|EPOLLONESHOT);
}

sub flush_send {
	my ($self) = @_;
	push(@{$self->{msgq}}, $_[1]) if defined($_[1]);
	while (defined(my $buf = shift @{$self->{msgq}})) {
		if (ref($buf) eq 'CODE') {
			$buf->($self); # could be \&PublicInbox::DS::close
		} else {
			my $wq_s1 = $self->{sock};
			my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf,
								0);
			next if defined($n);
			Carp::croak("sendmsg: $!") unless $!{EAGAIN};
			PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT);
			unshift @{$self->{msgq}}, $buf;
			last; # wait for ->event_step
		}
	}
}

sub enq_close { flush_send($_[0], $_[0]->can('close')) }

sub event_step { # called on EPOLLOUT wakeup
	my ($self) = @_;
	eval { flush_send($self) } if $self->{sock};
	if ($@) {
		warn $@;
		$self->close;
	}
}

1;

git clone https://public-inbox.org/public-inbox.git
git clone http://7fh6tueqddpjyxjmgtdiueylzoqt6pt7hec3pukyptlmohoowvhde4yd.onion/public-inbox.git