From: Stefan Beller <sbeller@google.com>
To: git@vger.kernel.org
Cc: peff@peff.net, jrnieder@gmail.com, gitster@pobox.com,
johannes.schindelin@gmx.de, Stefan Beller <sbeller@google.com>
Subject: [PATCH 4/9] thread-utils: add a threaded task queue
Date: Thu, 27 Aug 2015 18:14:50 -0700 [thread overview]
Message-ID: <1440724495-708-5-git-send-email-sbeller@google.com> (raw)
In-Reply-To: <1440724495-708-1-git-send-email-sbeller@google.com>
This adds functionality to do work in a parallel threaded
fashion while the boiler plate code for setting up threads
and tearing them down as well as queuing up tasks is hidden
behind the new API.
Signed-off-by: Stefan Beller <sbeller@google.com>
---
run-command.c | 39 +++++++-----
run-command.h | 3 +
thread-utils.c | 192 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
thread-utils.h | 35 +++++++++++
4 files changed, 253 insertions(+), 16 deletions(-)
diff --git a/run-command.c b/run-command.c
index 28e1d55..3d37f8c 100644
--- a/run-command.c
+++ b/run-command.c
@@ -610,10 +610,12 @@ static NORETURN void die_async(const char *err, va_list params)
if (!pthread_equal(main_thread, pthread_self())) {
struct async *async = pthread_getspecific(async_key);
- if (async->proc_in >= 0)
- close(async->proc_in);
- if (async->proc_out >= 0)
- close(async->proc_out);
+ if (async) {
+ if (async->proc_in >= 0)
+ close(async->proc_in);
+ if (async->proc_out >= 0)
+ close(async->proc_out);
+ }
pthread_exit((void *)128);
}
@@ -668,6 +670,22 @@ int git_atexit(void (*handler)(void))
#endif
+void setup_main_thread(void)
+{
+ if (!main_thread_set) {
+ /*
+ * We assume that the first time that start_async is called
+ * it is from the main thread.
+ */
+ main_thread_set = 1;
+ main_thread = pthread_self();
+ pthread_key_create(&async_key, NULL);
+ pthread_key_create(&async_die_counter, NULL);
+ set_die_routine(die_async);
+ set_die_is_recursing_routine(async_die_is_recursing);
+ }
+}
+
int start_async(struct async *async)
{
int need_in, need_out;
@@ -740,18 +758,7 @@ int start_async(struct async *async)
else if (async->out)
close(async->out);
#else
- if (!main_thread_set) {
- /*
- * We assume that the first time that start_async is called
- * it is from the main thread.
- */
- main_thread_set = 1;
- main_thread = pthread_self();
- pthread_key_create(&async_key, NULL);
- pthread_key_create(&async_die_counter, NULL);
- set_die_routine(die_async);
- set_die_is_recursing_routine(async_die_is_recursing);
- }
+ setup_main_thread();
if (proc_in >= 0)
set_cloexec(proc_in);
diff --git a/run-command.h b/run-command.h
index 5b4425a..176a5b2 100644
--- a/run-command.h
+++ b/run-command.h
@@ -119,4 +119,7 @@ struct async {
int start_async(struct async *async);
int finish_async(struct async *async);
+/* die gracefully from within threads */
+void setup_main_thread(void);
+
#endif
diff --git a/thread-utils.c b/thread-utils.c
index a2135e0..30ccd79 100644
--- a/thread-utils.c
+++ b/thread-utils.c
@@ -1,5 +1,7 @@
#include "cache.h"
#include "thread-utils.h"
+#include "run-command.h"
+#include "git-compat-util.h"
#if defined(hpux) || defined(__hpux) || defined(_hpux)
# include <sys/pstat.h>
@@ -75,3 +77,193 @@ int init_recursive_mutex(pthread_mutex_t *m)
}
return ret;
}
+
+#ifndef NO_PTHREADS
+struct job_list {
+ int (*fct)(struct task_queue *tq, void *task);
+ void *task;
+ struct job_list *next;
+};
+
+struct task_queue {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond_non_empty;
+
+ int queued_tasks;
+ struct job_list *first;
+ struct job_list *last;
+
+ pthread_t *threads;
+ unsigned max_threads;
+ unsigned max_tasks;
+
+ void (*finish_function)(struct task_queue *tq);
+ int early_return;
+};
+
+static void next_task(struct task_queue *tq,
+ int (**fct)(struct task_queue *tq, void *task),
+ void **task,
+ int *early_return)
+{
+ struct job_list *job = NULL;
+
+ pthread_mutex_lock(&tq->mutex);
+ while (tq->queued_tasks == 0)
+ pthread_cond_wait(&tq->cond_non_empty, &tq->mutex);
+
+ tq->early_return |= *early_return;
+
+ if (!tq->early_return) {
+ job = tq->first;
+ tq->first = job->next;
+ if (!tq->first)
+ tq->last = NULL;
+ tq->queued_tasks--;
+ }
+
+ pthread_mutex_unlock(&tq->mutex);
+
+ if (job) {
+ *fct = job->fct;
+ *task = job->task;
+ } else {
+ *fct = NULL;
+ *task = NULL;
+ }
+
+ free(job);
+}
+
+static void *dispatcher(void *args)
+{
+ void *task;
+ int (*fct)(struct task_queue *tq, void *task);
+ int early_return = 0;
+ struct task_queue *tq = args;
+
+ next_task(tq, &fct, &task, &early_return);
+ while (fct && !early_return) {
+ early_return = fct(tq, task);
+ next_task(tq, &fct, &task, &early_return);
+ }
+
+ if (tq->finish_function)
+ tq->finish_function(tq);
+
+ pthread_exit(0);
+}
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+ struct task_queue *tq = xmalloc(sizeof(*tq));
+
+ int i, ret;
+ if (!max_threads)
+ tq->max_threads = online_cpus();
+ else
+ tq->max_threads = max_threads;
+
+ pthread_mutex_init(&tq->mutex, NULL);
+ pthread_cond_init(&tq->cond_non_empty, NULL);
+
+ tq->threads = xmalloc(tq->max_threads * sizeof(pthread_t));
+
+ tq->queued_tasks = 0;
+ tq->first = NULL;
+ tq->last = NULL;
+
+ setup_main_thread();
+
+ for (i = 0; i < tq->max_threads; i++) {
+ ret = pthread_create(&tq->threads[i], 0, &dispatcher, tq);
+ if (ret)
+ die("unable to create thread: %s", strerror(ret));
+ }
+
+ tq->early_return = 0;
+
+ return tq;
+}
+
+void add_task(struct task_queue *tq,
+ int (*fct)(struct task_queue *tq, void *task),
+ void *task)
+{
+ struct job_list *job_list;
+
+ job_list = xmalloc(sizeof(*job_list));
+ job_list->task = task;
+ job_list->fct = fct;
+ job_list->next = NULL;
+
+ pthread_mutex_lock(&tq->mutex);
+
+ if (!tq->last) {
+ tq->last = job_list;
+ tq->first = tq->last;
+ } else {
+ tq->last->next = job_list;
+ tq->last = tq->last->next;
+ }
+ tq->queued_tasks++;
+
+ pthread_mutex_unlock(&tq->mutex);
+ pthread_cond_signal(&tq->cond_non_empty);
+}
+
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq))
+{
+ int ret;
+ int i;
+
+ tq->finish_function = fct;
+
+ for (i = 0; i < tq->max_threads; i++)
+ add_task(tq, NULL, NULL);
+
+ for (i = 0; i < tq->max_threads; i++)
+ pthread_join(tq->threads[i], 0);
+
+ pthread_mutex_destroy(&tq->mutex);
+ pthread_cond_destroy(&tq->cond_non_empty);
+
+ if (tq->first)
+ die("BUG: internal error with queuing jobs for threads");
+
+ free(tq->threads);
+ ret = tq->early_return;
+
+ free(tq);
+ return ret;
+}
+#else /* NO_PTHREADS */
+
+struct task_queue {
+ int early_return;
+};
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+ struct task_queue *tq = xmalloc(sizeof(*tq));
+
+ tq->early_return = 0;
+}
+
+void add_task(struct task_queue *tq,
+ int (*fct)(struct task_queue *tq, void *task),
+ void *task)
+{
+ if (tq->early_return)
+ return;
+
+ tq->early_return |= fct(tq, task);
+}
+
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq))
+{
+ int ret = tq->early_return;
+ free(tq);
+ return ret;
+}
+#endif
diff --git a/thread-utils.h b/thread-utils.h
index d9a769d..f41cfb1 100644
--- a/thread-utils.h
+++ b/thread-utils.h
@@ -12,4 +12,39 @@ extern int init_recursive_mutex(pthread_mutex_t*);
#define online_cpus() 1
#endif
+
+/*
+ * Creates a struct `task_queue`, which holds a list of tasks. Up to
+ * `max_threads` threads are active to process the enqueued tasks
+ * processing the tasks in a first in first out order.
+ *
+ * If `max_threads` is zero the number of cores available will be used.
+ *
+ * Currently this only works in environments with pthreads, in other
+ * environments, the task will be processed sequentially in `add_task`.
+ */
+struct task_queue *create_task_queue(unsigned max_threads);
+
+/*
+ * The function and data are put into the task queue.
+ *
+ * The function `fct` must not be NULL, as that's used internally
+ * in `finish_task_queue` to signal shutdown. If the return code
+ * of `fct` is unequal to 0, the tasks will stop eventually,
+ * the current parallel tasks will be flushed out.
+ */
+void add_task(struct task_queue *tq,
+ int (*fct)(struct task_queue *tq, void *task),
+ void *task);
+
+/*
+ * Waits for all tasks to be done and frees the object. The return code
+ * is zero if all enqueued tasks were processed.
+ *
+ * The function `fct` is called once in each thread after the last task
+ * for that thread was processed. If no thread local cleanup needs to be
+ * performed, pass NULL.
+ */
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq));
+
#endif /* THREAD_COMPAT_H */
--
2.5.0.264.g5e52b0d
next prev parent reply other threads:[~2015-08-28 1:15 UTC|newest]
Thread overview: 33+ messages / expand[flat|nested] mbox.gz Atom feed top
2015-08-28 1:14 [PATCH 0/9] Progress with git submodule Stefan Beller
2015-08-28 1:14 ` [PATCH 1/9] submodule: implement `module_list` as a builtin helper Stefan Beller
2015-08-28 1:14 ` [PATCH 2/9] submodule: implement `module_name` " Stefan Beller
2015-08-28 1:14 ` [PATCH 3/9] submodule: implement `module_clone` " Stefan Beller
2015-08-31 18:53 ` Junio C Hamano
2015-08-28 1:14 ` Stefan Beller [this message]
2015-08-28 1:14 ` [PATCH 5/9] run-command: add synced output Stefan Beller
2015-08-28 1:14 ` [PATCH 6/9] submodule: helper to run foreach in parallel Stefan Beller
2015-08-28 17:08 ` Stefan Beller
2015-08-28 1:14 ` [PATCH 7/9] fetch: fetch submodules " Stefan Beller
2015-08-28 17:00 ` Stefan Beller
2015-08-28 17:01 ` Jonathan Nieder
2015-08-28 17:12 ` Junio C Hamano
2015-08-28 17:45 ` Stefan Beller
2015-08-28 18:20 ` Jonathan Nieder
2015-08-28 18:27 ` Junio C Hamano
2015-08-28 18:35 ` Jeff King
2015-08-28 18:41 ` Junio C Hamano
2015-08-28 18:41 ` Stefan Beller
2015-08-28 18:44 ` Jeff King
2015-08-28 18:50 ` Jonathan Nieder
2015-08-28 18:53 ` Jeff King
2015-08-28 19:02 ` Stefan Beller
2015-08-28 18:59 ` Stefan Beller
2015-08-28 18:44 ` Jonathan Nieder
2015-08-28 18:36 ` Stefan Beller
2015-08-28 18:42 ` Jonathan Nieder
2015-08-31 18:56 ` Junio C Hamano
2015-08-31 19:05 ` Jeff King
2015-08-28 1:14 ` [PATCH 8/9] index-pack: Use the new worker pool Stefan Beller
2015-08-28 1:14 ` [PATCH 9/9] pack-objects: Use " Stefan Beller
2015-08-28 10:09 ` [PATCH 0/9] Progress with git submodule Johannes Schindelin
2015-08-28 16:35 ` Stefan Beller
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: http://vger.kernel.org/majordomo-info.html
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1440724495-708-5-git-send-email-sbeller@google.com \
--to=sbeller@google.com \
--cc=git@vger.kernel.org \
--cc=gitster@pobox.com \
--cc=johannes.schindelin@gmx.de \
--cc=jrnieder@gmail.com \
--cc=peff@peff.net \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://80x24.org/mirrors/git.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).