git@vger.kernel.org mailing list mirror (one of many)
 help / color / mirror / code / Atom feed
* [PATCHv6 0/8] fetch submodules in parallel
@ 2015-10-01  1:54 Stefan Beller
  2015-10-01  1:54 ` [PATCHv6 1/8] submodule.c: write "Fetching submodule <foo>" to stderr Stefan Beller
                   ` (9 more replies)
  0 siblings, 10 replies; 12+ messages in thread
From: Stefan Beller @ 2015-10-01  1:54 UTC (permalink / raw)
  To: gitster, git
  Cc: Stefan Beller, ramsay, jacob.keller, peff, jrnieder,
	johannes.schindelin, Jens.Lehmann, ericsunshine

This replaces sb/submodule-parallel-fetch once again.
Changes are only in patch 5,6,7
(5: reverse popping, 6: see below, 7: adapt to changes of 6).

Junio wrote:
> > +             if (pp->return_value(pp->data, &pp->children[i].process,
> > +                                  &pp->children[i].err, code))
> at this point, code can be uninitialized if we took the last "is
> confused" arm of the if/elseif cascade.

It's fixed in the reroll.

sigchain_pop_common reversed the popping.

When I started an office discussion with Jonathan about how to best implement
the next step ("git submodule update" using the parallel processing machine),
I fixes some nits and also some major spots:

* The order of the arguments for the callbacks (Generally the callback cookie
  comes last and is called `cb` and not `data`)
  
* renamed return_value_fn to task_finished_fn
  
* Add another callback cookie for task specific things. This will help in the
  rewrite of `git submodule update` as there are steps to be done after the
  some processes are done using the parallel engine. So we want to be able
  to remember specific children or tag information on them instead parsing the
  cp->argv.

* the main loop of the parallel processing was first adapted to Junios suggestion,
  but Jonathan pointed out more improvements.  We can get rid of `no_more_task`
  completely as `if (!pp->nr_processes)` as the exit condition is sufficient.
  (pp->nr_processes is modified only when starting or reaping a child, so we will
  capture the whole output of each subprocess even in case of a quick shutdown)

* even more accurate documentation

Jonathan Nieder (1):
  submodule.c: write "Fetching submodule <foo>" to stderr

Stefan Beller (7):
  xread: poll on non blocking fds
  xread_nonblock: add functionality to read from fds without blocking
  strbuf: add strbuf_read_once to read without blocking
  sigchain: add command to pop all common signals
  run-command: add an asynchronous parallel child processor
  fetch_populated_submodules: use new parallel job processing
  submodules: allow parallel fetching, add tests and documentation

 Documentation/fetch-options.txt |   7 +
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 +
 git-compat-util.h               |   1 +
 run-command.c                   | 350 ++++++++++++++++++++++++++++++++++++++++
 run-command.h                   |  78 +++++++++
 sigchain.c                      |   9 ++
 sigchain.h                      |   1 +
 strbuf.c                        |  11 ++
 strbuf.h                        |   9 ++
 submodule.c                     | 129 +++++++++++----
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  20 +++
 t/t5526-fetch-submodules.sh     |  70 +++++---
 test-run-command.c              |  25 +++
 wrapper.c                       |  35 +++-
 16 files changed, 695 insertions(+), 64 deletions(-)
diff --git a/run-command.c b/run-command.c
index df84985..28048a7 100644
--- a/run-command.c
+++ b/run-command.c
@@ -863,12 +863,13 @@ struct parallel_processes {
 
 	get_next_task_fn get_next_task;
 	start_failure_fn start_failure;
-	return_value_fn return_value;
+	task_finished_fn task_finished;
 
 	struct {
 		unsigned in_use : 1;
 		struct child_process process;
 		struct strbuf err;
+		void *data;
 	} *children;
 	/*
 	 * The struct pollfd is logically part of *children,
@@ -882,9 +883,10 @@ struct parallel_processes {
 	struct strbuf buffered_output; /* of finished children */
 } parallel_processes_struct;
 
-static int default_start_failure(void *data,
-				 struct child_process *cp,
-				 struct strbuf *err)
+static int default_start_failure(struct child_process *cp,
+				 struct strbuf *err,
+				 void *pp_cb,
+				 void *pp_task_cb)
 {
 	int i;
 
@@ -895,10 +897,11 @@ static int default_start_failure(void *data,
 	return 0;
 }
 
-static int default_return_value(void *data,
-				struct child_process *cp,
-				struct strbuf *err,
-				int result)
+static int default_task_finished(int result,
+				 struct child_process *cp,
+				 struct strbuf *err,
+				 void *pp_cb,
+				 void *pp_task_cb)
 {
 	int i;
 
@@ -930,10 +933,11 @@ static void handle_children_on_signal(int signo)
 	raise(signo);
 }
 
-static struct parallel_processes *pp_init(int n, void *data,
+static struct parallel_processes *pp_init(int n,
 					  get_next_task_fn get_next_task,
 					  start_failure_fn start_failure,
-					  return_value_fn return_value)
+					  task_finished_fn task_finished,
+					  void *data)
 {
 	int i;
 	struct parallel_processes *pp = &parallel_processes_struct;
@@ -948,7 +952,7 @@ static struct parallel_processes *pp_init(int n, void *data,
 	pp->get_next_task = get_next_task;
 
 	pp->start_failure = start_failure ? start_failure : default_start_failure;
-	pp->return_value = return_value ? return_value : default_return_value;
+	pp->task_finished = task_finished ? task_finished : default_task_finished;
 
 	pp->nr_processes = 0;
 	pp->output_owner = 0;
@@ -1006,15 +1010,17 @@ static int pp_start_one(struct parallel_processes *pp)
 	if (i == pp->max_processes)
 		die("BUG: bookkeeping is hard");
 
-	if (!pp->get_next_task(pp->data,
+	if (!pp->get_next_task(&pp->children[i].data,
 			       &pp->children[i].process,
-			       &pp->children[i].err))
+			       &pp->children[i].err,
+			       pp->data))
 		return 1;
 
 	if (start_command(&pp->children[i].process)) {
-		int code = pp->start_failure(pp->data,
-					     &pp->children[i].process,
-					     &pp->children[i].err);
+		int code = pp->start_failure(&pp->children[i].process,
+					     &pp->children[i].err,
+					     pp->data,
+					     &pp->children[i].data);
 		strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
 		strbuf_reset(&pp->children[i].err);
 		return code ? -1 : 1;
@@ -1110,14 +1116,16 @@ static int pp_collect_finished(struct parallel_processes *pp)
 				code = -1;
 				errno = ENOENT;
 			}
-		} else
+		} else {
 			strbuf_addf(&pp->children[i].err,
 				    "waitpid is confused (%s)",
 				    pp->children[i].process.argv[0]);
+			code = -1;
+		}
 
-
-		if (pp->return_value(pp->data, &pp->children[i].process,
-				     &pp->children[i].err, code))
+		if (pp->task_finished(code, &pp->children[i].process,
+				      &pp->children[i].err, pp->data,
+				      &pp->children[i].data))
 			result = 1;
 
 		argv_array_clear(&pp->children[i].process.args);
@@ -1155,45 +1163,39 @@ static int pp_collect_finished(struct parallel_processes *pp)
 	return result;
 }
 
