git@vger.kernel.org mailing list mirror (one of many)
 help / color / mirror / code / Atom feed
* [RFC PATCHv1 0/2] Parallel git submodule fetching
@ 2015-09-11 23:09 Stefan Beller
  2015-09-11 23:09 ` [PATCH 1/2] Sending "Fetching submodule <foo>" output to stderr Stefan Beller
  2015-09-11 23:09 ` [PATCH 2/2] fetch: fetch submodules in parallel Stefan Beller
  0 siblings, 2 replies; 13+ messages in thread
From: Stefan Beller @ 2015-09-11 23:09 UTC (permalink / raw
  To: git
  Cc: peff, gitster, jrnieder, johannes.schindelin, Jens.Lehmann,
	vlovich, Stefan Beller

This is a new approach to making git submodules faster without using threads.
We'll directly run commands in parallel without an intermediate thread pool.

patch 1 is fixing an error, which was hard to come by in a parallel world,
but is not much of a problem in a single-childed world.

The second patch works fine, (I'd love to hear feedback on the percieved
change of output) though it has still two edges I want to tackle:

* The fcntl call to make the pipes nonblocking is not checked for errors.
* We also need to communicate back if one process cannot spawn or such,
  i.e. the reporting for all subprocesses combined needs to added. I think
  about adding another callback which is called to finish up a child process.

Earlier attempts used a threading pool, which made the conversion very
easy as you don't need to touch lots of existing code as you would just
replace the run_command(..) call by an add_task(..). The threading pool
would abstract away the whole parallelism and its problems. It would also
have the backpressure to the main program, add_task would just not
return if there was no place left in the queue.

[1] http://www.spinics.net/lists/git/msg258435.html

Any feedback welcome!
Thanks,
Stefan

Jonathan Nieder (1):
  Sending "Fetching submodule <foo>" output to stderr

Stefan Beller (1):
  fetch: fetch submodules in parallel

 Documentation/fetch-options.txt |   7 ++
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 ++
 run-command.c                   | 144 ++++++++++++++++++++++++++++++++++++++++
 run-command.h                   |  29 ++++++++
 strbuf.c                        |  31 +++++++++
 strbuf.h                        |   1 +
 submodule.c                     |  99 +++++++++++++++++----------
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  16 +++++
 t/t5526-fetch-submodules.sh     |  70 ++++++++++++-------
 test-run-command.c              |  23 +++++++
 12 files changed, 373 insertions(+), 61 deletions(-)

-- 
2.6.0.rc0.131.gf624c3d

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH 1/2] Sending "Fetching submodule <foo>" output to stderr
  2015-09-11 23:09 [RFC PATCHv1 0/2] Parallel git submodule fetching Stefan Beller
@ 2015-09-11 23:09 ` Stefan Beller
  2015-09-11 23:09 ` [PATCH 2/2] fetch: fetch submodules in parallel Stefan Beller
  1 sibling, 0 replies; 13+ messages in thread
From: Stefan Beller @ 2015-09-11 23:09 UTC (permalink / raw
  To: git
  Cc: peff, gitster, jrnieder, johannes.schindelin, Jens.Lehmann,
	vlovich, Stefan Beller

From: Jonathan Nieder <jrnieder@gmail.com>

The "Pushing submodule <foo>" progress output correctly goes to
stderr, but "Fetching submodule <foo>" is going to stdout by mistake.
Fix it to write to stderr.

Noticed while trying to implement a parallel submodule fetch.  When
this particular output line went to a different file descriptor, it
was buffered separately, resulting in wrongly interleaved output if
we copied it to the terminal naively.

Signed-off-by: Jonathan Nieder <jrnieder@gmail.com>
Signed-off-by: Stefan Beller <sbeller@google.com>
---
 submodule.c                 |  2 +-
 t/t5526-fetch-submodules.sh | 51 +++++++++++++++++++++++----------------------
 2 files changed, 27 insertions(+), 26 deletions(-)

diff --git a/submodule.c b/submodule.c
index 9fcc86f..1d64e57 100644
--- a/submodule.c
+++ b/submodule.c
@@ -694,7 +694,7 @@ int fetch_populated_submodules(const struct argv_array *options,
 			git_dir = submodule_git_dir.buf;
 		if (is_directory(git_dir)) {
 			if (!quiet)
-				printf("Fetching submodule %s%s\n", prefix, ce->name);
+				fprintf(stderr, "Fetching submodule %s%s\n", prefix, ce->name);
 			cp.dir = submodule_path.buf;
 			argv_array_push(&argv, default_argv);
 			argv_array_push(&argv, "--submodule-prefix");
diff --git a/t/t5526-fetch-submodules.sh b/t/t5526-fetch-submodules.sh
index a4532b0..17759b1 100755
--- a/t/t5526-fetch-submodules.sh
+++ b/t/t5526-fetch-submodules.sh
@@ -16,7 +16,8 @@ add_upstream_commit() {
 		git add subfile &&
 		git commit -m new subfile &&
 		head2=$(git rev-parse --short HEAD) &&
-		echo "From $pwd/submodule" > ../expect.err &&
+		echo "Fetching submodule submodule" > ../expect.err &&
+		echo "From $pwd/submodule" >> ../expect.err &&
 		echo "   $head1..$head2  master     -> origin/master" >> ../expect.err
 	) &&
 	(
@@ -27,6 +28,7 @@ add_upstream_commit() {
 		git add deepsubfile &&
 		git commit -m new deepsubfile &&
 		head2=$(git rev-parse --short HEAD) &&
+		echo "Fetching submodule submodule/subdir/deepsubmodule" >> ../expect.err
 		echo "From $pwd/deepsubmodule" >> ../expect.err &&
 		echo "   $head1..$head2  master     -> origin/master" >> ../expect.err
 	)
@@ -56,9 +58,7 @@ test_expect_success setup '
 	(
 		cd downstream &&
 		git submodule update --init --recursive
-	) &&
-	echo "Fetching submodule submodule" > expect.out &&
-	echo "Fetching submodule submodule/subdir/deepsubmodule" >> expect.out
+	)
 '
 
 test_expect_success "fetch --recurse-submodules recurses into submodules" '
@@ -67,7 +67,7 @@ test_expect_success "fetch --recurse-submodules recurses into submodules" '
 		cd downstream &&
 		git fetch --recurse-submodules >../actual.out 2>../actual.err
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -96,7 +96,7 @@ test_expect_success "using fetchRecurseSubmodules=true in .gitmodules recurses i
 		git config -f .gitmodules submodule.submodule.fetchRecurseSubmodules true &&
 		git fetch >../actual.out 2>../actual.err
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -127,7 +127,7 @@ test_expect_success "--recurse-submodules overrides fetchRecurseSubmodules setti
 		git config --unset -f .gitmodules submodule.submodule.fetchRecurseSubmodules &&
 		git config --unset submodule.submodule.fetchRecurseSubmodules
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -146,7 +146,7 @@ test_expect_success "--dry-run propagates to submodules" '
 		cd downstream &&
 		git fetch --recurse-submodules --dry-run >../actual.out 2>../actual.err
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -155,7 +155,7 @@ test_expect_success "Without --dry-run propagates to submodules" '
 		cd downstream &&
 		git fetch --recurse-submodules >../actual.out 2>../actual.err
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -166,7 +166,7 @@ test_expect_success "recurseSubmodules=true propagates into submodules" '
 		git config fetch.recurseSubmodules true
 		git fetch >../actual.out 2>../actual.err
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -180,7 +180,7 @@ test_expect_success "--recurse-submodules overrides config in submodule" '
 		) &&
 		git fetch --recurse-submodules >../actual.out 2>../actual.err
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -214,16 +214,15 @@ test_expect_success "Recursion stops when no new submodule commits are fetched"
 	git add submodule &&
 	git commit -m "new submodule" &&
 	head2=$(git rev-parse --short HEAD) &&
-	echo "Fetching submodule submodule" > expect.out.sub &&
 	echo "From $pwd/." > expect.err.sub &&
 	echo "   $head1..$head2  master     -> origin/master" >>expect.err.sub &&
-	head -2 expect.err >> expect.err.sub &&
+	head -3 expect.err >> expect.err.sub &&
 	(
 		cd downstream &&
 		git fetch >../actual.out 2>../actual.err
 	) &&
 	test_i18ncmp expect.err.sub actual.err &&
-	test_i18ncmp expect.out.sub actual.out
+	test_must_be_empty actual.out
 '
 
 test_expect_success "Recursion doesn't happen when new superproject commits don't change any submodules" '
@@ -269,7 +268,7 @@ test_expect_success "Recursion picks up config in submodule" '
 		)
 	) &&
 	test_i18ncmp expect.err.sub actual.err &&
-	test_i18ncmp expect.out actual.out
+	test_must_be_empty actual.out
 '
 
 test_expect_success "Recursion picks up all submodules when necessary" '
@@ -285,7 +284,8 @@ test_expect_success "Recursion picks up all submodules when necessary" '
 		git add subdir/deepsubmodule &&
 		git commit -m "new deepsubmodule"
 		head2=$(git rev-parse --short HEAD) &&
-		echo "From $pwd/submodule" > ../expect.err.sub &&
+		echo "Fetching submodule submodule" > ../expect.err.sub &&
+		echo "From $pwd/submodule" >> ../expect.err.sub &&
 		echo "   $head1..$head2  master     -> origin/master" >> ../expect.err.sub
 	) &&
 	head1=$(git rev-parse --short HEAD) &&
@@ -295,13 +295,13 @@ test_expect_success "Recursion picks up all submodules when necessary" '
 	echo "From $pwd/." > expect.err.2 &&
 	echo "   $head1..$head2  master     -> origin/master" >> expect.err.2 &&
 	cat expect.err.sub >> expect.err.2 &&
-	tail -2 expect.err >> expect.err.2 &&
+	tail -3 expect.err >> expect.err.2 &&
 	(
 		cd downstream &&
 		git fetch >../actual.out 2>../actual.err
 	) &&
 	test_i18ncmp expect.err.2 actual.err &&
-	test_i18ncmp expect.out actual.out
+	test_must_be_empty actual.out
 '
 
 test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no new commits are fetched in the superproject (and ignores config)" '
@@ -317,7 +317,8 @@ test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no ne
 		git add subdir/deepsubmodule &&
 		git commit -m "new deepsubmodule" &&
 		head2=$(git rev-parse --short HEAD) &&
-		echo "From $pwd/submodule" > ../expect.err.sub &&
+		echo Fetching submodule submodule > ../expect.err.sub &&
+		echo "From $pwd/submodule" >> ../expect.err.sub &&
 		echo "   $head1..$head2  master     -> origin/master" >> ../expect.err.sub
 	) &&
 	(
@@ -335,7 +336,7 @@ test_expect_success "'--recurse-submodules=on-demand' recurses as deep as necess
 	git add submodule &&
 	git commit -m "new submodule" &&
 	head2=$(git rev-parse --short HEAD) &&
-	tail -2 expect.err > expect.err.deepsub &&
+	tail -3 expect.err > expect.err.deepsub &&
 	echo "From $pwd/." > expect.err &&
 	echo "   $head1..$head2  master     -> origin/master" >>expect.err &&
 	cat expect.err.sub >> expect.err &&
@@ -354,7 +355,7 @@ test_expect_success "'--recurse-submodules=on-demand' recurses as deep as necess
 			git config --unset -f .gitmodules submodule.subdir/deepsubmodule.fetchRecursive
 		)
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -388,7 +389,7 @@ test_expect_success "'fetch.recurseSubmodules=on-demand' overrides global config
 	head2=$(git rev-parse --short HEAD) &&
 	echo "From $pwd/." > expect.err.2 &&
 	echo "   $head1..$head2  master     -> origin/master" >>expect.err.2 &&
-	head -2 expect.err >> expect.err.2 &&
+	head -3 expect.err >> expect.err.2 &&
 	(
 		cd downstream &&
 		git config fetch.recurseSubmodules on-demand &&
@@ -399,7 +400,7 @@ test_expect_success "'fetch.recurseSubmodules=on-demand' overrides global config
 		cd downstream &&
 		git config --unset fetch.recurseSubmodules
 	) &&
-	test_i18ncmp expect.out.sub actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err.2 actual.err
 '
 
@@ -416,7 +417,7 @@ test_expect_success "'submodule.<sub>.fetchRecurseSubmodules=on-demand' override
 	head2=$(git rev-parse --short HEAD) &&
 	echo "From $pwd/." > expect.err.2 &&
 	echo "   $head1..$head2  master     -> origin/master" >>expect.err.2 &&
-	head -2 expect.err >> expect.err.2 &&
+	head -3 expect.err >> expect.err.2 &&
 	(
 		cd downstream &&
 		git config submodule.submodule.fetchRecurseSubmodules on-demand &&
@@ -427,7 +428,7 @@ test_expect_success "'submodule.<sub>.fetchRecurseSubmodules=on-demand' override
 		cd downstream &&
 		git config --unset submodule.submodule.fetchRecurseSubmodules
 	) &&
-	test_i18ncmp expect.out.sub actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err.2 actual.err
 '
 
