git@vger.kernel.org mailing list mirror (one of many)
 help / color / mirror / code / Atom feed
From: Stefan Beller <sbeller@google.com>
To: peff@peff.net
Cc: git@vger.kernel.org, jrnieder@gmail.com, gitster@pobox.com,
	Stefan Beller <sbeller@google.com>
Subject: [PATCH 2/5] thread-utils: add a threaded task queue
Date: Tue, 25 Aug 2015 10:28:23 -0700	[thread overview]
Message-ID: <1440523706-23041-3-git-send-email-sbeller@google.com> (raw)
In-Reply-To: <1440523706-23041-1-git-send-email-sbeller@google.com>

This adds functionality to do work in a parallel threaded
fashion while the boiler plate code for setting up threads
and tearing them down as well as queuing up tasks is hidden
behind the new API.

Signed-off-by: Stefan Beller <sbeller@google.com>
---
 run-command.c  |  29 ++++---
 thread-utils.c | 237 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 thread-utils.h |  40 ++++++++++
 3 files changed, 294 insertions(+), 12 deletions(-)

diff --git a/run-command.c b/run-command.c
index 28e1d55..cb15cd9 100644
--- a/run-command.c
+++ b/run-command.c
@@ -668,6 +668,22 @@ int git_atexit(void (*handler)(void))
 
 #endif
 
+void setup_main_thread(void)
+{
+	if (!main_thread_set) {
+		/*
+		 * We assume that the first time that start_async is called
+		 * it is from the main thread.
+		 */
+		main_thread_set = 1;
+		main_thread = pthread_self();
+		pthread_key_create(&async_key, NULL);
+		pthread_key_create(&async_die_counter, NULL);
+		set_die_routine(die_async);
+		set_die_is_recursing_routine(async_die_is_recursing);
+	}
+}
+
 int start_async(struct async *async)
 {
 	int need_in, need_out;
@@ -740,18 +756,7 @@ int start_async(struct async *async)
 	else if (async->out)
 		close(async->out);
 #else
-	if (!main_thread_set) {
-		/*
-		 * We assume that the first time that start_async is called
-		 * it is from the main thread.
-		 */
-		main_thread_set = 1;
-		main_thread = pthread_self();
-		pthread_key_create(&async_key, NULL);
-		pthread_key_create(&async_die_counter, NULL);
-		set_die_routine(die_async);
-		set_die_is_recursing_routine(async_die_is_recursing);
-	}
+	setup_main_thread();
 
 	if (proc_in >= 0)
 		set_cloexec(proc_in);
diff --git a/thread-utils.c b/thread-utils.c
index a2135e0..936b3672 100644
--- a/thread-utils.c
+++ b/thread-utils.c
@@ -1,5 +1,7 @@
 #include "cache.h"
 #include "thread-utils.h"
+#include "run-command.h"
+#include "git-compat-util.h"
 
 #if defined(hpux) || defined(__hpux) || defined(_hpux)
 #  include <sys/pstat.h>
@@ -75,3 +77,238 @@ int init_recursive_mutex(pthread_mutex_t *m)
 	}
 	return ret;
 }
