From 2131a3cd0a1cad6f7d6a2c6439676f9f2a039ff7 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 25 Dec 2019 07:50:41 +0000 Subject: qspawn: reduce local vars, de-anonymize rd_hdr rd_hdr() now becomes a named subroutine instead of a per-call local variable, so kilobytes of memory will not have to be allocated for it on every ->psgi_return call. --- lib/PublicInbox/Qspawn.pm | 79 ++++++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 39 deletions(-) (limited to 'lib/PublicInbox/Qspawn.pm') diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 6cb28b9a..1985dccd 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -162,12 +162,11 @@ sub psgi_qx { $self->{qx_arg} = $qx_arg; $self->{qx_fh} = $qx_fh; $self->{qx_buf} = \$qx_buf; - my $rpipe; # comes from popen_rd my $async = $env->{'pi-httpd.async'}; my $cb = sub { my ($r, $buf); reread: - $r = sysread($rpipe, $buf, 65536); + $r = sysread($self->{rpipe}, $buf, 65536); if ($async) { $async->async_pass($env->{'psgix.io'}, $qx_fh, \$buf); } elsif (defined $r) { @@ -180,10 +179,9 @@ reread: }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); $self->start($limiter, sub { # start_cb, may run later, much later... - ($rpipe) = @_; # popen_rd result if ($async) { - # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end_obj) - $async = $async->($rpipe, $cb, undef, $self); + # PublicInbox::HTTPD::Async->new(rpipe, $cb, $end_obj) + $async = $async->($self->{rpipe}, $cb, undef, $self); # $cb will call ->async_pass or ->close } else { # generic PSGI $cb->() while $self->{qx_fh}; @@ -215,6 +213,32 @@ sub event_step { $fh->close if $fh; # async-only (psgi_return) } +sub rd_hdr ($) { + my ($self) = @_; + # typically used for reading CGI headers + # we must loop until EAGAIN for EPOLLET in HTTPD/Async.pm + # We also need to check EINTR for generic PSGI servers. + my $ret; + my $total_rd = 0; + my $hdr_buf = $self->{hdr_buf}; + do { + my $r = sysread($self->{rpipe}, $$hdr_buf, 4096, + length($$hdr_buf)); + if (defined($r)) { + $total_rd += $r; + $ret = $self->{parse_hdr}->($total_rd, $hdr_buf); + } else { + # caller should notify us when it's ready: + return if $! == EAGAIN; + next if $! == EINTR; # immediate retry + log_err($self->{psgi_env}, "error reading header: $!"); + $ret = [ 500, [], [ "Internal error\n" ] ]; + } + } until (defined $ret); + delete $self->{parse_hdr}; # done parsing headers + $ret; +} + # Used for streaming the stdout of one process as a PSGI response. # # $env is the PSGI env. @@ -240,43 +264,20 @@ sub event_step { sub psgi_return { my ($self, $env, $limiter, $parse_hdr) = @_; $self->{psgi_env} = $env; - my $rpipe; - my $buf = ''; - my $rd_hdr = sub { - # typically used for reading CGI headers - # we must loop until EAGAIN for EPOLLET in HTTPD/Async.pm - # We also need to check EINTR for generic PSGI servers. - my $ret; - my $total_rd = 0; - do { - my $r = sysread($rpipe, $buf, 4096, length($buf)); - if (defined($r)) { - $total_rd += $r; - $ret = $parse_hdr->($r ? $total_rd : 0, \$buf); - } else { - # caller should notify us when it's ready: - return if $! == EAGAIN; - next if $! == EINTR; # immediate retry - log_err($env, "error reading header: $!"); - $ret = [ 500, [], [ "Internal error\n" ] ]; - } - } until (defined $ret); - $ret; - }; - + $self->{hdr_buf} = \(my $hdr_buf = ''); + $self->{parse_hdr} = $parse_hdr; my $wcb = delete $env->{'qspawn.wcb'}; # or PSGI server supplies it my $async = $env->{'pi-httpd.async'}; my $cb = sub { - my $r = $rd_hdr->() or return; - $rd_hdr = undef; # done reading headers + my $r = rd_hdr($self) or return; my $filter = delete $env->{'qspawn.filter'}; if (scalar(@$r) == 3) { # error if ($async) { # calls rpipe->close && ->event_step $async->close; } else { - $rpipe->close; + $self->{rpipe}->close; event_step($self); } $wcb->($r); @@ -285,12 +286,13 @@ sub psgi_return { my $fh = $wcb->($r); # scalar @$r == 2 $fh = filter_fh($fh, $filter) if $filter; $self->{fh} = $fh; - $async->async_pass($env->{'psgix.io'}, $fh, \$buf); + $async->async_pass($env->{'psgix.io'}, $fh, + delete($self->{hdr_buf})); } else { # for synchronous PSGI servers require PublicInbox::GetlineBody; - $r->[2] = PublicInbox::GetlineBody->new($rpipe, + $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe}, \&event_step, $self, - $buf, $filter); + ${$self->{hdr_buf}}, $filter); $wcb->($r); } @@ -300,14 +302,13 @@ sub psgi_return { }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); my $start_cb = sub { # may run later, much later... - ($rpipe) = @_; if ($async) { - # PublicInbox::HTTPD::Async->new($rpipe, $cb, $cb_arg, + # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, # $end_obj) - $async = $async->($rpipe, $cb, undef, $self); + $async = $async->($self->{rpipe}, $cb, undef, $self); # $cb will call ->async_pass or ->close } else { # generic PSGI - $cb->() while $rd_hdr; + $cb->() while $self->{parse_hdr}; } }; -- cgit v1.2.3-24-ge0c7