user/dev discussion of public-inbox itself
 help / color / mirror / code / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download mbox.gz: |
* [PATCH 4/4] lei: use async barrier for --import-before
  2024-04-16 20:56  7% [PATCH 0/4] lei parallelism fixes Eric Wong
@ 2024-04-16 20:56  6% ` Eric Wong
  0 siblings, 0 replies; 2+ results
From: Eric Wong @ 2024-04-16 20:56 UTC (permalink / raw)
  To: meta

Write barriers can take a long time to finish, especially when
commands are issues in parallel.  So handle it asynchronously
without blocking lei-daemon by making EOFpipe a little more
flexible by supporting arguments to the callback function.

This is another step towards improving parallel use of lei.
---
 lib/PublicInbox/EOFpipe.pm    |  7 ++++---
 lib/PublicInbox/LeiToMail.pm  | 29 ++++++++++++++++++++++-------
 lib/PublicInbox/LeiXSearch.pm | 13 +++++++++----
 3 files changed, 35 insertions(+), 14 deletions(-)

diff --git a/lib/PublicInbox/EOFpipe.pm b/lib/PublicInbox/EOFpipe.pm
index 3474874f..77b699a2 100644
--- a/lib/PublicInbox/EOFpipe.pm
+++ b/lib/PublicInbox/EOFpipe.pm
@@ -7,8 +7,8 @@ use parent qw(PublicInbox::DS);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT $F_SETPIPE_SZ);
 
 sub new {
-	my (undef, $rd, $cb) = @_;
-	my $self = bless { cb => $cb }, __PACKAGE__;
+	my (undef, $rd, @cb_args) = @_;
+	my $self = bless { cb_args => \@cb_args }, __PACKAGE__;
 	# 4096: page size
 	fcntl($rd, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
 	$self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
@@ -17,7 +17,8 @@ sub new {
 sub event_step {
 	my ($self) = @_;
 	if ($self->do_read(my $buf, 1) == 0) { # auto-closed
-		$self->{cb}->();
+		my ($cb, @args) = @{delete $self->{cb_args}};
+		$cb->(@args);
 	}
 }
 
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 593547f6..5481b5e4 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -14,7 +14,7 @@ use PublicInbox::Import;
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use PublicInbox::Syscall qw(rename_noreplace);
-use autodie qw(open seek close);
+use autodie qw(pipe open seek close);
 use Carp qw(croak);
 
 my %kw2char = ( # Maildir characters
@@ -605,7 +605,7 @@ sub _pre_augment_mbox {
 			$lei->{dedupe} && $lei->{dedupe}->can('reset_dedupe');
 	}
 	if ($self->{zsfx} = PublicInbox::MboxReader::zsfx($dst)) {
-		pipe(my ($r, $w)) or die "pipe: $!";
+		pipe(my $r, my $w);
 		$lei->{zpipe} = [ $r, $w ];
 		$lei->{ovv}->{lock_path} and
 			die 'BUG: unexpected {ovv}->{lock_path}';
@@ -719,17 +719,32 @@ sub do_augment { # slow, runs in wq worker
 	$m->($self, $lei);
 }
 
+sub post_augment_call ($$$$) {
+	my ($self, $lei, $m, $post_augment_done) = @_;
+	eval { $m->($self, $lei) };
+	$lei->{post_augment_err} = $@ if $@; # for post_augment_done
+}
+
 # fast (spawn compressor or mkdir), runs in same process as pre_augment
 sub post_augment {
-	my ($self, $lei, @args) = @_;
+	my ($self, $lei, $post_augment_done) = @_;
 	$self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ...");
 
-	# FIXME: this synchronous wait can be slow w/ parallel callers
-	my $wait = $lei->{opt}->{'import-before'} ?
-			$lei->{sto}->wq_do('barrier') : 0;
 	# _post_augment_mbox
 	my $m = $self->can("_post_augment_$self->{base_type}") or return;
-	$m->($self, $lei, @args);
+
+	# --import-before is only for lei-(q|lcat), not lei-convert
+	$lei->{opt}->{'import-before'} or
+		return post_augment_call $self, $lei, $m, $post_augment_done;
+
+	# we can't deal with post_augment until import-before commits:
+	require PublicInbox::EOFpipe;
+	my @io = @$lei{qw(2 sock)};
+	pipe(my $r, $io[2]);
+	PublicInbox::EOFpipe->new($r, \&post_augment_call,
+				$self, $lei, $m, $post_augment_done);
+	$lei->{sto}->wq_io_do('barrier', \@io);
+	# _post_augment_* && post_augment_done run when barrier is complete
 }
 
 # called by every single l2m worker process
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 5a5a1adc..43dedd10 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -22,6 +22,7 @@ use PublicInbox::ContentHash qw(git_sha);
 use POSIX qw(strftime);
 use autodie qw(close open read seek truncate);
 use PublicInbox::Syscall qw($F_SETPIPE_SZ);
+use PublicInbox::OnDestroy;
 
 sub new {
 	my ($class) = @_;
@@ -428,11 +429,9 @@ sub query_done { # EOF callback for main daemon
 	$lei->dclose;
 }
 
-sub do_post_augment {
+sub post_augment_done { # via on_destroy in top-level lei-daemon
 	my ($lei) = @_;
-	my $l2m = $lei->{l2m} or return; # client disconnected
-	eval { $l2m->post_augment($lei) };
-	my $err = $@;
+	my $err = delete $lei->{post_augment_err};
 	if ($err) {
 		if (my $lxs = delete $lei->{lxs}) {
 			$lxs->wq_kill(-POSIX::SIGTERM());
@@ -447,6 +446,12 @@ sub do_post_augment {
 	close(delete $lei->{au_done}); # trigger wait_startq if start_mua didn't
 }
 
+sub do_post_augment {
+	my ($lei) = @_;
+	my $l2m = $lei->{l2m} or return; # client disconnected
+	$l2m->post_augment($lei, on_destroy(\&post_augment_done, $lei));
+}
+
 sub incr_post_augment { # called whenever an l2m shard finishes augment
 	my ($lei) = @_;
 	my $l2m = $lei->{l2m} or return; # client disconnected

^ permalink raw reply related	[relevance 6%]

* [PATCH 0/4] lei parallelism fixes
@ 2024-04-16 20:56  7% Eric Wong
  2024-04-16 20:56  6% ` [PATCH 4/4] lei: use async barrier for --import-before Eric Wong
  0 siblings, 1 reply; 2+ results
