git@vger.kernel.org mailing list mirror (one of many)
 help / color / mirror / code / Atom feed
From: Stefan Beller <sbeller@google.com>
To: git@vger.kernel.org
Cc: gitster@pobox.com, hvoigt@hvoigt.net, Jens.Lehmann@web.de,
	Stefan Beller <sbeller@google.com>
Subject: [RFC PATCH 2/4] Add a workdispatcher to get work done in parallel
Date: Thu,  6 Aug 2015 10:35:22 -0700	[thread overview]
Message-ID: <1438882524-21215-3-git-send-email-sbeller@google.com> (raw)
In-Reply-To: <1438882524-21215-1-git-send-email-sbeller@google.com>

This adds infrastructure code to work a set of tasks from a thread pool.

The whole life cycle of such a thread pool would look like

    struct workdispatcher *wd;
    struct return_values *rv;
    
    wd = create_workdispatcher(&command_for_task, max_parallel_jobs);
    for (...) {
        prepare(pointer_to_task);
        add_task(wd, pointer_to_task);
    }
    rv = wait_workdispatcher(wd);

Signed-off-by: Stefan Beller <sbeller@google.com>
---
 Makefile         |   1 +
 workdispatcher.c | 184 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 workdispatcher.h |  29 +++++++++
 3 files changed, 214 insertions(+)
 create mode 100644 workdispatcher.c
 create mode 100644 workdispatcher.h

diff --git a/Makefile b/Makefile
index 6fb7484..2d8803c 100644
--- a/Makefile
+++ b/Makefile
@@ -805,6 +805,7 @@ LIB_OBJS += version.o
 LIB_OBJS += versioncmp.o
 LIB_OBJS += walker.o
 LIB_OBJS += wildmatch.o
+LIB_OBJS += workdispatcher.o
 LIB_OBJS += wrapper.o
 LIB_OBJS += write_or_die.o
 LIB_OBJS += ws.o
