From: Junio C Hamano <gitster@pobox.com>
To: Stefan Beller <sbeller@google.com>
Cc: git@vger.kernel.org, jrnieder@gmail.com,
Johannes Sixt <j6t@kdbg.org>, Jeff King <peff@peff.net>,
hvoigt@hvoigt.net, jens.lehmann@web.de
Subject: Re: [RFC PATCH 2/3] run-commands: add an async queue processor
Date: Fri, 21 Aug 2015 12:05:13 -0700 [thread overview]
Message-ID: <xmqqegiw5uom.fsf@gitster.dls.corp.google.com> (raw)
In-Reply-To: <1440121237-24576-2-git-send-email-sbeller@google.com> (Stefan Beller's message of "Thu, 20 Aug 2015 18:40:36 -0700")
Stefan Beller <sbeller@google.com> writes:
> This adds functionality to do work in parallel.
>
> The whole life cycle of such a thread pool would look like
>
> struct task_queue * tq = create_task_queue(32); // no of threads
> for (...)
> add_task(tq, process_one_item_function, item); // non blocking
> ...
> int ret = finish_task_queue(tq); // blocks until all tasks are done
> if (!tq)
> die ("Not all items were be processed");
>
> The caller must take care of handling the output.
>
> Signed-off-by: Stefan Beller <sbeller@google.com>
> ---
>
> I sent this a while ago to the list, no comments on it :(
The primary reason I suspect is because you sent to a wrong set of
people. Submodule folks have largely been working in the scripted
ones, and may not necessarily be the ones who are most familiar with
the run-command infrastructure.
"shortlog --no-merges" tells me that the obvious suspects are j6t
and peff.
> The core functionality stayed the same, but I hope to improved naming and
> location of the code.
>
> The WIP is only for the NO_PTHREADS case.
> run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
> run-command.h | 30 +++++++++
> 2 files changed, 230 insertions(+), 12 deletions(-)
>
> diff --git a/run-command.c b/run-command.c
> index 28e1d55..4029011 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -4,6 +4,21 @@
> #include "sigchain.h"
> #include "argv-array.h"
>
> +#ifdef NO_PTHREADS
> +
> +#else
> +
> +#include "thread-utils.h"
> +
> +#include <pthread.h>
> +#include <semaphore.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +
> +#endif
> +
> +#include "git-compat-util.h"
> +
This goes against the way we have been organizing the header files.
http://thread.gmane.org/gmane.comp.version-control.git/276241/focus=276265
> @@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void))
>
> #endif
>
> +void setup_main_thread()
void setup_main_thread(void)
> @@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
> close(cmd->out);
> return finish_command(cmd);
> }
> +
> +#ifndef NO_PTHREADS
> +struct job_list {
> + int (*fct)(struct task_queue *aq, void *task);
> + void *task;
> + struct job_list *next;
> +};
> +#endif
> +
> +struct task_queue {
> +#ifndef NO_PTHREADS
> + /*
> + * To avoid deadlocks always aquire the semaphores with lowest priority
acquire.
> + * first, priorites are in descending order as listed.
> + *
> + * The `mutex` is a general purpose lock for modifying data in the async
> + * queue, such as adding a new task or adding a return value from
> + * an already run task.
> + *
> + * `workingcount` and `freecount` are opposing semaphores, the sum of
> + * their values should equal `max_threads` at any time while the `mutex`
> + * is available.
> + */
> + sem_t mutex;
> + sem_t workingcount;
> + sem_t freecount;
> +
> + pthread_t *threads;
> + unsigned max_threads;
> +
> + struct job_list *first;
> + struct job_list *last;
> +#endif
> + int early_return;
> +};
> +
> +#ifndef NO_PTHREADS
> +
> +static void get_task(struct task_queue *aq,
> + int (**fct)(struct task_queue *aq, void *task),
> + void **task,
> + int *early_return)
> +{
> + struct job_list *job;
> +
> + sem_wait(&aq->workingcount);
> + sem_wait(&aq->mutex);
> +
> + if (!aq->first)
> + die("BUG: internal error with dequeuing jobs for threads");
> + job = aq->first;
> + *fct = job->fct;
> + *task = job->task;
> + aq->early_return |= *early_return;
> + *early_return = aq->early_return;
> + aq->first = job->next;
> + if (!aq->first)
> + aq->last = NULL;
> +
> + sem_post(&aq->freecount);
> + sem_post(&aq->mutex);
> +
> + free(job);
> +}
> +
> +static void* dispatcher(void *args)
static void *dispatcher(....)
> +{
> + void *task;
> + int (*fct)(struct task_queue *aq, void *data);
s/data/task/?
> + int early_return = 0;
> + struct task_queue *aq = args;
> +
> + get_task(aq, &fct, &task, &early_return);
> + while (fct || early_return != 0) {
> + early_return = fct(aq, task);
> + get_task(aq, &fct, &task, &early_return);
> + }
If the func said "we are done, you may stop dispatching now", do you
still want to do another get_task()?
> + pthread_exit(0);
> +}
> +#endif
> +
> +struct task_queue *create_task_queue(unsigned max_threads)
> +{
> + struct task_queue *aq = xmalloc(sizeof(*aq));
> +
> +#ifndef NO_PTHREADS
> + int i;
> + if (!max_threads)
> + aq->max_threads = online_cpus();
> + else
> + aq->max_threads = max_threads;
> +
> + sem_init(&aq->mutex, 0, 1);
> + sem_init(&aq->workingcount, 0, 0);
> + sem_init(&aq->freecount, 0, aq->max_threads);
> + aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t));
> +
> + for (i = 0; i < aq->max_threads; i++)
> + pthread_create(&aq->threads[i], 0, &dispatcher, aq);
> +
> + aq->first = NULL;
> + aq->last = NULL;
Shouldn't these be initialized before letting threads call into
dispatcher? The workingcount semaphore that is initialized to 0 may
prevent them from peeking into these pointers and barfing, but still...
> +
> + setup_main_thread();
> +#endif
> + aq->early_return = 0;
> +
> + return aq;
> +}
next prev parent reply other threads:[~2015-08-21 19:05 UTC|newest]
Thread overview: 16+ messages / expand[flat|nested] mbox.gz Atom feed top
2015-08-21 1:40 [PATCH 1/3] submodule: implement `module_clone` as a builtin helper Stefan Beller
2015-08-21 1:40 ` [RFC PATCH 2/3] run-commands: add an async queue processor Stefan Beller
2015-08-21 19:05 ` Junio C Hamano [this message]
2015-08-21 19:44 ` Jeff King
2015-08-21 19:48 ` Stefan Beller
2015-08-21 19:51 ` Jeff King
2015-08-21 20:12 ` Stefan Beller
2015-08-21 20:41 ` Junio C Hamano
2015-08-21 23:40 ` Stefan Beller
2015-08-24 21:22 ` Junio C Hamano
2015-08-21 19:45 ` Stefan Beller
2015-08-21 20:47 ` Junio C Hamano
2015-08-21 20:56 ` Stefan Beller
2015-08-21 1:40 ` [WIP/PATCH 3/3] submodule: helper to run foreach in parallel Stefan Beller
2015-08-21 19:23 ` Junio C Hamano
2015-08-21 20:21 ` Stefan Beller
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: http://vger.kernel.org/majordomo-info.html
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=xmqqegiw5uom.fsf@gitster.dls.corp.google.com \
--to=gitster@pobox.com \
--cc=git@vger.kernel.org \
--cc=hvoigt@hvoigt.net \
--cc=j6t@kdbg.org \
--cc=jens.lehmann@web.de \
--cc=jrnieder@gmail.com \
--cc=peff@peff.net \
--cc=sbeller@google.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://80x24.org/mirrors/git.git
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).