diff options
author | Eric Wong <e@80x24.org> | 2022-07-19 22:42:52 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2022-07-20 03:54:27 +0000 |
commit | 49684178901a3d5db198032da1bb831b2b3e0b65 (patch) | |
tree | 7c19828248f589d06cb34fbfb69cee129c4e1366 /lib/PublicInbox/WQBlocked.pm | |
parent | 840785917bc74c8e7df226463144185294047d75 (diff) | |
download | public-inbox-49684178901a3d5db198032da1bb831b2b3e0b65.tar.gz |
Enqueuing "note-event" requests from the DS event loop must not wait on workers being able to drain the queue quickly enough. Thus we make the SOCK_SEQPACKET writes nonblocking and rely on the lei-daemon event loop to enqueue writes. This is a unique problem for "note-event" since it reuses workers in between commands, while most lei commands currently fork off new workers.
Diffstat (limited to 'lib/PublicInbox/WQBlocked.pm')
-rw-r--r-- | lib/PublicInbox/WQBlocked.pm | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/lib/PublicInbox/WQBlocked.pm b/lib/PublicInbox/WQBlocked.pm new file mode 100644 index 00000000..fbb43600 --- /dev/null +++ b/lib/PublicInbox/WQBlocked.pm @@ -0,0 +1,49 @@ +# Copyright (C) all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# non-blocking workqueues, currently used by LeiNoteEvent to track renames +package PublicInbox::WQBlocked; +use v5.12; +use parent qw(PublicInbox::DS); +use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT); +use PublicInbox::IPC; +use Carp (); +use Socket qw(MSG_EOR); + +sub new { + my ($cls, $wq, $buf) = @_; + my $self = bless { msgq => [$buf], }, $cls; + $wq->{wqb} = $self->SUPER::new($wq->{-wq_s1}, EPOLLOUT|EPOLLONESHOT); +} + +sub flush_send { + my ($self) = @_; + push(@{$self->{msgq}}, $_[1]) if defined($_[1]); + while (defined(my $buf = shift @{$self->{msgq}})) { + if (ref($buf) eq 'CODE') { + $buf->($self); # could be \&PublicInbox::DS::close + } else { + my $wq_s1 = $self->{sock}; + my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf, + MSG_EOR); + next if defined($n); + Carp::croak("sendmsg: $!") unless $!{EAGAIN}; + PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT); + unshift @{$self->{msgq}}, $buf; + last; # wait for ->event_step + } + } +} + +sub enq_close { flush_send($_[0], $_[0]->can('close')) } + +sub event_step { # called on EPOLLOUT wakeup + my ($self) = @_; + eval { flush_send($self) } if $self->{sock}; + if ($@) { + warn $@; + $self->close; + } +} + +1; |