diff --git a/workdispatcher.c b/workdispatcher.c
new file mode 100644
index 0000000..adfedd9
--- /dev/null
+++ b/workdispatcher.c
@@ -0,0 +1,184 @@
+#include "cache.h"
+#include "workdispatcher.h"
+
+#ifndef NO_PTHREADS
+#include <pthread.h>
+#include <semaphore.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#include "git-compat-util.h"
+struct job_list {
+	void *item;
+	struct job_list *next;
+};
+#endif
+
+struct workdispatcher {
+#ifndef NO_PTHREADS
+	/*
+	 * To avoid deadlocks always aquire the semaphores with lowest priority
+	 * first, priorites are in descending order as listed.
+	 *
+	 * The `mutex` is a general purpose lock for modifying data in the work
+	 * dispatcher, such as adding a new task or adding a return value from
+	 * an already run task.
+	 *
+	 * `workingcount` and `freecount` are opposing semaphores, the sum of
+	 * their values should equal `max_threads` at any time while the `mutex`
+	 * is available.
+	 */
+	sem_t mutex;
+	sem_t workingcount;
+	sem_t freecount;
+
+	pthread_t *threads;
+	unsigned max_threads;
+
+	struct job_list *first;
+	struct job_list *last;
+#endif
+	void *(*function)(void*);
+	struct return_values *ret;
+};
+
+#ifndef NO_PTHREADS
+static unsigned number_cores(void)
+{
+	int count = sysconf(_SC_NPROCESSORS_ONLN);
+	if (count < 1) {
+		fprintf(stderr, "Number of CPUs online reported %d. "
+			"Using one core.\n", count);
+		count = 1;
+	}
+	return count;
+}
+
+void *get_task(struct workdispatcher *wd)
+{
+	void *ret;
+	struct job_list *job;
+
+	sem_wait(&wd->workingcount);
+	sem_wait(&wd->mutex);
+
+	if (!wd->first)
+		die("BUG: internal error with dequeuing jobs for threads");
+	job = wd->first;
+	ret = job->item;
+	wd->first = job->next;
+	if (!wd->first)
+		wd->last = NULL;
+
+	sem_post(&wd->freecount);
+	sem_post(&wd->mutex);
+
+	free(job);
+	return ret;
+}
+
+void* dispatcher(void *args)
+{
+	struct workdispatcher *wd = args;
+	void *job = get_task(wd);
+	while (job) {
+		void *retvalue = wd->function(job);
+
+		sem_wait(&wd->mutex);
+		struct return_values *rv = wd->ret;
+		ALLOC_GROW(rv->ret, rv->count + 1, rv->alloc);
+		wd->ret->ret[rv->count++] = retvalue;
+		sem_post(&wd->mutex);
+
+		job = get_task(wd);
+	}
+
+	pthread_exit(0);
+}
+#endif
+
+struct workdispatcher *create_workdispatcher(void *function(void*),
+					     unsigned max_threads)
+{
+	struct workdispatcher *wd = xmalloc(sizeof(*wd));
+
+#ifndef NO_PTHREADS
+	int i;
+	if (!max_threads)
+		wd->max_threads = number_cores();
+	else
+		wd->max_threads = max_threads;
+
+	sem_init(&wd->mutex, 0, 1);
+	sem_init(&wd->workingcount, 0, 0);
+	sem_init(&wd->freecount, 0, wd->max_threads);
+	wd->threads = xmalloc(wd->max_threads * sizeof(pthread_t));
+
+	for (i = 0; i < wd->max_threads; i++)
+		pthread_create(&wd->threads[i], 0, &dispatcher, wd);
+
+	wd->first = NULL;
+	wd->last = NULL;
+#endif
+	wd->function = function;
+	wd->ret = xmalloc(sizeof(*wd->ret));
+	wd->ret->ret = NULL;
+	wd->ret->count = 0;
+	wd->ret->alloc = 0;
+
+	return wd;
+}
+
+void add_task(struct workdispatcher *wd, void *job)
+{
+#ifndef NO_PTHREADS
+	struct job_list *job_list;
+
+	job_list = xmalloc(sizeof(*job_list));
+	job_list->item = job;
+	job_list->next = NULL;
+
+	sem_wait(&wd->freecount);
+	sem_wait(&wd->mutex);
+
+	if (!wd->last) {
+		wd->last = job_list;
+		wd->first = wd->last;
+	} else {
+		wd->last->next = job_list;
+		wd->last = wd->last->next;
+	}
+
+	sem_post(&wd->workingcount);
+	sem_post(&wd->mutex);
+#else
+	ALLOC_GROW(wd->ret->ret, wd->ret->count + 1, wd->ret->alloc);
+	wd->ret->ret[wd->ret->count++] = wd->function(job);
+#endif
+}
+
+struct return_values *wait_workdispatcher(struct workdispatcher *wd)
+{
+	struct return_values *ret;
+#ifndef NO_PTHREADS
+	int i;
+	for (i = 0; i < wd->max_threads; i++)
+		add_task(wd, NULL);
+
+	for (i = 0; i < wd->max_threads; i++)
+		pthread_join(wd->threads[i], 0);
+
+	sem_destroy(&wd->mutex);
+	sem_destroy(&wd->workingcount);
+	sem_destroy(&wd->freecount);
+
+	if (wd->first)
+		die("BUG: internal error with queuing jobs for threads");
+
+	free(wd->threads);
+#endif
+	ret = wd->ret;
+
+	free(wd);
+	return ret;
+}
diff --git a/workdispatcher.h b/workdispatcher.h
new file mode 100644
index 0000000..9f78124
--- /dev/null
+++ b/workdispatcher.h
@@ -0,0 +1,29 @@
+#ifndef WORKDISPATCHER
+#define WORKDISPATCHER
+
+struct return_values {
+	void **ret;
+	int count, alloc;
+};
+
+/*
+ * Creates a struct workdispatcher, which holds a job list and assigns the
+ * jobs to be processed to a number of threads `maxthreads`.
+ * Within the threads the function `fct` is called with the pointer as
+ * given in add_task.
+ *
+ * If `maxthreads` is zero the number of cores available will be used.
+ *
+ * Currently this only works in environments with pthreads, in other
+ * environments, the task will be processed directly in `add_task`.
+ */
+struct workdispatcher *create_workdispatcher(void *fct(void*),
+					     unsigned maxthreads);
+
+/* Waits for all tasks to be done and frees the object. */
+struct return_values *wait_workdispatcher(struct workdispatcher *wd);
+
+/* `task` must not be NULL, as that's used internally to signal shutdown. */
+void add_task(struct workdispatcher *wd, void *task);
+
+#endif
-- 
2.5.0.239.g9728e1d.dirty

  parent reply	other threads:[~2015-08-06 17:35 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-08-06 17:35 [RFC/PATCH 0/4] parallel fetch for submodules Stefan Beller
2015-08-06 17:35 ` [PATCH 1/4] submodule: implement `module_name` as a builtin helper Stefan Beller
2015-08-06 19:49   ` Jens Lehmann
2015-08-06 19:54     ` Jens Lehmann
2015-08-06 17:35 ` Stefan Beller [this message]
2015-08-06 17:35 ` [PATCH 3/4] argv_array: add argv_array_copy Stefan Beller
2015-08-06 18:18   ` Eric Sunshine
2015-08-06 18:52     ` Jeff King
2015-08-06 17:35 ` [RFC PATCH 4/4] submodule: add infrastructure to fetch submodules in parallel Stefan Beller
2015-08-06 20:08 ` [RFC/PATCH 0/4] parallel fetch for submodules Jens Lehmann
2015-08-06 20:44   ` Stefan Beller

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: http://vger.kernel.org/majordomo-info.html

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1438882524-21215-3-git-send-email-sbeller@google.com \
    --to=sbeller@google.com \
    --cc=Jens.Lehmann@web.de \
    --cc=git@vger.kernel.org \
    --cc=gitster@pobox.com \
    --cc=hvoigt@hvoigt.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/mirrors/git.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).