-- 
2.6.0.rc0.131.gf624c3d

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 2/2] fetch: fetch submodules in parallel
  2015-09-11 23:09 [RFC PATCHv1 0/2] Parallel git submodule fetching Stefan Beller
  2015-09-11 23:09 ` [PATCH 1/2] Sending "Fetching submodule <foo>" output to stderr Stefan Beller
@ 2015-09-11 23:09 ` Stefan Beller
  2015-09-12 19:11   ` Junio C Hamano
  1 sibling, 1 reply; 13+ messages in thread
From: Stefan Beller @ 2015-09-11 23:09 UTC (permalink / raw
  To: git
  Cc: peff, gitster, jrnieder, johannes.schindelin, Jens.Lehmann,
	vlovich, Stefan Beller

If we run external commands in parallel we cannot pipe the output directly
to the our stdout/err as it would mix up. So each process's output will
flow through a pipe, which we buffer. One subprocess can be directly
piped to out stdout/err for a low latency feedback to the user.

Example:
Let's assume we have 5 submodules A,B,C,D,E and each fetch takes a
different amount of time as the different submodules vary in size, then
the output of fetches in sequential order might look like this:

 time -->
 output: |---A---|   |-B-|   |----C-----------|   |-D-|   |-E-|

When we schedule these submodules into maximal two parallel processes,
a schedule and sample output over time may look like this:

thread 1: |---A---|   |-D-|   |-E-|

thread 2: |-B-|   |----C-----------|

output:   |---A---|B|------C-------|DE

So A will be perceived as it would run normally in the single child
version. As B has finished by the time A is done, we can dump its whole
progress buffer on stderr, such that it looks like it finished in no time.
Once that is done, C is determined to be the visible child and its progress
will be reported in real time.

So this way of output is really good for human consumption,
as it only changes the timing, not the actual output.

For machine consumption the output needs to be prepared in
the tasks, by either having a prefix per line or per block
to indicate whose tasks output is displayed.

Signed-off-by: Stefan Beller <sbeller@google.com>
---
 Documentation/fetch-options.txt |   7 ++
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 ++
 run-command.c                   | 144 ++++++++++++++++++++++++++++++++++++++++
 run-command.h                   |  29 ++++++++
 strbuf.c                        |  31 +++++++++
 strbuf.h                        |   1 +
 submodule.c                     |  99 +++++++++++++++++----------
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  16 +++++
 t/t5526-fetch-submodules.sh     |  19 ++++++
 test-run-command.c              |  23 +++++++
 12 files changed, 347 insertions(+), 36 deletions(-)

diff --git a/Documentation/fetch-options.txt b/Documentation/fetch-options.txt
index 45583d8..e2a59c3 100644
--- a/Documentation/fetch-options.txt
+++ b/Documentation/fetch-options.txt
@@ -100,6 +100,13 @@ ifndef::git-pull[]
 	reference to a commit that isn't already in the local submodule
 	clone.
 
+-j::
+--jobs=<n>::
+	Number of parallel children to be used for fetching submodules.
+	Each will fetch from different submodules, such that fetching many
+	submodules will be faster. By default submodules will be fetched
+	one at a time
+
 --no-recurse-submodules::
 	Disable recursive fetching of submodules (this has the same effect as
 	using the '--recurse-submodules=no' option).
diff --git a/builtin/fetch.c b/builtin/fetch.c
index ee1f1a9..09ff837 100644
--- a/builtin/fetch.c
+++ b/builtin/fetch.c
@@ -37,6 +37,7 @@ static int prune = -1; /* unspecified */
 static int all, append, dry_run, force, keep, multiple, update_head_ok, verbosity;
 static int progress = -1, recurse_submodules = RECURSE_SUBMODULES_DEFAULT;
 static int tags = TAGS_DEFAULT, unshallow, update_shallow;
+static int max_children = 1;
 static const char *depth;
 static const char *upload_pack;
 static struct strbuf default_rla = STRBUF_INIT;
@@ -99,6 +100,8 @@ static struct option builtin_fetch_options[] = {
 		    N_("fetch all tags and associated objects"), TAGS_SET),
 	OPT_SET_INT('n', NULL, &tags,
 		    N_("do not fetch all tags (--no-tags)"), TAGS_UNSET),
+	OPT_INTEGER('j', "jobs", &max_children,
+		    N_("number of threads used for fetching")),
 	OPT_BOOL('p', "prune", &prune,
 		 N_("prune remote-tracking branches no longer on remote")),
 	{ OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"),
@@ -1217,7 +1220,8 @@ int cmd_fetch(int argc, const char **argv, const char *prefix)
 		result = fetch_populated_submodules(&options,
 						    submodule_prefix,
 						    recurse_submodules,
-						    verbosity < 0);
+						    verbosity < 0,
+						    max_children);
 		argv_array_clear(&options);
 	}
 
diff --git a/builtin/pull.c b/builtin/pull.c
index 722a83c..fbbda67 100644
--- a/builtin/pull.c
+++ b/builtin/pull.c
@@ -94,6 +94,7 @@ static int opt_force;
 static char *opt_tags;
 static char *opt_prune;
 static char *opt_recurse_submodules;
+static char *max_children;
 static int opt_dry_run;
 static char *opt_keep;
 static char *opt_depth;
@@ -177,6 +178,9 @@ static struct option pull_options[] = {
 		N_("on-demand"),
 		N_("control recursive fetching of submodules"),
 		PARSE_OPT_OPTARG),
+	OPT_PASSTHRU('j', "jobs", &max_children, N_("n"),
+		N_("number of threads used for fetching submodules"),
+		PARSE_OPT_OPTARG),
 	OPT_BOOL(0, "dry-run", &opt_dry_run,
 		N_("dry run")),
 	OPT_PASSTHRU('k', "keep", &opt_keep, NULL,
@@ -524,6 +528,8 @@ static int run_fetch(const char *repo, const char **refspecs)
 		argv_array_push(&args, opt_prune);
 	if (opt_recurse_submodules)
 		argv_array_push(&args, opt_recurse_submodules);
+	if (max_children)
+		argv_array_push(&args, max_children);
 	if (opt_dry_run)
 		argv_array_push(&args, "--dry-run");
 	if (opt_keep)
diff --git a/run-command.c b/run-command.c
index 28e1d55..b8ff67b 100644
--- a/run-command.c
+++ b/run-command.c
@@ -852,3 +852,147 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
 	close(cmd->out);
 	return finish_command(cmd);
 }
+
+int run_processes_async(int n, get_next_task fn, void *data)
+{
+	int i, wait_status;
+	pid_t pid;
+
+	/* no more tasks. Also set when aborting early. */
+	int all_tasks_started = 0;
+	int nr_processes = 0;
+	int child_in_foreground = 0;
+	struct timeval timeout;
+	struct child_process *children = xcalloc(n, sizeof(*children));
+	char *slots = xcalloc(n, sizeof(*slots));
+	struct strbuf *err = xcalloc(n, sizeof(*err));
+	fd_set fdset;
+	int maxfd;
+	struct strbuf finished_children = STRBUF_INIT;
+	int flags;
+	for (i = 0; i < n; i++)
+		strbuf_init(&err[i], 0);
+
+	while (!all_tasks_started || nr_processes > 0) {
+		/* Start new processes. */
+		while (!all_tasks_started && nr_processes < n) {
+			for (i = 0; i < n; i++)
+				if (!slots[i])
+					break; /* found an empty slot */
+			if (i == n)
+				die("BUG: bookkeeping is hard");
+
+			if (fn(data, &children[i], &err[i])) {
+				all_tasks_started = 1;
+				break;
+			}
+			if (start_command(&children[i]))
+				die(_("Could not start child process"));
+			flags = fcntl(children[i].err, F_GETFL);
+			fcntl(children[i].err, F_SETFL, flags | O_NONBLOCK);
+			nr_processes++;
+			slots[i] = 1;
+		}
+
+		/* prepare data for select call */
+		FD_ZERO(&fdset);
+		maxfd = 0;
+		for (i = 0; i < n; i++) {
+			if (!slots[i])
+				continue;
+			FD_SET(children[i].err, &fdset);
+			if (children[i].err > maxfd)
+				maxfd = children[i].err;
+		}
+		timeout.tv_sec = 0;
+		timeout.tv_usec = 500000;
+
+		i = select(maxfd + 1, &fdset, NULL, NULL, &timeout);
+		if (i < 0) {
+			if (errno == EINTR)
+				/* A signal was caught; try again */
+				continue;
+			else if (errno == ENOMEM)
+				die_errno("BUG: keeping track of fds is hard");
+			else if (errno == EINVAL)
+				die_errno("BUG: invalid arguments to select");
+			else if (errno == EBADF)
+				die_errno("BUG: keeping track of fds is hard");
+			else
+				die_errno("Unknown error with select");
+		}
+
+		/* Buffer output from all pipes. */
+		for (i = 0; i < n; i++) {
+			if (!slots[i])
+				continue;
+			if (FD_ISSET(children[i].err, &fdset))
+				strbuf_read_noblock(&err[i], children[i].err, 0);
+			if (child_in_foreground == i) {
+				fputs(err[i].buf, stderr);
+				strbuf_reset(&err[i]);
+				fflush(stderr);
+			}
+		}
+
+		/* Collect finished child processes. */
+		while (nr_processes > 0) {
+			pid = waitpid(-1, &wait_status, WNOHANG);
+			if (pid == 0)
+				/* no child finished */
+				break;
+
+			if (pid < 0) {
+				if (errno == EINTR)
+					break; /* just try again  next time */
+				if (errno == EINVAL || errno == ECHILD)
+					die_errno("wait");
+			} else {
+				/* Find the finished child. */
+				for (i = 0; i < n; i++)
+					if (slots[i] && pid == children[i].pid)
+						break;
+				if (i == n)
+					/* waitpid returned another process id which
+					 * we are not waiting on, so ignore it*/
+					break;
+			}
+
+			strbuf_read_noblock(&err[i], children[i].err, 0);
+			argv_array_clear(&children[i].args);
+			argv_array_clear(&children[i].env_array);
+
+			slots[i] = 0;
+			nr_processes--;
+
+			if (i != child_in_foreground) {
+				strbuf_addbuf(&finished_children, &err[i]);
+				strbuf_reset(&err[i]);
+			} else {
+				fputs(err[i].buf, stderr);
+				strbuf_reset(&err[i]);
+
+				/* Output all other finished child processes */
+				fputs(finished_children.buf, stderr);
+				strbuf_reset(&finished_children);
+
+				/*
+				 * Pick next process to output live.
+				 * There can be no active process if n==1
+				 * NEEDSWORK:
+				 * For now we pick it randomly by doing a round
+				 * robin. Later we may want to pick the one with
+				 * the most output or the longest or shortest
+				 * running process time.
+				 */
+				for (i = 0; i < n; i++)
+					if (slots[(child_in_foreground + i) % n])
+						break;
+				child_in_foreground = (child_in_foreground + i) % n;
+				fputs(err[child_in_foreground].buf, stderr);
+				strbuf_reset(&err[child_in_foreground]);
+			}
+		}
+	}
+	return 0;
+}
diff --git a/run-command.h b/run-command.h
index 5b4425a..8f53ad6 100644
--- a/run-command.h
+++ b/run-command.h
@@ -119,4 +119,33 @@ struct async {
 int start_async(struct async *async);
 int finish_async(struct async *async);
 
+/**
+ * Return 0 if the next child is ready to run.
+ * This callback takes care to initialize the child process and preload the
+ * out and error channel. The preloading of these outpout channels is useful
+ * if you want to have a message printed directly before the output of the
+ * child process.
+ *
+ * Return != 0 if there are no more tasks to be processed.
+ */
+typedef int (*get_next_task)(void *data,
+			     struct child_process *cp,
+			     struct strbuf *err);
+
+/**
+ * Runs up to n processes at the same time. Whenever a process can
+ * be started, the callback `get_next_task` is called to obtain the
+ * data fed to the child process.
+ *
+ * The children started via this function run in parallel and their output
+ * to both stdout and stderr is buffered, while one of the children will
+ * directly output to stdout/stderr.
+ *
+ * This leads to a problem with output from processes which put out to
+ * stdout/err alternatingly as the buffering will not be able to replay
+ * the
+ */
+
+int run_processes_async(int n, get_next_task fn, void *data);
+
 #endif
diff --git a/strbuf.c b/strbuf.c
index cce5eed..7f866c3 100644
--- a/strbuf.c
+++ b/strbuf.c
@@ -384,6 +384,37 @@ ssize_t strbuf_read(struct strbuf *sb, int fd, size_t hint)
 	return sb->len - oldlen;
 }
 
+ssize_t strbuf_read_noblock(struct strbuf *sb, int fd, size_t hint)
+{
+	size_t oldlen = sb->len;
+	size_t oldalloc = sb->alloc;
+
+	strbuf_grow(sb, hint ? hint : 8192);
+	for (;;) {
+		ssize_t cnt;
+
+		cnt = read(fd, sb->buf + sb->len, sb->alloc - sb->len - 1);
+		if (cnt < 0) {
+			if (errno == EINTR)
+				continue;
+			if (errno == EAGAIN)
+				break;
+			if (oldalloc == 0)
+				strbuf_release(sb);
+			else
+				strbuf_setlen(sb, oldlen);
+			return -1;
+		}
+		if (!cnt)
+			break;
+		sb->len += cnt;
+		strbuf_grow(sb, 8192);
+	}
+
+	sb->buf[sb->len] = '\0';
+	return sb->len - oldlen;
+}
+
 #define STRBUF_MAXLINK (2*PATH_MAX)
 
 int strbuf_readlink(struct strbuf *sb, const char *path, size_t hint)
diff --git a/strbuf.h b/strbuf.h
index aef2794..7ea462b 100644
--- a/strbuf.h
+++ b/strbuf.h
@@ -365,6 +365,7 @@ extern size_t strbuf_fread(struct strbuf *, size_t, FILE *);
  * any partial read is undone.
  */
 extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint);
+extern ssize_t strbuf_read_noblock(struct strbuf *, int fd, size_t hint);
 
 /**
  * Read the contents of a file, specified by its path. The third argument
diff --git a/submodule.c b/submodule.c
index 1d64e57..6d757c6 100644
--- a/submodule.c
+++ b/submodule.c
@@ -12,6 +12,7 @@
 #include "sha1-array.h"
 #include "argv-array.h"
 #include "blob.h"
+#include "thread-utils.h"
 
 static int config_fetch_recurse_submodules = RECURSE_SUBMODULES_ON_DEMAND;
 static struct string_list changed_submodule_paths;
@@ -615,37 +616,61 @@ static void calculate_changed_submodule_paths(void)
 	initialized_fetch_ref_tips = 0;
 }
 
+struct submodule_parallel_fetch {
+	int count;
+	struct argv_array args;
+	const char *work_tree;
+	const char *prefix;
+	int command_line_option;
+	int quiet;
+};
+#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL}
+
+int get_next_submodule(void *data, struct child_process *cp,
+		       struct strbuf *err);
+
 int fetch_populated_submodules(const struct argv_array *options,
 			       const char *prefix, int command_line_option,
-			       int quiet)
+			       int quiet, int max_parallel_jobs)
 {
 	int i, result = 0;
-	struct child_process cp = CHILD_PROCESS_INIT;
-	struct argv_array argv = ARGV_ARRAY_INIT;
-	const char *work_tree = get_git_work_tree();
-	if (!work_tree)
+	struct submodule_parallel_fetch spf = SPF_INIT;
+	spf.work_tree = get_git_work_tree();
+	spf.command_line_option = command_line_option;
+	spf.quiet = quiet;
+	spf.prefix = prefix;
+	if (!spf.work_tree)
 		goto out;
 
 	if (read_cache() < 0)
 		die("index file corrupt");
 
-	argv_array_push(&argv, "fetch");
+	argv_array_push(&spf.args, "fetch");
 	for (i = 0; i < options->argc; i++)
-		argv_array_push(&argv, options->argv[i]);
-	argv_array_push(&argv, "--recurse-submodules-default");
+		argv_array_push(&spf.args, options->argv[i]);
+	argv_array_push(&spf.args, "--recurse-submodules-default");
 	/* default value, "--submodule-prefix" and its value are added later */
 
-	cp.env = local_repo_env;
-	cp.git_cmd = 1;
-	cp.no_stdin = 1;
-
 	calculate_changed_submodule_paths();
+	run_processes_async(max_parallel_jobs, get_next_submodule, &spf);
+
+	argv_array_clear(&spf.args);
+out:
+	string_list_clear(&changed_submodule_paths, 1);
+	return result;
+}
+
+int get_next_submodule(void *data, struct child_process *cp,
+		       struct strbuf *err)
+{
+	int ret = 0;
+	struct submodule_parallel_fetch *spf = data;
 
-	for (i = 0; i < active_nr; i++) {
+	for ( ; spf->count < active_nr; spf->count++) {
 		struct strbuf submodule_path = STRBUF_INIT;
 		struct strbuf submodule_git_dir = STRBUF_INIT;
 		struct strbuf submodule_prefix = STRBUF_INIT;
-		const struct cache_entry *ce = active_cache[i];
+		const struct cache_entry *ce = active_cache[spf->count];
 		const char *git_dir, *default_argv;
 		const struct submodule *submodule;
 
@@ -657,7 +682,7 @@ int fetch_populated_submodules(const struct argv_array *options,
 			submodule = submodule_from_name(null_sha1, ce->name);
 
 		default_argv = "yes";
-		if (command_line_option == RECURSE_SUBMODULES_DEFAULT) {
+		if (spf->command_line_option == RECURSE_SUBMODULES_DEFAULT) {
 			if (submodule &&
 			    submodule->fetch_recurse !=
 						RECURSE_SUBMODULES_NONE) {
@@ -680,40 +705,46 @@ int fetch_populated_submodules(const struct argv_array *options,
 					default_argv = "on-demand";
 				}
 			}
-		} else if (command_line_option == RECURSE_SUBMODULES_ON_DEMAND) {
+		} else if (spf->command_line_option == RECURSE_SUBMODULES_ON_DEMAND) {
 			if (!unsorted_string_list_lookup(&changed_submodule_paths, ce->name))
 				continue;
 			default_argv = "on-demand";
 		}
 
-		strbuf_addf(&submodule_path, "%s/%s", work_tree, ce->name);
+		strbuf_addf(&submodule_path, "%s/%s", spf->work_tree, ce->name);
 		strbuf_addf(&submodule_git_dir, "%s/.git", submodule_path.buf);
-		strbuf_addf(&submodule_prefix, "%s%s/", prefix, ce->name);
+		strbuf_addf(&submodule_prefix, "%s%s/", spf->prefix, ce->name);
 		git_dir = read_gitfile(submodule_git_dir.buf);
 		if (!git_dir)
 			git_dir = submodule_git_dir.buf;
 		if (is_directory(git_dir)) {
-			if (!quiet)
-				fprintf(stderr, "Fetching submodule %s%s\n", prefix, ce->name);
-			cp.dir = submodule_path.buf;
-			argv_array_push(&argv, default_argv);
-			argv_array_push(&argv, "--submodule-prefix");
-			argv_array_push(&argv, submodule_prefix.buf);
-			cp.argv = argv.argv;
-			if (run_command(&cp))
-				result = 1;
-			argv_array_pop(&argv);
-			argv_array_pop(&argv);
-			argv_array_pop(&argv);
+			child_process_init(cp);
+			cp->dir = strbuf_detach(&submodule_path, NULL);
+			cp->git_cmd = 1;
+			cp->no_stdout = 1;
+			cp->no_stdin = 1;
+			cp->stdout_to_stderr = 1;
+			cp->err = -1;
+			cp->env = local_repo_env;
+			if (!spf->quiet)
+				strbuf_addf(err, "Fetching submodule %s%s\n",
+					    spf->prefix, ce->name);
+			argv_array_init(&cp->args);
+			argv_array_pushv(&cp->args, spf->args.argv);
+			argv_array_push(&cp->args, default_argv);
+			argv_array_push(&cp->args, "--submodule-prefix");
+			argv_array_push(&cp->args, submodule_prefix.buf);
+			ret = 1;
 		}
 		strbuf_release(&submodule_path);
 		strbuf_release(&submodule_git_dir);
 		strbuf_release(&submodule_prefix);
+		if (ret) {
+			spf->count++;
+			return 0;
+		}
 	}
-	argv_array_clear(&argv);
-out:
-	string_list_clear(&changed_submodule_paths, 1);
-	return result;
+	return 1;
 }
 
 unsigned is_submodule_modified(const char *path, int ignore_untracked)
diff --git a/submodule.h b/submodule.h
index 5507c3d..cbc0003 100644
--- a/submodule.h
+++ b/submodule.h
@@ -31,7 +31,7 @@ void set_config_fetch_recurse_submodules(int value);
 void check_for_new_submodule_commits(unsigned char new_sha1[20]);
 int fetch_populated_submodules(const struct argv_array *options,
 			       const char *prefix, int command_line_option,
-			       int quiet);
+			       int quiet, int max_parallel_jobs);
 unsigned is_submodule_modified(const char *path, int ignore_untracked);
 int submodule_uses_gitfile(const char *path);
 int ok_to_remove_submodule(const char *path);
diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh
index 9acf628..0970fb0 100755
--- a/t/t0061-run-command.sh
+++ b/t/t0061-run-command.sh
@@ -47,4 +47,20 @@ test_expect_success POSIXPERM,SANITY 'unreadable directory in PATH' '
 	test_cmp expect actual
 '
 
+cat >expect <<-EOF
+Now running instance 0
+Hello World
+Now running instance 1
+Hello World
+Now running instance 2
+Hello World
+Now running instance 3
+Hello World
+EOF
+
+test_expect_success 'run_command runs in parallel' '
+	test-run-command run-command-async sh -c "echo Hello World >&2;" 2>actual &&
+	test_cmp expect actual
+'
+
 test_done
diff --git a/t/t5526-fetch-submodules.sh b/t/t5526-fetch-submodules.sh
index 17759b1..1b4ce69 100755
--- a/t/t5526-fetch-submodules.sh
+++ b/t/t5526-fetch-submodules.sh
@@ -71,6 +71,16 @@ test_expect_success "fetch --recurse-submodules recurses into submodules" '
 	test_i18ncmp expect.err actual.err
 '
 
+test_expect_success "fetch --recurse-submodules -j2 has the same output behaviour" '
+	add_upstream_commit &&
+	(
+		cd downstream &&
+		git fetch --recurse-submodules -j2 2>../actual.err
+	) &&
+	test_must_be_empty actual.out &&
+	test_i18ncmp expect.err actual.err
+'
+
 test_expect_success "fetch alone only fetches superproject" '
 	add_upstream_commit &&
 	(
@@ -140,6 +150,15 @@ test_expect_success "--quiet propagates to submodules" '
 	! test -s actual.err
 '
 
+test_expect_success "--quiet propagates to parallel submodules" '
+	(
+		cd downstream &&
+		git fetch --recurse-submodules -j 2 --quiet  >../actual.out 2>../actual.err
+	) &&
+	! test -s actual.out &&
+	! test -s actual.err
+'
+
 test_expect_success "--dry-run propagates to submodules" '
 	add_upstream_commit &&
 	(
diff --git a/test-run-command.c b/test-run-command.c
index 89c7de2..4817f6e 100644
--- a/test-run-command.c
+++ b/test-run-command.c
@@ -10,9 +10,29 @@
 
 #include "git-compat-util.h"
 #include "run-command.h"
+#include "argv-array.h"
+#include "strbuf.h"
 #include <string.h>
 #include <errno.h>
 
+static int number_callbacks;
+int run_processes_async_next(void *data,
+			     struct child_process *cp,
+			     struct strbuf *err)
+{
+	struct child_process *d = data;
+	if (number_callbacks >= 4)
+		return 1;
+
+	argv_array_pushv(&cp->args, d->argv);
+	cp->stdout_to_stderr = 1;
+	cp->no_stdin = 1;
+	cp->err = -1;
+	strbuf_addf(err, "Now running instance %d\n", number_callbacks);
+	number_callbacks++;
+	return 0;
+}
+
 int main(int argc, char **argv)
 {
 	struct child_process proc = CHILD_PROCESS_INIT;
@@ -30,6 +50,9 @@ int main(int argc, char **argv)
 	if (!strcmp(argv[1], "run-command"))
 		exit(run_command(&proc));
 
+	if (!strcmp(argv[1], "run-command-async"))
+		exit(run_processes_async(4, run_processes_async_next, &proc));
+
 	fprintf(stderr, "check usage\n");
 	return 1;
 }
-- 
2.6.0.rc0.131.gf624c3d

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [PATCH 2/2] fetch: fetch submodules in parallel
  2015-09-11 23:09 ` [PATCH 2/2] fetch: fetch submodules in parallel Stefan Beller
