From e61e5fcff30370f355d8aeea6e47c06f3606e994 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 25 Dec 2019 07:50:43 +0000 Subject: qspawn: psgi_return: initial cb can be named We can take advantage of HTTPD::Async being able to pass user-supplied args to callbacks to get rid of one (of many) anonymous subs in the code path. --- lib/PublicInbox/Qspawn.pm | 83 ++++++++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 40 deletions(-) (limited to 'lib/PublicInbox/Qspawn.pm') diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 1985dccd..0967bcfa 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -239,6 +239,42 @@ sub rd_hdr ($) { $ret; } +sub psgi_return_init_cb { + my ($self) = @_; + my $r = rd_hdr($self) or return; + my $env = $self->{psgi_env}; + my $filter = delete $env->{'qspawn.filter'}; + my $wcb = delete $env->{'qspawn.wcb'}; + my $async = delete $self->{async}; + if (scalar(@$r) == 3) { # error + if ($async) { + # calls rpipe->close && ->event_step + $async->close; + } else { + $self->{rpipe}->close; + event_step($self); + } + $wcb->($r); + } elsif ($async) { + # done reading headers, handoff to read body + my $fh = $wcb->($r); # scalar @$r == 2 + $fh = filter_fh($fh, $filter) if $filter; + $self->{fh} = $fh; + $async->async_pass($env->{'psgix.io'}, $fh, + delete($self->{hdr_buf})); + } else { # for synchronous PSGI servers + require PublicInbox::GetlineBody; + $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe}, + \&event_step, $self, + ${$self->{hdr_buf}}, $filter); + $wcb->($r); + } + + # Workaround a leak under Perl 5.16.3 when combined with + # Plack::Middleware::Deflater: + $wcb = undef; +} + # Used for streaming the stdout of one process as a PSGI response. # # $env is the PSGI env. @@ -266,62 +302,29 @@ sub psgi_return { $self->{psgi_env} = $env; $self->{hdr_buf} = \(my $hdr_buf = ''); $self->{parse_hdr} = $parse_hdr; - my $wcb = delete $env->{'qspawn.wcb'}; # or PSGI server supplies it - my $async = $env->{'pi-httpd.async'}; - - my $cb = sub { - my $r = rd_hdr($self) or return; - my $filter = delete $env->{'qspawn.filter'}; - if (scalar(@$r) == 3) { # error - if ($async) { - # calls rpipe->close && ->event_step - $async->close; - } else { - $self->{rpipe}->close; - event_step($self); - } - $wcb->($r); - } elsif ($async) { - # done reading headers, handoff to read body - my $fh = $wcb->($r); # scalar @$r == 2 - $fh = filter_fh($fh, $filter) if $filter; - $self->{fh} = $fh; - $async->async_pass($env->{'psgix.io'}, $fh, - delete($self->{hdr_buf})); - } else { # for synchronous PSGI servers - require PublicInbox::GetlineBody; - $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe}, - \&event_step, $self, - ${$self->{hdr_buf}}, $filter); - $wcb->($r); - } - - # Workaround a leak under Perl 5.16.3 when combined with - # Plack::Middleware::Deflater: - $wcb = undef; - }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); my $start_cb = sub { # may run later, much later... - if ($async) { + if (my $async = $env->{'pi-httpd.async'}) { # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, # $end_obj) - $async = $async->($self->{rpipe}, $cb, undef, $self); - # $cb will call ->async_pass or ->close + $self->{async} = $async->($self->{rpipe}, + \&psgi_return_init_cb, $self, + $self); } else { # generic PSGI - $cb->() while $self->{parse_hdr}; + psgi_return_init_cb($self) while $self->{parse_hdr}; } }; # the caller already captured the PSGI write callback from # the PSGI server, so we can call ->start, here: - return $self->start($limiter, $start_cb) if $wcb; + return $self->start($limiter, $start_cb) if $env->{'qspawn.wcb'}; # the caller will return this sub to the PSGI server, so # it can set the response callback (that is, for PublicInbox::HTTP, # the chunked_wcb or identity_wcb callback), but other HTTP servers # are supported: sub { - ($wcb) = @_; + $self->{psgi_env}->{'qspawn.wcb'} = $_[0]; $self->start($limiter, $start_cb); }; } -- cgit v1.2.3-24-ge0c7