about summary refs log tree commit homepage
path: root/lib/PublicInbox
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-10-25 00:29:39 +0000
committerEric Wong <e@80x24.org>2023-10-25 07:28:42 +0000
commit3e634c22ceff4736d3c34d3496e7e5519e6ef356 (patch)
tree25472585f4c93917125b95457fbed3ee975cc2be /lib/PublicInbox
parent6fe457251172f2f59a4e0a89be2a56913e88f2ad (diff)
downloadpublic-inbox-3e634c22ceff4736d3c34d3496e7e5519e6ef356.tar.gz
Now that psgi_yield is used everywhere, the more complex
psgi_return and it's helper bits can be removed.  We'll also fix
some outdated comments now that everything on psgi_return has
switched to psgi_yield.  GetlineResponse replaces GetlineBody
and does a better job of isolating generic PSGI-only code.
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r--lib/PublicInbox/GetlineBody.pm46
-rw-r--r--lib/PublicInbox/GitHTTPBackend.pm6
-rw-r--r--lib/PublicInbox/GzipFilter.pm2
-rw-r--r--lib/PublicInbox/HTTPD.pm5
-rw-r--r--lib/PublicInbox/HTTPD/Async.pm101
-rw-r--r--lib/PublicInbox/Qspawn.pm121
-rw-r--r--lib/PublicInbox/RepoAtom.pm2
-rw-r--r--lib/PublicInbox/WwwCoderepo.pm2
8 files changed, 6 insertions, 279 deletions
diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm
deleted file mode 100644
index 0e781224..00000000
--- a/lib/PublicInbox/GetlineBody.pm
+++ /dev/null
@@ -1,46 +0,0 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# Wrap a pipe or file for PSGI streaming response bodies and calls the
-# end callback when the object goes out-of-scope.
-# This depends on rpipe being _blocking_ on getline.
-#
-# This is only used by generic PSGI servers and not public-inbox-httpd
-package PublicInbox::GetlineBody;
-use strict;
-use warnings;
-
-sub new {
-        my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_;
-        bless {
-                rpipe => $rpipe,
-                end => $end,
-                end_arg => $end_arg,
-                initial_buf => $buf,
-                filter => $filter,
-        }, $class;
-}
-
-# close should always be called after getline returns undef,
-# but a client aborting a connection can ruin our day; so lets
-# hope our underlying PSGI server does not leak references, here.
-sub DESTROY { $_[0]->close }
-
-sub getline {
-        my ($self) = @_;
-        my $rpipe = $self->{rpipe} or return; # EOF was set on previous call
-        my $buf = delete($self->{initial_buf}) // $rpipe->getline;
-        delete($self->{rpipe}) unless defined $buf; # set EOF for next call
-        if (my $filter = $self->{filter}) {
-                $buf = $filter->translate($buf);
-        }
-        $buf;
-}
-
-sub close {
-        my ($self) = @_;
-        my ($end, $end_arg) = delete @$self{qw(end end_arg)};
-        $end->($end_arg) if $end;
-}
-
-1;
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index d7e0bced..7228555b 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -145,16 +145,12 @@ sub parse_cgi_headers { # {parse_hdr} for Qspawn
                 }
         }
 