@ 2015-09-12 19:11   ` Junio C Hamano
  2015-09-14 16:46     ` Stefan Beller
  0 siblings, 1 reply; 13+ messages in thread
From: Junio C Hamano @ 2015-09-12 19:11 UTC (permalink / raw
  To: Stefan Beller
  Cc: git, peff, jrnieder, johannes.schindelin, Jens.Lehmann, vlovich

Stefan Beller <sbeller@google.com> writes:

> diff --git a/run-command.c b/run-command.c
> index 28e1d55..b8ff67b 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -852,3 +852,147 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
>  	close(cmd->out);
>  	return finish_command(cmd);
>  }
> +
> +int run_processes_async(int n, get_next_task fn, void *data)
> +{
> +	int i, wait_status;
> +	pid_t pid;
> +
> +	/* no more tasks. Also set when aborting early. */
> +	int all_tasks_started = 0;
> +	int nr_processes = 0;
> +	int child_in_foreground = 0;
> +	struct timeval timeout;
> +	struct child_process *children = xcalloc(n, sizeof(*children));
> +	char *slots = xcalloc(n, sizeof(*slots));
> +	struct strbuf *err = xcalloc(n, sizeof(*err));
> +	fd_set fdset;
> +	int maxfd;
> +	struct strbuf finished_children = STRBUF_INIT;
> +	int flags;
> +	for (i = 0; i < n; i++)
> +		strbuf_init(&err[i], 0);
> +
> +	while (!all_tasks_started || nr_processes > 0) {
> +		/* Start new processes. */
> +		while (!all_tasks_started && nr_processes < n) {
> +			for (i = 0; i < n; i++)
> +				if (!slots[i])
> +					break; /* found an empty slot */
> +			if (i == n)
> +				die("BUG: bookkeeping is hard");
> +
> +			if (fn(data, &children[i], &err[i])) {
> +				all_tasks_started = 1;
> +				break;
> +			}
> +			if (start_command(&children[i]))
> +				die(_("Could not start child process"));
> +			flags = fcntl(children[i].err, F_GETFL);
> +			fcntl(children[i].err, F_SETFL, flags | O_NONBLOCK);

This function in run-command.c looks as if it is a generic helper to
be called by anybody, but it seems to only care about the standard
error and not the standard output stream, which means potential
users that do not dup them together cannot use it.  Is that a big
downside, or is it sufficient to document the API to say that
children must do so?  I offhand do not think the latter is
unreasonable, but that may be only because I haven't thought things
through.

> +			nr_processes++;
> +			slots[i] = 1;
> +		}
> +
> +		/* prepare data for select call */
> +		FD_ZERO(&fdset);
> +		maxfd = 0;
> +		for (i = 0; i < n; i++) {
> +			if (!slots[i])
> +				continue;
> +			FD_SET(children[i].err, &fdset);
> +			if (children[i].err > maxfd)
> +				maxfd = children[i].err;
> +		}
> +		timeout.tv_sec = 0;
> +		timeout.tv_usec = 500000;
> +
> +		i = select(maxfd + 1, &fdset, NULL, NULL, &timeout);

I thought we try to use poll() and on systems with only select we
allow compat/ to emulate in our code.

> +		if (i < 0) {
> +			if (errno == EINTR)
> +				/* A signal was caught; try again */
> +				continue;
> +			else if (errno == ENOMEM)
> +				die_errno("BUG: keeping track of fds is hard");
> +			else if (errno == EINVAL)
> +				die_errno("BUG: invalid arguments to select");
> +			else if (errno == EBADF)
> +				die_errno("BUG: keeping track of fds is hard");
> +			else
> +				die_errno("Unknown error with select");

I doubt that the later part of elseif cascade adds any value.  You
will see errno printed anyway.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 2/2] fetch: fetch submodules in parallel
  2015-09-12 19:11   ` Junio C Hamano
@ 2015-09-14 16:46     ` Stefan Beller
  2015-09-14 17:17       ` Jeff King
  0 siblings, 1 reply; 13+ messages in thread
From: Stefan Beller @ 2015-09-14 16:46 UTC (permalink / raw
  To: Junio C Hamano
  Cc: git@vger.kernel.org, Jeff King, Jonathan Nieder,
	Johannes Schindelin, Jens Lehmann, Vitali Lovich

On Sat, Sep 12, 2015 at 12:11 PM, Junio C Hamano <gitster@pobox.com> wrote:
>> +                     if (start_command(&children[i]))
>> +                             die(_("Could not start child process"));
>> +                     flags = fcntl(children[i].err, F_GETFL);
>> +                     fcntl(children[i].err, F_SETFL, flags | O_NONBLOCK);
>
> This function in run-command.c looks as if it is a generic helper to
> be called by anybody, but it seems to only care about the standard
> error and not the standard output stream, which means potential
> users that do not dup them together cannot use it.  Is that a big
> downside, or is it sufficient to document the API to say that
> children must do so?  I offhand do not think the latter is
> unreasonable, but that may be only because I haven't thought things
> through.

Yes it ought to become a generic helper eventually.

I tried implementing a buffering solution for both stdout and stderr,
but that doesn't really workout well if you consider interleaved output
on the pipes as we cannot accurately replay that later on. To do that
we would need to store the timing information of the channels, at least
the relative order of it like:

  (stdout, First comes text to stdout),
  (stderr, interrupted by text in stderr)
  (stdout, but stdout doesn't bother, blasting more text)
  (stderr, continues to interrupt)

obtaining the information is inherently racy, as all we can do is
polling/reading from both stdout/err as fast as possible but without
proper synchronization mechanisms we cannot be sure.

I will add documentation explaining why the async output case
will only deal with one channel. I chose stderr as that's already
available and needed in this use case.


>
>> +                     nr_processes++;
>> +                     slots[i] = 1;
>> +             }
>> +
>> +             /* prepare data for select call */
>> +             FD_ZERO(&fdset);
>> +             maxfd = 0;
>> +             for (i = 0; i < n; i++) {
>> +                     if (!slots[i])
>> +                             continue;
>> +                     FD_SET(children[i].err, &fdset);
>> +                     if (children[i].err > maxfd)
>> +                             maxfd = children[i].err;
>> +             }
>> +             timeout.tv_sec = 0;
>> +             timeout.tv_usec = 500000;
>> +
>> +             i = select(maxfd + 1, &fdset, NULL, NULL, &timeout);
>
> I thought we try to use poll() and on systems with only select we
> allow compat/ to emulate in our code.

I did not know that. I'll rewrite the patch to use poll instead.

>
>> +             if (i < 0) {
>> +                     if (errno == EINTR)
>> +                             /* A signal was caught; try again */
>> +                             continue;
>> +                     else if (errno == ENOMEM)
>> +                             die_errno("BUG: keeping track of fds is hard");
>> +                     else if (errno == EINVAL)
>> +                             die_errno("BUG: invalid arguments to select");
>> +                     else if (errno == EBADF)
>> +                             die_errno("BUG: keeping track of fds is hard");
>> +                     else
>> +                             die_errno("Unknown error with select");
>
> I doubt that the later part of elseif cascade adds any value.  You
> will see errno printed anyway.

ok.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 2/2] fetch: fetch submodules in parallel
  2015-09-14 16:46     ` Stefan Beller
@ 2015-09-14 17:17       ` Jeff King
  2015-09-14 17:47         ` Stefan Beller
                           ` (2 more replies)
  0 siblings, 3 replies; 13+ messages in thread
From: Jeff King @ 2015-09-14 17:17 UTC (permalink / raw
  To: Stefan Beller
  Cc: Junio C Hamano, git@vger.kernel.org, Jonathan Nieder,
	Johannes Schindelin, Jens Lehmann, Vitali Lovich

On Mon, Sep 14, 2015 at 09:46:58AM -0700, Stefan Beller wrote:

> I tried implementing a buffering solution for both stdout and stderr,
> but that doesn't really workout well if you consider interleaved output
> on the pipes as we cannot accurately replay that later on. To do that
> we would need to store the timing information of the channels, at least
> the relative order of it like:
> 
>   (stdout, First comes text to stdout),
>   (stderr, interrupted by text in stderr)
>   (stdout, but stdout doesn't bother, blasting more text)
>   (stderr, continues to interrupt)
> 
> obtaining the information is inherently racy, as all we can do is
> polling/reading from both stdout/err as fast as possible but without
> proper synchronization mechanisms we cannot be sure.

I don't think you need exact timing information.  This is no different
than running the commands themselves, with stdout and stderr writing to
a pty that your terminal emulator will then read() from. If the program
produces intermingled stdout/stderr that clogs up the terminal, that is
its problem.

The only difference is that we're going to save it and later replay it
all very quickly.  So I think it would be sufficient just to retain the
original order.

> I will add documentation explaining why the async output case
> will only deal with one channel. I chose stderr as that's already
> available and needed in this use case.

I suspect you could just set child->stdout_to_stderr in this case, and
then you get your ordering for free. But probably in the general case
people would want to run inspection commands that produce a useful
stdout.

To handle multiple channels, I think you could just do a linked list of
buffers rather than a single strbuf. Like:

  struct io_chunk {
	int channel;
	char *buf;
	size_t len;
	struct io_chunk *next;
  };

and just keep appending chunks to the list (and to dump them, just walk
the list, writing each to the appropriate channel descriptor).

-Peff

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 2/2] fetch: fetch submodules in parallel
  2015-09-14 17:17       ` Jeff King
@ 2015-09-14 17:47         ` Stefan Beller
  2015-09-14 17:55         ` Jonathan Nieder
  2015-09-14 17:56         ` [PATCH 2/2] fetch: " Junio C Hamano
  2 siblings, 0 replies; 13+ messages in thread
From: Stefan Beller @ 2015-09-14 17:47 UTC (permalink / raw
  To: Jeff King
  Cc: Junio C Hamano, git@vger.kernel.org, Jonathan Nieder,
	Johannes Schindelin, Jens Lehmann, Vitali Lovich

On Mon, Sep 14, 2015 at 10:17 AM, Jeff King <peff@peff.net> wrote:
> On Mon, Sep 14, 2015 at 09:46:58AM -0700, Stefan Beller wrote:
>
>> I tried implementing a buffering solution for both stdout and stderr,
>> but that doesn't really workout well if you consider interleaved output
>> on the pipes as we cannot accurately replay that later on. To do that
>> we would need to store the timing information of the channels, at least
>> the relative order of it like:
>>
>>   (stdout, First comes text to stdout),
>>   (stderr, interrupted by text in stderr)
>>   (stdout, but stdout doesn't bother, blasting more text)
>>   (stderr, continues to interrupt)
>>
>> obtaining the information is inherently racy, as all we can do is
>> polling/reading from both stdout/err as fast as possible but without
>> proper synchronization mechanisms we cannot be sure.
>
> I don't think you need exact timing information.  This is no different
> than running the commands themselves, with stdout and stderr writing to
> a pty that your terminal emulator will then read() from. If the program
> produces intermingled stdout/stderr that clogs up the terminal, that is
> its problem.
>
> The only difference is that we're going to save it and later replay it
> all very quickly.  So I think it would be sufficient just to retain the
> original order.
>
>> I will add documentation explaining why the async output case
>> will only deal with one channel. I chose stderr as that's already
>> available and needed in this use case.
>
> I suspect you could just set child->stdout_to_stderr in this case, and
> then you get your ordering for free. But probably in the general case
> people would want to run inspection commands that produce a useful
> stdout.
>
> To handle multiple channels, I think you could just do a linked list of
> buffers rather than a single strbuf. Like:

I will have no problem coding such a thing in a user program,
but how do you obtain this non racily from the child using the posix
API?

The poll/select command may return more than one fd ready, so
then you don't know the ordering in which you would need to replay
it. This may introduce subtle bugs?

So I'd rather come up with a solution buffering 2 channels once we need it,
keeping the stdout_to_stderr as a requirement for now.

>
>   struct io_chunk {
>         int channel;
>         char *buf;
>         size_t len;
>         struct io_chunk *next;
>   };
>
> and just keep appending chunks to the list (and to dump them, just walk
> the list, writing each to the appropriate channel descriptor).
>
> -Peff

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 2/2] fetch: fetch submodules in parallel
  2015-09-14 17:17       ` Jeff King
  2015-09-14 17:47         ` Stefan Beller
@ 2015-09-14 17:55         ` Jonathan Nieder
  2015-09-14 18:07           ` Jeff King
  2015-09-14 17:56         ` [PATCH 2/2] fetch: " Junio C Hamano
  2 siblings, 1 reply; 13+ messages in thread
From: Jonathan Nieder @ 2015-09-14 17:55 UTC (permalink / raw
  To: Jeff King
  Cc: Stefan Beller, Junio C Hamano, git@vger.kernel.org,
	Johannes Schindelin, Jens Lehmann, Vitali Lovich

Jeff King wrote:
> On Mon, Sep 14, 2015 at 09:46:58AM -0700, Stefan Beller wrote:

>> I tried implementing a buffering solution for both stdout and stderr,
>> but that doesn't really workout well if you consider interleaved output
>> on the pipes as we cannot accurately replay that later on.
[...]
>> obtaining the information is inherently racy
[...]
> I don't think you need exact timing information.  This is no different
> than running the commands themselves, with stdout and stderr writing to
> a pty that your terminal emulator will then read() from. If the program
> produces intermingled stdout/stderr that clogs up the terminal, that is
> its problem.

The difference is that when stdout and stderr write to a pty, they write
to the same pty.  That is, suppose a child process does

	write(1, "A\n", 2);
	write(2, "B\n", 1);
	write(1, "C\n", 2);

Then the output that should be echoed to the terminal is

	A
	B
	C

Now the parent might do

	for (;;) {
		int n = select(...);
		... do stuff ...
	}

If all three writes happen during the "do stuff" step, then *if* the
child's stdout and stderr went to different pipes, all the parent sees
is

	child's stdout: A\nC\n
	child's stderr: B\n

There is not sufficient information to recover the original output
order.  (Linux provides a pipe2(..., O_DIRECT) that almost provides
sufficient information --- it tells you

	child's stdout: "A\n", "C\n"
	child's stderr: "B\n"

but still doesn't give information about ordering.)

That's probably okay: in most git commands, stderr shows a combination
of diagnostic output and progress information and stdout shows the
actual result, so interleaving between the two is not too common.

One can imagine a "git grep --recurse-submodules" that wants to run a
grep in each submodule and combine their output in some appropriate
way.  It's not clear what order is best for that use case: stderr
(errors, plus progress in some imaginary future) at the beginning to
show the story of how output was generated before the output?  stderr
at the end so errors are not hidden way up on the screen?  Some kind
of interleaving that pays attention to the format of stdout?

That is more complicated than the "fetch --recurse-submodules" case
that Stefan is currently tackling, so it seems wise to me to punt for
now.

Thanks and hope that helps,
Jonathan

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 2/2] fetch: fetch submodules in parallel
  2015-09-14 17:17       ` Jeff King
  2015-09-14 17:47         ` Stefan Beller
  2015-09-14 17:55         ` Jonathan Nieder
@ 2015-09-14 17:56         ` Junio C Hamano
  2 siblings, 0 replies; 13+ messages in thread
From: Junio C Hamano @ 2015-09-14 17:56 UTC (permalink / raw
  To: Jeff King
  Cc: Stefan Beller, git@vger.kernel.org, Jonathan Nieder,
	Johannes Schindelin, Jens Lehmann, Vitali Lovich

Jeff King <peff@peff.net> writes:

> I don't think you need exact timing information.  This is no different
> than running the commands themselves, with stdout and stderr writing to
> a pty that your terminal emulator will then read() from. If the program
> produces intermingled stdout/stderr that clogs up the terminal, that is
> its problem.
>
> The only difference is that we're going to save it and later replay it
> all very quickly.  So I think it would be sufficient just to retain the
> original order.
>
>> I will add documentation explaining why the async output case
>> will only deal with one channel. I chose stderr as that's already
>> available and needed in this use case.
>
> I suspect you could just set child->stdout_to_stderr in this case, and
> then you get your ordering for free.

I think we are in agreement; that is exactly what I wanted to say
when I said "I offhand do not think the latter [i.e. the callers
have to dup them together] is unreasonable".  Thanks for stating it
more clearly and explicitly.

> To handle multiple channels, I think you could just do a linked list of
> buffers rather than a single strbuf. Like:
>
>   struct io_chunk {
> 	int channel;
> 	char *buf;
> 	size_t len;
> 	struct io_chunk *next;
>   };
>
> and just keep appending chunks to the list (and to dump them, just walk
> the list, writing each to the appropriate channel descriptor).

Perhaps, but let's not overdesign things before we have a concrete
example codepath that benefits from such a thing.  It is hard to
detect a misdesign without a real usage pattern.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 2/2] fetch: fetch submodules in parallel
  2015-09-14 17:55         ` Jonathan Nieder
@ 2015-09-14 18:07           ` Jeff King
  2015-09-14 21:50             ` [PATCHv2] " Stefan Beller
  0 siblings, 1 reply; 13+ messages in thread
From: Jeff King @ 2015-09-14 18:07 UTC (permalink / raw
  To: Jonathan Nieder
  Cc: Stefan Beller, Junio C Hamano, git@vger.kernel.org,
	Johannes Schindelin, Jens Lehmann, Vitali Lovich

On Mon, Sep 14, 2015 at 10:55:09AM -0700, Jonathan Nieder wrote:

> > I don't think you need exact timing information.  This is no different
> > than running the commands themselves, with stdout and stderr writing to
> > a pty that your terminal emulator will then read() from. If the program
> > produces intermingled stdout/stderr that clogs up the terminal, that is
> > its problem.
> 
> The difference is that when stdout and stderr write to a pty, they write
> to the same pty.  That is, suppose a child process does
> 
> 	write(1, "A\n", 2);
> 	write(2, "B\n", 1);
> 	write(1, "C\n", 2);

Ah, right. The pty is where things get mixed, not the read() from the
terminal. So it depends on the write() order. Thanks for the
explanation.

> One can imagine a "git grep --recurse-submodules" that wants to run a
> grep in each submodule and combine their output in some appropriate
> way.  It's not clear what order is best for that use case: stderr
> (errors, plus progress in some imaginary future) at the beginning to
> show the story of how output was generated before the output?  stderr
> at the end so errors are not hidden way up on the screen?  Some kind
> of interleaving that pays attention to the format of stdout?

I'd suggest a "best effort" interleaving, where we select and preserve
the read() order. That makes the easy cases work (you get things in the
original order), and the hard cases at least do something reasonable
(we may reorder two items which come in the same atomic "tick" of our
select, but at least they are nearby).

That's just my gut feeling, though.

> That is more complicated than the "fetch --recurse-submodules" case
> that Stefan is currently tackling, so it seems wise to me to punt for
> now.

I can live with that.

-Peff

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCHv2] fetch submodules in parallel
  2015-09-14 18:07           ` Jeff King
@ 2015-09-14 21:50             ` Stefan Beller
  2015-09-14 21:50               ` [PATCHv2] fetch: " Stefan Beller
  2015-09-14 22:06               ` [PATCHv2] " Junio C Hamano
  0 siblings, 2 replies; 13+ messages in thread
From: Stefan Beller @ 2015-09-14 21:50 UTC (permalink / raw
  To: gitster
  Cc: peff, git, jrnieder, johannes.schindelin, Jens.Lehmann, vlovich,
	Stefan Beller

This replaces the last patch of the "Parallel git submodule fetching"
series. Changes:

* have correct return code in submodule fetching when one submodule fails
* use poll instead of select now
* broke down into more smaller functions instead of one giant.
  (I think it is an improvement, but I wouldn't be surprised if someone objects)
* closed memory leaks
* document the need for stdout_to_stderr

I don't deem it RFC-ish any more but good to go.

Any feedback welcome!
Thanks,
Stefan


Stefan Beller (1):
  fetch: fetch submodules in parallel

 Documentation/fetch-options.txt |   7 +
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 +
 run-command.c                   | 278 ++++++++++++++++++++++++++++++++++++----
 run-command.h                   |  36 ++++++
 strbuf.c                        |  31 +++++
 strbuf.h                        |   1 +
 submodule.c                     | 119 ++++++++++++-----
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  20 +++
 t/t5526-fetch-submodules.sh     |  19 +++
 test-run-command.c              |  24 ++++
 12 files changed, 490 insertions(+), 59 deletions(-)

Interdiff to RFCv1:

diff --git a/Documentation/fetch-options.txt b/Documentation/fetch-options.txt
index d432f98..6b109f6 100644
--- a/Documentation/fetch-options.txt
+++ b/Documentation/fetch-options.txt
@@ -105,7 +105,7 @@ ifndef::git-pull[]
 	Number of parallel children to be used for fetching submodules.
 	Each will fetch from different submodules, such that fetching many
 	submodules will be faster. By default submodules will be fetched
-	one at a time
+	one at a time.
 
 --no-recurse-submodules::
 	Disable recursive fetching of submodules (this has the same effect as
diff --git a/builtin/fetch.c b/builtin/fetch.c
index a1520bb..f28eac6 100644
--- a/builtin/fetch.c
+++ b/builtin/fetch.c
@@ -101,7 +101,7 @@ static struct option builtin_fetch_options[] = {
 	OPT_SET_INT('n', NULL, &tags,
 		    N_("do not fetch all tags (--no-tags)"), TAGS_UNSET),
 	OPT_INTEGER('j', "jobs", &max_children,
-		    N_("number of threads used for fetching")),
+		    N_("number of submodules fetched in parallel")),
 	OPT_BOOL('p', "prune", &prune,
 		 N_("prune remote-tracking branches no longer on remote")),
 	{ OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"),
diff --git a/builtin/pull.c b/builtin/pull.c
index bc117e9..f0af196 100644
--- a/builtin/pull.c
+++ b/builtin/pull.c
@@ -179,7 +179,7 @@ static struct option pull_options[] = {
 		N_("control recursive fetching of submodules"),
 		PARSE_OPT_OPTARG),
 	OPT_PASSTHRU('j', "jobs", &max_children, N_("n"),
-		N_("number of threads used for fetching submodules"),
+		N_("number of submodules pulled in parallel"),
 		PARSE_OPT_OPTARG),
 	OPT_BOOL(0, "dry-run", &opt_dry_run,
 		N_("dry run")),
diff --git a/run-command.c b/run-command.c
index b8ff67b..6f6f9fb 100644
--- a/run-command.c
+++ b/run-command.c
@@ -232,6 +232,35 @@ static inline void set_cloexec(int fd)
 		fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
 }
 
+static int determine_return_value(int wait_status,
+				  int *result,
+				  int *error_code,
+				  const char *argv0)
+{
+	if (WIFSIGNALED(wait_status)) {
+		*result = WTERMSIG(wait_status);
+		if (*result != SIGINT && *result != SIGQUIT)
+			error("%s died of signal %d", argv0, *result);
+		/*
+		 * This return value is chosen so that code & 0xff
+		 * mimics the exit code that a POSIX shell would report for
+		 * a program that died from this signal.
+		 */
+		*result += 128;
+	} else if (WIFEXITED(wait_status)) {
+		*result = WEXITSTATUS(wait_status);
+		/*
+		 * Convert special exit code when execvp failed.
+		 */
+		if (*result == 127) {
+			*result = -1;
+			*error_code = ENOENT;
+		}
+	} else
+		return 1;
+	return 0;
+}
+
 static int wait_or_whine(pid_t pid, const char *argv0)
 {
 	int status, code = -1;
@@ -244,29 +273,10 @@ static int wait_or_whine(pid_t pid, const char *argv0)
 	if (waiting < 0) {
 		failed_errno = errno;
 		error("waitpid for %s failed: %s", argv0, strerror(errno));
-	} else if (waiting != pid) {
-		error("waitpid is confused (%s)", argv0);
-	} else if (WIFSIGNALED(status)) {
-		code = WTERMSIG(status);
-		if (code != SIGINT && code != SIGQUIT)
-			error("%s died of signal %d", argv0, code);
-		/*
-		 * This return value is chosen so that code & 0xff
-		 * mimics the exit code that a POSIX shell would report for
-		 * a program that died from this signal.
-		 */
-		code += 128;
-	} else if (WIFEXITED(status)) {
-		code = WEXITSTATUS(status);
-		/*
-		 * Convert special exit code when execvp failed.
-		 */
-		if (code == 127) {
-			code = -1;
-			failed_errno = ENOENT;
-		}
 	} else {
-		error("waitpid is confused (%s)", argv0);
+		if (waiting != pid
+		   || (determine_return_value(status, &code, &failed_errno, argv0) < 0))
+			error("waitpid is confused (%s)", argv0);
 	}
 
 	clear_child_for_cleanup(pid);
@@ -853,146 +863,226 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
 	return finish_command(cmd);
 }
 
-int run_processes_async(int n, get_next_task fn, void *data)
+static void unblock_fd(int fd)
 {
-	int i, wait_status;
-	pid_t pid;
+	int flags = fcntl(fd, F_GETFL);
+	if (flags < 0) {
+		warning("Could not get file status flags, "
+			"output will be degraded");
+		return;
+	}
+	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK)) {
+			warning("Could not set file status flags, "
+			"output will be degraded");
+		return;
+	}
+}
 
-	/* no more tasks. Also set when aborting early. */
-	int all_tasks_started = 0;
-	int nr_processes = 0;
-	int child_in_foreground = 0;
-	struct timeval timeout;
-	struct child_process *children = xcalloc(n, sizeof(*children));
-	char *slots = xcalloc(n, sizeof(*slots));
-	struct strbuf *err = xcalloc(n, sizeof(*err));
-	fd_set fdset;
-	int maxfd;
-	struct strbuf finished_children = STRBUF_INIT;
-	int flags;
-	for (i = 0; i < n; i++)
-		strbuf_init(&err[i], 0);
-
-	while (!all_tasks_started || nr_processes > 0) {
-		/* Start new processes. */
-		while (!all_tasks_started && nr_processes < n) {
-			for (i = 0; i < n; i++)
-				if (!slots[i])
-					break; /* found an empty slot */
-			if (i == n)
-				die("BUG: bookkeeping is hard");
-
-			if (fn(data, &children[i], &err[i])) {
-				all_tasks_started = 1;
-				break;
-			}
-			if (start_command(&children[i]))
-				die(_("Could not start child process"));
-			flags = fcntl(children[i].err, F_GETFL);
-			fcntl(children[i].err, F_SETFL, flags | O_NONBLOCK);
-			nr_processes++;
-			slots[i] = 1;
-		}
+struct parallel_processes {
+	int max_number_processes;
+	void *data;
+	get_next_task fn;
+	handle_child_starting_failure fn_err;
+	handle_child_return_value fn_exit;
+
+	int nr_processes;
+	int all_tasks_started;
+	int foreground_child;
+	char *slots;
+	struct child_process *children;
+	struct pollfd *pfd;
+	struct strbuf *err;
+	struct strbuf finished_children;
+};
+
+static void run_processes_parallel_init(struct parallel_processes *pp,
+					int n, void *data,
+					get_next_task fn,
+					handle_child_starting_failure fn_err,
+					handle_child_return_value fn_exit)
+{
+	int i;
+
+	pp->max_number_processes = n;
+	pp->data = data;
+	pp->fn = fn;
+	pp->fn_err = fn_err;
+	pp->fn_exit = fn_exit;
+
+	pp->nr_processes = 0;
+	pp->all_tasks_started = 0;
+	pp->foreground_child = 0;
+	pp->slots = xcalloc(n, sizeof(*pp->slots));
+	pp->children = xcalloc(n, sizeof(*pp->children));
+	pp->pfd = xcalloc(n, sizeof(*pp->pfd));
+	pp->err = xcalloc(n, sizeof(*pp->err));
+	strbuf_init(&pp->finished_children, 0);
+
+	for (i = 0; i < n; i++) {
+		strbuf_init(&pp->err[i], 0);
+		pp->pfd[i].events = POLLIN;
+		pp->pfd[i].fd = -1;
+	}
+}
+
+static void run_processes_parallel_cleanup(struct parallel_processes *pp)
+{
+	int i;
+	for (i = 0; i < pp->max_number_processes; i++)
+		strbuf_release(&pp->err[i]);
+
+	free(pp->children);
+	free(pp->slots);
+	free(pp->pfd);
+	free(pp->err);
+	strbuf_release(&pp->finished_children);
+}
 
-		/* prepare data for select call */
-		FD_ZERO(&fdset);
-		maxfd = 0;
-		for (i = 0; i < n; i++) {
-			if (!slots[i])
-				continue;
-			FD_SET(children[i].err, &fdset);
-			if (children[i].err > maxfd)
-				maxfd = children[i].err;
+static void run_processes_parallel_start_new(struct parallel_processes *pp)
+{
+	int i;
+	/* Start new processes. */
+	while (!pp->all_tasks_started
+	       && pp->nr_processes < pp->max_number_processes) {
+		for (i = 0; i < pp->max_number_processes; i++)
+			if (!pp->slots[i])
+				break; /* found an empty slot */
+		if (i == pp->max_number_processes)
+			die("BUG: bookkeeping is hard");
+
+		if (pp->fn(pp->data, &pp->children[i], &pp->err[i])) {
+			pp->all_tasks_started = 1;
+			break;
 		}
-		timeout.tv_sec = 0;
-		timeout.tv_usec = 500000;
+		if (start_command(&pp->children[i]))
+			pp->fn_err(pp->data, &pp->children[i], &pp->err[i]);
 
-		i = select(maxfd + 1, &fdset, NULL, NULL, &timeout);
-		if (i < 0) {
-			if (errno == EINTR)
-				/* A signal was caught; try again */
-				continue;
-			else if (errno == ENOMEM)
-				die_errno("BUG: keeping track of fds is hard");
-			else if (errno == EINVAL)
-				die_errno("BUG: invalid arguments to select");
-			else if (errno == EBADF)
-				die_errno("BUG: keeping track of fds is hard");
-			else
-				die_errno("Unknown error with select");
+		unblock_fd(pp->children[i].err);
+
+		pp->nr_processes++;
+		pp->slots[i] = 1;
+		pp->pfd[i].fd = pp->children[i].err;
+	}
+}
+
+static int run_processes_parallel_buffer_stderr(struct parallel_processes *pp)
+{
+	int i;
+	i = poll(pp->pfd, pp->max_number_processes, 100);
+	if (i < 0) {
+		if (errno == EINTR)
+			/* A signal was caught; try again */
+			return -1;
+		else {
+			run_processes_parallel_cleanup(pp);
+			die_errno("poll");
 		}
+	}
 
-		/* Buffer output from all pipes. */
-		for (i = 0; i < n; i++) {
-			if (!slots[i])
-				continue;
-			if (FD_ISSET(children[i].err, &fdset))
-				strbuf_read_noblock(&err[i], children[i].err, 0);
-			if (child_in_foreground == i) {
-				fputs(err[i].buf, stderr);
-				strbuf_reset(&err[i]);
-				fflush(stderr);
-			}
+	/* Buffer output from all pipes. */
+	for (i = 0; i < pp->max_number_processes; i++) {
+		if (!pp->slots[i])
+			continue;
+		if (pp->pfd[i].revents & POLLIN)
+			strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0);
+		if (pp->foreground_child == i) {
+			fputs(pp->err[i].buf, stderr);
+			strbuf_reset(&pp->err[i]);
 		}
+	}
+	return 0;
+}
 
-		/* Collect finished child processes. */
-		while (nr_processes > 0) {
-			pid = waitpid(-1, &wait_status, WNOHANG);
-			if (pid == 0)
-				/* no child finished */
-				break;
-
-			if (pid < 0) {
-				if (errno == EINTR)
-					break; /* just try again  next time */
-				if (errno == EINVAL || errno == ECHILD)
-					die_errno("wait");
-			} else {
-				/* Find the finished child. */
-				for (i = 0; i < n; i++)
-					if (slots[i] && pid == children[i].pid)
-						break;
-				if (i == n)
-					/* waitpid returned another process id which
-					 * we are not waiting on, so ignore it*/
+
+static void run_processes_parallel_collect_finished(struct parallel_processes *pp)
+{
+	int i = 0;
+	pid_t pid;
+	int wait_status, code;
+	int n = pp->max_number_processes;
+	/* Collect finished child processes. */
+	while (pp->nr_processes > 0) {
+		pid = waitpid(-1, &wait_status, WNOHANG);
+		if (pid == 0)
+			return; /* no child finished */
+
+		if (pid < 0) {
+			if (errno == EINTR)
+				return; /* just try again  next time */
+			if (errno == EINVAL || errno == ECHILD)
+				die_errno("wait");
+		} else {
+			/* Find the finished child. */
+			for (i = 0; i < pp->max_number_processes; i++)
+				if (pp->slots[i] && pid == pp->children[i].pid)
 					break;
-			}
+			if (i == pp->max_number_processes)
+				/*
+				 * waitpid returned another process id
+				 * which we are not waiting for.
+				 */
+				return;
+		}
+		strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0);
 
-			strbuf_read_noblock(&err[i], children[i].err, 0);
-			argv_array_clear(&children[i].args);
-			argv_array_clear(&children[i].env_array);
+		if (determine_return_value(wait_status, &code, &errno,
+					   pp->children[i].argv[0]) < 0)
+			error("waitpid is confused (%s)",
+			      pp->children[i].argv[0]);
 
-			slots[i] = 0;
-			nr_processes--;
+		pp->fn_exit(pp->data, &pp->children[i], code);
 
-			if (i != child_in_foreground) {
-				strbuf_addbuf(&finished_children, &err[i]);
-				strbuf_reset(&err[i]);
-			} else {
-				fputs(err[i].buf, stderr);
-				strbuf_reset(&err[i]);
+		argv_array_clear(&pp->children[i].args);
+		argv_array_clear(&pp->children[i].env_array);
 
-				/* Output all other finished child processes */
-				fputs(finished_children.buf, stderr);
-				strbuf_reset(&finished_children);
+		pp->nr_processes--;
+		pp->slots[i] = 0;
+		pp->pfd[i].fd = -1;
 
-				/*
-				 * Pick next process to output live.
-				 * There can be no active process if n==1
-				 * NEEDSWORK:
-				 * For now we pick it randomly by doing a round
-				 * robin. Later we may want to pick the one with
-				 * the most output or the longest or shortest
-				 * running process time.
-				 */
-				for (i = 0; i < n; i++)
-					if (slots[(child_in_foreground + i) % n])
-						break;
-				child_in_foreground = (child_in_foreground + i) % n;
-				fputs(err[child_in_foreground].buf, stderr);
-				strbuf_reset(&err[child_in_foreground]);
-			}
+		if (i != pp->foreground_child) {
+			strbuf_addbuf(&pp->finished_children, &pp->err[i]);
+			strbuf_reset(&pp->err[i]);
+		} else {
+			fputs(pp->err[i].buf, stderr);
+			strbuf_reset(&pp->err[i]);
+
+			/* Output all other finished child processes */
+			fputs(pp->finished_children.buf, stderr);
+			strbuf_reset(&pp->finished_children);
+
+			/*
+			 * Pick next process to output live.
+			 * NEEDSWORK:
+			 * For now we pick it randomly by doing a round
+			 * robin. Later we may want to pick the one with
+			 * the most output or the longest or shortest
+			 * running process time.
+			 */
+			for (i = 0; i < n; i++)
+				if (pp->slots[(pp->foreground_child + i) % n])
+					break;
+			pp->foreground_child = (pp->foreground_child + i) % n;
+			fputs(pp->err[pp->foreground_child].buf, stderr);
+			strbuf_reset(&pp->err[pp->foreground_child]);
 		}
 	}
+}
+
+int run_processes_parallel(int n, void *data,
+			   get_next_task fn,
+			   handle_child_starting_failure fn_err,
+			   handle_child_return_value fn_exit)
+{
+	struct parallel_processes pp;
+	run_processes_parallel_init(&pp, n, data, fn, fn_err, fn_exit);
+
+	while (!pp.all_tasks_started || pp.nr_processes > 0) {
+		run_processes_parallel_start_new(&pp);
+		if (run_processes_parallel_buffer_stderr(&pp))
+			continue;
+		run_processes_parallel_collect_finished(&pp);
+	}
+	run_processes_parallel_cleanup(&pp);
+
 	return 0;
 }
diff --git a/run-command.h b/run-command.h
index 8f53ad6..0487f71 100644
--- a/run-command.h
+++ b/run-command.h
@@ -120,32 +120,39 @@ int start_async(struct async *async);
 int finish_async(struct async *async);
 
 /**
- * Return 0 if the next child is ready to run.
- * This callback takes care to initialize the child process and preload the
- * out and error channel. The preloading of these outpout channels is useful
- * if you want to have a message printed directly before the output of the
- * child process.
+ * This callback should initialize the child process and preload the
+ * error channel. The preloading of is useful if you want to have a message
+ * printed directly before the output of the child process.
+ * You MUST set stdout_to_stderr.
  *
+ * Return 0 if the next child is ready to run.
  * Return != 0 if there are no more tasks to be processed.
  */
 typedef int (*get_next_task)(void *data,
 			     struct child_process *cp,
 			     struct strbuf *err);
 
+typedef void (*handle_child_starting_failure)(void *data,
+					      struct child_process *cp,
+					      struct strbuf *err);
+
+typedef void (*handle_child_return_value)(void *data,
+					  struct child_process *cp,
+					  int result);
+
 /**
- * Runs up to n processes at the same time. Whenever a process can
- * be started, the callback `get_next_task` is called to obtain the
- * data fed to the child process.
+ * Runs up to n processes at the same time. Whenever a process can be
+ * started, the callback `get_next_task` is called to obtain the data
+ * fed to the child process.
  *
  * The children started via this function run in parallel and their output
- * to both stdout and stderr is buffered, while one of the children will
- * directly output to stdout/stderr.
- *
- * This leads to a problem with output from processes which put out to
- * stdout/err alternatingly as the buffering will not be able to replay
- * the
+ * to stderr is buffered, while one of the children will directly output
+ * to stderr.
  */
 
-int run_processes_async(int n, get_next_task fn, void *data);
+int run_processes_parallel(int n, void *data,
+			   get_next_task fn,
+			   handle_child_starting_failure,
+			   handle_child_return_value);
 
 #endif
diff --git a/submodule.c b/submodule.c
index 6d757c6..a0e06e8 100644
--- a/submodule.c
+++ b/submodule.c
@@ -623,17 +623,32 @@ struct submodule_parallel_fetch {
 	const char *prefix;
 	int command_line_option;
 	int quiet;
+	int result;
 };
-#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL}
+#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL, NULL, 0, 0, 0}
 
 int get_next_submodule(void *data, struct child_process *cp,
 		       struct strbuf *err);
 
+void handle_submodule_fetch_start_err(void *data, struct child_process *cp, struct strbuf *err)
+{
+	struct submodule_parallel_fetch *spf = data;
+	spf->result = 1;
+}
+
+void handle_submodule_fetch_finish( void *data, struct child_process *cp, int retvalue)
+{
+	struct submodule_parallel_fetch *spf = data;
+
+	if (retvalue)
+		spf->result = 1;
+}
+
 int fetch_populated_submodules(const struct argv_array *options,
 			       const char *prefix, int command_line_option,
 			       int quiet, int max_parallel_jobs)
 {
-	int i, result = 0;
+	int i;
 	struct submodule_parallel_fetch spf = SPF_INIT;
 	spf.work_tree = get_git_work_tree();
 	spf.command_line_option = command_line_option;
@@ -652,12 +667,15 @@ int fetch_populated_submodules(const struct argv_array *options,
 	/* default value, "--submodule-prefix" and its value are added later */
 
 	calculate_changed_submodule_paths();
-	run_processes_async(max_parallel_jobs, get_next_submodule, &spf);
+	run_processes_parallel(max_parallel_jobs, &spf,
+			       get_next_submodule,
+			       handle_submodule_fetch_start_err,
+			       handle_submodule_fetch_finish);
 
 	argv_array_clear(&spf.args);
 out:
 	string_list_clear(&changed_submodule_paths, 1);
-	return result;
+	return spf.result;
 }
 
 int get_next_submodule(void *data, struct child_process *cp,
diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh
index 0970fb0..37c89b9 100755
--- a/t/t0061-run-command.sh
+++ b/t/t0061-run-command.sh
@@ -48,18 +48,22 @@ test_expect_success POSIXPERM,SANITY 'unreadable directory in PATH' '
 '
 
 cat >expect <<-EOF
-Now running instance 0
-Hello World
-Now running instance 1
-Hello World
-Now running instance 2
-Hello World
-Now running instance 3
-Hello World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
 EOF
 
 test_expect_success 'run_command runs in parallel' '
-	test-run-command run-command-async sh -c "echo Hello World >&2;" 2>actual &&
+	test-run-command run-command-async sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
 	test_cmp expect actual
 '
 
diff --git a/test-run-command.c b/test-run-command.c
index 4817f6e..71fd3ca 100644
--- a/test-run-command.c
+++ b/test-run-command.c
@@ -16,9 +16,9 @@
 #include <errno.h>
 
 static int number_callbacks;
-int run_processes_async_next(void *data,
-			     struct child_process *cp,
-			     struct strbuf *err)
+int parallel_next(void *data,
+		  struct child_process *cp,
+		  struct strbuf *err)
 {
 	struct child_process *d = data;
 	if (number_callbacks >= 4)
@@ -28,7 +28,7 @@ int run_processes_async_next(void *data,
 	cp->stdout_to_stderr = 1;
 	cp->no_stdin = 1;
 	cp->err = -1;
-	strbuf_addf(err, "Now running instance %d\n", number_callbacks);
+	strbuf_addf(err, "preloaded output of a child\n");
 	number_callbacks++;
 	return 0;
 }
@@ -51,7 +51,8 @@ int main(int argc, char **argv)
 		exit(run_command(&proc));
 
 	if (!strcmp(argv[1], "run-command-async"))
-		exit(run_processes_async(4, run_processes_async_next, &proc));
+		exit(run_processes_parallel(4, &proc, parallel_next,
+					 NULL, NULL));
 
 	fprintf(stderr, "check usage\n");
 	return 1;

-- 
2.6.0.rc0.131.gf624c3d

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCHv2] fetch: fetch submodules in parallel
  2015-09-14 21:50             ` [PATCHv2] " Stefan Beller
@ 2015-09-14 21:50               ` Stefan Beller
  2015-09-14 22:06               ` [PATCHv2] " Junio C Hamano
  1 sibling, 0 replies; 13+ messages in thread
From: Stefan Beller @ 2015-09-14 21:50 UTC (permalink / raw
  To: gitster
  Cc: peff, git, jrnieder, johannes.schindelin, Jens.Lehmann, vlovich,
	Stefan Beller

If we run external commands in parallel we cannot pipe the output directly
to the our stdout/err as it would mix up. So each process's output will
flow through a pipe, which we buffer. One subprocess can be directly
piped to out stdout/err for a low latency feedback to the user.

Example:
Let's assume we have 5 submodules A,B,C,D,E and each fetch takes a
different amount of time as the different submodules vary in size, then
the output of fetches in sequential order might look like this:

 time -->
 output: |---A---|   |-B-|   |----C-----------|   |-D-|   |-E-|

When we schedule these submodules into maximal two parallel processes,
a schedule and sample output over time may look like this:

thread 1: |---A---|   |-D-|   |-E-|

thread 2: |-B-|   |----C-----------|

output:   |---A---|B|------C-------|DE

So A will be perceived as it would run normally in the single child
version. As B has finished by the time A is done, we can dump its whole
progress buffer on stderr, such that it looks like it finished in no time.
Once that is done, C is determined to be the visible child and its progress
will be reported in real time.

So this way of output is really good for human consumption,
as it only changes the timing, not the actual output.

For machine consumption the output needs to be prepared in
the tasks, by either having a prefix per line or per block
to indicate whose tasks output is displayed.

Signed-off-by: Stefan Beller <sbeller@google.com>
---
 Documentation/fetch-options.txt |   7 +
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 +
 run-command.c                   | 278 ++++++++++++++++++++++++++++++++++++----
 run-command.h                   |  36 ++++++
 strbuf.c                        |  31 +++++
 strbuf.h                        |   1 +
 submodule.c                     | 119 ++++++++++++-----
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  20 +++
 t/t5526-fetch-submodules.sh     |  19 +++
 test-run-command.c              |  24 ++++
 12 files changed, 490 insertions(+), 59 deletions(-)

diff --git a/Documentation/fetch-options.txt b/Documentation/fetch-options.txt
index 45583d8..6b109f6 100644
--- a/Documentation/fetch-options.txt
+++ b/Documentation/fetch-options.txt
@@ -100,6 +100,13 @@ ifndef::git-pull[]
 	reference to a commit that isn't already in the local submodule
 	clone.
 
+-j::
+--jobs=<n>::
+	Number of parallel children to be used for fetching submodules.
+	Each will fetch from different submodules, such that fetching many
+	submodules will be faster. By default submodules will be fetched
+	one at a time.
+
 --no-recurse-submodules::
 	Disable recursive fetching of submodules (this has the same effect as
 	using the '--recurse-submodules=no' option).
diff --git a/builtin/fetch.c b/builtin/fetch.c
index ee1f1a9..f28eac6 100644
--- a/builtin/fetch.c
+++ b/builtin/fetch.c
@@ -37,6 +37,7 @@ static int prune = -1; /* unspecified */
 static int all, append, dry_run, force, keep, multiple, update_head_ok, verbosity;
 static int progress = -1, recurse_submodules = RECURSE_SUBMODULES_DEFAULT;
 static int tags = TAGS_DEFAULT, unshallow, update_shallow;
+static int max_children = 1;
 static const char *depth;
 static const char *upload_pack;
 static struct strbuf default_rla = STRBUF_INIT;
@@ -99,6 +100,8 @@ static struct option builtin_fetch_options[] = {
 		    N_("fetch all tags and associated objects"), TAGS_SET),
 	OPT_SET_INT('n', NULL, &tags,
 		    N_("do not fetch all tags (--no-tags)"), TAGS_UNSET),
+	OPT_INTEGER('j', "jobs", &max_children,
+		    N_("number of submodules fetched in parallel")),
 	OPT_BOOL('p', "prune", &prune,
 		 N_("prune remote-tracking branches no longer on remote")),
 	{ OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"),
@@ -1217,7 +1220,8 @@ int cmd_fetch(int argc, const char **argv, const char *prefix)
 		result = fetch_populated_submodules(&options,
 						    submodule_prefix,
 						    recurse_submodules,
-						    verbosity < 0);
+						    verbosity < 0,
+						    max_children);
 		argv_array_clear(&options);
 	}
 
diff --git a/builtin/pull.c b/builtin/pull.c
index 722a83c..f0af196 100644
--- a/builtin/pull.c
+++ b/builtin/pull.c
@@ -94,6 +94,7 @@ static int opt_force;
 static char *opt_tags;
 static char *opt_prune;
 static char *opt_recurse_submodules;
+static char *max_children;
 static int opt_dry_run;
 static char *opt_keep;
 static char *opt_depth;
@@ -177,6 +178,9 @@ static struct option pull_options[] = {
 		N_("on-demand"),
 		N_("control recursive fetching of submodules"),
 		PARSE_OPT_OPTARG),
+	OPT_PASSTHRU('j', "jobs", &max_children, N_("n"),
+		N_("number of submodules pulled in parallel"),
+		PARSE_OPT_OPTARG),
 	OPT_BOOL(0, "dry-run", &opt_dry_run,
 		N_("dry run")),
 	OPT_PASSTHRU('k', "keep", &opt_keep, NULL,
@@ -524,6 +528,8 @@ static int run_fetch(const char *repo, const char **refspecs)
 		argv_array_push(&args, opt_prune);
 	if (opt_recurse_submodules)
 		argv_array_push(&args, opt_recurse_submodules);
+	if (max_children)
+		argv_array_push(&args, max_children);
 	if (opt_dry_run)
 		argv_array_push(&args, "--dry-run");
 	if (opt_keep)
diff --git a/run-command.c b/run-command.c
index 28e1d55..6f6f9fb 100644
--- a/run-command.c
+++ b/run-command.c
@@ -232,6 +232,35 @@ static inline void set_cloexec(int fd)
 		fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
 }
 
+static int determine_return_value(int wait_status,
+				  int *result,
+				  int *error_code,
+				  const char *argv0)
+{
+	if (WIFSIGNALED(wait_status)) {
+		*result = WTERMSIG(wait_status);
+		if (*result != SIGINT && *result != SIGQUIT)
+			error("%s died of signal %d", argv0, *result);
+		/*
+		 * This return value is chosen so that code & 0xff
+		 * mimics the exit code that a POSIX shell would report for
+		 * a program that died from this signal.
+		 */
+		*result += 128;
+	} else if (WIFEXITED(wait_status)) {
+		*result = WEXITSTATUS(wait_status);
+		/*
+		 * Convert special exit code when execvp failed.
+		 */
+		if (*result == 127) {
+			*result = -1;
+			*error_code = ENOENT;
+		}
+	} else
+		return 1;
+	return 0;
+}
+
 static int wait_or_whine(pid_t pid, const char *argv0)
 {
 	int status, code = -1;
@@ -244,29 +273,10 @@ static int wait_or_whine(pid_t pid, const char *argv0)
 	if (waiting < 0) {
 		failed_errno = errno;
 		error("waitpid for %s failed: %s", argv0, strerror(errno));
-	} else if (waiting != pid) {
-		error("waitpid is confused (%s)", argv0);
-	} else if (WIFSIGNALED(status)) {
-		code = WTERMSIG(status);
-		if (code != SIGINT && code != SIGQUIT)
-			error("%s died of signal %d", argv0, code);
-		/*
-		 * This return value is chosen so that code & 0xff
-		 * mimics the exit code that a POSIX shell would report for
-		 * a program that died from this signal.
-		 */
-		code += 128;
-	} else if (WIFEXITED(status)) {
-		code = WEXITSTATUS(status);
-		/*
-		 * Convert special exit code when execvp failed.
-		 */
-		if (code == 127) {
-			code = -1;
-			failed_errno = ENOENT;
-		}
 	} else {
-		error("waitpid is confused (%s)", argv0);
+		if (waiting != pid
+		   || (determine_return_value(status, &code, &failed_errno, argv0) < 0))
+			error("waitpid is confused (%s)", argv0);
 	}
 
 	clear_child_for_cleanup(pid);
@@ -852,3 +862,227 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
 	close(cmd->out);
 	return finish_command(cmd);
 }
+
+static void unblock_fd(int fd)
+{
+	int flags = fcntl(fd, F_GETFL);
+	if (flags < 0) {
+		warning("Could not get file status flags, "
+			"output will be degraded");
+		return;
+	}
+	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK)) {
+			warning("Could not set file status flags, "
+			"output will be degraded");
+		return;
+	}
+}
+
+struct parallel_processes {
+	int max_number_processes;
+	void *data;
+	get_next_task fn;
+	handle_child_starting_failure fn_err;
+	handle_child_return_value fn_exit;
+
+	int nr_processes;
+	int all_tasks_started;
+	int foreground_child;
+	char *slots;
+	struct child_process *children;
+	struct pollfd *pfd;
+	struct strbuf *err;
+	struct strbuf finished_children;
+};
+
+static void run_processes_parallel_init(struct parallel_processes *pp,
+					int n, void *data,
+					get_next_task fn,
+					handle_child_starting_failure fn_err,
+					handle_child_return_value fn_exit)
+{
+	int i;
+
+	pp->max_number_processes = n;
+	pp->data = data;
+	pp->fn = fn;
+	pp->fn_err = fn_err;
+	pp->fn_exit = fn_exit;
+
+	pp->nr_processes = 0;
+	pp->all_tasks_started = 0;
+	pp->foreground_child = 0;
+	pp->slots = xcalloc(n, sizeof(*pp->slots));
+	pp->children = xcalloc(n, sizeof(*pp->children));
+	pp->pfd = xcalloc(n, sizeof(*pp->pfd));
+	pp->err = xcalloc(n, sizeof(*pp->err));
+	strbuf_init(&pp->finished_children, 0);
+
+	for (i = 0; i < n; i++) {
+		strbuf_init(&pp->err[i], 0);
+		pp->pfd[i].events = POLLIN;
+		pp->pfd[i].fd = -1;
+	}
+}
+
+static void run_processes_parallel_cleanup(struct parallel_processes *pp)
+{
+	int i;
+	for (i = 0; i < pp->max_number_processes; i++)
+		strbuf_release(&pp->err[i]);
+
+	free(pp->children);
+	free(pp->slots);
+	free(pp->pfd);
+	free(pp->err);
+	strbuf_release(&pp->finished_children);
+}
+
+static void run_processes_parallel_start_new(struct parallel_processes *pp)
+{
+	int i;
+	/* Start new processes. */
+	while (!pp->all_tasks_started
+	       && pp->nr_processes < pp->max_number_processes) {
+		for (i = 0; i < pp->max_number_processes; i++)
+			if (!pp->slots[i])
+				break; /* found an empty slot */
+		if (i == pp->max_number_processes)
+			die("BUG: bookkeeping is hard");
+
+		if (pp->fn(pp->data, &pp->children[i], &pp->err[i])) {
+			pp->all_tasks_started = 1;
+			break;
+		}
+		if (start_command(&pp->children[i]))
+			pp->fn_err(pp->data, &pp->children[i], &pp->err[i]);
+
+		unblock_fd(pp->children[i].err);
+
+		pp->nr_processes++;
+		pp->slots[i] = 1;
+		pp->pfd[i].fd = pp->children[i].err;
+	}
+}
+
+static int run_processes_parallel_buffer_stderr(struct parallel_processes *pp)
+{
+	int i;
+	i = poll(pp->pfd, pp->max_number_processes, 100);
+	if (i < 0) {
+		if (errno == EINTR)
+			/* A signal was caught; try again */
+			return -1;
+		else {
+			run_processes_parallel_cleanup(pp);
+			die_errno("poll");
+		}
+	}
+
+	/* Buffer output from all pipes. */
+	for (i = 0; i < pp->max_number_processes; i++) {
+		if (!pp->slots[i])
+			continue;
+		if (pp->pfd[i].revents & POLLIN)
+			strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0);
+		if (pp->foreground_child == i) {
+			fputs(pp->err[i].buf, stderr);
+			strbuf_reset(&pp->err[i]);
+		}
+	}
+	return 0;
+}
+
+
+static void run_processes_parallel_collect_finished(struct parallel_processes *pp)
+{
+	int i = 0;
+	pid_t pid;
+	int wait_status, code;
+	int n = pp->max_number_processes;
+	/* Collect finished child processes. */
+	while (pp->nr_processes > 0) {
+		pid = waitpid(-1, &wait_status, WNOHANG);
+		if (pid == 0)
+			return; /* no child finished */
+
+		if (pid < 0) {
+			if (errno == EINTR)
+				return; /* just try again  next time */
+			if (errno == EINVAL || errno == ECHILD)
+				die_errno("wait");
+		} else {
+			/* Find the finished child. */
+			for (i = 0; i < pp->max_number_processes; i++)
+				if (pp->slots[i] && pid == pp->children[i].pid)
+					break;
+			if (i == pp->max_number_processes)
+				/*
+				 * waitpid returned another process id
+				 * which we are not waiting for.
+				 */
+				return;
+		}
+		strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0);
+
+		if (determine_return_value(wait_status, &code, &errno,
+					   pp->children[i].argv[0]) < 0)
+			error("waitpid is confused (%s)",
+			      pp->children[i].argv[0]);
+
+		pp->fn_exit(pp->data, &pp->children[i], code);
+
+		argv_array_clear(&pp->children[i].args);
+		argv_array_clear(&pp->children[i].env_array);
+
+		pp->nr_processes--;
+		pp->slots[i] = 0;
+		pp->pfd[i].fd = -1;
+
+		if (i != pp->foreground_child) {
+			strbuf_addbuf(&pp->finished_children, &pp->err[i]);
+			strbuf_reset(&pp->err[i]);
+		} else {
+			fputs(pp->err[i].buf, stderr);
+			strbuf_reset(&pp->err[i]);
+
+			/* Output all other finished child processes */
+			fputs(pp->finished_children.buf, stderr);
+			strbuf_reset(&pp->finished_children);
+
+			/*
+			 * Pick next process to output live.
+			 * NEEDSWORK:
+			 * For now we pick it randomly by doing a round
+			 * robin. Later we may want to pick the one with
+			 * the most output or the longest or shortest
+			 * running process time.
+			 */
+			for (i = 0; i < n; i++)
+				if (pp->slots[(pp->foreground_child + i) % n])
+					break;
+			pp->foreground_child = (pp->foreground_child + i) % n;
+			fputs(pp->err[pp->foreground_child].buf, stderr);
+			strbuf_reset(&pp->err[pp->foreground_child]);
+		}
+	}
+}
+
+int run_processes_parallel(int n, void *data,
+			   get_next_task fn,
+			   handle_child_starting_failure fn_err,
+			   handle_child_return_value fn_exit)
+{
+	struct parallel_processes pp;
+	run_processes_parallel_init(&pp, n, data, fn, fn_err, fn_exit);
+
+	while (!pp.all_tasks_started || pp.nr_processes > 0) {
+		run_processes_parallel_start_new(&pp);
+		if (run_processes_parallel_buffer_stderr(&pp))
+			continue;
+		run_processes_parallel_collect_finished(&pp);
+	}
+	run_processes_parallel_cleanup(&pp);
+
+	return 0;
+}
diff --git a/run-command.h b/run-command.h
index 5b4425a..0487f71 100644
--- a/run-command.h
+++ b/run-command.h
@@ -119,4 +119,40 @@ struct async {
 int start_async(struct async *async);
 int finish_async(struct async *async);
 
+/**
+ * This callback should initialize the child process and preload the
+ * error channel. The preloading of is useful if you want to have a message
+ * printed directly before the output of the child process.
+ * You MUST set stdout_to_stderr.
+ *
+ * Return 0 if the next child is ready to run.
+ * Return != 0 if there are no more tasks to be processed.
+ */
+typedef int (*get_next_task)(void *data,
+			     struct child_process *cp,
+			     struct strbuf *err);
+
+typedef void (*handle_child_starting_failure)(void *data,
+					      struct child_process *cp,
+					      struct strbuf *err);
+
+typedef void (*handle_child_return_value)(void *data,
+					  struct child_process *cp,
+					  int result);
+
+/**
+ * Runs up to n processes at the same time. Whenever a process can be
+ * started, the callback `get_next_task` is called to obtain the data
+ * fed to the child process.
+ *
+ * The children started via this function run in parallel and their output
+ * to stderr is buffered, while one of the children will directly output
+ * to stderr.
+ */
+
+int run_processes_parallel(int n, void *data,
+			   get_next_task fn,
+			   handle_child_starting_failure,
+			   handle_child_return_value);
+
 #endif
diff --git a/strbuf.c b/strbuf.c
index cce5eed..7f866c3 100644
--- a/strbuf.c
+++ b/strbuf.c
@@ -384,6 +384,37 @@ ssize_t strbuf_read(struct strbuf *sb, int fd, size_t hint)
 	return sb->len - oldlen;
 }
 
