* [PATCH 5/7] git: move rbuf handling to PublicInbox::IO
2023-11-26 2:10 7% [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
@ 2023-11-26 2:11 6% ` Eric Wong
0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2023-11-26 2:11 UTC (permalink / raw)
To: meta
The long-term plan is to share non-blocking read buffering logic
with HTTP/NNTP/IMAP/POP3 and also XapClient.
---
lib/PublicInbox/Gcf2Client.pm | 1 -
lib/PublicInbox/Git.pm | 59 ++++++-----------------------------
lib/PublicInbox/IO.pm | 53 ++++++++++++++++++++++++++++++-
3 files changed, 61 insertions(+), 52 deletions(-)
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 48d8c5ac..19d77e32 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -21,7 +21,6 @@ use autodie qw(socketpair);
# pid.owner => process which spawned {pid}
# in => same as {sock}, for compatibility with PublicInbox::Git
# inflight => array (see PublicInbox::Git)
-# rbuf => scalarref, may be non-existent or empty
sub new {
my ($opt) = @_;
my $self = bless {}, __PACKAGE__;
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index bef524aa..93736cf0 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -14,11 +14,11 @@ use autodie qw(socketpair read);
use POSIX ();
use Socket qw(AF_UNIX SOCK_STREAM);
use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
-use Errno qw(EINTR EAGAIN);
+use Errno qw(EAGAIN);
use File::Glob qw(bsd_glob GLOB_NOSORT);
use File::Spec ();
use PublicInbox::Spawn qw(spawn popen_rd run_qx which);
-use PublicInbox::IO qw(poll_in read_all try_cat);
+use PublicInbox::IO qw(read_all try_cat);
use PublicInbox::Tmpfile;
use Carp qw(croak carp);
use PublicInbox::SHA qw(sha_all);
@@ -166,43 +166,6 @@ sub _sock_cmd {
$self->{sock} = PublicInbox::IO::attach_pid($s1, $pid);
}
-sub my_read ($$$) {
- my ($fh, $rbuf, $len) = @_;
- my $left = $len - length($$rbuf);
- my $r;
- while ($left > 0) {
- $r = sysread($fh, $$rbuf, $left, length($$rbuf));
- if ($r) {
- $left -= $r;
- } elsif (defined($r)) { # EOF
- return 0;
- } else {
- next if ($! == EAGAIN and poll_in($fh));
- next if $! == EINTR; # may be set by sysread or poll_in
- return; # unrecoverable error
- }
- }
- my $no_pad = substr($$rbuf, 0, $len, '');
- \$no_pad;
-}
-
-sub my_readline ($$) {
- my ($fh, $rbuf) = @_;
- while (1) {
- if ((my $n = index($$rbuf, "\n")) >= 0) {
- return substr($$rbuf, 0, $n + 1, '');
- }
- my $r = sysread($fh, $$rbuf, 65536, length($$rbuf)) and next;
-
- # return whatever's left on EOF
- return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r);
-
- next if ($! == EAGAIN and poll_in($fh));
- next if $! == EINTR; # may be set by sysread or poll_in
- return; # unrecoverable error
- }
-}
-
sub cat_async_retry ($$) {
my ($self, $old_inflight) = @_;
@@ -234,16 +197,15 @@ sub cat_async_step ($$) {
my ($self, $inflight) = @_;
die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
my ($req, $cb, $arg) = @$inflight[0, 1, 2];
- my $rbuf = delete($self->{rbuf}) // \(my $new = '');
my ($bref, $oid, $type, $size);
- my $head = my_readline($self->{sock}, $rbuf);
+ my $head = $self->{sock}->my_readline;
my $cmd = ref($req) ? $$req : $req;
# ->fail may be called via Gcf2Client.pm
my $info = $self->{-bc} && substr($cmd, 0, 5) eq 'info ';
if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) {
($oid, $type, $size) = ($1, $2, $3 + 0);
unless ($info) { # --batch-command
- $bref = my_read($self->{sock}, $rbuf, $size + 1) or
+ $bref = $self->{sock}->my_bufread($size + 1) or
$self->fail(defined($bref) ?
'read EOF' : "read: $!");
chop($$bref) eq "\n" or
@@ -268,7 +230,6 @@ sub cat_async_step ($$) {
my $err = $! ? " ($!)" : '';
$self->fail("bad result from async cat-file: $head$err");
}
- $self->{rbuf} = $rbuf if $$rbuf ne '';
splice(@$inflight, 0, 3); # don't retry $cb on ->fail
eval { $cb->($bref, $oid, $type, $size, $arg) };
async_err($self, $req, $oid, $@, $info ? 'check' : 'cat') if $@;
@@ -312,17 +273,15 @@ sub check_async_step ($$) {
my ($ck, $inflight) = @_;
die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
my ($req, $cb, $arg) = @$inflight[0, 1, 2];
- my $rbuf = delete($ck->{rbuf}) // \(my $new = '');
- chomp(my $line = my_readline($ck->{sock}, $rbuf));
+ chomp(my $line = $ck->{sock}->my_readline);
my ($hex, $type, $size) = split(/ /, $line);
# git <2.21 would show `dangling' (2.21+ shows `ambiguous')
# https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/
if ($hex eq 'dangling') {
- my $ret = my_read($ck->{sock}, $rbuf, $type + 1);
+ my $ret = $ck->{sock}->my_bufread($type + 1);
$ck->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret;
}
- $ck->{rbuf} = $rbuf if $$rbuf ne '';
splice(@$inflight, 0, 3); # don't retry $cb on ->fail
eval { $cb->(undef, $hex, $type, $size, $arg) };
async_err($ck, $req, $hex, $@, 'check') if $@;
@@ -643,8 +602,8 @@ sub event_step {
$self->cat_async_step($inflight);
return $self->close unless $self->{sock};
# don't loop here to keep things fair, but we must requeue
- # if there's already-read data in rbuf
- $self->requeue if exists($self->{rbuf});
+ # if there's already-read data in pi_io_rbuf
+ $self->requeue if $self->{sock}->has_rbuf;
}
}
@@ -670,7 +629,7 @@ sub close {
warn "E: (in abort) $req: $@" if $@;
}
}
- delete @$self{qw(-bc err_c inflight rbuf)};
+ delete @$self{qw(-bc err_c inflight)};
delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
}
diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm
index 63ae3ef4..fcebac59 100644
--- a/lib/PublicInbox/IO.pm
+++ b/lib/PublicInbox/IO.pm
@@ -9,6 +9,7 @@ use PublicInbox::DS qw(awaitpid);
our @EXPORT_OK = qw(poll_in read_all try_cat write_file);
use Carp qw(croak);
use IO::Poll qw(POLLIN);
+use Errno qw(EINTR EAGAIN);
# don't autodie in top-level for Perl 5.16.3 (and maybe newer versions)
# we have our own ->close, so we scope autodie into each sub
@@ -18,7 +19,7 @@ sub waitcb { # awaitpid callback
$cb->($pid, @args) if $cb;
}
-sub attach_pid ($$;@) {
+sub attach_pid {
my ($io, $pid, @cb_arg) = @_;
bless $io, __PACKAGE__;
# we share $err (and not $self) with awaitpid to avoid a ref cycle
@@ -87,4 +88,54 @@ sub try_cat ($) {
read_all $fh;
}
+# TODO: move existing HTTP/IMAP/NNTP/POP3 uses of rbuf here
+sub my_bufread {
+ my ($io, $len) = @_;
+ my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = '');
+ my $left = $len - length($$rbuf);
+ my $r;
+ while ($left > 0) {
+ $r = sysread($io, $$rbuf, $left, length($$rbuf));
+ if ($r) {
+ $left -= $r;
+ } elsif (defined($r)) { # EOF
+ return 0;
+ } else {
+ next if ($! == EAGAIN and poll_in($io));
+ next if $! == EINTR; # may be set by sysread or poll_in
+ return; # unrecoverable error
+ }
+ }
+ my $no_pad = substr($$rbuf, 0, $len, '');
+ delete(${*$io}{pi_io_rbuf}) if $$rbuf eq '';
+ \$no_pad;
+}
+
+# always uses "\n"
+sub my_readline {
+ my ($io) = @_;
+ my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = '');
+ while (1) {
+ if ((my $n = index($$rbuf, "\n")) >= 0) {
+ my $ret = substr($$rbuf, 0, $n + 1, '');
+ delete(${*$io}{pi_io_rbuf}) if $$rbuf eq '';
+ return $ret;
+ }
+ my $r = sysread($io, $$rbuf, 65536, length($$rbuf));
+ if (!defined($r)) {
+ next if ($! == EAGAIN and poll_in($io));
+ next if $! == EINTR; # may be set by sysread or poll_in
+ return; # unrecoverable error
+ } elsif ($r == 0) { # return whatever's left on EOF
+ delete(${*$io}{pi_io_rbuf});
+ return $$rbuf;
+ } # else { continue
+ }
+}
+
+sub has_rbuf {
+ my ($io) = @_;
+ defined(${*$io}{pi_io_rbuf});
+}
+
1;
^ permalink raw reply related [relevance 6%]
* [PATCH 0/7] more I/O + process reliability and cleanups
@ 2023-11-26 2:10 7% Eric Wong
2023-11-26 2:11 6% ` [PATCH 5/7] git: move rbuf handling to PublicInbox::IO Eric Wong
0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2023-11-26 2:10 UTC (permalink / raw)
To: meta
6/7 ought to fix another hang in t/lei-q-save.t when writing to
v2 outputs.
Much of this stuff will be relevant to code search since Xapian
searches will be moved to C++ (if available) to support features
which aren't usable from Perl bindings and allow more
predictable performance anyways.
Eric Wong (7):
xap_helper_cxx: do not copy xap_helper.h source
xap_client: attach PID to the IO object
xap_client: pass arguments to top-level xap_helper
xap_helper: allow PI_NO_CXX to disable C++ in more places
git: move rbuf handling to PublicInbox::IO
git: improve coupling with {sock} and {inflight} fields
drop redundant calls to DS->Reset
lib/PublicInbox/CodeSearchIdx.pm | 11 +--
lib/PublicInbox/Daemon.pm | 1 -
lib/PublicInbox/Gcf2Client.pm | 7 +-
lib/PublicInbox/Git.pm | 138 ++++++++++++-------------------
lib/PublicInbox/GitAsyncCat.pm | 2 +-
lib/PublicInbox/IO.pm | 70 ++++++++++++++--
lib/PublicInbox/TestCommon.pm | 2 +-
lib/PublicInbox/Watch.pm | 6 +-
lib/PublicInbox/XapClient.pm | 9 +-
lib/PublicInbox/XapHelperCxx.pm | 11 +--
lib/PublicInbox/Xapcmd.pm | 6 +-
t/xap_helper.t | 5 +-
12 files changed, 145 insertions(+), 123 deletions(-)
^ permalink raw reply [relevance 7%]
Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2023-11-26 2:10 7% [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
2023-11-26 2:11 6% ` [PATCH 5/7] git: move rbuf handling to PublicInbox::IO Eric Wong
Code repositories for project(s) associated with this public inbox
https://80x24.org/public-inbox.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).