-        # fallback to WwwCoderepo if cgit 404s.  Duplicating $ctx prevents
-        # ->finalize from the current Qspawn from using qspawn.wcb.
-        # This makes qspawn skip ->async_pass and causes
-        # PublicInbox::HTTPD::Async::event_step to close shortly after
+        # fallback to WwwCoderepo if cgit 404s
         if ($code == 404 && $ctx->{www} && !$ctx->{_coderepo_tried}++) {
                 my $wcb = delete $ctx->{env}->{'qspawn.wcb'};
                 $ctx->{env}->{'plack.skip-deflater'} = 1; # prevent 2x gzip
                 $ctx->{env}->{'qspawn.fallback'} = $code;
                 my $res = $ctx->{www}->coderepo->srv($ctx);
-                # for ->psgi_return_init_cb
                 $ctx->{env}->{'qspawn.wcb'} = $wcb;
                 $res; # CODE or ARRAY ref
         } else {
diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm
index d6ecd5ba..fc471ea2 100644
--- a/lib/PublicInbox/GzipFilter.pm
+++ b/lib/PublicInbox/GzipFilter.pm
@@ -93,7 +93,7 @@ sub gone { # what: search/over/mm
         undef;
 }
 
-# for GetlineBody (via Qspawn) when NOT using $env->{'pi-httpd.async'}
+# for GetlineResponse (via Qspawn) when NOT using $env->{'pi-httpd.async'}
 # Also used for ->getline callbacks
 sub translate {
         my $self = shift; # $_[1] => input
diff --git a/lib/PublicInbox/HTTPD.pm b/lib/PublicInbox/HTTPD.pm
index bae7281b..6a6347d8 100644
--- a/lib/PublicInbox/HTTPD.pm
+++ b/lib/PublicInbox/HTTPD.pm
@@ -9,9 +9,6 @@ use strict;
 use Plack::Util ();
 use Plack::Builder;
 use PublicInbox::HTTP;
-use PublicInbox::HTTPD::Async;
-
-sub pi_httpd_async { PublicInbox::HTTPD::Async->new(@_) }
 
 # we have a different env for ever listener socket for
 # SERVER_NAME, SERVER_PORT and psgi.url_scheme
@@ -45,7 +42,7 @@ sub env_for ($$$) {
                 # this to limit git-http-backend(1) parallelism.
                 # We also check for the truthiness of this to
                 # detect when to use async paths for slow blobs
-                'pi-httpd.async' => \&pi_httpd_async,
+                'pi-httpd.async' => 1,
                 'pi-httpd.app' => $self->{app},
                 'pi-httpd.warn_cb' => $self->{warn_cb},
         }
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
deleted file mode 100644
index 2e4d8baa..00000000
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ /dev/null
@@ -1,101 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-#
-# XXX This is a totally unstable API for public-inbox internal use only
-# This is exposed via the 'pi-httpd.async' key in the PSGI env hash.
-# The name of this key is not even stable!
-# Currently intended for use with read-only pipes with expensive
-# processes such as git-http-backend(1), cgit(1)
-#
-# fields:
-# http: PublicInbox::HTTP ref
-# fh: PublicInbox::HTTP::{Identity,Chunked} ref (can ->write + ->close)
-# cb: initial read callback
-# arg: arg for {cb}
-# end_obj: CODE or object which responds to ->event_step when ->close is called
-package PublicInbox::HTTPD::Async;
-use v5.12;
-use parent qw(PublicInbox::DS);
-use Errno qw(EAGAIN);
-use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::ProcessIONBF;
-
-# This is called via: $env->{'pi-httpd.async'}->()
-# $io is a read-only pipe ($rpipe) for now, but may be a
-# bidirectional socket in the future.
-sub new {
-        my ($class, $io, $cb, $arg, $end_obj) = @_;
-        my $self = bless {
-                cb => $cb, # initial read callback
-                arg => $arg, # arg for $cb
-                end_obj => $end_obj, # like END{}, can ->event_step
-        }, $class;
-        PublicInbox::ProcessIONBF->replace($io);
-        $self->SUPER::new($io, EPOLLIN);
-}
-
-sub event_step {
-        my ($self) = @_;
-        if (defined $self->{cb}) {
-                # this may call async_pass when headers are done
-                $self->{cb}->($self->{arg});
-        } elsif (my $sock = $self->{sock}) {
-                # $http may be undef if discarding body output from cgit on 404
-                my $http = $self->{http} or return $self->close;
-                # $self->{sock} is a read pipe for git-http-backend or cgit
-                # and 65536 is the default Linux pipe size
-                my $r = sysread($sock, my $buf, 65536);
-                if ($r) {
-                        $self->{ofh}->write($buf); # may call $http->close
-                        # let other clients get some work done, too
-                        return if $http->{sock}; # !closed
-
-                        # else: fall through to close below...
-                } elsif (!defined $r && $! == EAGAIN) {
-                        return; # EPOLLIN means we'll be notified
-                }
-
-                # Done! Error handling will happen in $self->{ofh}->close
-                # called by end_obj->event_step handler
-                delete $http->{forward};
-                $self->close; # queues end_obj->event_step to be called
-        } # else { # we may've been requeued but closed by $http
-}
-
-# once this is called, all data we read is passed to the
-# to the PublicInbox::HTTP instance ($http) via $ofh->write
-# $ofh is typically PublicInbox::HTTP::{Chunked,Identity}, but
-# may be PublicInbox::GzipFilter or $PublicInbox::Qspawn::qx_fh
-sub async_pass {
-        my ($self, $http, $ofh, $bref) = @_;
-        delete @$self{qw(cb arg)};
-        # In case the client HTTP connection ($http) dies, it
-        # will automatically close this ($self) object.
-        $http->{forward} = $self;
-
-        # write anything we overread when we were reading headers.
-        # This is typically PublicInbox:HTTP::{chunked,identity}_wcb,
-        # but may be PublicInbox::GzipFilter::write.  PSGI requires
-        # *_wcb methods respond to ->write (and ->close), not ->print
-        $ofh->write($$bref);
-
-        $self->{http} = $http;
-        $self->{ofh} = $ofh;
-}
-
-# may be called as $forward->close in PublicInbox::HTTP or EOF (event_step)
-sub close {
-        my $self = $_[0];
-        $self->SUPER::close; # DS::close
-        delete @$self{qw(cb arg)};
-
-        # we defer this to the next timer loop since close is deferred
-        if (my $end_obj = delete $self->{end_obj}) {
-                # this calls $end_obj->event_step
-                # (likely PublicInbox::Qspawn::event_step,
-                #  NOT PublicInbox::HTTPD::Async::event_step)
-                PublicInbox::DS::requeue($end_obj);
-        }
-}
-
-1;
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 203d8f41..a6e1d58b 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -176,48 +176,6 @@ sub psgi_qx {
         start($self, $limiter, undef);
 }
 
-# this is called on pipe EOF to reap the process, may be called
-# via PublicInbox::DS event loop OR via GetlineBody for generic
-# PSGI servers.
-sub event_step {
-        my ($self) = @_;
-        finish($self);
-        my $fh = delete $self->{qfh};
-        $fh->close if $fh; # async-only (psgi_return)
-}
-
-sub rd_hdr ($) {
-        my ($self) = @_;
-        # typically used for reading CGI headers
-        # We also need to check EINTR for generic PSGI servers.
-        my ($ret, $total_rd);
-        my ($bref, $ph_cb, @ph_arg) = ($self->{hdr_buf}, @{$self->{parse_hdr}});
-        until (defined($ret)) {
-                my $r = sysread($self->{rpipe}, $$bref, 4096, length($$bref));
-                if (defined($r)) {
-                        $total_rd += $r;
-                        eval { $ret = $ph_cb->($total_rd, $bref, @ph_arg) };
-                        if ($@) {
-                                warn "parse_hdr: $@";
-                                $ret = [ 500, [], [ "Internal error\n" ] ];
-                        } elsif (!defined($ret) && !$r) {
-                                warn <<EOM;
-EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
-EOM
-                                $ret = [ 500, [], [ "Internal error\n" ] ];
-                        }
-                } else {
-                        # caller should notify us when it's ready:
-                        return if $! == EAGAIN;
-                        next if $! == EINTR; # immediate retry
-                        warn "error reading header: $!";
-                        $ret = [ 500, [], [ "Internal error\n" ] ];
-                }
-        }
-        delete $self->{parse_hdr}; # done parsing headers
-        $ret;
-}
-
 sub yield_pass {
         my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
         my $env = $self->{psgi_env};
@@ -251,62 +209,6 @@ sub yield_pass {
         $self->{qfh} = $qfh; # keep $ipipe open
 }
 
-sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
-        my ($self) = @_;
-        my $r = rd_hdr($self) or return; # incomplete
-        my $env = $self->{psgi_env};
-        my $filter;
-
-        # this is for RepoAtom since that can fire after parse_cgi_headers
-        if (ref($r) eq 'ARRAY' && blessed($r->[2]) && $r->[2]->can('attach')) {
-                $filter = pop @$r;
-        }
-        $filter //= delete($env->{'qspawn.filter'}) // (ref($r) eq 'ARRAY' ?
-                PublicInbox::GzipFilter::qsp_maybe($r->[1], $env) : undef);
-
-        my $wcb = delete $env->{'qspawn.wcb'};
-        my $async = delete $self->{async}; # PublicInbox::HTTPD::Async
-        if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error
-                if ($async) { # calls rpipe->close && ->event_step
-                        $async->close; # PublicInbox::HTTPD::Async::close
-                } else { # generic PSGI, use PublicInbox::ProcessIO::CLOSE
-                        delete($self->{rpipe})->close;
-                        event_step($self);
-                }
-                if (ref($r) eq 'ARRAY') { # error
-                        $wcb->($r)
-                } elsif (ref($r) eq 'CODE') { # chain another command
-                        $r->($wcb);
-                        $self->{passed} = 1;
-                }
-                # else do nothing
-        } elsif ($async) {
-                # done reading headers, handoff to read body
-                my $fh = $wcb->($r); # scalar @$r == 2
-                $fh = $filter->attach($fh) if $filter;
-                $self->{qfh} = $fh;
-                $async->async_pass($env->{'psgix.io'}, $fh,
-                                        delete($self->{hdr_buf}));
-        } else { # for synchronous PSGI servers
-                require PublicInbox::GetlineBody;
-                my $buf = delete $self->{hdr_buf};
-                $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe},
-                                        \&event_step, $self, $$buf, $filter);
-                $wcb->($r);
-        }
-}
-
-sub psgi_return_start { # may run later, much later...
-        my ($self) = @_;
-        if (my $cb = $self->{psgi_env}->{'pi-httpd.async'}) {
-                # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, $end_obj)
-                $self->{async} = $cb->($self->{rpipe},
-                                        \&psgi_return_init_cb, $self, $self);
-        } else { # generic PSGI
-                psgi_return_init_cb($self) while $self->{parse_hdr};
-        }
-}
-
 sub r500 () { [ 500, [], [ "Internal error\n" ] ] }
 
 sub parse_hdr_done ($$) {
@@ -363,7 +265,7 @@ sub _yield_start { # may run later, much later...
 #   $env->{'qspawn.wcb'} - the write callback from the PSGI server
 #                          optional, use this if you've already
 #                          captured it elsewhere.  If not given,
-#                          psgi_return will return an anonymous
+#                          psgi_yield will return an anonymous
 #                          sub for the PSGI server to call
 #
 #   $env->{'qspawn.filter'} - filter object, responds to ->attach for
@@ -379,27 +281,6 @@ sub _yield_start { # may run later, much later...
 #              body will be streamed, later, via writes (push-based) to
 #              psgix.io.  3-element arrays means the body is available
 #              immediately (or streamed via ->getline (pull-based)).
-sub psgi_return {
-        my ($self, $env, $limiter, @parse_hdr_arg)= @_;
-        $self->{psgi_env} = $env;
-        $self->{hdr_buf} = \(my $hdr_buf = '');
-        $self->{parse_hdr} = \@parse_hdr_arg;
-        $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
-
-        # the caller already captured the PSGI write callback from
-        # the PSGI server, so we can call ->start, here:
-        $env->{'qspawn.wcb'} and
-                return start($self, $limiter, \&psgi_return_start);
-
-        # the caller will return this sub to the PSGI server, so
-        # it can set the response callback (that is, for
-        # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
-        # but other HTTP servers are supported:
-        sub {
-                $env->{'qspawn.wcb'} = $_[0];
-                start($self, $limiter, \&psgi_return_start);
-        }
-}
 
 sub psgi_yield {
         my ($self, $env, $limiter, @parse_hdr_arg)= @_;
diff --git a/lib/PublicInbox/RepoAtom.pm b/lib/PublicInbox/RepoAtom.pm
index b7179511..c1649d0a 100644
--- a/lib/PublicInbox/RepoAtom.pm
+++ b/lib/PublicInbox/RepoAtom.pm
@@ -40,7 +40,7 @@ EOM
 # called by GzipFilter->close
 sub zflush { $_[0]->SUPER::zflush('</feed>') }
 
-# called by GzipFilter->write or GetlineBody->getline
+# called by GzipFilter->write or GetlineResponse->getline
 sub translate {
         my $self = shift;
         my $rec = $_[0] // return $self->zflush; # getline
diff --git a/lib/PublicInbox/WwwCoderepo.pm b/lib/PublicInbox/WwwCoderepo.pm
index 6e19fc02..0eb4a2d6 100644
--- a/lib/PublicInbox/WwwCoderepo.pm
+++ b/lib/PublicInbox/WwwCoderepo.pm
@@ -230,7 +230,7 @@ sub summary ($$) {
 # called by GzipFilter->close after translate
 sub zflush { $_[0]->SUPER::zflush('</pre>', $_[0]->_html_end) }
 
-# called by GzipFilter->write or GetlineBody->getline
+# called by GzipFilter->write or GetlineResponse->getline
 sub translate {
         my $ctx = shift;
         my $rec = $_[0] // return zflush($ctx); # getline