+ssize_t strbuf_read_noblock(struct strbuf *sb, int fd, size_t hint)
+{
+	size_t oldlen = sb->len;
+	size_t oldalloc = sb->alloc;
+
+	strbuf_grow(sb, hint ? hint : 8192);
+	for (;;) {
+		ssize_t cnt;
+
+		cnt = read(fd, sb->buf + sb->len, sb->alloc - sb->len - 1);
+		if (cnt < 0) {
+			if (errno == EINTR)
+				continue;
+			if (errno == EAGAIN)
+				break;
+			if (oldalloc == 0)
+				strbuf_release(sb);
+			else
+				strbuf_setlen(sb, oldlen);
+			return -1;
+		}
+		if (!cnt)
+			break;
+		sb->len += cnt;
+		strbuf_grow(sb, 8192);
+	}
+
+	sb->buf[sb->len] = '\0';
+	return sb->len - oldlen;
+}
+
 #define STRBUF_MAXLINK (2*PATH_MAX)
 
 int strbuf_readlink(struct strbuf *sb, const char *path, size_t hint)
diff --git a/strbuf.h b/strbuf.h
index aef2794..7ea462b 100644
--- a/strbuf.h
+++ b/strbuf.h
@@ -365,6 +365,7 @@ extern size_t strbuf_fread(struct strbuf *, size_t, FILE *);
  * any partial read is undone.
  */
 extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint);