-int run_processes_parallel(int n, void *data,
+int run_processes_parallel(int n,
 			   get_next_task_fn get_next_task,
 			   start_failure_fn start_failure,
-			   return_value_fn return_value)
+			   task_finished_fn task_finished,
+			   void *pp_cb)
 {
-	int no_more_task = 0;
+	int i;
+	int output_timeout = 100;
+	int spawn_cap = 4;
 	struct parallel_processes *pp;
 
-	pp = pp_init(n, data, get_next_task, start_failure, return_value);
+	pp = pp_init(n, get_next_task, start_failure, task_finished, pp_cb);
 	while (1) {
-		int i;
-		int output_timeout = 100;
-		int spawn_cap = 4;
-
-		if (!no_more_task) {
-			for (i = 0; i < spawn_cap; i++) {
-				int code;
-				if (pp->nr_processes == pp->max_processes)
-					break;
-
-				code = pp_start_one(pp);
-				if (!code)
-					continue;
-				if (code < 0) {
-					pp->shutdown = 1;
-					kill_children(pp, SIGTERM);
-				}
-				no_more_task = 1;
-				break;
+		for (i = 0;
+		    i < spawn_cap && !pp->shutdown &&
+		    pp->nr_processes < pp->max_processes;
+		    i++) {
+			int code = pp_start_one(pp);
+			if (!code)
+				continue;
+			if (code < 0) {
+				pp->shutdown = 1;
+				kill_children(pp, SIGTERM);
 			}
+			break;
 		}
-		if (no_more_task && !pp->nr_processes)
+		if (!pp->nr_processes)
 			break;
 		pp_buffer_stderr(pp, output_timeout);
 		pp_output(pp);
 		if (pp_collect_finished(pp)) {
 			kill_children(pp, SIGTERM);
 			pp->shutdown = 1;
-			no_more_task = 1;
 		}
 	}
 
diff --git a/run-command.h b/run-command.h
index 1179cb0..c24aa54 100644
--- a/run-command.h
+++ b/run-command.h
@@ -121,16 +121,24 @@ int finish_async(struct async *async);
 
 /**
  * This callback should initialize the child process and preload the
- * error channel. The preloading of is useful if you want to have a message
- * printed directly before the output of the child process.
+ * error channel if desired. The preloading of is useful if you want to
+ * have a message printed directly before the output of the child process.
+ * pp_cb is the callback cookie as passed to run_processes_parallel.
+ * You can store a child process specific callback cookie in pp_task_cb.
+ *
  * You MUST set stdout_to_stderr.
  *
+ * Even after returning 0 to indicate that there are no more processes,
+ * this function will be called again until there are no more running
+ * child processes.
+ *
  * Return 1 if the next child is ready to run.
- * Return 0 if there are no more tasks to be processed.
+ * Return 0 if there are currently no more tasks to be processed.
  */
-typedef int (*get_next_task_fn)(void *data,
+typedef int (*get_next_task_fn)(void **pp_task_cb,
 				struct child_process *cp,
-				struct strbuf *err);
+				struct strbuf *err,
+				void *pp_cb);
 
 /**
  * This callback is called whenever there are problems starting
@@ -140,28 +148,35 @@ typedef int (*get_next_task_fn)(void *data,
  * message to the strbuf err instead, which will be printed without
  * messing up the output of the other parallel processes.
  *
+ * pp_cb is the callback cookie as passed into run_processes_parallel,
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ *
  * Return 0 to continue the parallel processing. To abort gracefully,
  * return non zero.
  */
-typedef int (*start_failure_fn)(void *data,
-				struct child_process *cp,
-				struct strbuf *err);
+typedef int (*start_failure_fn)(struct child_process *cp,
+				struct strbuf *err,
+				void *pp_cb,
+				void *pp_task_cb);
 
 /**
- * This callback is called on every there are problems starting
- * a new process.
+ * This callback is called on every child process that finished processing.
  *
  * You must not write to stdout or stderr in this function. Add your
  * message to the strbuf err instead, which will be printed without
  * messing up the output of the other parallel processes.
  *
+ * pp_cb is the callback cookie as passed into run_processes_parallel,
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ *
  * Return 0 to continue the parallel processing. To abort gracefully,
  * return non zero.
  */
-typedef int (*return_value_fn)(void *data,
-			       struct child_process *cp,
-			       struct strbuf *err,
-			       int result);
+typedef int (*task_finished_fn)(int result,
+				struct child_process *cp,
+				struct strbuf *err,
+				void *pp_cb,
+				void *pp_task_cb);
 
 /**
  * Runs up to n processes at the same time. Whenever a process can be
@@ -176,10 +191,10 @@ typedef int (*return_value_fn)(void *data,
  * will be used. The default handlers will print an error message on
  * error without issuing an emergency stop.
  */
-
-int run_processes_parallel(int n, void *data,
+int run_processes_parallel(int n,
 			   get_next_task_fn,
 			   start_failure_fn,
-			   return_value_fn);
+			   task_finished_fn,
+			   void *pp_cb);
 
 #endif
diff --git a/sigchain.c b/sigchain.c
index 9262307..2ac43bb 100644
--- a/sigchain.c
+++ b/sigchain.c
@@ -53,9 +53,9 @@ void sigchain_push_common(sigchain_fun f)
 
 void sigchain_pop_common(void)
 {
-	sigchain_pop(SIGINT);
-	sigchain_pop(SIGHUP);
-	sigchain_pop(SIGTERM);
-	sigchain_pop(SIGQUIT);
 	sigchain_pop(SIGPIPE);
+	sigchain_pop(SIGQUIT);
+	sigchain_pop(SIGTERM);
+	sigchain_pop(SIGHUP);
+	sigchain_pop(SIGINT);
 }
diff --git a/submodule.c b/submodule.c
index 7ab89f4..cf8bf5d 100644
--- a/submodule.c
+++ b/submodule.c
@@ -627,23 +627,24 @@ struct submodule_parallel_fetch {
 };
 #define SPF_INIT {0, ARGV_ARRAY_INIT, NULL, NULL, 0, 0, 0}
 
-int get_next_submodule(void *data, struct child_process *cp,
-		       struct strbuf *err);
+static int get_next_submodule(void **task_cb, struct child_process *cp,
+			      struct strbuf *err, void *data);
 
-static int fetch_start_failure(void *data, struct child_process *cp,
-			       struct strbuf *err)
+static int fetch_start_failure(struct child_process *cp,
+			       struct strbuf *err,
+			       void *cb, void *task_cb)
 {
-	struct submodule_parallel_fetch *spf = data;
+	struct submodule_parallel_fetch *spf = cb;
 
 	spf->result = 1;
 
 	return 0;
 }
 
-static int fetch_finish(void *data, struct child_process *cp,
-			struct strbuf *err, int retvalue)
+static int fetch_finish(int retvalue, struct child_process *cp,
+			struct strbuf *err, void *cb, void *task_cb)
 {
-	struct submodule_parallel_fetch *spf = data;
+	struct submodule_parallel_fetch *spf = cb;
 
 	if (retvalue)
 		spf->result = 1;
@@ -676,10 +677,11 @@ int fetch_populated_submodules(const struct argv_array *options,
 	/* default value, "--submodule-prefix" and its value are added later */
 
 	calculate_changed_submodule_paths();
-	run_processes_parallel(max_parallel_jobs, &spf,
+	run_processes_parallel(max_parallel_jobs,
 			       get_next_submodule,
 			       fetch_start_failure,
-			       fetch_finish);
+			       fetch_finish,
+			       &spf);
 
 	argv_array_clear(&spf.args);
 out:
@@ -687,8 +689,8 @@ out:
 	return spf.result;
 }
 
-int get_next_submodule(void *data, struct child_process *cp,
-		       struct strbuf *err)
+static int get_next_submodule(void **task_cb, struct child_process *cp,
+			      struct strbuf *err, void *data)
 {
 	int ret = 0;
 	struct submodule_parallel_fetch *spf = data;
diff --git a/test-run-command.c b/test-run-command.c
index 2555791..699d9e9 100644
--- a/test-run-command.c
+++ b/test-run-command.c
@@ -16,11 +16,12 @@
 #include <errno.h>
 
 static int number_callbacks;
-static int parallel_next(void *data,
+static int parallel_next(void** task_cb,
 			 struct child_process *cp,
-			 struct strbuf *err)
+			 struct strbuf *err,
+			 void *cb)
 {
-	struct child_process *d = data;
+	struct child_process *d = cb;
 	if (number_callbacks >= 4)
 		return 0;
 
@@ -51,8 +52,8 @@ int main(int argc, char **argv)
 		exit(run_command(&proc));
 
 	if (!strcmp(argv[1], "run-command-parallel-4"))
-		exit(run_processes_parallel(4, &proc, parallel_next,
-					 NULL, NULL));
+		exit(run_processes_parallel(4, parallel_next,
+					    NULL, NULL, &proc));
 
 	fprintf(stderr, "check usage\n");
 	return 1;

-- 
2.5.0.275.gf20166c.dirty

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCHv6 1/8] submodule.c: write "Fetching submodule <foo>" to stderr
  2015-10-01  1:54 [PATCHv6 0/8] fetch submodules in parallel Stefan Beller
@ 2015-10-01  1:54 ` Stefan Beller
  2015-10-01  1:54 ` [PATCHv6 2/8] xread: poll on non blocking fds Stefan Beller
                   ` (8 subsequent siblings)
  9 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-10-01  1:54 UTC (permalink / raw)
  To: gitster, git
  Cc: Jonathan Nieder, ramsay, jacob.keller, peff, johannes.schindelin,
	Jens.Lehmann, ericsunshine, Stefan Beller

From: Jonathan Nieder <jrnieder@gmail.com>

The "Pushing submodule <foo>" progress output correctly goes to
stderr, but "Fetching submodule <foo>" is going to stdout by
mistake.  Fix it to write to stderr.

Noticed while trying to implement a parallel submodule fetch.  When
this particular output line went to a different file descriptor, it
was buffered separately, resulting in wrongly interleaved output if
we copied it to the terminal naively.

Signed-off-by: Jonathan Nieder <jrnieder@gmail.com>
Signed-off-by: Stefan Beller <sbeller@google.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
---
 submodule.c                 |  2 +-
 t/t5526-fetch-submodules.sh | 51 +++++++++++++++++++++++----------------------
 2 files changed, 27 insertions(+), 26 deletions(-)

diff --git a/submodule.c b/submodule.c
index 9fcc86f..1d64e57 100644
--- a/submodule.c
+++ b/submodule.c
@@ -694,7 +694,7 @@ int fetch_populated_submodules(const struct argv_array *options,
 			git_dir = submodule_git_dir.buf;
 		if (is_directory(git_dir)) {
 			if (!quiet)
-				printf("Fetching submodule %s%s\n", prefix, ce->name);
+				fprintf(stderr, "Fetching submodule %s%s\n", prefix, ce->name);
 			cp.dir = submodule_path.buf;
 			argv_array_push(&argv, default_argv);
 			argv_array_push(&argv, "--submodule-prefix");
diff --git a/t/t5526-fetch-submodules.sh b/t/t5526-fetch-submodules.sh
index a4532b0..17759b1 100755
--- a/t/t5526-fetch-submodules.sh
+++ b/t/t5526-fetch-submodules.sh
@@ -16,7 +16,8 @@ add_upstream_commit() {
 		git add subfile &&
 		git commit -m new subfile &&
 		head2=$(git rev-parse --short HEAD) &&
-		echo "From $pwd/submodule" > ../expect.err &&
+		echo "Fetching submodule submodule" > ../expect.err &&
+		echo "From $pwd/submodule" >> ../expect.err &&
 		echo "   $head1..$head2  master     -> origin/master" >> ../expect.err
 	) &&
 	(
@@ -27,6 +28,7 @@ add_upstream_commit() {
 		git add deepsubfile &&
 		git commit -m new deepsubfile &&
 		head2=$(git rev-parse --short HEAD) &&
+		echo "Fetching submodule submodule/subdir/deepsubmodule" >> ../expect.err
 		echo "From $pwd/deepsubmodule" >> ../expect.err &&
 		echo "   $head1..$head2  master     -> origin/master" >> ../expect.err
 	)
@@ -56,9 +58,7 @@ test_expect_success setup '
 	(
 		cd downstream &&
 		git submodule update --init --recursive
-	) &&
-	echo "Fetching submodule submodule" > expect.out &&
-	echo "Fetching submodule submodule/subdir/deepsubmodule" >> expect.out
+	)
 '
 
 test_expect_success "fetch --recurse-submodules recurses into submodules" '
@@ -67,7 +67,7 @@ test_expect_success "fetch --recurse-submodules recurses into submodules" '
 		cd downstream &&
 		git fetch --recurse-submodules >../actual.out 2>../actual.err
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -96,7 +96,7 @@ test_expect_success "using fetchRecurseSubmodules=true in .gitmodules recurses i
 		git config -f .gitmodules submodule.submodule.fetchRecurseSubmodules true &&
 		git fetch >../actual.out 2>../actual.err
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -127,7 +127,7 @@ test_expect_success "--recurse-submodules overrides fetchRecurseSubmodules setti
 		git config --unset -f .gitmodules submodule.submodule.fetchRecurseSubmodules &&
 		git config --unset submodule.submodule.fetchRecurseSubmodules
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -146,7 +146,7 @@ test_expect_success "--dry-run propagates to submodules" '
 		cd downstream &&
 		git fetch --recurse-submodules --dry-run >../actual.out 2>../actual.err
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -155,7 +155,7 @@ test_expect_success "Without --dry-run propagates to submodules" '
 		cd downstream &&
 		git fetch --recurse-submodules >../actual.out 2>../actual.err
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -166,7 +166,7 @@ test_expect_success "recurseSubmodules=true propagates into submodules" '
 		git config fetch.recurseSubmodules true
 		git fetch >../actual.out 2>../actual.err
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -180,7 +180,7 @@ test_expect_success "--recurse-submodules overrides config in submodule" '
 		) &&
 		git fetch --recurse-submodules >../actual.out 2>../actual.err
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -214,16 +214,15 @@ test_expect_success "Recursion stops when no new submodule commits are fetched"
 	git add submodule &&
 	git commit -m "new submodule" &&
 	head2=$(git rev-parse --short HEAD) &&
-	echo "Fetching submodule submodule" > expect.out.sub &&
 	echo "From $pwd/." > expect.err.sub &&
 	echo "   $head1..$head2  master     -> origin/master" >>expect.err.sub &&
-	head -2 expect.err >> expect.err.sub &&
+	head -3 expect.err >> expect.err.sub &&
 	(
 		cd downstream &&
 		git fetch >../actual.out 2>../actual.err
 	) &&
 	test_i18ncmp expect.err.sub actual.err &&
-	test_i18ncmp expect.out.sub actual.out
+	test_must_be_empty actual.out
 '
 
 test_expect_success "Recursion doesn't happen when new superproject commits don't change any submodules" '
@@ -269,7 +268,7 @@ test_expect_success "Recursion picks up config in submodule" '
 		)
 	) &&
 	test_i18ncmp expect.err.sub actual.err &&
-	test_i18ncmp expect.out actual.out
+	test_must_be_empty actual.out
 '
 
 test_expect_success "Recursion picks up all submodules when necessary" '
@@ -285,7 +284,8 @@ test_expect_success "Recursion picks up all submodules when necessary" '
 		git add subdir/deepsubmodule &&
 		git commit -m "new deepsubmodule"
 		head2=$(git rev-parse --short HEAD) &&
-		echo "From $pwd/submodule" > ../expect.err.sub &&
+		echo "Fetching submodule submodule" > ../expect.err.sub &&
+		echo "From $pwd/submodule" >> ../expect.err.sub &&
 		echo "   $head1..$head2  master     -> origin/master" >> ../expect.err.sub
 	) &&
 	head1=$(git rev-parse --short HEAD) &&
@@ -295,13 +295,13 @@ test_expect_success "Recursion picks up all submodules when necessary" '
 	echo "From $pwd/." > expect.err.2 &&
 	echo "   $head1..$head2  master     -> origin/master" >> expect.err.2 &&
 	cat expect.err.sub >> expect.err.2 &&
-	tail -2 expect.err >> expect.err.2 &&
+	tail -3 expect.err >> expect.err.2 &&
 	(
 		cd downstream &&
 		git fetch >../actual.out 2>../actual.err
 	) &&
 	test_i18ncmp expect.err.2 actual.err &&
-	test_i18ncmp expect.out actual.out
+	test_must_be_empty actual.out
 '
 
 test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no new commits are fetched in the superproject (and ignores config)" '
@@ -317,7 +317,8 @@ test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no ne
 		git add subdir/deepsubmodule &&
 		git commit -m "new deepsubmodule" &&
 		head2=$(git rev-parse --short HEAD) &&
-		echo "From $pwd/submodule" > ../expect.err.sub &&
+		echo Fetching submodule submodule > ../expect.err.sub &&
+		echo "From $pwd/submodule" >> ../expect.err.sub &&
 		echo "   $head1..$head2  master     -> origin/master" >> ../expect.err.sub
 	) &&
 	(
@@ -335,7 +336,7 @@ test_expect_success "'--recurse-submodules=on-demand' recurses as deep as necess
 	git add submodule &&
 	git commit -m "new submodule" &&
 	head2=$(git rev-parse --short HEAD) &&
-	tail -2 expect.err > expect.err.deepsub &&
+	tail -3 expect.err > expect.err.deepsub &&
 	echo "From $pwd/." > expect.err &&
 	echo "   $head1..$head2  master     -> origin/master" >>expect.err &&
 	cat expect.err.sub >> expect.err &&
@@ -354,7 +355,7 @@ test_expect_success "'--recurse-submodules=on-demand' recurses as deep as necess
 			git config --unset -f .gitmodules submodule.subdir/deepsubmodule.fetchRecursive
 		)
 	) &&
-	test_i18ncmp expect.out actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err actual.err
 '
 
@@ -388,7 +389,7 @@ test_expect_success "'fetch.recurseSubmodules=on-demand' overrides global config
 	head2=$(git rev-parse --short HEAD) &&
 	echo "From $pwd/." > expect.err.2 &&
 	echo "   $head1..$head2  master     -> origin/master" >>expect.err.2 &&
-	head -2 expect.err >> expect.err.2 &&
+	head -3 expect.err >> expect.err.2 &&
 	(
 		cd downstream &&
 		git config fetch.recurseSubmodules on-demand &&
@@ -399,7 +400,7 @@ test_expect_success "'fetch.recurseSubmodules=on-demand' overrides global config
 		cd downstream &&
 		git config --unset fetch.recurseSubmodules
 	) &&
-	test_i18ncmp expect.out.sub actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err.2 actual.err
 '
 
@@ -416,7 +417,7 @@ test_expect_success "'submodule.<sub>.fetchRecurseSubmodules=on-demand' override
 	head2=$(git rev-parse --short HEAD) &&
 	echo "From $pwd/." > expect.err.2 &&
 	echo "   $head1..$head2  master     -> origin/master" >>expect.err.2 &&
-	head -2 expect.err >> expect.err.2 &&
+	head -3 expect.err >> expect.err.2 &&
 	(
 		cd downstream &&
 		git config submodule.submodule.fetchRecurseSubmodules on-demand &&
@@ -427,7 +428,7 @@ test_expect_success "'submodule.<sub>.fetchRecurseSubmodules=on-demand' override
 		cd downstream &&
 		git config --unset submodule.submodule.fetchRecurseSubmodules
 	) &&
-	test_i18ncmp expect.out.sub actual.out &&
+	test_must_be_empty actual.out &&
 	test_i18ncmp expect.err.2 actual.err
 '
 
-- 
2.5.0.275.gf20166c.dirty

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCHv6 2/8] xread: poll on non blocking fds
  2015-10-01  1:54 [PATCHv6 0/8] fetch submodules in parallel Stefan Beller
  2015-10-01  1:54 ` [PATCHv6 1/8] submodule.c: write "Fetching submodule <foo>" to stderr Stefan Beller
@ 2015-10-01  1:54 ` Stefan Beller
  2015-10-01  1:54 ` [PATCHv6 3/8] xread_nonblock: add functionality to read from fds without blocking Stefan Beller
                   ` (7 subsequent siblings)
  9 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-10-01  1:54 UTC (permalink / raw)
  To: gitster, git
  Cc: Stefan Beller, ramsay, jacob.keller, peff, jrnieder,
	johannes.schindelin, Jens.Lehmann, ericsunshine

>From the man page:
EAGAIN The file descriptor fd refers to a file other than a socket
       and has been marked nonblocking (O_NONBLOCK), and the read
       would block.

EAGAIN or EWOULDBLOCK
       The file descriptor fd refers to a socket and has been marked
       nonblocking (O_NONBLOCK), and the read would block.  POSIX.1-2001
       allows either error to be returned for this case, and does not
       require these constants to have the same value, so a portable
       application should check for both possibilities.

If we get an EAGAIN or EWOULDBLOCK the fd must have set O_NONBLOCK.
As the intent of xread is to read as much as possible either until the
fd is EOF or an actual error occurs, we can ease the feeder of the fd
by not spinning the whole time, but rather wait for it politely by not
busy waiting.

We should not care if the call to poll failed, as we're in an infinite
loop and can only get out with the correct read().

Signed-off-by: Stefan Beller <sbeller@google.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
---
 wrapper.c | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/wrapper.c b/wrapper.c
index ff49807..5517928 100644
--- a/wrapper.c
+++ b/wrapper.c
@@ -201,8 +201,17 @@ ssize_t xread(int fd, void *buf, size_t len)
 	    len = MAX_IO_SIZE;
 	while (1) {
 		nr = read(fd, buf, len);
-		if ((nr < 0) && (errno == EAGAIN || errno == EINTR))
-			continue;
+		if (nr < 0) {
+			if (errno == EINTR)
+				continue;
+			if (errno == EAGAIN || errno == EWOULDBLOCK) {
+				struct pollfd pfd;
+				pfd.events = POLLIN;
+				pfd.fd = fd;
+				/* We deliberately ignore the return value */
+				poll(&pfd, 1, -1);
+			}
+		}
 		return nr;
 	}
 }
-- 
2.5.0.275.gf20166c.dirty

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCHv6 3/8] xread_nonblock: add functionality to read from fds without blocking
  2015-10-01  1:54 [PATCHv6 0/8] fetch submodules in parallel Stefan Beller
  2015-10-01  1:54 ` [PATCHv6 1/8] submodule.c: write "Fetching submodule <foo>" to stderr Stefan Beller
  2015-10-01  1:54 ` [PATCHv6 2/8] xread: poll on non blocking fds Stefan Beller
@ 2015-10-01  1:54 ` Stefan Beller
  2015-10-01  1:54 ` [PATCHv6 4/8] strbuf: add strbuf_read_once to read " Stefan Beller
                   ` (6 subsequent siblings)
  9 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-10-01  1:54 UTC (permalink / raw)
  To: gitster, git
  Cc: Stefan Beller, ramsay, jacob.keller, peff, jrnieder,
	johannes.schindelin, Jens.Lehmann, ericsunshine

Provide a wrapper to read(), similar to xread(), that restarts on
EINTR but not EAGAIN (or EWOULDBLOCK). This enables the caller to
handle polling itself, possibly polling multiple sockets or performing
some other action.

Helped-by: Jacob Keller <jacob.keller@gmail.com>
Helped-by: Jeff King <peff@peff.net>,
Helped-by: Junio C Hamano <gitster@pobox.com>
Signed-off-by: Stefan Beller <sbeller@google.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
---
 git-compat-util.h |  1 +
 wrapper.c         | 22 ++++++++++++++++++++++
 2 files changed, 23 insertions(+)

diff --git a/git-compat-util.h b/git-compat-util.h
index c6d391f..9ccea85 100644
--- a/git-compat-util.h
+++ b/git-compat-util.h
@@ -718,6 +718,7 @@ extern void *xcalloc(size_t nmemb, size_t size);
 extern void *xmmap(void *start, size_t length, int prot, int flags, int fd, off_t offset);
 extern void *xmmap_gently(void *start, size_t length, int prot, int flags, int fd, off_t offset);
 extern ssize_t xread(int fd, void *buf, size_t len);
+extern ssize_t xread_nonblock(int fd, void *buf, size_t len);
 extern ssize_t xwrite(int fd, const void *buf, size_t len);
 extern ssize_t xpread(int fd, void *buf, size_t len, off_t offset);
 extern int xdup(int fd);
diff --git a/wrapper.c b/wrapper.c
index 5517928..41a21e1 100644
--- a/wrapper.c
+++ b/wrapper.c
@@ -217,6 +217,28 @@ ssize_t xread(int fd, void *buf, size_t len)
 }
 
 /*
+ * xread_nonblock() is the same a read(), but it automatically restarts read()
+ * interrupted operations (EINTR). xread_nonblock() DOES NOT GUARANTEE that
+ * "len" bytes is read. EWOULDBLOCK is turned into EAGAIN.
+ */
+ssize_t xread_nonblock(int fd, void *buf, size_t len)
+{
+	ssize_t nr;
+	if (len > MAX_IO_SIZE)
+		len = MAX_IO_SIZE;
+	while (1) {
+		nr = read(fd, buf, len);
+		if (nr < 0) {
+			if (errno == EINTR)
+				continue;
+			if (errno == EWOULDBLOCK)
+				errno = EAGAIN;
+		}
+		return nr;
+	}
+}
+
+/*
  * xwrite() is the same a write(), but it automatically restarts write()
  * operations with a recoverable error (EAGAIN and EINTR). xwrite() DOES NOT
  * GUARANTEE that "len" bytes is written even if the operation is successful.
-- 
2.5.0.275.gf20166c.dirty

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCHv6 4/8] strbuf: add strbuf_read_once to read without blocking
  2015-10-01  1:54 [PATCHv6 0/8] fetch submodules in parallel Stefan Beller
                   ` (2 preceding siblings ...)
  2015-10-01  1:54 ` [PATCHv6 3/8] xread_nonblock: add functionality to read from fds without blocking Stefan Beller
@ 2015-10-01  1:54 ` Stefan Beller
  2015-10-01  1:54 ` [PATCHv6 5/8] sigchain: add command to pop all common signals Stefan Beller
                   ` (5 subsequent siblings)
  9 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-10-01  1:54 UTC (permalink / raw)
  To: gitster, git
  Cc: Stefan Beller, ramsay, jacob.keller, peff, jrnieder,
	johannes.schindelin, Jens.Lehmann, ericsunshine

The new call will read from a file descriptor into a strbuf once. The
underlying call xread_nonblock is meant to execute without blocking if
the file descriptor is set to O_NONBLOCK. It is a bug to call
strbuf_read_once on a file descriptor which would block.

Signed-off-by: Stefan Beller <sbeller@google.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
---
 strbuf.c | 11 +++++++++++
 strbuf.h |  9 +++++++++
 2 files changed, 20 insertions(+)

diff --git a/strbuf.c b/strbuf.c
index cce5eed..35e71b8 100644
--- a/strbuf.c
+++ b/strbuf.c
@@ -384,6 +384,17 @@ ssize_t strbuf_read(struct strbuf *sb, int fd, size_t hint)
 	return sb->len - oldlen;
 }
 
+ssize_t strbuf_read_once(struct strbuf *sb, int fd, size_t hint)
+{
+	ssize_t cnt;
+
+	strbuf_grow(sb, hint ? hint : 8192);
+	cnt = xread_nonblock(fd, sb->buf + sb->len, sb->alloc - sb->len - 1);
+	if (cnt > 0)
+		strbuf_setlen(sb, sb->len + cnt);
+	return cnt;
+}
+
 #define STRBUF_MAXLINK (2*PATH_MAX)
 
 int strbuf_readlink(struct strbuf *sb, const char *path, size_t hint)
diff --git a/strbuf.h b/strbuf.h
index aef2794..ea69665 100644
--- a/strbuf.h
+++ b/strbuf.h
@@ -367,6 +367,15 @@ extern size_t strbuf_fread(struct strbuf *, size_t, FILE *);
 extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint);
 
 /**
+ * Read from a file descriptor that is marked as O_NONBLOCK without
+ * blocking.  Returns the number of new bytes appended to the sb.
+ * Negative return value signals there was an error returned from
+ * underlying read(2), in which case the caller should check errno.
+ * e.g. errno == EAGAIN when the read may have blocked.
+ */
+extern ssize_t strbuf_read_once(struct strbuf *, int fd, size_t hint);
+
+/**
  * Read the contents of a file, specified by its path. The third argument
  * can be used to give a hint about the file size, to avoid reallocs.
  */
-- 
2.5.0.275.gf20166c.dirty

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCHv6 5/8] sigchain: add command to pop all common signals
  2015-10-01  1:54 [PATCHv6 0/8] fetch submodules in parallel Stefan Beller
                   ` (3 preceding siblings ...)
  2015-10-01  1:54 ` [PATCHv6 4/8] strbuf: add strbuf_read_once to read " Stefan Beller
@ 2015-10-01  1:54 ` Stefan Beller
  2015-10-01  1:54 ` [PATCHv6 6/8] run-command: add an asynchronous parallel child processor Stefan Beller
                   ` (4 subsequent siblings)
  9 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-10-01  1:54 UTC (permalink / raw)
  To: gitster, git
  Cc: Stefan Beller, ramsay, jacob.keller, peff, jrnieder,
	johannes.schindelin, Jens.Lehmann, ericsunshine

The new method removes all common signal handlers that were installed
by sigchain_push.

CC: Jeff King <peff@peff.net>
Signed-off-by: Stefan Beller <sbeller@google.com>
---
 sigchain.c | 9 +++++++++
 sigchain.h | 1 +
 2 files changed, 10 insertions(+)

diff --git a/sigchain.c b/sigchain.c
index faa375d..2ac43bb 100644
--- a/sigchain.c
+++ b/sigchain.c
@@ -50,3 +50,12 @@ void sigchain_push_common(sigchain_fun f)
 	sigchain_push(SIGQUIT, f);
 	sigchain_push(SIGPIPE, f);
 }
+
+void sigchain_pop_common(void)
+{
+	sigchain_pop(SIGPIPE);
+	sigchain_pop(SIGQUIT);
+	sigchain_pop(SIGTERM);
+	sigchain_pop(SIGHUP);
+	sigchain_pop(SIGINT);
+}
diff --git a/sigchain.h b/sigchain.h
index 618083b..138b20f 100644
--- a/sigchain.h
+++ b/sigchain.h
@@ -7,5 +7,6 @@ int sigchain_push(int sig, sigchain_fun f);
 int sigchain_pop(int sig);
 
 void sigchain_push_common(sigchain_fun f);
+void sigchain_pop_common(void);
 
 #endif /* SIGCHAIN_H */
-- 
2.5.0.275.gf20166c.dirty

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCHv6 6/8] run-command: add an asynchronous parallel child processor
  2015-10-01  1:54 [PATCHv6 0/8] fetch submodules in parallel Stefan Beller
                   ` (4 preceding siblings ...)
  2015-10-01  1:54 ` [PATCHv6 5/8] sigchain: add command to pop all common signals Stefan Beller
@ 2015-10-01  1:54 ` Stefan Beller
  2015-10-01  1:54 ` [PATCHv6 7/8] fetch_populated_submodules: use new parallel job processing Stefan Beller
                   ` (3 subsequent siblings)
  9 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-10-01  1:54 UTC (permalink / raw)
  To: gitster, git
  Cc: Stefan Beller, ramsay, jacob.keller, peff, jrnieder,
	johannes.schindelin, Jens.Lehmann, ericsunshine

This allows to run external commands in parallel with ordered output
on stderr.

If we run external commands in parallel we cannot pipe the output directly
to the our stdout/err as it would mix up. So each process's output will
flow through a pipe, which we buffer. One subprocess can be directly
piped to out stdout/err for a low latency feedback to the user.

Example:
Let's assume we have 5 submodules A,B,C,D,E and each fetch takes a
different amount of time as the different submodules vary in size, then
the output of fetches in sequential order might look like this:

 time -->
 output: |---A---| |-B-| |-------C-------| |-D-| |-E-|

When we schedule these submodules into maximal two parallel processes,
a schedule and sample output over time may look like this:

process 1: |---A---| |-D-| |-E-|

process 2: |-B-| |-------C-------|

output:    |---A---|B|---C-------|DE

So A will be perceived as it would run normally in the single child
version. As B has finished by the time A is done, we can dump its whole
progress buffer on stderr, such that it looks like it finished in no
time. Once that is done, C is determined to be the visible child and
its progress will be reported in real time.

So this way of output is really good for human consumption, as it only
changes the timing, not the actual output.

For machine consumption the output needs to be prepared in the tasks,
by either having a prefix per line or per block to indicate whose tasks
output is displayed, because the output order may not follow the
original sequential ordering:

 |----A----| |--B--| |-C-|

will be scheduled to be all parallel:

process 1: |----A----|
process 2: |--B--|
process 3: |-C-|
output:    |----A----|CB

This happens because C finished before B did, so it will be queued for
output before B.

Signed-off-by: Stefan Beller <sbeller@google.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
---
 run-command.c          | 350 +++++++++++++++++++++++++++++++++++++++++++++++++
 run-command.h          |  78 +++++++++++
 t/t0061-run-command.sh |  20 +++
 test-run-command.c     |  25 ++++
 4 files changed, 473 insertions(+)

diff --git a/run-command.c b/run-command.c
index 28e1d55..28048a7 100644
--- a/run-command.c
+++ b/run-command.c
@@ -3,6 +3,8 @@
 #include "exec_cmd.h"
 #include "sigchain.h"
 #include "argv-array.h"
+#include "thread-utils.h"
+#include "strbuf.h"
 
 void child_process_init(struct child_process *child)
 {
@@ -852,3 +854,351 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
 	close(cmd->out);
 	return finish_command(cmd);
 }
+
+struct parallel_processes {
+	void *data;
+
+	int max_processes;
+	int nr_processes;
+
+	get_next_task_fn get_next_task;
+	start_failure_fn start_failure;
+	task_finished_fn task_finished;
+
+	struct {
+		unsigned in_use : 1;
+		struct child_process process;
+		struct strbuf err;
+		void *data;
+	} *children;
+	/*
+	 * The struct pollfd is logically part of *children,
+	 * but the system call expects it as its own array.
+	 */
+	struct pollfd *pfd;
+
+	unsigned shutdown : 1;
+
+	int output_owner;
+	struct strbuf buffered_output; /* of finished children */
+} parallel_processes_struct;
+
+static int default_start_failure(struct child_process *cp,
+				 struct strbuf *err,
+				 void *pp_cb,
+				 void *pp_task_cb)
+{
+	int i;
+
+	strbuf_addstr(err, "Starting a child failed:");
+	for (i = 0; cp->argv[i]; i++)
+		strbuf_addf(err, " %s", cp->argv[i]);
+
+	return 0;
+}
+
+static int default_task_finished(int result,
+				 struct child_process *cp,
+				 struct strbuf *err,
+				 void *pp_cb,
+				 void *pp_task_cb)
+{
+	int i;
+
+	if (!result)
+		return 0;
+
+	strbuf_addf(err, "A child failed with return code %d:", result);
+	for (i = 0; cp->argv[i]; i++)
+		strbuf_addf(err, " %s", cp->argv[i]);
+
+	return 0;
+}
+
+static void kill_children(struct parallel_processes *pp, int signo)
+{
+	int i, n = pp->max_processes;
+
+	for (i = 0; i < n; i++)
+		if (pp->children[i].in_use)
+			kill(pp->children[i].process.pid, signo);
+}
+
+static void handle_children_on_signal(int signo)
+{
+	struct parallel_processes *pp = &parallel_processes_struct;
+
+	kill_children(pp, signo);
+	sigchain_pop(signo);
+	raise(signo);
+}
+
+static struct parallel_processes *pp_init(int n,
+					  get_next_task_fn get_next_task,
+					  start_failure_fn start_failure,
+					  task_finished_fn task_finished,
+					  void *data)
+{
+	int i;
+	struct parallel_processes *pp = &parallel_processes_struct;
+
+	if (n < 1)
+		n = online_cpus();
+
+	pp->max_processes = n;
+	pp->data = data;
+	if (!get_next_task)
+		die("BUG: you need to specify a get_next_task function");
+	pp->get_next_task = get_next_task;
+
+	pp->start_failure = start_failure ? start_failure : default_start_failure;
+	pp->task_finished = task_finished ? task_finished : default_task_finished;
+
+	pp->nr_processes = 0;
+	pp->output_owner = 0;
+	pp->children = xcalloc(n, sizeof(*pp->children));
+	pp->pfd = xcalloc(n, sizeof(*pp->pfd));
+	strbuf_init(&pp->buffered_output, 0);
+
+	for (i = 0; i < n; i++) {
+		strbuf_init(&pp->children[i].err, 0);
+		pp->pfd[i].events = POLLIN;
+		pp->pfd[i].fd = -1;
+	}
+	sigchain_push_common(handle_children_on_signal);
+	return pp;
+}
+
+static void pp_cleanup(struct parallel_processes *pp)
+{
+	int i;
+
+	for (i = 0; i < pp->max_processes; i++)
+		strbuf_release(&pp->children[i].err);
+
+	free(pp->children);
+	free(pp->pfd);
+	strbuf_release(&pp->buffered_output);
+
+	sigchain_pop_common();
+}
+
+static void set_nonblocking(int fd)
+{
+	int flags = fcntl(fd, F_GETFL);
+	if (flags < 0)
+		warning("Could not get file status flags, "
+			"output will be degraded");
+	else if (fcntl(fd, F_SETFL, flags | O_NONBLOCK))
+		warning("Could not set file status flags, "
+			"output will be degraded");
+}
+
+/* returns
+ *  0 if a new task was started.
+ *  1 if no new jobs was started (get_next_task ran out of work, non critical
+ *    problem with starting a new command)
+ * -1 no new job was started, user wishes to shutdown early.
+ */
+static int pp_start_one(struct parallel_processes *pp)
+{
+	int i;
+
+	for (i = 0; i < pp->max_processes; i++)
+		if (!pp->children[i].in_use)
+			break;
+	if (i == pp->max_processes)
+		die("BUG: bookkeeping is hard");
+
+	if (!pp->get_next_task(&pp->children[i].data,
+			       &pp->children[i].process,
+			       &pp->children[i].err,
+			       pp->data))
+		return 1;
+
+	if (start_command(&pp->children[i].process)) {
+		int code = pp->start_failure(&pp->children[i].process,
+					     &pp->children[i].err,
+					     pp->data,
+					     &pp->children[i].data);
+		strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+		strbuf_reset(&pp->children[i].err);
+		return code ? -1 : 1;
+	}
+
+	set_nonblocking(pp->children[i].process.err);
+
+	pp->nr_processes++;
+	pp->children[i].in_use = 1;
+	pp->pfd[i].fd = pp->children[i].process.err;
+	return 0;
+}
+
+static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
+{
+	int i;
+
+	while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
+		if (errno == EINTR)
+			continue;
+		pp_cleanup(pp);
+		die_errno("poll");
+	}
+
+	/* Buffer output from all pipes. */
+	for (i = 0; i < pp->max_processes; i++) {
+		if (pp->children[i].in_use &&
+		    pp->pfd[i].revents & POLLIN)
+			if (strbuf_read_once(&pp->children[i].err,
+					     pp->children[i].process.err, 0) < 0)
+				if (errno != EAGAIN)
+					die_errno("read");
+	}
+}
+
+static void pp_output(struct parallel_processes *pp)
+{
+	int i = pp->output_owner;
+	if (pp->children[i].in_use &&
+	    pp->children[i].err.len) {
+		fputs(pp->children[i].err.buf, stderr);
+		strbuf_reset(&pp->children[i].err);
+	}
+}
+
+static int pp_collect_finished(struct parallel_processes *pp)
+{
+	int i = 0;
+	pid_t pid;
+	int wait_status, code;
+	int n = pp->max_processes;
+	int result = 0;
+
+	while (pp->nr_processes > 0) {
+		pid = waitpid(-1, &wait_status, WNOHANG);
+		if (pid == 0)
+			return 0;
+
+		if (pid < 0)
+			die_errno("wait");
+
+		for (i = 0; i < pp->max_processes; i++)
+			if (pp->children[i].in_use &&
+			    pid == pp->children[i].process.pid)
+				break;
+		if (i == pp->max_processes)
+			die("BUG: found a child process we were not aware of");
+
+		if (strbuf_read(&pp->children[i].err,
+				pp->children[i].process.err, 0) < 0)
+			die_errno("strbuf_read");
+
+		if (WIFSIGNALED(wait_status)) {
+			code = WTERMSIG(wait_status);
+			if (!pp->shutdown &&
+			    code != SIGINT && code != SIGQUIT)
+				strbuf_addf(&pp->children[i].err,
+					    "%s died of signal %d",
+					    pp->children[i].process.argv[0],
+					    code);
+			/*
+			 * This return value is chosen so that code & 0xff
+			 * mimics the exit code that a POSIX shell would report for
+			 * a program that died from this signal.
+			 */
+			code += 128;
+		} else if (WIFEXITED(wait_status)) {
+			code = WEXITSTATUS(wait_status);
+			/*
+			 * Convert special exit code when execvp failed.
+			 */
+			if (code == 127) {
+				code = -1;
+				errno = ENOENT;
+			}
+		} else {
+			strbuf_addf(&pp->children[i].err,
+				    "waitpid is confused (%s)",
+				    pp->children[i].process.argv[0]);
+			code = -1;
+		}
+
+		if (pp->task_finished(code, &pp->children[i].process,
+				      &pp->children[i].err, pp->data,
+				      &pp->children[i].data))
+			result = 1;
+
+		argv_array_clear(&pp->children[i].process.args);
+		argv_array_clear(&pp->children[i].process.env_array);
+
+		pp->nr_processes--;
+		pp->children[i].in_use = 0;
+		pp->pfd[i].fd = -1;
+
+		if (i != pp->output_owner) {
+			strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
+			strbuf_reset(&pp->children[i].err);
+		} else {
+			fputs(pp->children[i].err.buf, stderr);
+			strbuf_reset(&pp->children[i].err);
+
+			/* Output all other finished child processes */
+			fputs(pp->buffered_output.buf, stderr);
+			strbuf_reset(&pp->buffered_output);
+
+			/*
+			 * Pick next process to output live.
+			 * NEEDSWORK:
+			 * For now we pick it randomly by doing a round
+			 * robin. Later we may want to pick the one with
+			 * the most output or the longest or shortest
+			 * running process time.
+			 */
+			for (i = 0; i < n; i++)
+				if (pp->children[(pp->output_owner + i) % n].in_use)
+					break;
+			pp->output_owner = (pp->output_owner + i) % n;
+		}
+	}
+	return result;
+}
+
+int run_processes_parallel(int n,
+			   get_next_task_fn get_next_task,
+			   start_failure_fn start_failure,
+			   task_finished_fn task_finished,
+			   void *pp_cb)
+{
+	int i;
+	int output_timeout = 100;
+	int spawn_cap = 4;
+	struct parallel_processes *pp;
+
+	pp = pp_init(n, get_next_task, start_failure, task_finished, pp_cb);
+	while (1) {
+		for (i = 0;
+		    i < spawn_cap && !pp->shutdown &&
+		    pp->nr_processes < pp->max_processes;
+		    i++) {
+			int code = pp_start_one(pp);
+			if (!code)
+				continue;
+			if (code < 0) {
+				pp->shutdown = 1;
+				kill_children(pp, SIGTERM);
+			}
+			break;
+		}
+		if (!pp->nr_processes)
+			break;
+		pp_buffer_stderr(pp, output_timeout);
+		pp_output(pp);
+		if (pp_collect_finished(pp)) {
+			kill_children(pp, SIGTERM);
+			pp->shutdown = 1;
+		}
+	}
+
+	pp_cleanup(pp);
+	return 0;
+}
diff --git a/run-command.h b/run-command.h
index 5b4425a..c24aa54 100644
--- a/run-command.h
+++ b/run-command.h
@@ -119,4 +119,82 @@ struct async {
 int start_async(struct async *async);
 int finish_async(struct async *async);
 
+/**
+ * This callback should initialize the child process and preload the
+ * error channel if desired. The preloading of is useful if you want to
+ * have a message printed directly before the output of the child process.
+ * pp_cb is the callback cookie as passed to run_processes_parallel.
+ * You can store a child process specific callback cookie in pp_task_cb.
+ *
+ * You MUST set stdout_to_stderr.
+ *
+ * Even after returning 0 to indicate that there are no more processes,
+ * this function will be called again until there are no more running
+ * child processes.
+ *
+ * Return 1 if the next child is ready to run.
+ * Return 0 if there are currently no more tasks to be processed.
+ */
+typedef int (*get_next_task_fn)(void **pp_task_cb,
+				struct child_process *cp,
+				struct strbuf *err,
+				void *pp_cb);
+
+/**
+ * This callback is called whenever there are problems starting
+ * a new process.
+ *
+ * You must not write to stdout or stderr in this function. Add your
+ * message to the strbuf err instead, which will be printed without
+ * messing up the output of the other parallel processes.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel,
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ *
+ * Return 0 to continue the parallel processing. To abort gracefully,
+ * return non zero.
+ */
+typedef int (*start_failure_fn)(struct child_process *cp,
+				struct strbuf *err,
+				void *pp_cb,
+				void *pp_task_cb);
+
+/**
+ * This callback is called on every child process that finished processing.
+ *
+ * You must not write to stdout or stderr in this function. Add your
+ * message to the strbuf err instead, which will be printed without
+ * messing up the output of the other parallel processes.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel,
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ *
+ * Return 0 to continue the parallel processing. To abort gracefully,
+ * return non zero.
+ */
+typedef int (*task_finished_fn)(int result,
+				struct child_process *cp,
+				struct strbuf *err,
+				void *pp_cb,
+				void *pp_task_cb);
+
+/**
+ * Runs up to n processes at the same time. Whenever a process can be
+ * started, the callback `get_next_task` is called to obtain the data
+ * fed to the child process.
+ *
+ * The children started via this function run in parallel and their output
+ * to stderr is buffered, while one of the children will directly output
+ * to stderr.
+ *
+ * If start_failure_fn and return_value_fn are NULL, default handlers
+ * will be used. The default handlers will print an error message on
+ * error without issuing an emergency stop.
+ */
+int run_processes_parallel(int n,
+			   get_next_task_fn,
+			   start_failure_fn,
+			   task_finished_fn,
+			   void *pp_cb);
+
 #endif
diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh
index 9acf628..49aa3db 100755
--- a/t/t0061-run-command.sh
+++ b/t/t0061-run-command.sh
@@ -47,4 +47,24 @@ test_expect_success POSIXPERM,SANITY 'unreadable directory in PATH' '
 	test_cmp expect actual
 '
 
+cat >expect <<-EOF
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+EOF
+
+test_expect_success 'run_command runs in parallel' '
+	test-run-command run-command-parallel-4 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
+	test_cmp expect actual
+'
+
 test_done
diff --git a/test-run-command.c b/test-run-command.c
index 89c7de2..699d9e9 100644
--- a/test-run-command.c
+++ b/test-run-command.c
@@ -10,9 +10,30 @@
 
 #include "git-compat-util.h"
 #include "run-command.h"
+#include "argv-array.h"
+#include "strbuf.h"
 #include <string.h>
 #include <errno.h>
 
+static int number_callbacks;
+static int parallel_next(void** task_cb,
+			 struct child_process *cp,
+			 struct strbuf *err,
+			 void *cb)
+{
+	struct child_process *d = cb;
+	if (number_callbacks >= 4)
+		return 0;
+
+	argv_array_pushv(&cp->args, d->argv);
+	cp->stdout_to_stderr = 1;
+	cp->no_stdin = 1;
+	cp->err = -1;
+	strbuf_addf(err, "preloaded output of a child\n");
+	number_callbacks++;
+	return 1;
+}
+
 int main(int argc, char **argv)
 {
 	struct child_process proc = CHILD_PROCESS_INIT;
@@ -30,6 +51,10 @@ int main(int argc, char **argv)
 	if (!strcmp(argv[1], "run-command"))
 		exit(run_command(&proc));
 
+	if (!strcmp(argv[1], "run-command-parallel-4"))
+		exit(run_processes_parallel(4, parallel_next,
+					    NULL, NULL, &proc));
+
 	fprintf(stderr, "check usage\n");
 	return 1;
 }
-- 
2.5.0.275.gf20166c.dirty

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCHv6 7/8] fetch_populated_submodules: use new parallel job processing
  2015-10-01  1:54 [PATCHv6 0/8] fetch submodules in parallel Stefan Beller
                   ` (5 preceding siblings ...)
  2015-10-01  1:54 ` [PATCHv6 6/8] run-command: add an asynchronous parallel child processor Stefan Beller
@ 2015-10-01  1:54 ` Stefan Beller
  2015-10-01  1:54 ` [PATCHv6 8/8] submodules: allow parallel fetching, add tests and documentation Stefan Beller
                   ` (2 subsequent siblings)
  9 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-10-01  1:54 UTC (permalink / raw)
  To: gitster, git
  Cc: Stefan Beller, ramsay, jacob.keller, peff, jrnieder,
	johannes.schindelin, Jens.Lehmann, ericsunshine

In a later patch we enable parallel processing of submodules, this
only adds the possibility for it. So this change should not change
any user facing behavior.

Signed-off-by: Stefan Beller <sbeller@google.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
---
 submodule.c | 128 ++++++++++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 94 insertions(+), 34 deletions(-)

diff --git a/submodule.c b/submodule.c
index 1d64e57..ff5bc32 100644
--- a/submodule.c
+++ b/submodule.c
@@ -12,6 +12,7 @@
 #include "sha1-array.h"
 #include "argv-array.h"
 #include "blob.h"
+#include "thread-utils.h"
 
 static int config_fetch_recurse_submodules = RECURSE_SUBMODULES_ON_DEMAND;
 static struct string_list changed_submodule_paths;
@@ -615,37 +616,91 @@ static void calculate_changed_submodule_paths(void)
 	initialized_fetch_ref_tips = 0;
 }
 
+struct submodule_parallel_fetch {
+	int count;
+	struct argv_array args;
+	const char *work_tree;
+	const char *prefix;
+	int command_line_option;
+	int quiet;
+	int result;
+};
+#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL, NULL, 0, 0, 0}
+
+static int get_next_submodule(void **task_cb, struct child_process *cp,
+			      struct strbuf *err, void *data);
+
+static int fetch_start_failure(struct child_process *cp,
+			       struct strbuf *err,
+			       void *cb, void *task_cb)
+{
+	struct submodule_parallel_fetch *spf = cb;
+
+	spf->result = 1;
+
+	return 0;
+}
+
+static int fetch_finish(int retvalue, struct child_process *cp,
+			struct strbuf *err, void *cb, void *task_cb)
+{
+	struct submodule_parallel_fetch *spf = cb;
+
+	if (retvalue)
+		spf->result = 1;
+
+	return 0;
+}
+
 int fetch_populated_submodules(const struct argv_array *options,
 			       const char *prefix, int command_line_option,
 			       int quiet)
 {
-	int i, result = 0;
-	struct child_process cp = CHILD_PROCESS_INIT;
-	struct argv_array argv = ARGV_ARRAY_INIT;
-	const char *work_tree = get_git_work_tree();
-	if (!work_tree)
+	int i;
+	int max_parallel_jobs = 1;
+	struct submodule_parallel_fetch spf = SPF_INIT;
+
+	spf.work_tree = get_git_work_tree();
+	spf.command_line_option = command_line_option;
+	spf.quiet = quiet;
+	spf.prefix = prefix;
+
+	if (!spf.work_tree)
 		goto out;
 
 	if (read_cache() < 0)
 		die("index file corrupt");
 
-	argv_array_push(&argv, "fetch");
+	argv_array_push(&spf.args, "fetch");
 	for (i = 0; i < options->argc; i++)
-		argv_array_push(&argv, options->argv[i]);
-	argv_array_push(&argv, "--recurse-submodules-default");
+		argv_array_push(&spf.args, options->argv[i]);
+	argv_array_push(&spf.args, "--recurse-submodules-default");
 	/* default value, "--submodule-prefix" and its value are added later */
 
-	cp.env = local_repo_env;
-	cp.git_cmd = 1;
-	cp.no_stdin = 1;
-
 	calculate_changed_submodule_paths();
+	run_processes_parallel(max_parallel_jobs,
+			       get_next_submodule,
+			       fetch_start_failure,
+			       fetch_finish,
+			       &spf);
+
+	argv_array_clear(&spf.args);
+out:
+	string_list_clear(&changed_submodule_paths, 1);
+	return spf.result;
+}
 
-	for (i = 0; i < active_nr; i++) {
+static int get_next_submodule(void **task_cb, struct child_process *cp,
+			      struct strbuf *err, void *data)
+{
+	int ret = 0;
+	struct submodule_parallel_fetch *spf = data;
+
+	for ( ; spf->count < active_nr; spf->count++) {
 		struct strbuf submodule_path = STRBUF_INIT;
 		struct strbuf submodule_git_dir = STRBUF_INIT;
 		struct strbuf submodule_prefix = STRBUF_INIT;
-		const struct cache_entry *ce = active_cache[i];
+		const struct cache_entry *ce = active_cache[spf->count];
 		const char *git_dir, *default_argv;
 		const struct submodule *submodule;
 
@@ -657,7 +712,7 @@ int fetch_populated_submodules(const struct argv_array *options,
 			submodule = submodule_from_name(null_sha1, ce->name);
 
 		default_argv = "yes";
-		if (command_line_option == RECURSE_SUBMODULES_DEFAULT) {
+		if (spf->command_line_option == RECURSE_SUBMODULES_DEFAULT) {
 			if (submodule &&
 			    submodule->fetch_recurse !=
 						RECURSE_SUBMODULES_NONE) {
@@ -680,40 +735,45 @@ int fetch_populated_submodules(const struct argv_array *options,
 					default_argv = "on-demand";
 				}
 			}
-		} else if (command_line_option == RECURSE_SUBMODULES_ON_DEMAND) {
+		} else if (spf->command_line_option == RECURSE_SUBMODULES_ON_DEMAND) {
 			if (!unsorted_string_list_lookup(&changed_submodule_paths, ce->name))
 				continue;
 			default_argv = "on-demand";
 		}
 
-		strbuf_addf(&submodule_path, "%s/%s", work_tree, ce->name);
+		strbuf_addf(&submodule_path, "%s/%s", spf->work_tree, ce->name);
 		strbuf_addf(&submodule_git_dir, "%s/.git", submodule_path.buf);
-		strbuf_addf(&submodule_prefix, "%s%s/", prefix, ce->name);
+		strbuf_addf(&submodule_prefix, "%s%s/", spf->prefix, ce->name);
 		git_dir = read_gitfile(submodule_git_dir.buf);
 		if (!git_dir)
 			git_dir = submodule_git_dir.buf;
 		if (is_directory(git_dir)) {
-			if (!quiet)
-				fprintf(stderr, "Fetching submodule %s%s\n", prefix, ce->name);
-			cp.dir = submodule_path.buf;
-			argv_array_push(&argv, default_argv);
-			argv_array_push(&argv, "--submodule-prefix");
-			argv_array_push(&argv, submodule_prefix.buf);
-			cp.argv = argv.argv;
-			if (run_command(&cp))
-				result = 1;
-			argv_array_pop(&argv);
-			argv_array_pop(&argv);
-			argv_array_pop(&argv);
+			child_process_init(cp);
+			cp->dir = strbuf_detach(&submodule_path, NULL);
+			cp->env = local_repo_env;
+			cp->git_cmd = 1;
+			cp->no_stdin = 1;
+			cp->stdout_to_stderr = 1;
+			cp->err = -1;
+			if (!spf->quiet)
+				strbuf_addf(err, "Fetching submodule %s%s\n",
+					    spf->prefix, ce->name);
+			argv_array_init(&cp->args);
+			argv_array_pushv(&cp->args, spf->args.argv);
+			argv_array_push(&cp->args, default_argv);
+			argv_array_push(&cp->args, "--submodule-prefix");
+			argv_array_push(&cp->args, submodule_prefix.buf);
+			ret = 1;
 		}
 		strbuf_release(&submodule_path);
 		strbuf_release(&submodule_git_dir);
 		strbuf_release(&submodule_prefix);
+		if (ret) {
+			spf->count++;
+			return 1;
+		}
 	}
-	argv_array_clear(&argv);
-out:
-	string_list_clear(&changed_submodule_paths, 1);
-	return result;
+	return 0;
 }
 
 unsigned is_submodule_modified(const char *path, int ignore_untracked)
-- 
2.5.0.275.gf20166c.dirty

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCHv6 8/8] submodules: allow parallel fetching, add tests and documentation
  2015-10-01  1:54 [PATCHv6 0/8] fetch submodules in parallel Stefan Beller
                   ` (6 preceding siblings ...)
  2015-10-01  1:54 ` [PATCHv6 7/8] fetch_populated_submodules: use new parallel job processing Stefan Beller
@ 2015-10-01  1:54 ` Stefan Beller
  2015-10-01 18:55 ` [PATCHv6 0/8] fetch submodules in parallel Ramsay Jones
  2015-10-02 18:48 ` Junio C Hamano
  9 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-10-01  1:54 UTC (permalink / raw)
  To: gitster, git
  Cc: Stefan Beller, ramsay, jacob.keller, peff, jrnieder,
	johannes.schindelin, Jens.Lehmann, ericsunshine

This enables the work of the previous patches.

Signed-off-by: Stefan Beller <sbeller@google.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
---
 Documentation/fetch-options.txt |  7 +++++++
 builtin/fetch.c                 |  6 +++++-
 builtin/pull.c                  |  6 ++++++
 submodule.c                     |  3 +--
 submodule.h                     |  2 +-
 t/t5526-fetch-submodules.sh     | 19 +++++++++++++++++++
 6 files changed, 39 insertions(+), 4 deletions(-)

diff --git a/Documentation/fetch-options.txt b/Documentation/fetch-options.txt
index 45583d8..6b109f6 100644
--- a/Documentation/fetch-options.txt
+++ b/Documentation/fetch-options.txt
@@ -100,6 +100,13 @@ ifndef::git-pull[]
 	reference to a commit that isn't already in the local submodule
 	clone.
 
+-j::
+--jobs=<n>::
+	Number of parallel children to be used for fetching submodules.
+	Each will fetch from different submodules, such that fetching many
+	submodules will be faster. By default submodules will be fetched
+	one at a time.
+
 --no-recurse-submodules::
 	Disable recursive fetching of submodules (this has the same effect as
 	using the '--recurse-submodules=no' option).
diff --git a/builtin/fetch.c b/builtin/fetch.c
index ee1f1a9..f28eac6 100644
--- a/builtin/fetch.c
+++ b/builtin/fetch.c
@@ -37,6 +37,7 @@ static int prune = -1; /* unspecified */
 static int all, append, dry_run, force, keep, multiple, update_head_ok, verbosity;
 static int progress = -1, recurse_submodules = RECURSE_SUBMODULES_DEFAULT;
 static int tags = TAGS_DEFAULT, unshallow, update_shallow;
+static int max_children = 1;
 static const char *depth;
 static const char *upload_pack;
 static struct strbuf default_rla = STRBUF_INIT;
@@ -99,6 +100,8 @@ static struct option builtin_fetch_options[] = {
 		    N_("fetch all tags and associated objects"), TAGS_SET),
 	OPT_SET_INT('n', NULL, &tags,
 		    N_("do not fetch all tags (--no-tags)"), TAGS_UNSET),
+	OPT_INTEGER('j', "jobs", &max_children,
+		    N_("number of submodules fetched in parallel")),
 	OPT_BOOL('p', "prune", &prune,
 		 N_("prune remote-tracking branches no longer on remote")),
 	{ OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"),
@@ -1217,7 +1220,8 @@ int cmd_fetch(int argc, const char **argv, const char *prefix)
 		result = fetch_populated_submodules(&options,
 						    submodule_prefix,
 						    recurse_submodules,
-						    verbosity < 0);
+						    verbosity < 0,
+						    max_children);
 		argv_array_clear(&options);
 	}
 
diff --git a/builtin/pull.c b/builtin/pull.c
index 722a83c..f0af196 100644
--- a/builtin/pull.c
+++ b/builtin/pull.c
@@ -94,6 +94,7 @@ static int opt_force;
 static char *opt_tags;
 static char *opt_prune;
 static char *opt_recurse_submodules;
+static char *max_children;
 static int opt_dry_run;
 static char *opt_keep;
 static char *opt_depth;
@@ -177,6 +178,9 @@ static struct option pull_options[] = {
 		N_("on-demand"),
 		N_("control recursive fetching of submodules"),
 		PARSE_OPT_OPTARG),
+	OPT_PASSTHRU('j', "jobs", &max_children, N_("n"),
+		N_("number of submodules pulled in parallel"),
+		PARSE_OPT_OPTARG),
 	OPT_BOOL(0, "dry-run", &opt_dry_run,
 		N_("dry run")),
 	OPT_PASSTHRU('k', "keep", &opt_keep, NULL,
@@ -524,6 +528,8 @@ static int run_fetch(const char *repo, const char **refspecs)
 		argv_array_push(&args, opt_prune);
 	if (opt_recurse_submodules)
 		argv_array_push(&args, opt_recurse_submodules);
+	if (max_children)
+		argv_array_push(&args, max_children);
 	if (opt_dry_run)
 		argv_array_push(&args, "--dry-run");
 	if (opt_keep)
diff --git a/submodule.c b/submodule.c
index ff5bc32..cf8bf5d 100644
--- a/submodule.c
+++ b/submodule.c
@@ -654,10 +654,9 @@ static int fetch_finish(int retvalue, struct child_process *cp,
 
 int fetch_populated_submodules(const struct argv_array *options,
 			       const char *prefix, int command_line_option,
-			       int quiet)
+			       int quiet, int max_parallel_jobs)
 {
 	int i;
-	int max_parallel_jobs = 1;
 	struct submodule_parallel_fetch spf = SPF_INIT;
 
 	spf.work_tree = get_git_work_tree();
diff --git a/submodule.h b/submodule.h
index 5507c3d..cbc0003 100644
--- a/submodule.h
+++ b/submodule.h
@@ -31,7 +31,7 @@ void set_config_fetch_recurse_submodules(int value);
 void check_for_new_submodule_commits(unsigned char new_sha1[20]);
 int fetch_populated_submodules(const struct argv_array *options,
 			       const char *prefix, int command_line_option,
-			       int quiet);
+			       int quiet, int max_parallel_jobs);
 unsigned is_submodule_modified(const char *path, int ignore_untracked);
 int submodule_uses_gitfile(const char *path);
 int ok_to_remove_submodule(const char *path);
diff --git a/t/t5526-fetch-submodules.sh b/t/t5526-fetch-submodules.sh
index 17759b1..1b4ce69 100755
--- a/t/t5526-fetch-submodules.sh
+++ b/t/t5526-fetch-submodules.sh
@@ -71,6 +71,16 @@ test_expect_success "fetch --recurse-submodules recurses into submodules" '
 	test_i18ncmp expect.err actual.err
 '
 
+test_expect_success "fetch --recurse-submodules -j2 has the same output behaviour" '
+	add_upstream_commit &&
+	(
+		cd downstream &&
+		git fetch --recurse-submodules -j2 2>../actual.err
+	) &&
+	test_must_be_empty actual.out &&
+	test_i18ncmp expect.err actual.err
+'
+
 test_expect_success "fetch alone only fetches superproject" '
 	add_upstream_commit &&
 	(
@@ -140,6 +150,15 @@ test_expect_success "--quiet propagates to submodules" '
 	! test -s actual.err
 '
 
+test_expect_success "--quiet propagates to parallel submodules" '
+	(
+		cd downstream &&
+		git fetch --recurse-submodules -j 2 --quiet  >../actual.out 2>../actual.err
+	) &&
+	! test -s actual.out &&
+	! test -s actual.err
+'
+
 test_expect_success "--dry-run propagates to submodules" '
 	add_upstream_commit &&
 	(
-- 
2.5.0.275.gf20166c.dirty

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* Re: [PATCHv6 0/8] fetch submodules in parallel
  2015-10-01  1:54 [PATCHv6 0/8] fetch submodules in parallel Stefan Beller
                   ` (7 preceding siblings ...)
  2015-10-01  1:54 ` [PATCHv6 8/8] submodules: allow parallel fetching, add tests and documentation Stefan Beller
@ 2015-10-01 18:55 ` Ramsay Jones
  2015-10-01 19:03   ` Stefan Beller
  2015-10-02 18:48 ` Junio C Hamano
  9 siblings, 1 reply; 12+ messages in thread
From: Ramsay Jones @ 2015-10-01 18:55 UTC (permalink / raw)
  To: Stefan Beller, gitster, git
  Cc: jacob.keller, peff, jrnieder, johannes.schindelin, Jens.Lehmann,
	ericsunshine

Hi Stefan,

On 01/10/15 02:54, Stefan Beller wrote:
[snip]

While skimming the interdiff for this series, ...

> diff --git a/run-command.c b/run-command.c
> index df84985..28048a7 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -863,12 +863,13 @@ struct parallel_processes {
>  
>  	get_next_task_fn get_next_task;
>  	start_failure_fn start_failure;
> -	return_value_fn return_value;
> +	task_finished_fn task_finished;
>  
>  	struct {
>  		unsigned in_use : 1;
>  		struct child_process process;
>  		struct strbuf err;
> +		void *data;
>  	} *children;
>  	/*
>  	 * The struct pollfd is logically part of *children,
> @@ -882,9 +883,10 @@ struct parallel_processes {
>  	struct strbuf buffered_output; /* of finished children */
>  } parallel_processes_struct;
>  
> -static int default_start_failure(void *data,
> -				 struct child_process *cp,
> -				 struct strbuf *err)
> +static int default_start_failure(struct child_process *cp,
> +				 struct strbuf *err,
> +				 void *pp_cb,
> +				 void *pp_task_cb)
>  {
>  	int i;
>  
> @@ -895,10 +897,11 @@ static int default_start_failure(void *data,
>  	return 0;
>  }
>  
> -static int default_return_value(void *data,
> -				struct child_process *cp,
> -				struct strbuf *err,
> -				int result)
> +static int default_task_finished(int result,
> +				 struct child_process *cp,
> +				 struct strbuf *err,
> +				 void *pp_cb,
> +				 void *pp_task_cb)
>  {
>  	int i;
>  
> @@ -930,10 +933,11 @@ static void handle_children_on_signal(int signo)
>  	raise(signo);
>  }
>  
> -static struct parallel_processes *pp_init(int n, void *data,
> +static struct parallel_processes *pp_init(int n,
>  					  get_next_task_fn get_next_task,
>  					  start_failure_fn start_failure,
> -					  return_value_fn return_value)
> +					  task_finished_fn task_finished,
> +					  void *data)
>  {
>  	int i;
>  	struct parallel_processes *pp = &parallel_processes_struct;
> @@ -948,7 +952,7 @@ static struct parallel_processes *pp_init(int n, void *data,
>  	pp->get_next_task = get_next_task;
>  
>  	pp->start_failure = start_failure ? start_failure : default_start_failure;
> -	pp->return_value = return_value ? return_value : default_return_value;
> +	pp->task_finished = task_finished ? task_finished : default_task_finished;
>  
>  	pp->nr_processes = 0;
>  	pp->output_owner = 0;
> @@ -1006,15 +1010,17 @@ static int pp_start_one(struct parallel_processes *pp)
>  	if (i == pp->max_processes)
>  		die("BUG: bookkeeping is hard");
>  
> -	if (!pp->get_next_task(pp->data,
> +	if (!pp->get_next_task(&pp->children[i].data,
>  			       &pp->children[i].process,
> -			       &pp->children[i].err))
> +			       &pp->children[i].err,
> +			       pp->data))
>  		return 1;

... the above hunk caught my eye. I don't know that it matters that
much, but since you have reordered parameters on some functions, should
pp->get_next_task() take the 'task_cb' as the last parameter, rather than
the first?

I have not looked at the final result yet (just the interdiff), so please
just ignore the above if I've missed something obvious. :-D

ATB,
Ramsay Jones

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCHv6 0/8] fetch submodules in parallel
  2015-10-01 18:55 ` [PATCHv6 0/8] fetch submodules in parallel Ramsay Jones
@ 2015-10-01 19:03   ` Stefan Beller
  0 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-10-01 19:03 UTC (permalink / raw)
  To: Ramsay Jones
  Cc: Junio C Hamano, git@vger.kernel.org, Jacob Keller, Jeff King,
	Jonathan Nieder, Johannes Schindelin, Jens Lehmann, Eric Sunshine

On Thu, Oct 1, 2015 at 11:55 AM, Ramsay Jones
<ramsay@ramsayjones.plus.com> wrote:
> Hi Stefan,
>
>>
>> -     if (!pp->get_next_task(pp->data,
>> +     if (!pp->get_next_task(&pp->children[i].data,
>>                              &pp->children[i].process,
>> -                            &pp->children[i].err))
>> +                            &pp->children[i].err,
>> +                            pp->data))
>>               return 1;
>
> ... the above hunk caught my eye. I don't know that it matters that
> much, but since you have reordered parameters on some functions, should
> pp->get_next_task() take the 'task_cb' as the last parameter, rather than
> the first?
>
> I have not looked at the final result yet (just the interdiff), so please
> just ignore the above if I've missed something obvious. :-D

Well I reordered such that "passive" arguments come last, i.e. the
cookies for consumption. In this specific case we ask get_next_task
to fill in the cookie if desired. Unlike all the other cookie passing
this is a double void pointer, so even syntactically we have a difference
to other cookie passing around.

If you look at the function definitions in the header, you find the arguments
ordered as

  (Active/unique arguments for that function, child process, error
buffer, cookies for consumption)

That said, I find a few things I need to improve.
pp->children[i].data, may want to be initialized to NULL before we ask
get_next_task
to fill in a cookie. If get_next_task decides not to, we have a clear default.

The call to run_processes_parallel may be reordered to its original order again,
as we pass in a cookie actively. (int n, int *cb, callbacks...)

>
> ATB,
> Ramsay Jones
>

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCHv6 0/8] fetch submodules in parallel
  2015-10-01  1:54 [PATCHv6 0/8] fetch submodules in parallel Stefan Beller
                   ` (8 preceding siblings ...)
  2015-10-01 18:55 ` [PATCHv6 0/8] fetch submodules in parallel Ramsay Jones
@ 2015-10-02 18:48 ` Junio C Hamano
  9 siblings, 0 replies; 12+ messages in thread
From: Junio C Hamano @ 2015-10-02 18:48 UTC (permalink / raw)
  To: Stefan Beller
  Cc: git, ramsay, jacob.keller, peff, jrnieder, johannes.schindelin,
	Jens.Lehmann, ericsunshine

Stefan Beller <sbeller@google.com> writes:

> * renamed return_value_fn to task_finished_fn

It made interdiff noisier but I think it gives us a good end result.

> * the main loop of the parallel processing was first adapted to Junios suggestion,
>   but Jonathan pointed out more improvements.  We can get rid of `no_more_task`
>   completely as `if (!pp->nr_processes)` as the exit condition is sufficient.
>   (pp->nr_processes is modified only when starting or reaping a child, so we will
>   capture the whole output of each subprocess even in case of a quick shutdown)

Interesting.  The original motivation for "no-more-task" check was
that even when we are no longer running anything (i.e. everybody
finished) we may get a new task from next_task(), and the condition
to "break" out of the loop could be placed anywhere in that loop
(e.g. after we wait and cull the finished tasks, or even in the
outermost while(1) condition).

But you can take advantage of the specific placement of the check;
it is after the part that spawns new tasks and before the part that
culls the existing tasks, so not having any running task at that
point is sufficient condition.

Will replace what was queued.

Thanks.

^ permalink raw reply	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2015-10-02 18:48 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-10-01  1:54 [PATCHv6 0/8] fetch submodules in parallel Stefan Beller
2015-10-01  1:54 ` [PATCHv6 1/8] submodule.c: write "Fetching submodule <foo>" to stderr Stefan Beller
2015-10-01  1:54 ` [PATCHv6 2/8] xread: poll on non blocking fds Stefan Beller
2015-10-01  1:54 ` [PATCHv6 3/8] xread_nonblock: add functionality to read from fds without blocking Stefan Beller
2015-10-01  1:54 ` [PATCHv6 4/8] strbuf: add strbuf_read_once to read " Stefan Beller
2015-10-01  1:54 ` [PATCHv6 5/8] sigchain: add command to pop all common signals Stefan Beller
2015-10-01  1:54 ` [PATCHv6 6/8] run-command: add an asynchronous parallel child processor Stefan Beller
2015-10-01  1:54 ` [PATCHv6 7/8] fetch_populated_submodules: use new parallel job processing Stefan Beller
2015-10-01  1:54 ` [PATCHv6 8/8] submodules: allow parallel fetching, add tests and documentation Stefan Beller
2015-10-01 18:55 ` [PATCHv6 0/8] fetch submodules in parallel Ramsay Jones
2015-10-01 19:03   ` Stefan Beller
2015-10-02 18:48 ` Junio C Hamano

Code repositories for project(s) associated with this public inbox

	https://80x24.org/mirrors/git.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).