user/dev discussion of public-inbox itself
 help / color / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 13/57] ds: switch write buffering to use a tempfile
Date: Mon, 24 Jun 2019 02:52:14 +0000
Message-ID: <20190624025258.25592-14-e@80x24.org> (raw)
In-Reply-To: <20190624025258.25592-1-e@80x24.org>

Data which can't fit into a generously-sized socket buffer,
has no business being stored in heap.
---
 lib/PublicInbox/DS.pm   | 110 ++++++++++++++++++++++++++++++----------
 lib/PublicInbox/HTTP.pm |  20 ++------
 2 files changed, 85 insertions(+), 45 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 3e8b0b1a..eb468f57 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -18,22 +18,23 @@ use strict;
 use bytes;
 use POSIX ();
 use IO::Handle qw();
-use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
+use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
 use parent qw(Exporter);
-our @EXPORT_OK = qw(now msg_more);
+our @EXPORT_OK = qw(now msg_more write_in_full);
 use warnings;
 
 use PublicInbox::Syscall qw(:epoll);
 
 use fields ('sock',              # underlying socket
-            'wbuf',              # arrayref of scalars, scalarrefs, or coderefs to write
+            'wbuf',              # arrayref of coderefs or GLOB refs
             'wbuf_off',  # offset into first element of wbuf to start writing at
             'event_watch',       # bitmask of events the client is interested in (POLLIN,OUT,etc.)
             );
 
 use Errno  qw(EAGAIN EINVAL);
 use Carp   qw(croak confess);
+use File::Temp qw(tempfile);
 
 use constant POLLIN        => 1;
 use constant POLLOUT       => 4;
@@ -478,32 +479,51 @@ sub close {
     return 0;
 }
 