+extern ssize_t strbuf_read_noblock(struct strbuf *, int fd, size_t hint);
 
 /**
  * Read the contents of a file, specified by its path. The third argument
diff --git a/submodule.c b/submodule.c
index 1d64e57..a0e06e8 100644
--- a/submodule.c
+++ b/submodule.c
@@ -12,6 +12,7 @@
 #include "sha1-array.h"
 #include "argv-array.h"
 #include "blob.h"
+#include "thread-utils.h"
 
 static int config_fetch_recurse_submodules = RECURSE_SUBMODULES_ON_DEMAND;
 static struct string_list changed_submodule_paths;
@@ -615,37 +616,79 @@ static void calculate_changed_submodule_paths(void)
 	initialized_fetch_ref_tips = 0;
 }
 
+struct submodule_parallel_fetch {
+	int count;
+	struct argv_array args;
+	const char *work_tree;
+	const char *prefix;
+	int command_line_option;
+	int quiet;
+	int result;
+};
+#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL, NULL, 0, 0, 0}
+
+int get_next_submodule(void *data, struct child_process *cp,
+		       struct strbuf *err);
+
+void handle_submodule_fetch_start_err(void *data, struct child_process *cp, struct strbuf *err)
+{
+	struct submodule_parallel_fetch *spf = data;
+	spf->result = 1;
+}
+
+void handle_submodule_fetch_finish( void *data, struct child_process *cp, int retvalue)
+{
+	struct submodule_parallel_fetch *spf = data;
+
+	if (retvalue)
+		spf->result = 1;
+}
+
 int fetch_populated_submodules(const struct argv_array *options,
 			       const char *prefix, int command_line_option,
-			       int quiet)
+			       int quiet, int max_parallel_jobs)
 {
-	int i, result = 0;
-	struct child_process cp = CHILD_PROCESS_INIT;
-	struct argv_array argv = ARGV_ARRAY_INIT;
-	const char *work_tree = get_git_work_tree();
-	if (!work_tree)
+	int i;
+	struct submodule_parallel_fetch spf = SPF_INIT;
+	spf.work_tree = get_git_work_tree();
+	spf.command_line_option = command_line_option;
+	spf.quiet = quiet;
+	spf.prefix = prefix;
+	if (!spf.work_tree)
 		goto out;
 
 	if (read_cache() < 0)
 		die("index file corrupt");
 
-	argv_array_push(&argv, "fetch");
+	argv_array_push(&spf.args, "fetch");
 	for (i = 0; i < options->argc; i++)
-		argv_array_push(&argv, options->argv[i]);
-	argv_array_push(&argv, "--recurse-submodules-default");
+		argv_array_push(&spf.args, options->argv[i]);
+	argv_array_push(&spf.args, "--recurse-submodules-default");
 	/* default value, "--submodule-prefix" and its value are added later */
 
