about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-02-07 08:51:52 +0000
committerEric Wong <e@80x24.org>2021-02-07 22:57:04 +0000
commit2d610f0e645fd2b01bf9108b2d06022ab730815c (patch)
treeb4f0ea6655dd3de1d4cd72b70e441130b889f59e /lib
parenta47d55c2f93cad83440ce20aa8bd971fa1501b56 (diff)
downloadpublic-inbox-2d610f0e645fd2b01bf9108b2d06022ab730815c.tar.gz
This reverts commit a7e6a8cd68fb6d700337d8dbc7ee2c65ff3d2fc1.

It turns out to be unworkable in the face of multiple producer
processes, since the lock we make has no effect when calculating
pipe capacity.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/IPC.pm52
1 files changed, 3 insertions, 49 deletions
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 3713b56b..7e5a0b16 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -18,7 +18,6 @@ use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
-use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
 our @EXPORT_OK = qw(ipc_freeze ipc_thaw);
 my $WQ_MAX_WORKERS = 4096;
 my ($enc, $dec);
@@ -59,15 +58,10 @@ sub _get_rec ($) {
         ipc_thaw($buf);
 }
 
-sub _pack_rec ($) {
-        my ($ref) = @_;
-        my $buf = ipc_freeze($ref);
-        length($buf) . "\n" . $buf;
-}
-
 sub _send_rec ($$) {
         my ($w, $ref) = @_;
-        print $w _pack_rec($ref) or croak "print: $!";
+        my $buf = ipc_freeze($ref);
+        print $w length($buf), "\n", $buf or croak "print: $!";
 }
 
 sub ipc_return ($$$) {
@@ -188,21 +182,6 @@ sub ipc_lock_init {
         $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock'
 }
 
-sub ipc_async_wait ($$) {
-        my ($self, $max) = @_; # max == -1 to wait for all
-        my $aif = $self->{-async_inflight} or return;
-        my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res';
-        while (my ($sub, $bytes, $cb, $cb_arg) = splice(@$aif, 0, 4)) {
-                my $ret = _get_rec($r_res) //
-                        die "no response on $sub (req.size=$bytes)";
-                $self->{-async_inflight_bytes} -= $bytes;
-
-                eval { $cb->($cb_arg, $ret) };
-                warn "E: $sub callback error: $@\n" if $@;
-                return if --$max == 0;
-        }
-}
-
 # call $self->$sub(@args), on a worker if ipc_worker_spawn was used
 sub ipc_do {
         my ($self, $sub, @args) = @_;
@@ -210,8 +189,7 @@ sub ipc_do {
                 my $ipc_lock = $self->{-ipc_lock};
                 my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
                 if (defined(wantarray)) {
-                        my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res';
-                        ipc_async_wait($self, -1);
+                        my $r_res = $self->{-ipc_res} or die 'no ipc_res';
                         _send_rec($w_req, [ wantarray, $sub, @args ]);
                         my $ret = _get_rec($r_res) // die "no response on $sub";
                         die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
@@ -224,30 +202,6 @@ sub ipc_do {
         }
 }
 
-sub ipc_async {
-        my ($self, $sub, $sub_args, $cb, $cb_arg) = @_;
-        if (my $w_req = $self->{-ipc_req}) { # run in worker
-                my $rec = _pack_rec([ 1, $sub, @$sub_args ]);
-                my $cur_bytes = \($self->{-async_inflight_bytes} //= 0);
-                while (($$cur_bytes + length($rec)) > PIPE_BUF) {
-                        ipc_async_wait($self, 1);
-                }
-                my $ipc_lock = $self->{-ipc_lock};
-                my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
-                print $w_req $rec or croak "print: $!";
-                $$cur_bytes += length($rec);
-                push @{$self->{-async_inflight}},
-                                $sub, length($rec), $cb, $cb_arg;
-        } else {
-                my $ret = [ eval { $self->$sub(@$sub_args) } ];
-                if (my $exc = $@) {
-                        $ret = ( bless(\$exc, 'PublicInbox::IPC::Die') );
-                }
-                eval { $cb->($cb_arg, $ret) };
-                warn "E: $sub callback error: $@\n" if $@;
-        }
-}
-
 # needed when there's multiple IPC workers and the parent forking
 # causes newer siblings to inherit older siblings sockets
 sub ipc_sibling_atfork_child {