From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id CF9781FD45 for ; Fri, 24 Jan 2020 09:43:52 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 3/4] ds: tmpio: store offsets per-buffer Date: Fri, 24 Jan 2020 09:43:51 +0000 Message-Id: <20200124094352.19437-4-e@yhbt.net> In-Reply-To: <20200124094352.19437-1-e@yhbt.net> References: <20200124094352.19437-1-e@yhbt.net> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We want to be able to inject existing file handles + offsets and even lengths into this in the future, without going through the ->getline interface[1] We also switch to using a 64K buffer size since we can safely discard whatever got truncated on write and full writes can help negotiate a larger TCP window for high-latency, high-bandwidth links. While we're at it, make it obvious that we're using O_APPEND for our tmpfile() interface so we can seek freely for reading while the writer always prints to the end of the file. [1] the getline interface for serving static files may result in us buffering on-FS data into another temporary file, which is a waste. --- lib/PublicInbox/DS.pm | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 970061fd..a9ac7fcd 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -18,7 +18,7 @@ use strict; use bytes; use POSIX qw(WNOHANG); use IO::Handle qw(); -use Fcntl qw(SEEK_SET :DEFAULT); +use Fcntl qw(SEEK_SET :DEFAULT O_APPEND); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use parent qw(Exporter); our @EXPORT_OK = qw(now msg_more); @@ -31,8 +31,8 @@ use PublicInbox::Tmpfile; use fields ('sock', # underlying socket 'rbuf', # scalarref, usually undef - 'wbuf', # arrayref of coderefs or GLOB refs (autovivified) - 'wbuf_off', # offset into first element of wbuf to start writing at + 'wbuf', # arrayref of coderefs or tmpio (autovivified)) + # (tmpio = [ GLOB, offset, [ length ] ]) ); use Errno qw(EAGAIN EINVAL); @@ -392,11 +392,13 @@ sub close { } # portable, non-thread-safe sendfile emulation (no pread, yet) -sub psendfile ($$$) { - my ($sock, $fh, $off) = @_; +sub send_tmpio ($$) { + my ($sock, $tmpio) = @_; - seek($fh, $$off, SEEK_SET) or return; - defined(my $to_write = read($fh, my $buf, 16384)) or return; + seek($tmpio->[0], $tmpio->[1], SEEK_SET) or return; + my $n = $tmpio->[2] // 65536; + $n = 65536 if $n > 65536; + defined(my $to_write = read($tmpio->[0], my $buf, $n)) or return; my $written = 0; while ($to_write > 0) { if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) { @@ -407,7 +409,8 @@ sub psendfile ($$$) { last; } } - $$off += $written; + $tmpio->[1] += $written; # offset + $tmpio->[2] -= $written if defined($tmpio->[2]); # length $written; } @@ -424,9 +427,8 @@ sub flush_write ($) { next_buf: while (my $bref = $wbuf->[0]) { if (ref($bref) ne 'CODE') { - my $off = delete($self->{wbuf_off}) // 0; while ($sock) { - my $w = psendfile($sock, $bref, \$off); + my $w = send_tmpio($sock, $bref); # bref is tmpio if (defined $w) { if ($w == 0) { shift @$wbuf; @@ -434,13 +436,12 @@ next_buf: } } elsif ($! == EAGAIN) { epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT); - $self->{wbuf_off} = $off; return 0; } else { return $self->close; } } - } else { #($ref eq 'CODE') { + } else { #(ref($bref) eq 'CODE') { shift @$wbuf; my $before = scalar(@$wbuf); $bref->($self); @@ -490,12 +491,13 @@ sub drop { # PerlIO::mmap or PerlIO::scalar if needed sub tmpio ($$$) { my ($self, $bref, $off) = @_; - my $fh = tmpfile('wbuf', $self->{sock}, 1) or + my $fh = tmpfile('wbuf', $self->{sock}, O_APPEND) or return drop($self, "tmpfile $!"); $fh->autoflush(1); + binmode $fh, ':unix'; # reduce syscalls for read() >8192 bytes my $len = bytes::length($$bref) - $off; $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!"); - $fh + [ $fh, 0 ] # [1] = offset, [2] = length, not set by us } =head2 C<< $obj->write( $data ) >> @@ -524,9 +526,9 @@ sub write { if ($ref eq 'CODE') { push @$wbuf, $bref; } else { - my $last = $wbuf->[-1]; - if (ref($last) eq 'GLOB') { # append to tmp file buffer - $last->print($$bref) or return drop($self, "print: $!"); + my $tmpio = $wbuf->[-1]; + if ($tmpio && !defined($tmpio->[2])) { # append to tmp file buffer + $tmpio->[0]->print($$bref) or return drop($self, "print: $!"); } else { my $tmpio = tmpio($self, $bref, 0) or return 0; push @$wbuf, $tmpio;