-	cp.env = local_repo_env;
-	cp.git_cmd = 1;
-	cp.no_stdin = 1;
-
 	calculate_changed_submodule_paths();
+	run_processes_parallel(max_parallel_jobs, &spf,
+			       get_next_submodule,
+			       handle_submodule_fetch_start_err,
+			       handle_submodule_fetch_finish);
+
+	argv_array_clear(&spf.args);
+out:
+	string_list_clear(&changed_submodule_paths, 1);
+	return spf.result;
+}
+
+int get_next_submodule(void *data, struct child_process *cp,
+		       struct strbuf *err)
+{
+	int ret = 0;
+	struct submodule_parallel_fetch *spf = data;
 
-	for (i = 0; i < active_nr; i++) {
+	for ( ; spf->count < active_nr; spf->count++) {
 		struct strbuf submodule_path = STRBUF_INIT;
 		struct strbuf submodule_git_dir = STRBUF_INIT;
 		struct strbuf submodule_prefix = STRBUF_INIT;
-		const struct cache_entry *ce = active_cache[i];
+		const struct cache_entry *ce = active_cache[spf->count];
 		const char *git_dir, *default_argv;
 		const struct submodule *submodule;
 
@@ -657,7 +700,7 @@ int fetch_populated_submodules(const struct argv_array *options,
 			submodule = submodule_from_name(null_sha1, ce->name);
 
 		default_argv = "yes";
-		if (command_line_option == RECURSE_SUBMODULES_DEFAULT) {
+		if (spf->command_line_option == RECURSE_SUBMODULES_DEFAULT) {
 			if (submodule &&
 			    submodule->fetch_recurse !=
 						RECURSE_SUBMODULES_NONE) {
@@ -680,40 +723,46 @@ int fetch_populated_submodules(const struct argv_array *options,
 					default_argv = "on-demand";
 				}
 			}
-		} else if (command_line_option == RECURSE_SUBMODULES_ON_DEMAND) {
+		} else if (spf->command_line_option == RECURSE_SUBMODULES_ON_DEMAND) {
 			if (!unsorted_string_list_lookup(&changed_submodule_paths, ce->name))
 				continue;
 			default_argv = "on-demand";
 		}
 
-		strbuf_addf(&submodule_path, "%s/%s", work_tree, ce->name);
+		strbuf_addf(&submodule_path, "%s/%s", spf->work_tree, ce->name);
 		strbuf_addf(&submodule_git_dir, "%s/.git", submodule_path.buf);
-		strbuf_addf(&submodule_prefix, "%s%s/", prefix, ce->name);
+		strbuf_addf(&submodule_prefix, "%s%s/", spf->prefix, ce->name);
 		git_dir = read_gitfile(submodule_git_dir.buf);
 		if (!git_dir)
 			git_dir = submodule_git_dir.buf;
 		if (is_directory(git_dir)) {
-			if (!quiet)
-				fprintf(stderr, "Fetching submodule %s%s\n", prefix, ce->name);
-			cp.dir = submodule_path.buf;
-			argv_array_push(&argv, default_argv);
-			argv_array_push(&argv, "--submodule-prefix");
-			argv_array_push(&argv, submodule_prefix.buf);
-			cp.argv = argv.argv;
-			if (run_command(&cp))
-				result = 1;
-			argv_array_pop(&argv);
-			argv_array_pop(&argv);
-			argv_array_pop(&argv);
+			child_process_init(cp);
+			cp->dir = strbuf_detach(&submodule_path, NULL);
+			cp->git_cmd = 1;
+			cp->no_stdout = 1;
+			cp->no_stdin = 1;
+			cp->stdout_to_stderr = 1;
+			cp->err = -1;
+			cp->env = local_repo_env;
+			if (!spf->quiet)
+				strbuf_addf(err, "Fetching submodule %s%s\n",
+					    spf->prefix, ce->name);
+			argv_array_init(&cp->args);
+			argv_array_pushv(&cp->args, spf->args.argv);
+			argv_array_push(&cp->args, default_argv);
+			argv_array_push(&cp->args, "--submodule-prefix");
+			argv_array_push(&cp->args, submodule_prefix.buf);
+			ret = 1;
 		}
 		strbuf_release(&submodule_path);
 		strbuf_release(&submodule_git_dir);
 		strbuf_release(&submodule_prefix);
+		if (ret) {
+			spf->count++;
+			return 0;
+		}
 	}
-	argv_array_clear(&argv);
-out:
-	string_list_clear(&changed_submodule_paths, 1);
-	return result;
+	return 1;
 }
 
 unsigned is_submodule_modified(const char *path, int ignore_untracked)
diff --git a/submodule.h b/submodule.h
index 5507c3d..cbc0003 100644
--- a/submodule.h
+++ b/submodule.h
@@ -31,7 +31,7 @@ void set_config_fetch_recurse_submodules(int value);
 void check_for_new_submodule_commits(unsigned char new_sha1[20]);
 int fetch_populated_submodules(const struct argv_array *options,
 			       const char *prefix, int command_line_option,
-			       int quiet);
+			       int quiet, int max_parallel_jobs);
 unsigned is_submodule_modified(const char *path, int ignore_untracked);
 int submodule_uses_gitfile(const char *path);
 int ok_to_remove_submodule(const char *path);
diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh
index 9acf628..37c89b9 100755
--- a/t/t0061-run-command.sh
+++ b/t/t0061-run-command.sh
@@ -47,4 +47,24 @@ test_expect_success POSIXPERM,SANITY 'unreadable directory in PATH' '
 	test_cmp expect actual
 '
 
+cat >expect <<-EOF
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+EOF
+
+test_expect_success 'run_command runs in parallel' '
+	test-run-command run-command-async sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
+	test_cmp expect actual
+'
+
 test_done
diff --git a/t/t5526-fetch-submodules.sh b/t/t5526-fetch-submodules.sh
index 17759b1..1b4ce69 100755
--- a/t/t5526-fetch-submodules.sh
+++ b/t/t5526-fetch-submodules.sh
@@ -71,6 +71,16 @@ test_expect_success "fetch --recurse-submodules recurses into submodules" '
 	test_i18ncmp expect.err actual.err
 '
 
+test_expect_success "fetch --recurse-submodules -j2 has the same output behaviour" '
+	add_upstream_commit &&
+	(
+		cd downstream &&
+		git fetch --recurse-submodules -j2 2>../actual.err
+	) &&
+	test_must_be_empty actual.out &&
+	test_i18ncmp expect.err actual.err
+'
+
 test_expect_success "fetch alone only fetches superproject" '
 	add_upstream_commit &&
 	(
@@ -140,6 +150,15 @@ test_expect_success "--quiet propagates to submodules" '
 	! test -s actual.err
 '
 
+test_expect_success "--quiet propagates to parallel submodules" '
+	(
+		cd downstream &&
+		git fetch --recurse-submodules -j 2 --quiet  >../actual.out 2>../actual.err
+	) &&
+	! test -s actual.out &&
+	! test -s actual.err
+'
+
 test_expect_success "--dry-run propagates to submodules" '
 	add_upstream_commit &&
 	(
diff --git a/test-run-command.c b/test-run-command.c
index 89c7de2..71fd3ca 100644
--- a/test-run-command.c
+++ b/test-run-command.c
@@ -10,9 +10,29 @@
 
 #include "git-compat-util.h"
 #include "run-command.h"
+#include "argv-array.h"
+#include "strbuf.h"
 #include <string.h>
 #include <errno.h>
 
+static int number_callbacks;
+int parallel_next(void *data,
+		  struct child_process *cp,
+		  struct strbuf *err)
+{
+	struct child_process *d = data;
+	if (number_callbacks >= 4)
+		return 1;
+
+	argv_array_pushv(&cp->args, d->argv);
+	cp->stdout_to_stderr = 1;
+	cp->no_stdin = 1;
+	cp->err = -1;
+	strbuf_addf(err, "preloaded output of a child\n");
+	number_callbacks++;
+	return 0;
+}
+
 int main(int argc, char **argv)
 {
 	struct child_process proc = CHILD_PROCESS_INIT;
@@ -30,6 +50,10 @@ int main(int argc, char **argv)
 	if (!strcmp(argv[1], "run-command"))
 		exit(run_command(&proc));
 
+	if (!strcmp(argv[1], "run-command-async"))
+		exit(run_processes_parallel(4, &proc, parallel_next,
+					 NULL, NULL));
+
 	fprintf(stderr, "check usage\n");
 	return 1;
 }
-- 
2.6.0.rc0.131.gf624c3d

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [PATCHv2] fetch submodules in parallel
  2015-09-14 21:50             ` [PATCHv2] " Stefan Beller
  2015-09-14 21:50               ` [PATCHv2] fetch: " Stefan Beller
@ 2015-09-14 22:06               ` Junio C Hamano
  1 sibling, 0 replies; 13+ messages in thread
From: Junio C Hamano @ 2015-09-14 22:06 UTC (permalink / raw
  To: Stefan Beller
  Cc: peff, git, jrnieder, johannes.schindelin, Jens.Lehmann, vlovich

Stefan Beller <sbeller@google.com> writes:

> This replaces the last patch of the "Parallel git submodule fetching"
> series. Changes:
>
> * have correct return code in submodule fetching when one submodule fails
> * use poll instead of select now
> * broke down into more smaller functions instead of one giant.
>   (I think it is an improvement, but I wouldn't be surprised if someone objects)
> * closed memory leaks
> * document the need for stdout_to_stderr
>
> I don't deem it RFC-ish any more but good to go.

I didn't say this in the previous round because it smelled like an
RFC, but for a real submission, 2/2 may be doing too many things at
once.  I suspect this is more or less "taste" thing, so I won't mind
too much as long as the reviewers are OK with it.

^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2015-09-14 22:06 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2015-09-11 23:09 [RFC PATCHv1 0/2] Parallel git submodule fetching Stefan Beller
2015-09-11 23:09 ` [PATCH 1/2] Sending "Fetching submodule <foo>" output to stderr Stefan Beller
2015-09-11 23:09 ` [PATCH 2/2] fetch: fetch submodules in parallel Stefan Beller
2015-09-12 19:11   ` Junio C Hamano
2015-09-14 16:46     ` Stefan Beller
2015-09-14 17:17       ` Jeff King
2015-09-14 17:47         ` Stefan Beller
2015-09-14 17:55         ` Jonathan Nieder
2015-09-14 18:07           ` Jeff King
2015-09-14 21:50             ` [PATCHv2] " Stefan Beller
2015-09-14 21:50               ` [PATCHv2] fetch: " Stefan Beller
2015-09-14 22:06               ` [PATCHv2] " Junio C Hamano
2015-09-14 17:56         ` [PATCH 2/2] fetch: " Junio C Hamano

Code repositories for project(s) associated with this public inbox

	https://80x24.org/mirrors/git.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).