user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 5/7] git: move rbuf handling to PublicInbox::IO
Date: Sun, 26 Nov 2023 02:11:03 +0000	[thread overview]
Message-ID: <20231126021105.408573-6-e@80x24.org> (raw)
In-Reply-To: <20231126021105.408573-1-e@80x24.org>

The long-term plan is to share non-blocking read buffering logic
with HTTP/NNTP/IMAP/POP3 and also XapClient.
---
 lib/PublicInbox/Gcf2Client.pm |  1 -
 lib/PublicInbox/Git.pm        | 59 ++++++-----------------------------
 lib/PublicInbox/IO.pm         | 53 ++++++++++++++++++++++++++++++-
 3 files changed, 61 insertions(+), 52 deletions(-)

diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 48d8c5ac..19d77e32 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -21,7 +21,6 @@ use autodie qw(socketpair);
 #	pid.owner => process which spawned {pid}
 #	in => same as {sock}, for compatibility with PublicInbox::Git
 #	inflight => array (see PublicInbox::Git)
-#	rbuf => scalarref, may be non-existent or empty
 sub new  {
 	my ($opt) = @_;
 	my $self = bless {}, __PACKAGE__;
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index bef524aa..93736cf0 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -14,11 +14,11 @@ use autodie qw(socketpair read);
 use POSIX ();
 use Socket qw(AF_UNIX SOCK_STREAM);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
-use Errno qw(EINTR EAGAIN);
+use Errno qw(EAGAIN);
 use File::Glob qw(bsd_glob GLOB_NOSORT);
 use File::Spec ();
 use PublicInbox::Spawn qw(spawn popen_rd run_qx which);
-use PublicInbox::IO qw(poll_in read_all try_cat);
+use PublicInbox::IO qw(read_all try_cat);
 use PublicInbox::Tmpfile;
 use Carp qw(croak carp);
 use PublicInbox::SHA qw(sha_all);
@@ -166,43 +166,6 @@ sub _sock_cmd {
 	$self->{sock} = PublicInbox::IO::attach_pid($s1, $pid);
 }
 
-sub my_read ($$$) {
-	my ($fh, $rbuf, $len) = @_;
-	my $left = $len - length($$rbuf);
-	my $r;
-	while ($left > 0) {
-		$r = sysread($fh, $$rbuf, $left, length($$rbuf));
-		if ($r) {
-			$left -= $r;
-		} elsif (defined($r)) { # EOF
-			return 0;
-		} else {
-			next if ($! == EAGAIN and poll_in($fh));
-			next if $! == EINTR; # may be set by sysread or poll_in
-			return; # unrecoverable error
-		}
-	}
-	my $no_pad = substr($$rbuf, 0, $len, '');
-	\$no_pad;
-}
-
-sub my_readline ($$) {
-	my ($fh, $rbuf) = @_;
-	while (1) {
-		if ((my $n = index($$rbuf, "\n")) >= 0) {
-			return substr($$rbuf, 0, $n + 1, '');
-		}
-		my $r = sysread($fh, $$rbuf, 65536, length($$rbuf)) and next;
-
-		# return whatever's left on EOF
-		return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r);
-
-		next if ($! == EAGAIN and poll_in($fh));
-		next if $! == EINTR; # may be set by sysread or poll_in
-		return; # unrecoverable error
-	}
-}
-
 sub cat_async_retry ($$) {
 	my ($self, $old_inflight) = @_;
 
@@ -234,16 +197,15 @@ sub cat_async_step ($$) {
 	my ($self, $inflight) = @_;
 	die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
 	my ($req, $cb, $arg) = @$inflight[0, 1, 2];
-	my $rbuf = delete($self->{rbuf}) // \(my $new = '');
 	my ($bref, $oid, $type, $size);
-	my $head = my_readline($self->{sock}, $rbuf);
+	my $head = $self->{sock}->my_readline;
 	my $cmd = ref($req) ? $$req : $req;
 	# ->fail may be called via Gcf2Client.pm
 	my $info = $self->{-bc} && substr($cmd, 0, 5) eq 'info ';
 	if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) {
 		($oid, $type, $size) = ($1, $2, $3 + 0);
 		unless ($info) { # --batch-command
-			$bref = my_read($self->{sock}, $rbuf, $size + 1) or
+			$bref = $self->{sock}->my_bufread($size + 1) or
 				$self->fail(defined($bref) ?
 						'read EOF' : "read: $!");
 			chop($$bref) eq "\n" or
@@ -268,7 +230,6 @@ sub cat_async_step ($$) {
 		my $err = $! ? " ($!)" : '';
 		$self->fail("bad result from async cat-file: $head$err");
 	}
-	$self->{rbuf} = $rbuf if $$rbuf ne '';
 	splice(@$inflight, 0, 3); # don't retry $cb on ->fail
 	eval { $cb->($bref, $oid, $type, $size, $arg) };
 	async_err($self, $req, $oid, $@, $info ? 'check' : 'cat') if $@;
@@ -312,17 +273,15 @@ sub check_async_step ($$) {
 	my ($ck, $inflight) = @_;
 	die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
 	my ($req, $cb, $arg) = @$inflight[0, 1, 2];
-	my $rbuf = delete($ck->{rbuf}) // \(my $new = '');
-	chomp(my $line = my_readline($ck->{sock}, $rbuf));
+	chomp(my $line = $ck->{sock}->my_readline);
 	my ($hex, $type, $size) = split(/ /, $line);
 
 	# git <2.21 would show `dangling' (2.21+ shows `ambiguous')
 	# https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/
 	if ($hex eq 'dangling') {
-		my $ret = my_read($ck->{sock}, $rbuf, $type + 1);
+		my $ret = $ck->{sock}->my_bufread($type + 1);
 		$ck->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret;
 	}
-	$ck->{rbuf} = $rbuf if $$rbuf ne '';
 	splice(@$inflight, 0, 3); # don't retry $cb on ->fail
 	eval { $cb->(undef, $hex, $type, $size, $arg) };
 	async_err($ck, $req, $hex, $@, 'check') if $@;
@@ -643,8 +602,8 @@ sub event_step {
 		$self->cat_async_step($inflight);
 		return $self->close unless $self->{sock};
 		# don't loop here to keep things fair, but we must requeue
-		# if there's already-read data in rbuf
-		$self->requeue if exists($self->{rbuf});
+		# if there's already-read data in pi_io_rbuf
+		$self->requeue if $self->{sock}->has_rbuf;
 	}
 }
 
@@ -670,7 +629,7 @@ sub close {
 			warn "E: (in abort) $req: $@" if $@;
 		}
 	}
-	delete @$self{qw(-bc err_c inflight rbuf)};
+	delete @$self{qw(-bc err_c inflight)};
 	delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
 }
 
diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm
index 63ae3ef4..fcebac59 100644
--- a/lib/PublicInbox/IO.pm
+++ b/lib/PublicInbox/IO.pm
@@ -9,6 +9,7 @@ use PublicInbox::DS qw(awaitpid);
 our @EXPORT_OK = qw(poll_in read_all try_cat write_file);
 use Carp qw(croak);
 use IO::Poll qw(POLLIN);
+use Errno qw(EINTR EAGAIN);
 # don't autodie in top-level for Perl 5.16.3 (and maybe newer versions)
 # we have our own ->close, so we scope autodie into each sub
 
@@ -18,7 +19,7 @@ sub waitcb { # awaitpid callback
 	$cb->($pid, @args) if $cb;
 }
 
-sub attach_pid ($$;@) {
+sub attach_pid {
 	my ($io, $pid, @cb_arg) = @_;
 	bless $io, __PACKAGE__;
 	# we share $err (and not $self) with awaitpid to avoid a ref cycle
@@ -87,4 +88,54 @@ sub try_cat ($) {
 	read_all $fh;
 }
 
+# TODO: move existing HTTP/IMAP/NNTP/POP3 uses of rbuf here
+sub my_bufread {
+	my ($io, $len) = @_;
+	my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = '');
+	my $left = $len - length($$rbuf);
+	my $r;
+	while ($left > 0) {
+		$r = sysread($io, $$rbuf, $left, length($$rbuf));
+		if ($r) {
+			$left -= $r;
+		} elsif (defined($r)) { # EOF
+			return 0;
+		} else {
+			next if ($! == EAGAIN and poll_in($io));
+			next if $! == EINTR; # may be set by sysread or poll_in
+			return; # unrecoverable error
+		}
+	}
+	my $no_pad = substr($$rbuf, 0, $len, '');
+	delete(${*$io}{pi_io_rbuf}) if $$rbuf eq '';
+	\$no_pad;
+}
+
+# always uses "\n"
+sub my_readline {
+	my ($io) = @_;
+	my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = '');
+	while (1) {
+		if ((my $n = index($$rbuf, "\n")) >= 0) {
+			my $ret = substr($$rbuf, 0, $n + 1, '');
+			delete(${*$io}{pi_io_rbuf}) if $$rbuf eq '';
+			return $ret;
+		}
+		my $r = sysread($io, $$rbuf, 65536, length($$rbuf));
+		if (!defined($r)) {
+			next if ($! == EAGAIN and poll_in($io));
+			next if $! == EINTR; # may be set by sysread or poll_in
+			return; # unrecoverable error
+		} elsif ($r == 0) { # return whatever's left on EOF
+			delete(${*$io}{pi_io_rbuf});
+			return $$rbuf;
+		} # else { continue
+	}
+}
+
+sub has_rbuf {
+	my ($io) = @_;
+	defined(${*$io}{pi_io_rbuf});
+}
+
 1;

  parent reply	other threads:[~2023-11-26  2:11 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-11-26  2:10 [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
2023-11-26  2:10 ` [PATCH 1/7] xap_helper_cxx: do not copy xap_helper.h source Eric Wong
2023-11-26  2:11 ` [PATCH 2/7] xap_client: attach PID to the IO object Eric Wong
2023-11-26  2:11 ` [PATCH 3/7] xap_client: pass arguments to top-level xap_helper Eric Wong
2023-11-26  2:11 ` [PATCH 4/7] xap_helper: allow PI_NO_CXX to disable C++ in more places Eric Wong
2023-11-26  2:11 ` Eric Wong [this message]
2023-11-26  2:11 ` [PATCH 6/7] git: improve coupling with {sock} and {inflight} fields Eric Wong
2023-11-26  2:11 ` [PATCH 7/7] drop redundant calls to DS->Reset Eric Wong
2023-11-26 21:08   ` [PATCH v2] " Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20231126021105.408573-6-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).