From: Eric Wong @ 2024-04-16 20:56 UTC (permalink / raw)
  To: meta

This series allows `lei reindex' to run in parallel with other
lei commands which write to lei/store.

Eric Wong (4):
  v2 + lei/store: always wait for fast-import checkpoint
  lei: use ->barrier to commit to lei/store
  lei/store: stop shard workers + cat-file on idle
  lei: use async barrier for --import-before

 lib/PublicInbox/EOFpipe.pm            |  7 ++--
 lib/PublicInbox/ExtSearchIdx.pm       |  1 +
 lib/PublicInbox/LEI.pm                |  6 ++--
 lib/PublicInbox/LeiInput.pm           |  2 +-
 lib/PublicInbox/LeiRefreshMailSync.pm |  2 +-
 lib/PublicInbox/LeiRemote.pm          |  4 +--
 lib/PublicInbox/LeiStore.pm           | 46 ++++++++++++++++-----------
 lib/PublicInbox/LeiToMail.pm          | 28 ++++++++++++----
 lib/PublicInbox/LeiXSearch.pm         | 17 ++++++----
 lib/PublicInbox/V2Writable.pm         |  8 +----
 t/lei-store-fail.t                    |  2 +-
 11 files changed, 74 insertions(+), 49 deletions(-)

^ permalink raw reply	[relevance 7%]

Results 1-2 of 2 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2024-04-16 20:56  7% [PATCH 0/4] lei parallelism fixes Eric Wong
2024-04-16 20:56  6% ` [PATCH 4/4] lei: use async barrier for --import-before Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/public-inbox.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).