From dbaf64b646943bd92e1aa8d581e23a5adb4a3e57 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 23 Feb 2016 02:52:18 +0000 Subject: initial public-inbox-httpd implemenation This is meant to provide an easy starting point for server admins. It provides a basic HTTP server for admins unfamiliar with configuring PSGI applications as well as being an identical interface for management as our nntpd implementation. This HTTP server may also be a generic Plack/PSGI server for existing Plack/PSGI applications. --- lib/PublicInbox/HTTP.pm | 334 ++++++++++++++++++++++++++++++++++++++++++++ lib/PublicInbox/Listener.pm | 5 +- 2 files changed, 337 insertions(+), 2 deletions(-) create mode 100644 lib/PublicInbox/HTTP.pm (limited to 'lib') diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm new file mode 100644 index 00000000..a5149ac2 --- /dev/null +++ b/lib/PublicInbox/HTTP.pm @@ -0,0 +1,334 @@ +# Copyright (C) 2016 all contributors +# License: AGPL-3.0+ +# +# Generic PSGI server for convenience. It aims to provide +# a consistent experience for public-inbox admins so they don't have +# to learn different ways to admin both NNTP and HTTP components. +# There's nothing public-inbox-specific, here. +# Each instance of this class represents a HTTP client socket + +package PublicInbox::HTTP; +use strict; +use warnings; +use base qw(Danga::Socket); +use fields qw(httpd env rbuf input_left); +use Fcntl qw(:seek); +use HTTP::Parser::XS qw(parse_http_request); # supports pure Perl fallback +use HTTP::Status qw(status_message); +use HTTP::Date qw(time2str); +use IO::File; +my $null_io = IO::File->new('/dev/null', '<'); +use constant { + CHUNK_START => -1, # [a-f0-9]+\r\n + CHUNK_END => -2, # \r\n + CHUNK_ZEND => -3, # \r\n + CHUNK_MAX_HDR => 256, +}; + +sub new ($$$) { + my ($class, $sock, $addr, $httpd) = @_; + my $self = fields::new($class); + $self->SUPER::new($sock); + $self->{httpd} = $httpd; + $self->{rbuf} = ''; + $self->watch_read(1); + $self; +} + +sub event_read { # called by Danga::Socket + my ($self) = @_; + + return event_read_input($self) if defined $self->{env}; + + my $off = $self->{rbuf} eq '' ? 0 : length($self->{rbuf}); + my $r = sysread($self->{sock}, $self->{rbuf}, 8192, $off); + if (defined $r) { + return $self->close if $r == 0; + return rbuf_process($self); + } + return if $!{EAGAIN}; # no need to call watch_read(1) again + + # common for clients to break connections without warning, + # would be too noisy to log here: + return $self->close; +} + +sub rbuf_process { + my ($self) = @_; + + my %env = %{$self->{httpd}->{env}}; # full hash copy + my $r = parse_http_request($self->{rbuf}, \%env); + + # We do not support Trailers in chunked requests, for now + # (they are rarely-used and git (as of 2.7.2) does not use them) + return $self->quit(400) if $r == -1 || $env{HTTP_TRAILER}; + return $self->watch_read(1) if $r < 0; # incomplete + $self->{rbuf} = substr($self->{rbuf}, $r); + my $len = input_prepare($self, \%env); + $len ? event_read_input($self) : app_dispatch($self); +} + +sub event_read_input ($) { + my ($self) = @_; + my $env = $self->{env}; + return event_read_input_chunked($self) if env_chunked($env); + + # env->{CONTENT_LENGTH} (identity) + my $sock = $self->{sock}; + my $len = $self->{input_left}; + $self->{input_left} = undef; + my $rbuf = \($self->{rbuf}); + my $input = $env->{'psgi.input'}; + + while ($len > 0) { + if ($$rbuf ne '') { + my $w = write_in_full($input, $rbuf, $len); + return $self->write_err unless $w; + $len -= $w; + die "BUG: $len < 0 (w=$w)" if $len < 0; + if ($len == 0) { # next request may be pipelined + $$rbuf = substr($$rbuf, $w); + last; + } + $$rbuf = ''; + } + my $r = sysread($sock, $$rbuf, 8192); + return $self->recv_err($r, $len) unless $r; + # continue looping if $r > 0; + } + app_dispatch($self); +} + +sub app_dispatch ($) { + my ($self) = @_; + $self->watch_read(0); + my $env = $self->{env}; + $self->{env} = undef; + $env->{REMOTE_ADDR} = $self->peer_ip_string; # Danga::Socket + $env->{REMOTE_PORT} = $self->{peer_port}; # set by peer_ip_string + if (my $host = $env->{HTTP_HOST}) { + $host =~ s/:(\d+)\z// and $env->{SERVER_PORT} = $1; + $env->{SERVER_NAME} = $host; + } + $env->{'psgi.input'}->seek(0, SEEK_SET); + my $res = Plack::Util::run_app($self->{httpd}->{app}, $env); + eval { + if (ref($res) eq 'CODE') { + $res->(sub { response_write($self, $env, $_[0]) }); + } else { + response_write($self, $env, $res); + } + }; + $self->close if $@; +} + +sub response_header_write { + my ($self, $env, $res) = @_; + my $proto = $env->{SERVER_PROTOCOL} or return; # HTTP/0.9 :P + my $status = $res->[0]; + my $h = "$proto $status " . status_message($status) . "\r\n"; + my ($len, $chunked); + my $headers = $res->[1]; + + for (my $i = 0; $i < @$headers; $i += 2) { + my $k = $headers->[$i]; + my $v = $headers->[$i + 1]; + next if $k =~ /\A(?:Connection|Date)\z/i; + + $len = $v if $k =~ /\AContent-Length\z/i; + if ($k =~ /\ATransfer-Encoding\z/i && $v =~ /\bchunked\b/i) { + $chunked = 1; + } + + $h .= "$k: $v\r\n"; + } + + my $conn = $env->{HTTP_CONNECTION} || ''; + my $alive = (defined($len) || $chunked) && + ($proto eq 'HTTP/1.1' && $conn !~ /\bclose\b/i) || + ($conn =~ /\bkeep-alive\b/i); + + $h .= 'Connection: ' . ($alive ? 'keep-alive' : 'close'); + $h .= "\r\nDate: " . time2str(time) . "\r\n\r\n"; + + if (($len || $chunked) && $env->{REQUEST_METHOD} ne 'HEAD') { + more($self, $h); + } else { + $self->write($h); + } + ($alive, $chunked); +} + +sub response_write { + my ($self, $env, $res) = @_; + my ($alive, $chunked) = response_header_write($self, $env, $res); + my $write = sub { $self->write($_[0]) }; + my $close = sub { + if ($alive) { + $self->event_write; # watch for readability if done + } else { + $self->write(sub { $self->close }); + } + }; + + if (defined $res->[2]) { + Plack::Util::foreach($res->[2], $write); + $close->(); + } else { + # this is returned to the calling application: + Plack::Util::inline_object(write => $write, close => $close); + } +} + +use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; +sub more ($$) { + my $self = $_[0]; + if (MSG_MORE && !$self->{write_buf_size}) { + my $n = send($self->{sock}, $_[1], MSG_MORE); + if (defined $n) { + my $dlen = length($_[1]); + return 1 if $n == $dlen; # all done! + $_[1] = substr($_[1], $n, $dlen - $n); + # fall through to normal write: + } + } + $self->write($_[1]); +} + +# overrides existing Danga::Socket method +sub event_write { + my ($self) = @_; + # only continue watching for readability when we are done writing: + return if $self->write(undef) != 1; + + if ($self->{rbuf} eq '') { + $self->watch_read(1); + } else { + # avoid recursion + Danga::Socket->AddTimer(0, sub { rbuf_process($self) }); + } +} + +sub input_prepare { + my ($self, $env) = @_; + my $input = $null_io; + my $len = $env->{CONTENT_LENGTH}; + if ($len) { + $input = IO::File->new_tmpfile; + } elsif (env_chunked($env)) { + $input = IO::File->new_tmpfile; + $len = CHUNK_START; + } + binmode $input; + $env->{'psgi.input'} = $input; + $self->{env} = $env; + $self->{input_left} = $len; +} + +sub env_chunked { ($_[0]->{HTTP_TRANSFER_ENCODING} || '') =~ /\bchunked\b/i } + +sub write_err { + my ($self) = @_; + my $err = $self->{env}->{'psgi.errors'}; + my $msg = $! || '(zero write)'; + $err->print("error buffering to input: $msg\n"); + $self->quit(500); +} + +sub recv_err { + my ($self, $r, $len) = @_; + return $self->close if (defined $r && $r == 0); + if ($!{EAGAIN}) { + $self->{input_left} = $len; + return; + } + my $err = $self->{env}->{'psgi.errors'}; + $err->print("error reading for input: $! ($len bytes remaining)\n"); + $self->quit(500); +} + +sub write_in_full { + my ($fh, $rbuf, $len) = @_; + my $rv = 0; + my $off = 0; + while ($len > 0) { + my $w = syswrite($fh, $$rbuf, $len, $off); + return ($rv ? $rv : $w) unless $w; # undef or 0 + $rv += $w; + $off += $w; + $len -= $w; + } + $rv +} + +sub event_read_input_chunked { # unlikely... + my ($self) = @_; + my $input = $self->{env}->{'psgi.input'}; + my $sock = $self->{sock}; + my $len = $self->{input_left}; + $self->{input_left} = undef; + my $rbuf = \($self->{rbuf}); + + while (1) { # chunk start + if ($len == CHUNK_ZEND) { + return app_dispatch($self) if $$rbuf =~ s/\A\r\n//s; + return $self->quit(400) if length($$rbuf) > 2; + } + if ($len == CHUNK_END) { + if ($$rbuf =~ s/\A\r\n//s) { + $len = CHUNK_START; + } elsif (length($$rbuf) > 2) { + return $self->quit(400); + } + } + if ($len == CHUNK_START) { + if ($$rbuf =~ s/\A([a-f0-9]+).*?\r\n//i) { + $len = hex $1; + } elsif (length($$rbuf) > CHUNK_MAX_HDR) { + return $self->quit(400); + } + # will break from loop since $len >= 0 + } + + if ($len < 0) { # chunk header is trickled, read more + my $off = length($$rbuf); + my $r = sysread($sock, $$rbuf, 8192, $off); + return $self->recv_err($r, $len) unless $r; + # (implicit) goto chunk_start if $r > 0; + } + $len = CHUNK_ZEND if $len == 0; + + # drain the current chunk + until ($len <= 0) { + if ($$rbuf ne '') { + my $w = write_in_full($input, $rbuf, $len); + return $self->write_err unless $w; + $len -= $w; + if ($len == 0) { + # we may have leftover data to parse + # in chunk + $$rbuf = substr($$rbuf, $w); + $len = CHUNK_END; + } elsif ($len < 0) { + die "BUG: len < 0: $len"; + } else { + $$rbuf = ''; + } + } + if ($$rbuf eq '') { + # read more of current chunk + my $r = sysread($sock, $$rbuf, 8192); + return $self->recv_err($r, $len) unless $r; + } + } + } +} + +sub quit { + my ($self, $status) = @_; + my $h = "HTTP/1.1 $status " . status_message($status) . "\r\n\r\n"; + $self->write($h); + $self->close; +} + +1; diff --git a/lib/PublicInbox/Listener.pm b/lib/PublicInbox/Listener.pm index 8e0554f3..5f351a77 100644 --- a/lib/PublicInbox/Listener.pm +++ b/lib/PublicInbox/Listener.pm @@ -25,11 +25,12 @@ sub new ($$$) { sub event_read { my ($self) = @_; + my $sock = $self->{sock}; # no loop here, we want to fairly distribute clients # between multiple processes sharing the same socket - if (my $addr = accept(my $c, $self->{sock})) { + if (my $addr = accept(my $c, $sock)) { IO::Handle::blocking($c, 0); # no accept4 :< - $self->{post_accept}->($c, $addr); + $self->{post_accept}->($c, $addr, $sock); } } -- cgit v1.2.3-24-ge0c7