+
+#ifndef NO_PTHREADS
+struct job_list {
+	int (*fct)(struct task_queue *tq, void *task);
+	void *task;
+	struct job_list *next;
+};
+
+static pthread_t main_thread;
+static int main_thread_set;
+static pthread_key_t async_key;
+static pthread_key_t async_die_counter;
+
+static NORETURN void die_async(const char *err, va_list params)
+{
+	vreportf("fatal: ", err, params);
+
+	if (!pthread_equal(main_thread, pthread_self()))
+		pthread_exit((void *)128);
+
+	exit(128);
+}
+
+static int async_die_is_recursing(void)
+{
+	void *ret = pthread_getspecific(async_die_counter);
+	pthread_setspecific(async_die_counter, (void *)1);
+	return ret != NULL;
+}
+
+/* FIXME: deduplicate this code with run-command.c */
+static void setup_main_thread(void)
+{
+	if (!main_thread_set) {
+		main_thread_set = 1;
+		main_thread = pthread_self();
+		pthread_key_create(&async_key, NULL);
+		pthread_key_create(&async_die_counter, NULL);
+		set_die_routine(die_async);
+		set_die_is_recursing_routine(async_die_is_recursing);
+	}
+}
+
+struct task_queue {
+	/*
+	 * To avoid deadlocks always acquire the semaphores with lowest priority
+	 * first, priorites are in descending order as listed.
+	 *
+	 * The `mutex` is a general purpose lock for modifying data in the async
+	 * queue, such as adding a new task or adding a return value from
+	 * an already run task.
+	 *
+	 * `workingcount` and `freecount` are opposing semaphores, the sum of
+	 * their values should equal `max_threads` at any time while the `mutex`
+	 * is available.
+	 */
+	sem_t mutex;
+	sem_t workingcount;
+	sem_t freecount;
+
+	pthread_t *threads;
+	unsigned max_threads;
+
+	struct job_list *first;
+	struct job_list *last;
+
+	void (*finish_function)(struct task_queue *tq);
+	int early_return;
+};
+
+static void next_task(struct task_queue *tq,
+		      int (**fct)(struct task_queue *tq, void *task),
+		      void **task,
+		      int *early_return)
+{
+	struct job_list *job = NULL;
+
+	sem_wait(&tq->workingcount);
+	sem_wait(&tq->mutex);
+
+	if (*early_return) {
+		tq->early_return |= *early_return;
+		*fct = NULL;
+		*task = NULL;
+	} else {
+		if (!tq->first)
+			die("BUG: internal error with dequeuing jobs for threads");
+
+		job = tq->first;
+		*fct = job->fct;
+		*task = job->task;
+
+		tq->first = job->next;
+		if (!tq->first)
+			tq->last = NULL;
+	}
+
+	sem_post(&tq->freecount);
+	sem_post(&tq->mutex);
+
+	free(job);
+}
+
+static void *dispatcher(void *args)
+{
+	void *task;
+	int (*fct)(struct task_queue *tq, void *task);
+	int early_return = 0;
+	struct task_queue *tq = args;
+
+	next_task(tq, &fct, &task, &early_return);
+	while (fct || early_return != 0) {
+		early_return = fct(tq, task);
+		next_task(tq, &fct, &task, &early_return);
+	}
+
+	if (tq->finish_function)
+		tq->finish_function(tq);
+
+	pthread_exit(0);
+}
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+	struct task_queue *tq = xmalloc(sizeof(*tq));
+
+	int i, ret;
+	if (!max_threads)
+		tq->max_threads = online_cpus();
+	else
+		tq->max_threads = max_threads;
+
+	sem_init(&tq->mutex, 0, 1);
+	sem_init(&tq->workingcount, 0, 0);
+	sem_init(&tq->freecount, 0, tq->max_threads);
+	tq->threads = xmalloc(tq->max_threads * sizeof(pthread_t));
+
+	tq->first = NULL;
+	tq->last = NULL;
+
+	setup_main_thread();
+
+	for (i = 0; i < tq->max_threads; i++) {
+		ret = pthread_create(&tq->threads[i], 0, &dispatcher, tq);
+		if (ret)
+			die("unable to create thread: %s", strerror(ret));
+	}
+
+	tq->early_return = 0;
+
+	return tq;
+}
+
+void add_task(struct task_queue *tq,
+	      int (*fct)(struct task_queue *tq, void *task),
+	      void *task)
+{
+	struct job_list *job_list;
+
+	job_list = xmalloc(sizeof(*job_list));
+	job_list->task = task;
+	job_list->fct = fct;
+	job_list->next = NULL;
+
+	sem_wait(&tq->freecount);
+	sem_wait(&tq->mutex);
+
+	if (!tq->last) {
+		tq->last = job_list;
+		tq->first = tq->last;
+	} else {
+		tq->last->next = job_list;
+		tq->last = tq->last->next;
+	}
+
+	sem_post(&tq->workingcount);
+	sem_post(&tq->mutex);
+}
+
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq))
+{
+	int ret;
+	int i;
+
+	tq->finish_function = fct;
+
+	for (i = 0; i < tq->max_threads; i++)
+		add_task(tq, NULL, NULL);
+
+	for (i = 0; i < tq->max_threads; i++)
+		pthread_join(tq->threads[i], 0);
+
+	sem_destroy(&tq->mutex);
+	sem_destroy(&tq->workingcount);
+	sem_destroy(&tq->freecount);
+
+	if (tq->first)
+		die("BUG: internal error with queuing jobs for threads");
+
+	free(tq->threads);
+	ret = tq->early_return;
+
+	free(tq);
+	return ret;
+}
+#else /* NO_PTHREADS */
+
+struct task_queue {
+	int early_return;
+};
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+	struct task_queue *tq = xmalloc(sizeof(*tq));
+
+	tq->early_return = 0;
+}
+
+void add_task(struct task_queue *tq,
+	      int (*fct)(struct task_queue *tq, void *task),
+	      void *task)
+{
+	if (tq->early_return)
+		return;
+
+	tq->early_return |= fct(tq, task);
+}
+
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq))
+{
+	int ret = tq->early_return;
+	free(tq);
+	return ret;
+}
+#endif
diff --git a/thread-utils.h b/thread-utils.h
index d9a769d..977d37b 100644
--- a/thread-utils.h
+++ b/thread-utils.h
@@ -7,9 +7,49 @@
 extern int online_cpus(void);
 extern int init_recursive_mutex(pthread_mutex_t*);
 
