From 2929e3b3c62925149a9a8cafd872bfdb017453eb Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:14 +0000 Subject: ds: switch write buffering to use a tempfile Data which can't fit into a generously-sized socket buffer, has no business being stored in heap. --- lib/PublicInbox/DS.pm | 110 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 82 insertions(+), 28 deletions(-) (limited to 'lib/PublicInbox/DS.pm') diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 3e8b0b1a..eb468f57 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -18,22 +18,23 @@ use strict; use bytes; use POSIX (); use IO::Handle qw(); -use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD); +use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use parent qw(Exporter); -our @EXPORT_OK = qw(now msg_more); +our @EXPORT_OK = qw(now msg_more write_in_full); use warnings; use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket - 'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write + 'wbuf', # arrayref of coderefs or GLOB refs 'wbuf_off', # offset into first element of wbuf to start writing at 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) ); use Errno qw(EAGAIN EINVAL); use Carp qw(croak confess); +use File::Temp qw(tempfile); use constant POLLIN => 1; use constant POLLOUT => 4; @@ -478,32 +479,51 @@ sub close { return 0; } +# portable, non-thread-safe sendfile emulation (no pread, yet) +sub psendfile ($$$) { + my ($sock, $fh, $off) = @_; + + sysseek($fh, $$off, SEEK_SET) or return; + defined(my $to_write = sysread($fh, my $buf, 16384)) or return; + my $written = 0; + while ($to_write > 0) { + if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) { + $written += $w; + $to_write -= $w; + } else { + return if $written == 0; + last; + } + } + $$off += $written; + $written; +} + # returns 1 if done, 0 if incomplete sub flush_write ($) { my ($self) = @_; my $wbuf = $self->{wbuf} or return 1; my $sock = $self->{sock} or return 1; +next_buf: while (my $bref = $wbuf->[0]) { - my $ref = ref($bref); - if ($ref eq 'SCALAR') { - my $len = bytes::length($$bref); - my $off = $self->{wbuf_off} || 0; - my $to_write = $len - $off; - my $written = syswrite($sock, $$bref, $to_write, $off); - if (defined $written) { - if ($written == $to_write) { - shift @$wbuf; + if (ref($bref) ne 'CODE') { + my $off = delete($self->{wbuf_off}) // 0; + while (1) { + my $w = psendfile($sock, $bref, \$off); + if (defined $w) { + if ($w == 0) { + shift @$wbuf; + goto next_buf; + } + } elsif ($! == EAGAIN) { + $self->{wbuf_off} = $off; + watch_write($self, 1); + return 0; } else { - $self->{wbuf_off} = $off + $written; + return $self->close; } - next; # keep going until EAGAIN - } elsif ($! == EAGAIN) { - $self->watch_write(1); - } else { - $self->close; } - return 0; } else { #($ref eq 'CODE') { shift @$wbuf; $bref->(); @@ -515,6 +535,31 @@ sub flush_write ($) { 1; # all done } +sub write_in_full ($$$$) { + my ($fh, $bref, $len, $off) = @_; + my $rv = 0; + while ($len > 0) { + my $w = syswrite($fh, $$bref, $len, $off); + return ($rv ? $rv : $w) unless $w; # undef or 0 + $rv += $w; + $len -= $w; + $off += $w; + } + $rv +} + +sub tmpbuf ($$) { + my ($bref, $off) = @_; + # open(my $fh, '+>>', undef) doesn't set O_APPEND + my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1); + open $fh, '+>>', $path or die "open: $!"; + unlink $path; + my $to_write = bytes::length($$bref) - $off; + my $w = write_in_full($fh, $bref, $to_write, $off); + die "write_in_full ($to_write): $!" unless defined $w; + $w == $to_write ? $fh : die("short write $w < $to_write"); +} + =head2 C<< $obj->write( $data ) >> Write the specified data to the underlying handle. I may be scalar, @@ -537,7 +582,16 @@ sub write { my $ref = ref $data; my $bref = $ref ? $data : \$data; if (my $wbuf = $self->{wbuf}) { # already buffering, can't write more... - push @$wbuf, $bref; + if ($ref eq 'CODE') { + push @$wbuf, $bref; + } else { + my $last = $wbuf->[-1]; + if (ref($last) eq 'GLOB') { # append to tmp file buffer + write_in_full($last, $bref, bytes::length($$bref), 0); + } else { + push @$wbuf, tmpbuf($bref, 0); + } + } return 0; } elsif ($ref eq 'CODE') { $bref->(); @@ -548,15 +602,13 @@ sub write { if (defined $written) { return 1 if $written == $to_write; - $self->{wbuf_off} = $written; - $self->{wbuf} = [ $bref ]; - return flush_write($self); # try until EAGAIN } elsif ($! == EAGAIN) { - $self->{wbuf} = [ $bref ]; - $self->watch_write(1); + $written = 0; } else { - $self->close; + return $self->close; } + $self->{wbuf} = [ tmpbuf($bref, $written) ]; + watch_write($self, 1); return 0; } } @@ -573,8 +625,10 @@ sub msg_more ($$) { my $nlen = bytes::length($_[1]) - $n; return 1 if $nlen == 0; # all done! - # PublicInbox::DS::write queues the unwritten substring: - return $self->write(substr($_[1], $n, $nlen)); + # queue up the unwritten substring: + $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ]; + watch_write($self, 1); + return 0; } } $self->write(\($_[1])); -- cgit v1.2.3-24-ge0c7