From 1971bbe4cd599b3d9583084145266369525bfca2 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 25 Dec 2019 07:50:45 +0000 Subject: qspawn: psgi_qx: eliminate anonymous subs We can follow what we did in psgi_return to make psgi_qx allocate less memory on each call. --- lib/PublicInbox/Qspawn.pm | 56 +++++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 24 deletions(-) (limited to 'lib/PublicInbox/Qspawn.pm') diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 33e20147..9e161234 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -149,6 +149,37 @@ sub start ($$$) { } } +sub psgi_qx_init_cb { + my ($self) = @_; + my $async = delete $self->{async}; + my ($r, $buf); + my $qx_fh = $self->{qx_fh}; +reread: + $r = sysread($self->{rpipe}, $buf, 65536); + if ($async) { + $async->async_pass($self->{psgi_env}->{'psgix.io'}, + $qx_fh, \$buf); + } elsif (defined $r) { + $r ? $qx_fh->write($buf) : event_step($self, undef); + } else { + return if $! == EAGAIN; # try again when notified + goto reread if $! == EINTR; + event_step($self, $!); + } +} + +sub psgi_qx_start { + my ($self) = @_; + if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) { + # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj) + $self->{async} = $async->($self->{rpipe}, + \&psgi_qx_init_cb, $self, $self); + # init_cb will call ->async_pass or ->close + } else { # generic PSGI + psgi_qx_init_cb($self) while $self->{qx_fh}; + } +} + # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with # the stdout of the given command when done; but respects the given limiter # $env is the PSGI env. As with ``/qx; only use this when output is small @@ -162,31 +193,8 @@ sub psgi_qx { $self->{qx_arg} = $qx_arg; $self->{qx_fh} = $qx_fh; $self->{qx_buf} = \$qx_buf; - my $async = $env->{'pi-httpd.async'}; - my $cb = sub { - my ($r, $buf); -reread: - $r = sysread($self->{rpipe}, $buf, 65536); - if ($async) { - $async->async_pass($env->{'psgix.io'}, $qx_fh, \$buf); - } elsif (defined $r) { - $r ? $qx_fh->write($buf) : event_step($self, undef); - } else { - return if $! == EAGAIN; # try again when notified - goto reread if $! == EINTR; - event_step($self, $!); - } - }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); - $self->start($limiter, sub { # start_cb, may run later, much later... - if ($async) { - # PublicInbox::HTTPD::Async->new(rpipe, $cb, $end_obj) - $async = $async->($self->{rpipe}, $cb, undef, $self); - # $cb will call ->async_pass or ->close - } else { # generic PSGI - $cb->() while $self->{qx_fh}; - } - }); + start($self, $limiter, \&psgi_qx_start); } # create a filter for "push"-based streaming PSGI writes used by HTTPD::Async -- cgit v1.2.3-24-ge0c7