+#include <pthread.h>
+#include <semaphore.h>
+#include <stdio.h>
+#include <unistd.h>
+
 #else
 
 #define online_cpus() 1
 
 #endif
+
+/*
+ * Creates a struct `task_queue`, which holds a list of tasks. Up to
+ * `max_threads` threads are active to process the enqueued tasks
+ * processing the tasks in a first in first out order.
+ *
+ * If `max_threads` is zero the number of cores available will be used.
+ *
+ * Currently this only works in environments with pthreads, in other
+ * environments, the task will be processed sequentially in `add_task`.
+ */
+struct task_queue *create_task_queue(unsigned max_threads);
+
+/*
+ * The function and data are put into the task queue.
+ *
+ * The function `fct` must not be NULL, as that's used internally
+ * in `finish_task_queue` to signal shutdown. If the return code
+ * of `fct` is unequal to 0, the tasks will stop eventually,
+ * the current parallel tasks will be flushed out.
+ */
+void add_task(struct task_queue *tq,
+	      int (*fct)(struct task_queue *tq, void *task),
+	      void *task);
+
+/*
+ * Waits for all tasks to be done and frees the object. The return code
+ * is zero if all enqueued tasks were processed.
+ *
+ * The function `fct` is called once in each thread after the last task
+ * for that thread was processed. If no thread local cleanup needs to be
+ * performed, pass NULL.
+ */
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq));
+
 #endif /* THREAD_COMPAT_H */
-- 
2.5.0.400.gff86faf

  parent reply	other threads:[~2015-08-25 17:28 UTC|newest]

Thread overview: 23+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-08-25 17:28 [RFC PATCH 0/5] Demonstrate new parallel threading API Stefan Beller
2015-08-25 17:28 ` [PATCH 1/5] FIXUP submodule: implement `module_clone` as a builtin helper Stefan Beller
2015-08-25 17:28 ` Stefan Beller [this message]
2015-08-25 17:28 ` [PATCH 3/5] submodule: helper to run foreach in parallel Stefan Beller
2015-08-25 21:09   ` Junio C Hamano
2015-08-25 21:42     ` Stefan Beller
2015-08-25 22:23       ` Junio C Hamano
2015-08-25 22:44         ` Junio C Hamano
2015-08-26 17:06   ` Jeff King
2015-08-26 17:21     ` Stefan Beller
2015-08-25 17:28 ` [PATCH 4/5] index-pack: Use the new worker pool Stefan Beller
2015-08-25 19:03   ` Jeff King
2015-08-25 19:23     ` Stefan Beller
2015-08-25 20:41     ` Junio C Hamano
2015-08-25 20:59       ` Stefan Beller
2015-08-25 21:12         ` Junio C Hamano
2015-08-25 22:39           ` Stefan Beller
2015-08-25 22:50             ` Junio C Hamano
2015-08-25 17:28 ` [PATCH 5/5] pack-objects: Use " Stefan Beller
  -- strict thread matches above, loose matches on Subject: below --
2015-08-27  0:52 [RFC PATCH 0/5] Progressing with `git submodule foreach_parallel` Stefan Beller
2015-08-27  0:52 ` [PATCH 2/5] thread-utils: add a threaded task queue Stefan Beller
2015-08-27 12:59   ` Johannes Schindelin
2015-08-27 17:02     ` Stefan Beller
2015-08-28 15:34     ` Junio C Hamano

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: http://vger.kernel.org/majordomo-info.html

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1440523706-23041-3-git-send-email-sbeller@google.com \
    --to=sbeller@google.com \
    --cc=git@vger.kernel.org \
    --cc=gitster@pobox.com \
    --cc=jrnieder@gmail.com \
    --cc=peff@peff.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/mirrors/git.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).