git@vger.kernel.org mailing list mirror (one of many)
 help / color / mirror / code / Atom feed
From: Junio C Hamano <gitster@pobox.com>
To: Stefan Beller <sbeller@google.com>
Cc: git@vger.kernel.org, jrnieder@gmail.com,
	Johannes Sixt <j6t@kdbg.org>, Jeff King <peff@peff.net>,
	hvoigt@hvoigt.net, jens.lehmann@web.de
Subject: Re: [RFC PATCH 2/3] run-commands: add an async queue processor
Date: Fri, 21 Aug 2015 12:05:13 -0700	[thread overview]
Message-ID: <xmqqegiw5uom.fsf@gitster.dls.corp.google.com> (raw)
In-Reply-To: <1440121237-24576-2-git-send-email-sbeller@google.com> (Stefan Beller's message of "Thu, 20 Aug 2015 18:40:36 -0700")

Stefan Beller <sbeller@google.com> writes:

> This adds functionality to do work in parallel.
>
> The whole life cycle of such a thread pool would look like
>
>     struct task_queue * tq = create_task_queue(32); // no of threads
>     for (...)
>         add_task(tq, process_one_item_function, item); // non blocking
>     ...
>     int ret = finish_task_queue(tq); // blocks until all tasks are done
>     if (!tq)
>         die ("Not all items were be processed");
>
> The caller must take care of handling the output.
>
> Signed-off-by: Stefan Beller <sbeller@google.com>
> ---
>
> I sent this a while ago to the list, no comments on it :(

The primary reason I suspect is because you sent to a wrong set of
people.  Submodule folks have largely been working in the scripted
ones, and may not necessarily be the ones who are most familiar with
the run-command infrastructure.

"shortlog --no-merges" tells me that the obvious suspects are j6t
and peff.

> The core functionality stayed the same, but I hope to improved naming and
> location of the code.
>
> The WIP is only for the NO_PTHREADS case.

>  run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
>  run-command.h |  30 +++++++++
>  2 files changed, 230 insertions(+), 12 deletions(-)
>
> diff --git a/run-command.c b/run-command.c
> index 28e1d55..4029011 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -4,6 +4,21 @@
>  #include "sigchain.h"
>  #include "argv-array.h"
>  
> +#ifdef NO_PTHREADS
> +
> +#else
> +
> +#include "thread-utils.h"
> +
> +#include <pthread.h>
> +#include <semaphore.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +
> +#endif
> +
> +#include "git-compat-util.h"
> +

This goes against the way we have been organizing the header files.

http://thread.gmane.org/gmane.comp.version-control.git/276241/focus=276265

> @@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void))
>  
>  #endif
>  
> +void setup_main_thread()

void setup_main_thread(void)

> @@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
>  	close(cmd->out);
>  	return finish_command(cmd);
>  }
> +
> +#ifndef NO_PTHREADS
> +struct job_list {
> +	int (*fct)(struct task_queue *aq, void *task);
> +	void *task;
> +	struct job_list *next;
> +};
> +#endif
> +
> +struct task_queue {
> +#ifndef NO_PTHREADS
> +	/*
> +	 * To avoid deadlocks always aquire the semaphores with lowest priority

acquire.

> +	 * first, priorites are in descending order as listed.
> +	 *
> +	 * The `mutex` is a general purpose lock for modifying data in the async
> +	 * queue, such as adding a new task or adding a return value from
> +	 * an already run task.
> +	 *
> +	 * `workingcount` and `freecount` are opposing semaphores, the sum of
> +	 * their values should equal `max_threads` at any time while the `mutex`
> +	 * is available.
> +	 */
> +	sem_t mutex;
> +	sem_t workingcount;
> +	sem_t freecount;
> +
> +	pthread_t *threads;
> +	unsigned max_threads;
> +
> +	struct job_list *first;
> +	struct job_list *last;
> +#endif
> +	int early_return;
> +};
> +
> +#ifndef NO_PTHREADS
> +
> +static void get_task(struct task_queue *aq,
> +		     int (**fct)(struct task_queue *aq, void *task),
> +		     void **task,
> +		     int *early_return)
> +{
> +	struct job_list *job;
> +
> +	sem_wait(&aq->workingcount);
> +	sem_wait(&aq->mutex);
> +
> +	if (!aq->first)
> +		die("BUG: internal error with dequeuing jobs for threads");
> +	job = aq->first;
> +	*fct = job->fct;
> +	*task = job->task;
> +	aq->early_return |= *early_return;
> +	*early_return = aq->early_return;
> +	aq->first = job->next;
> +	if (!aq->first)
> +		aq->last = NULL;
> +
> +	sem_post(&aq->freecount);
> +	sem_post(&aq->mutex);
> +
> +	free(job);
> +}
> +
> +static void* dispatcher(void *args)