+# portable, non-thread-safe sendfile emulation (no pread, yet)
+sub psendfile ($$$) {
+    my ($sock, $fh, $off) = @_;
+
+    sysseek($fh, $$off, SEEK_SET) or return;
+    defined(my $to_write = sysread($fh, my $buf, 16384)) or return;
+    my $written = 0;
+    while ($to_write > 0) {
+        if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) {
+            $written += $w;
+            $to_write -= $w;
+        } else {
+            return if $written == 0;
+            last;
+        }
+    }
+    $$off += $written;
+    $written;
+}
+
 # returns 1 if done, 0 if incomplete
 sub flush_write ($) {
     my ($self) = @_;
     my $wbuf = $self->{wbuf} or return 1;
     my $sock = $self->{sock} or return 1;
 
+next_buf:
     while (my $bref = $wbuf->[0]) {
-        my $ref = ref($bref);
-        if ($ref eq 'SCALAR') {
-            my $len = bytes::length($$bref);
-            my $off = $self->{wbuf_off} || 0;
-            my $to_write = $len - $off;
-            my $written = syswrite($sock, $$bref, $to_write, $off);
-            if (defined $written) {
-                if ($written == $to_write) {
-                    shift @$wbuf;
+        if (ref($bref) ne 'CODE') {
+            my $off = delete($self->{wbuf_off}) // 0;
+            while (1) {
+                my $w = psendfile($sock, $bref, \$off);
+                if (defined $w) {
+                    if ($w == 0) {
+                        shift @$wbuf;
+                        goto next_buf;
+                    }
+                } elsif ($! == EAGAIN) {
+                    $self->{wbuf_off} = $off;
+                    watch_write($self, 1);
+                    return 0;
                 } else {
-                    $self->{wbuf_off} = $off + $written;
+                    return $self->close;
                 }
-                next; # keep going until EAGAIN
-            } elsif ($! == EAGAIN) {
-                $self->watch_write(1);
-            } else {
-                $self->close;
             }
-            return 0;
         } else { #($ref eq 'CODE') {
             shift @$wbuf;
             $bref->();
@@ -515,6 +535,31 @@ sub flush_write ($) {
     1; # all done
 }
 
+sub write_in_full ($$$$) {
+    my ($fh, $bref, $len, $off) = @_;
+    my $rv = 0;
+    while ($len > 0) {
+        my $w = syswrite($fh, $$bref, $len, $off);
+        return ($rv ? $rv : $w) unless $w; # undef or 0
+        $rv += $w;
+        $len -= $w;
+        $off += $w;
+    }
+    $rv
+}
+
+sub tmpbuf ($$) {
+    my ($bref, $off) = @_;
+    # open(my $fh, '+>>', undef) doesn't set O_APPEND
+    my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1);
+    open $fh, '+>>', $path or die "open: $!";
+    unlink $path;
+    my $to_write = bytes::length($$bref) - $off;
+    my $w = write_in_full($fh, $bref, $to_write, $off);
+    die "write_in_full ($to_write): $!" unless defined $w;
+    $w == $to_write ? $fh : die("short write $w < $to_write");
+}
+
 =head2 C<< $obj->write( $data ) >>
 
 Write the specified data to the underlying handle.  I<data> may be scalar,
@@ -537,7 +582,16 @@ sub write {
     my $ref = ref $data;
     my $bref = $ref ? $data : \$data;
     if (my $wbuf = $self->{wbuf}) { # already buffering, can't write more...
-        push @$wbuf, $bref;
+        if ($ref eq 'CODE') {
+            push @$wbuf, $bref;
+        } else {
+            my $last = $wbuf->[-1];
+            if (ref($last) eq 'GLOB') { # append to tmp file buffer
+                write_in_full($last, $bref, bytes::length($$bref), 0);
+            } else {
+                push @$wbuf, tmpbuf($bref, 0);
+            }
+        }
         return 0;
     } elsif ($ref eq 'CODE') {
         $bref->();
@@ -548,15 +602,13 @@ sub write {
 
         if (defined $written) {
             return 1 if $written == $to_write;
-            $self->{wbuf_off} = $written;
-            $self->{wbuf} = [ $bref ];
-            return flush_write($self); # try until EAGAIN
         } elsif ($! == EAGAIN) {
-            $self->{wbuf} = [ $bref ];
-            $self->watch_write(1);
+            $written = 0;
         } else {
-            $self->close;
+            return $self->close;
         }
+        $self->{wbuf} = [ tmpbuf($bref, $written) ];
+        watch_write($self, 1);
         return 0;
     }
 }
@@ -573,8 +625,10 @@ sub msg_more ($$) {
             my $nlen = bytes::length($_[1]) - $n;
             return 1 if $nlen == 0; # all done!
 
-            # PublicInbox::DS::write queues the unwritten substring:
-            return $self->write(substr($_[1], $n, $nlen));
+            # queue up the unwritten substring:
+            $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
+            watch_write($self, 1);
+            return 0;
         }
     }
     $self->write(\($_[1]));
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index a669eb6e..fcb5eb6c 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -19,7 +19,7 @@ use HTTP::Status qw(status_message);
 use HTTP::Date qw(time2str);
 use IO::Handle;
 require PublicInbox::EvCleanup;
-PublicInbox::DS->import('msg_more');
+PublicInbox::DS->import(qw(msg_more write_in_full));
 use constant {
 	CHUNK_START => -1,   # [a-f0-9]+\r\n
 	CHUNK_END => -2,     # \r\n
@@ -125,7 +125,7 @@ sub read_input ($) {
 
 	while ($len > 0) {
 		if ($$rbuf ne '') {
-			my $w = write_in_full($input, $rbuf, $len);
+			my $w = write_in_full($input, $rbuf, $len, 0);
 			return write_err($self, $len) unless $w;
 			$len -= $w;
 			die "BUG: $len < 0 (w=$w)" if $len < 0;
@@ -367,20 +367,6 @@ sub recv_err {
 	quit($self, 500);
 }
 
-sub write_in_full {
-	my ($fh, $rbuf, $len) = @_;
-	my $rv = 0;
-	my $off = 0;
-	while ($len > 0) {
-		my $w = syswrite($fh, $$rbuf, $len, $off);
-		return ($rv ? $rv : $w) unless $w; # undef or 0
-		$rv += $w;
-		$off += $w;
-		$len -= $w;
-	}
-	$rv
-}
-
 sub read_input_chunked { # unlikely...
 	my ($self) = @_;
 	my $input = $self->{env}->{'psgi.input'};
@@ -425,7 +411,7 @@ sub read_input_chunked { # unlikely...
 		# drain the current chunk
 		until ($len <= 0) {
 			if ($$rbuf ne '') {
-				my $w = write_in_full($input, $rbuf, $len);
+				my $w = write_in_full($input, $rbuf, $len, 0);
 				return write_err($self, "$len chunk") if !$w;
 				$len -= $w;
 				if ($len == 0) {
-- 
EW


  parent reply index

Thread overview: 61+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-06-24  2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
2019-06-24  2:52 ` [PATCH 01/57] ds: get rid of {closed} field Eric Wong
2019-06-24  2:52 ` [PATCH 02/57] ds: get rid of more unused debug instance methods Eric Wong
2019-06-24  2:52 ` [PATCH 03/57] ds: use and export monotonic now() Eric Wong
2019-06-24  2:52 ` [PATCH 04/57] AddTimer: avoid clock_gettime for the '0' case Eric Wong
2019-06-24  2:52 ` [PATCH 05/57] ds: get rid of on_incomplete_write wrapper Eric Wong
2019-06-24  2:52 ` [PATCH 06/57] ds: lazy initialize wbuf_off Eric Wong
2019-06-24  2:52 ` [PATCH 07/57] ds: split out from ->flush_write and ->write Eric Wong
2019-06-24  2:52 ` [PATCH 08/57] ds: lazy-initialize wbuf Eric Wong
2019-06-24  2:52 ` [PATCH 09/57] ds: don't pass `events' arg to EPOLL_CTL_DEL Eric Wong
2019-06-24  2:52 ` [PATCH 10/57] ds: remove support for DS->write(undef) Eric Wong
2019-06-24  2:52 ` [PATCH 11/57] http: favor DS->write(strref) when reasonable Eric Wong
2019-06-24  2:52 ` [PATCH 12/57] ds: share send(..., MSG_MORE) logic Eric Wong
2019-06-24  2:52 ` Eric Wong [this message]
2019-06-24  2:52 ` [PATCH 14/57] ds: get rid of redundant and unnecessary POLL* constants Eric Wong
2019-06-24  2:52 ` [PATCH 15/57] syscall: get rid of unused EPOLL* constants Eric Wong
2019-06-24  2:52 ` [PATCH 16/57] syscall: get rid of unnecessary uname local vars Eric Wong
2019-06-24  2:52 ` [PATCH 17/57] ds: set event flags directly at initialization Eric Wong
2019-06-24  2:52 ` [PATCH 18/57] ds: import IO::KQueue namespace Eric Wong
2019-06-24  2:52 ` [PATCH 19/57] ds: share watch_chg between watch_read/watch_write Eric Wong
2019-06-24  2:52 ` [PATCH 20/57] ds: remove IO::Poll support (for now) Eric Wong
2019-06-24  2:52 ` [PATCH 21/57] ds: get rid of event_watch field Eric Wong
2019-06-24  2:52 ` [PATCH 22/57] httpd/async: remove EINTR check Eric Wong
2019-06-24  2:52 ` [PATCH 23/57] spawn: remove `Blocking' flag handling Eric Wong
2019-06-24  2:52 ` [PATCH 24/57] qspawn: describe where `$rpipe' come from Eric Wong
2019-06-24  2:52 ` [PATCH 25/57] http|nntp: favor "$! == EFOO" over $!{EFOO} checks Eric Wong
2019-06-24  2:52 ` [PATCH 26/57] ds: favor `delete' over assigning fields to `undef' Eric Wong
2019-06-24  2:52 ` [PATCH 27/57] http: don't pass extra args to PublicInbox::DS::close Eric Wong
2019-06-24  2:52 ` [PATCH 28/57] ds: pass $self to code references Eric Wong
2019-06-24  2:52 ` [PATCH 29/57] evcleanup: replace _run_asap with `event_step' callback Eric Wong
2019-06-24  2:52 ` [PATCH 30/57] ds: remove pointless exit calls Eric Wong
2019-06-24  2:52 ` [PATCH 31/57] http|nntp: be explicit about bytes::length on rbuf Eric Wong
2019-06-24  2:52 ` [PATCH 32/57] ds: hoist out do_read from NNTP and HTTP Eric Wong
2019-06-24  2:52 ` [PATCH 33/57] nntp: simplify re-arming/requeue logic Eric Wong
2019-06-24  2:52 ` [PATCH 34/57] allow use of PerlIO layers for filesystem writes Eric Wong
2019-06-24  2:52 ` [PATCH 35/57] ds: deal better with FS-related errors IO buffers Eric Wong
2019-06-24  2:52 ` [PATCH 36/57] nntp: wait for writability before sending greeting Eric Wong
2019-06-24  2:52 ` [PATCH 37/57] nntp: NNTPS and NNTP+STARTTLS working Eric Wong
2019-06-24  2:52 ` [PATCH 38/57] certs/create-certs.perl: fix cert validity on 32-bit Eric Wong
2019-06-24  2:52 ` [PATCH 39/57] daemon: map inherited sockets to well-known schemes Eric Wong
2019-06-24  2:52 ` [PATCH 40/57] ds|nntp: use CORE::close on socket Eric Wong
2019-06-24  2:52 ` [PATCH 41/57] nntp: call SSL_shutdown in normal cases Eric Wong
2019-06-24  2:52 ` [PATCH 42/57] t/nntpd-tls: slow client connection test Eric Wong
2019-06-24  2:52 ` [PATCH 43/57] daemon: use SSL_MODE_RELEASE_BUFFERS Eric Wong
2019-06-24  2:52 ` [PATCH 44/57] ds: allow ->write callbacks to syswrite directly Eric Wong
2019-06-24  2:52 ` [PATCH 45/57] nntp: reduce allocations for greeting Eric Wong
2019-06-24  2:52 ` [PATCH 46/57] ds: always use EV_ADD with EV_SET Eric Wong
2019-06-24  2:52 ` [PATCH 47/57] nntp: simplify long response logic and fix nesting Eric Wong
2019-06-24  2:52 ` [PATCH 48/57] ds: flush_write runs ->write callbacks even if closed Eric Wong
2019-06-24  2:52 ` [PATCH 49/57] nntp: lazily allocate and stash rbuf Eric Wong
2019-06-24  2:52 ` [PATCH 50/57] ci: require IO::KQueue on FreeBSD, for now Eric Wong
2019-06-24  2:52 ` [PATCH 51/57] nntp: send greeting immediately for plain sockets Eric Wong
2019-06-24  2:52 ` [PATCH 52/57] daemon: set TCP_DEFER_ACCEPT on everything but NNTP Eric Wong
2019-06-24  2:52 ` [PATCH 53/57] daemon: use FreeBSD accept filters on non-NNTP Eric Wong
2019-06-24  2:52 ` [PATCH 54/57] ds: split out IO::KQueue-specific code Eric Wong
2019-06-24  5:24   ` Eric Wong
2019-06-24  2:52 ` [PATCH 55/57] ds: reimplement IO::Poll support to look like epoll Eric Wong
2019-06-24  2:52 ` [PATCH 56/57] Revert "ci: require IO::KQueue on FreeBSD, for now" Eric Wong
2019-06-24  2:52 ` [PATCH 57/57] ds: reduce overhead of tempfile creation Eric Wong
2019-06-24  5:25 ` [PATCH 58/57] Makefile: skip DSKQXS in global syntax check Eric Wong
2019-06-24 18:28 ` [PATCH 59/57] ds: ->write must not clobber empty wbuf array Eric Wong

Reply instructions:

You may reply publically to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20190624025258.25592-14-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

user/dev discussion of public-inbox itself

Archives are clonable:
	git clone --mirror https://public-inbox.org/meta
	git clone --mirror http://czquwvybam4bgbro.onion/meta
	git clone --mirror http://hjrcffqmbrq6wope.onion/meta
	git clone --mirror http://ou63pmih66umazou.onion/meta

Example config snippet for mirrors

Newsgroups are available over NNTP:
	nntp://news.public-inbox.org/inbox.comp.mail.public-inbox.meta
	nntp://ou63pmih66umazou.onion/inbox.comp.mail.public-inbox.meta
	nntp://czquwvybam4bgbro.onion/inbox.comp.mail.public-inbox.meta
	nntp://hjrcffqmbrq6wope.onion/inbox.comp.mail.public-inbox.meta
	nntp://news.gmane.org/gmane.mail.public-inbox.general

 note: .onion URLs require Tor: https://www.torproject.org/

AGPL code for this site: git clone https://public-inbox.org/public-inbox.git