about summary refs log tree commit homepage
path: root/lib/PublicInbox/WQBlocked.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2022-07-19 22:42:52 +0000
committerEric Wong <e@80x24.org>2022-07-20 03:54:27 +0000
commit49684178901a3d5db198032da1bb831b2b3e0b65 (patch)
tree7c19828248f589d06cb34fbfb69cee129c4e1366 /lib/PublicInbox/WQBlocked.pm
parent840785917bc74c8e7df226463144185294047d75 (diff)
downloadpublic-inbox-49684178901a3d5db198032da1bb831b2b3e0b65.tar.gz
Enqueuing "note-event" requests from the DS event loop must
not wait on workers being able to drain the queue quickly
enough.  Thus we make the SOCK_SEQPACKET writes nonblocking
and rely on the lei-daemon event loop to enqueue writes.

This is a unique problem for "note-event" since it reuses
workers in between commands, while most lei commands currently
fork off new workers.
Diffstat (limited to 'lib/PublicInbox/WQBlocked.pm')
-rw-r--r--lib/PublicInbox/WQBlocked.pm49
1 files changed, 49 insertions, 0 deletions
diff --git a/lib/PublicInbox/WQBlocked.pm b/lib/PublicInbox/WQBlocked.pm
new file mode 100644
index 00000000..fbb43600
--- /dev/null
+++ b/lib/PublicInbox/WQBlocked.pm
@@ -0,0 +1,49 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# non-blocking workqueues, currently used by LeiNoteEvent to track renames
+package PublicInbox::WQBlocked;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+use PublicInbox::IPC;
+use Carp ();
+use Socket qw(MSG_EOR);
+
+sub new {
+        my ($cls, $wq, $buf) = @_;
+        my $self = bless { msgq => [$buf], }, $cls;
+        $wq->{wqb} = $self->SUPER::new($wq->{-wq_s1}, EPOLLOUT|EPOLLONESHOT);
+}
+
+sub flush_send {
+        my ($self) = @_;
+        push(@{$self->{msgq}}, $_[1]) if defined($_[1]);
+        while (defined(my $buf = shift @{$self->{msgq}})) {
+                if (ref($buf) eq 'CODE') {
+                        $buf->($self); # could be \&PublicInbox::DS::close
+                } else {
+                        my $wq_s1 = $self->{sock};
+                        my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf,
+                                                                MSG_EOR);
+                        next if defined($n);
+                        Carp::croak("sendmsg: $!") unless $!{EAGAIN};
+                        PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT);
+                        unshift @{$self->{msgq}}, $buf;
+                        last; # wait for ->event_step
+                }
+        }
+}
+
+sub enq_close { flush_send($_[0], $_[0]->can('close')) }
+
+sub event_step { # called on EPOLLOUT wakeup
+        my ($self) = @_;
+        eval { flush_send($self) } if $self->{sock};
+        if ($@) {
+                warn $@;
+                $self->close;
+        }
+}
+
+1;