static void *dispatcher(....)

> +{
> +	void *task;
> +	int (*fct)(struct task_queue *aq, void *data);

s/data/task/?

> +	int early_return = 0;
> +	struct task_queue *aq = args;
> +
> +	get_task(aq, &fct, &task, &early_return);
> +	while (fct || early_return != 0) {
> +		early_return = fct(aq, task);
> +		get_task(aq, &fct, &task, &early_return);
> +	}

If the func said "we are done, you may stop dispatching now", do you
still want to do another get_task()?

> +	pthread_exit(0);
> +}
> +#endif
> +
> +struct task_queue *create_task_queue(unsigned max_threads)
> +{
> +	struct task_queue *aq = xmalloc(sizeof(*aq));
> +
> +#ifndef NO_PTHREADS
> +	int i;
> +	if (!max_threads)
> +		aq->max_threads = online_cpus();
> +	else
> +		aq->max_threads = max_threads;
> +
> +	sem_init(&aq->mutex, 0, 1);
> +	sem_init(&aq->workingcount, 0, 0);
> +	sem_init(&aq->freecount, 0, aq->max_threads);
> +	aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t));
> +
> +	for (i = 0; i < aq->max_threads; i++)
> +		pthread_create(&aq->threads[i], 0, &dispatcher, aq);
> +
> +	aq->first = NULL;
> +	aq->last = NULL;


Shouldn't these be initialized before letting threads call into
dispatcher?  The workingcount semaphore that is initialized to 0 may
prevent them from peeking into these pointers and barfing, but still...

> +
> +	setup_main_thread();
> +#endif
> +	aq->early_return = 0;
> +
> +	return aq;
> +}

  reply	other threads:[~2015-08-21 19:05 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-08-21  1:40 [PATCH 1/3] submodule: implement `module_clone` as a builtin helper Stefan Beller
2015-08-21  1:40 ` [RFC PATCH 2/3] run-commands: add an async queue processor Stefan Beller
2015-08-21 19:05   ` Junio C Hamano [this message]
2015-08-21 19:44     ` Jeff King
2015-08-21 19:48       ` Stefan Beller
2015-08-21 19:51         ` Jeff King
2015-08-21 20:12           ` Stefan Beller
2015-08-21 20:41       ` Junio C Hamano
2015-08-21 23:40       ` Stefan Beller
2015-08-24 21:22         ` Junio C Hamano
2015-08-21 19:45     ` Stefan Beller
2015-08-21 20:47       ` Junio C Hamano
2015-08-21 20:56         ` Stefan Beller
2015-08-21  1:40 ` [WIP/PATCH 3/3] submodule: helper to run foreach in parallel Stefan Beller
2015-08-21 19:23   ` Junio C Hamano
2015-08-21 20:21     ` Stefan Beller

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: http://vger.kernel.org/majordomo-info.html

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=xmqqegiw5uom.fsf@gitster.dls.corp.google.com \
    --to=gitster@pobox.com \
    --cc=git@vger.kernel.org \
    --cc=hvoigt@hvoigt.net \
    --cc=j6t@kdbg.org \
    --cc=jens.lehmann@web.de \
    --cc=jrnieder@gmail.com \
    --cc=peff@peff.net \
    --cc=sbeller@google.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/mirrors/git.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).