git@vger.kernel.org mailing list mirror (one of many)
 help / color / mirror / code / Atom feed
* [RFC PATCH 0/4] add parallel unlink
@ 2023-12-03 13:39 Han Young
  2023-12-03 13:39 ` [RFC PATCH 1/4] symlinks: add and export threaded rmdir variants Han Young
                   ` (3 more replies)
  0 siblings, 4 replies; 5+ messages in thread
From: Han Young @ 2023-12-03 13:39 UTC (permalink / raw
  To: git; +Cc: Han Young

We have had parallel_checkout option since 04155bdad, but the unlink is still performed single threaded.
With a very large repository, directory rename or reorganization can lead to a large amount of unlinked entries.
In some instance, the unlink process can be slower than the parallel checkout.

This series of patches introduces basic support for parallel unlink. The removal of individual files
can be easily multithreaded, but removing empty directories is a little tricky.
If one thread decides to remove the directory, it may still have files that need to be deleted by
another thread. I had to use a mutex-guarded hashset to collect these 'race' directories,
and remove them after all threads have been joined. Maybe there are ways to do this
without mutex and hashmap?

The speed of unlinking files seems to vary from system to system. I did some tests with a private repo.
When I checkout a commit with 15000 moved files on a Linux machine with btrfs, parallel_unlink yields
10% speed up. But on a Intel MacBook Pro with APFS, the speed up is over 100%. I find it difficult to
choose the default threshold of parallel_unlink.

This series is by no means complete. Many functions contains duplicated code, and there are some
memory leaks. I want to know the community opinion before proceed, if it's worth doing or a waste of time.

Han Young (4):
  symlinks: add and export threaded rmdir variants
  entry: add threaded_unlink_entry function
  parallel-checkout: add parallel_unlink
  unpack-trees: introduce parallel_unlink

 entry.c             |  16 ++++++
 entry.h             |   3 ++
 parallel-checkout.c |  80 +++++++++++++++++++++++++++++
 parallel-checkout.h |  25 +++++++++
 symlinks.c          | 120 ++++++++++++++++++++++++++++++++++++++++++--
 symlinks.h          |   6 +++
 unpack-trees.c      |  15 +-----
 7 files changed, 249 insertions(+), 16 deletions(-)

-- 
2.43.0



^ permalink raw reply	[flat|nested] 5+ messages in thread

* [RFC PATCH 1/4] symlinks: add and export threaded rmdir variants
  2023-12-03 13:39 [RFC PATCH 0/4] add parallel unlink Han Young
@ 2023-12-03 13:39 ` Han Young
  2023-12-03 13:39 ` [RFC PATCH 2/4] entry: add threaded_unlink_entry function Han Young
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 5+ messages in thread
From: Han Young @ 2023-12-03 13:39 UTC (permalink / raw
  To: git; +Cc: Han Young

From: Han Young <hanyang.tony@bytedance.com>

Add and export threaded variants of remove dir related functions, these functions will be used by parallel unlink
---
Most of the code of threaded_schedule_dir_for_removal and threaded_do_remove_scheduled_dirs is duplicated.
We can remove the duplication either via breaking the function into smaller functions, or pass the cache as parameters.
If we choose to pass the cache explicitly, default cache in both entry.c and symlinks.c probably need to be moved to
unpack-trees.c. I'm not satisfied with using mutex guarded hashset to ensure every dir is removed. But I can't come
up with a better way.

 symlinks.c | 120 +++++++++++++++++++++++++++++++++++++++++++++++++++--
 symlinks.h |   6 +++
 2 files changed, 123 insertions(+), 3 deletions(-)

diff --git a/symlinks.c b/symlinks.c
index b29e340c2d..c8cb0a7eb7 100644
--- a/symlinks.c
+++ b/symlinks.c
@@ -2,9 +2,9 @@
 #include "gettext.h"
 #include "setup.h"
 #include "symlinks.h"
+#include "hashmap.h"
+#include "pthread.h"
 
-static int threaded_check_leading_path(struct cache_def *cache, const char *name,
-				       int len, int warn_on_lstat_err);
 static int threaded_has_dirs_only_path(struct cache_def *cache, const char *name, int len, int prefix_len);
 
 /*
@@ -229,7 +229,7 @@ int check_leading_path(const char *name, int len, int warn_on_lstat_err)
  * directory, or if we were unable to lstat() it. If warn_on_lstat_err is true,
  * also emit a warning for this error.
  */
-static int threaded_check_leading_path(struct cache_def *cache, const char *name,
+int threaded_check_leading_path(struct cache_def *cache, const char *name,
 				       int len, int warn_on_lstat_err)
 {
 	int flags;
@@ -277,6 +277,51 @@ static int threaded_has_dirs_only_path(struct cache_def *cache, const char *name
 }
 
 static struct strbuf removal = STRBUF_INIT;
+static struct hashmap dir_set;
+pthread_mutex_t dir_set_mutex = PTHREAD_MUTEX_INITIALIZER;
+struct rmdir_hash_entry {
+      struct hashmap_entry hash;
+      char *dir;
+      size_t dirlen;
+};
+
+/* rmdir_hashmap comparison function */
+static int rmdir_hash_entry_cmp(const void *cmp_data UNUSED,
+			       const struct hashmap_entry *eptr,
+			       const struct hashmap_entry *entry_or_key UNUSED,
+			       const void *keydata)
+{
+	const struct rmdir_hash_entry *a, *b;
+
+	a = container_of(eptr, const struct rmdir_hash_entry, hash);
+	return strcmp(a->dir, (char *)keydata);
+}
+
+void threaded_init_remove_scheduled_dirs(void)
+{
+	unsigned flags = 0;
+	hashmap_init(&dir_set, rmdir_hash_entry_cmp, &flags, 0);
+}
+
+static void add_dir_to_rmdir_hash(char *dir, size_t dirlen)
+{
+	struct rmdir_hash_entry *e;
+	struct hashmap_entry *ent;
+	int hash = strhash(dir);
+	pthread_mutex_lock(&dir_set_mutex);
+	ent = hashmap_get_from_hash(&dir_set, hash, dir);
+
+	if (!ent) {
+		e = xmalloc(sizeof(struct rmdir_hash_entry));
+		hashmap_entry_init(&e->hash, hash);
+		char *_dir= xmallocz(dirlen);
+		memcpy(_dir, dir, dirlen+1);
+		e->dir = _dir;
+		e->dirlen = dirlen;
+		hashmap_put_entry(&dir_set, e, hash);
+	}
+	pthread_mutex_unlock(&dir_set_mutex);
+}
 
 static void do_remove_scheduled_dirs(int new_len)
 {
@@ -294,6 +339,26 @@ static void do_remove_scheduled_dirs(int new_len)
 	removal.len = new_len;
 }
 
+
+static void threaded_do_remove_scheduled_dirs(int new_len, struct strbuf *removal)
+{
+	while (removal->len > new_len) {
+		removal->buf[removal->len] = '\0';
+		if (startup_info->original_cwd &&
+		     !strcmp(removal->buf, startup_info->original_cwd))
+			 break;
+		if (rmdir(removal->buf)) {
+			add_dir_to_rmdir_hash(removal->buf, removal->len);
+			break;
+		}
+		do {
+			removal->len--;
+		} while (removal->len > new_len &&
+			 removal->buf[removal->len] != '/');
+	}
+	removal->len = new_len;
+}
+
 void schedule_dir_for_removal(const char *name, int len)
 {
 	int match_len, last_slash, i, previous_slash;
@@ -327,11 +392,60 @@ void schedule_dir_for_removal(const char *name, int len)
 		strbuf_add(&removal, &name[match_len], last_slash - match_len);
 }
 
+void threaded_schedule_dir_for_removal(const char *name, int len, struct strbuf *removal_cache)
+{
+	int match_len, last_slash, i, previous_slash;
+
+	if (startup_info->original_cwd &&
+	    !strcmp(name, startup_info->original_cwd))
+		return;	/* Do not remove the current working directory */
+
+	match_len = last_slash = i =
+		longest_path_match(name, len, removal_cache->buf, removal_cache->len,
+				   &previous_slash);
+	/* Find last slash inside 'name' */
+	while (i < len) {
+		if (name[i] == '/')
+			last_slash = i;
+		i++;
+	}
+
+	/*
+	 * If we are about to go down the directory tree, we check if
+	 * we must first go upwards the tree, such that we then can
+	 * remove possible empty directories as we go upwards.
+	 */
+	if (match_len < last_slash && match_len < removal_cache->len)
+		threaded_do_remove_scheduled_dirs(match_len, removal_cache);
+	/*
+	 * If we go deeper down the directory tree, we only need to
+	 * save the new path components as we go down.
+	 */
+	if (match_len < last_slash)
+		strbuf_add(removal_cache, &name[match_len], last_slash - match_len);
+}
+
 void remove_scheduled_dirs(void)
 {
 	do_remove_scheduled_dirs(0);
 }
 
+void threaded_remove_scheduled_dirs_clean_up(void)
+{
+	struct hashmap_iter iter;
+	const struct rmdir_hash_entry *entry;
+
+	hashmap_for_each_entry(&dir_set, &iter, entry, hash /* member name */) {
+		schedule_dir_for_removal(entry->dir, entry->dirlen);
+	}
+	remove_scheduled_dirs();
+}
+
+void threaded_remove_scheduled_dirs(struct strbuf *removal_cache)
+{
+	threaded_do_remove_scheduled_dirs(0, removal_cache);
+}
+
 void invalidate_lstat_cache(void)
 {
 	reset_lstat_cache(&default_cache);
diff --git a/symlinks.h b/symlinks.h
index 7ae3d5b856..7898eae941 100644
--- a/symlinks.h
+++ b/symlinks.h
@@ -20,9 +20,15 @@ static inline void cache_def_clear(struct cache_def *cache)
 int has_symlink_leading_path(const char *name, int len);
 int threaded_has_symlink_leading_path(struct cache_def *, const char *, int);
 int check_leading_path(const char *name, int len, int warn_on_lstat_err);
+int threaded_check_leading_path(struct cache_def *cache, const char *name,
+				       int len, int warn_on_lstat_err);
 int has_dirs_only_path(const char *name, int len, int prefix_len);
 void invalidate_lstat_cache(void);
 void schedule_dir_for_removal(const char *name, int len);
+void threaded_schedule_dir_for_removal(const char *name, int len, struct strbuf *removal_cache);
 void remove_scheduled_dirs(void);
+void threaded_remove_scheduled_dirs(struct strbuf *removal_cache);
+void threaded_init_remove_scheduled_dirs(void);
+void threaded_remove_scheduled_dirs_clean_up(void);
 
 #endif /* SYMLINKS_H */
-- 
2.43.0



^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [RFC PATCH 2/4] entry: add threaded_unlink_entry function
  2023-12-03 13:39 [RFC PATCH 0/4] add parallel unlink Han Young
  2023-12-03 13:39 ` [RFC PATCH 1/4] symlinks: add and export threaded rmdir variants Han Young
@ 2023-12-03 13:39 ` Han Young
  2023-12-03 13:39 ` [RFC PATCH 3/4] parallel-checkout: add parallel_unlink Han Young
  2023-12-03 13:39 ` [RFC PATCH 4/4] unpack-trees: introduce parallel_unlink Han Young
  3 siblings, 0 replies; 5+ messages in thread
From: Han Young @ 2023-12-03 13:39 UTC (permalink / raw
  To: git; +Cc: Han Young

From: Han Young <hanyang.tony@bytedance.com>

Add threaded_unlink_entry function, the threaded function uses cache passed by arguments instead of the default cache. It also calls threaded variant of schedule_dir_for_removal to ensure dirs are removed in multithreaded unlink.
---
Another duplicated function. Because default removal cache and default lstat cache live in different source files,
threaded variant of check_leading_path and schedule_dir_for_removal must be called here
instead of choosing to pass explicit or default cache.

 entry.c | 16 ++++++++++++++++
 entry.h |  3 +++
 2 files changed, 19 insertions(+)

diff --git a/entry.c b/entry.c
index 076e97eb89..04440beb2b 100644
--- a/entry.c
+++ b/entry.c
@@ -567,6 +567,22 @@ int checkout_entry_ca(struct cache_entry *ce, struct conv_attrs *ca,
 	return write_entry(ce, path.buf, ca, state, 0, nr_checkouts);
 }
 
+void threaded_unlink_entry(const struct cache_entry *ce, const char *super_prefix,
+			   struct strbuf *removal, struct cache_def *cache)
+{
+	const struct submodule *sub = submodule_from_ce(ce);
+	if (sub) {
+		/* state.force is set at the caller. */
+		submodule_move_head(ce->name, super_prefix, "HEAD", NULL,
+				    SUBMODULE_MOVE_HEAD_FORCE);
+	}
+	if (threaded_check_leading_path(cache, ce->name, ce_namelen(ce), 1) >= 0)
+		return;
+	if (remove_or_warn(ce->ce_mode, ce->name))
+		return;
+	threaded_schedule_dir_for_removal(ce->name, ce_namelen(ce), removal);
+}
+
 void unlink_entry(const struct cache_entry *ce, const char *super_prefix)
 {
 	const struct submodule *sub = submodule_from_ce(ce);
diff --git a/entry.h b/entry.h
index ca3ed35bc0..413ca3822d 100644
--- a/entry.h
+++ b/entry.h
@@ -2,6 +2,7 @@
 #define ENTRY_H
 
 #include "convert.h"
+#include "symlinks.h"
 
 struct cache_entry;
 struct index_state;
@@ -56,6 +57,8 @@ int finish_delayed_checkout(struct checkout *state, int show_progress);
  * down from "read-tree" et al.
  */
 void unlink_entry(const struct cache_entry *ce, const char *super_prefix);
+void threaded_unlink_entry(const struct cache_entry *ce, const char *super_prefix,
+			   struct strbuf *removal, struct cache_def *cache);
 
 void *read_blob_entry(const struct cache_entry *ce, size_t *size);
 int fstat_checkout_output(int fd, const struct checkout *state, struct stat *st);
-- 
2.43.0



^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [RFC PATCH 3/4] parallel-checkout: add parallel_unlink
  2023-12-03 13:39 [RFC PATCH 0/4] add parallel unlink Han Young
  2023-12-03 13:39 ` [RFC PATCH 1/4] symlinks: add and export threaded rmdir variants Han Young
  2023-12-03 13:39 ` [RFC PATCH 2/4] entry: add threaded_unlink_entry function Han Young
@ 2023-12-03 13:39 ` Han Young
  2023-12-03 13:39 ` [RFC PATCH 4/4] unpack-trees: introduce parallel_unlink Han Young
  3 siblings, 0 replies; 5+ messages in thread
From: Han Young @ 2023-12-03 13:39 UTC (permalink / raw
  To: git; +Cc: Han Young

From: Han Young <hanyang.tony@bytedance.com>

Add parallel_unlink to parallel-checkout, parallel_unlink uses multiple threads to unlink entries. Because the path to be removed is sorted, each thread iterate through the entry list interleaved to distribute the workload as evenly as possible. Due to the multithread nature, it's not possible to remove all the dirs in one pass. The dir one thread is about to remove may have item that are being removed by another thread. Whenever we failed to remove the dir, we save it in a hashset. When every thread has finished its job, we remove all the entries in the hashset.
---
Note that we display progress after thread join, the progress count is updated for every thread instead of every path.
During testing, threads almost finished at around the same time. This caused the abrupt progress update.
We can use a mutex to display the progress, but that nullified the optimization on environment with fast file deletion time.

 parallel-checkout.c | 80 +++++++++++++++++++++++++++++++++++++++++++++
 parallel-checkout.h | 25 ++++++++++++++
 2 files changed, 105 insertions(+)

diff --git a/parallel-checkout.c b/parallel-checkout.c
index b5a714c711..6e62e044d8 100644
--- a/parallel-checkout.c
+++ b/parallel-checkout.c
@@ -328,6 +328,24 @@ static int close_and_clear(int *fd)
 	return ret;
 }
 
+void *parallel_unlink_proc(void *_data)
+{
+	struct parallel_unlink_data *data = _data;
+	struct cache_def cache = CACHE_DEF_INIT;
+	int i = data->start;
+	data->cnt = 0;
+
+	while (i < data->len) {
+		const struct cache_entry *ce = data->cache[i];
+		if (ce->ce_flags & CE_WT_REMOVE) {
+			++data->cnt;
+			threaded_unlink_entry(ce, data->super_prefix, data->removal_cache, &cache);
+		}
+		i += data->step;
+	}
+	return &data->cnt;
+}
+
 void write_pc_item(struct parallel_checkout_item *pc_item,
 		   struct checkout *state)
 {
@@ -678,3 +696,65 @@ int run_parallel_checkout(struct checkout *state, int num_workers, int threshold
 	finish_parallel_checkout();
 	return ret;
 }
+
+unsigned run_parallel_unlink(struct index_state *index,
+			  struct progress *progress,
+			  const char *super_prefix, int num_workers, int threshold,
+			  unsigned cnt)
+{
+	int i, use_parallel = 0, errs = 0;
+	if (num_workers > 1 && index->cache_nr >= threshold) {
+		int unlink_cnt = 0;
+		for (i = 0; i < index->cache_nr; i++) {
+			const struct cache_entry *ce = index->cache[i];
+			if (ce->ce_flags & CE_WT_REMOVE) {
+				unlink_cnt++;
+			}
+		}
+		if (unlink_cnt >= threshold) {
+			use_parallel = 1;
+		}
+	}
+	if (use_parallel) {
+		struct parallel_unlink_data *unlink_data;
+		CALLOC_ARRAY(unlink_data, num_workers);
+		threaded_init_remove_scheduled_dirs();
+		struct strbuf removal_caches[num_workers];
+		for (i = 0; i < num_workers; i++) {
+			struct parallel_unlink_data *data = &unlink_data[i];
+			strbuf_init(&removal_caches[i], 50);
+			data->start = i;
+			data->cache = index->cache;
+			data->len = index->cache_nr;
+			data->step = num_workers;
+			data->super_prefix = super_prefix;
+			data->removal_cache = &removal_caches[i];
+			errs = pthread_create(&data->pthread, NULL, parallel_unlink_proc, data);
+			if (errs)
+				die(_("unable to create parallel_checkout thread: %s"), strerror(errs));
+		}
+		for (i = 0; i < num_workers; i++) {
+			void *t_cnt;
+			if (pthread_join(unlink_data[i].pthread, &t_cnt))
+				die("unable to join parallel_unlink_thread");
+			cnt += *((unsigned *)t_cnt);
+			display_progress(progress, cnt);
+		}
+		threaded_remove_scheduled_dirs_clean_up();
+		for (i = 0; i < num_workers; i++) {
+			threaded_remove_scheduled_dirs(&removal_caches[i]);
+		}
+		remove_marked_cache_entries(index, 0);
+	} else {
+		for (i = 0; i < index->cache_nr; i++) {
+			const struct cache_entry *ce = index->cache[i];
+			if (ce->ce_flags & CE_WT_REMOVE) {
+				display_progress(progress, ++cnt);
+				unlink_entry(ce, super_prefix);
+			}
+		}
+		remove_marked_cache_entries(index, 0);
+	    remove_scheduled_dirs();
+	}
+	return cnt;
+}
diff --git a/parallel-checkout.h b/parallel-checkout.h
index c575284005..e851b773d9 100644
--- a/parallel-checkout.h
+++ b/parallel-checkout.h
@@ -43,6 +43,18 @@ size_t pc_queue_size(void);
 int run_parallel_checkout(struct checkout *state, int num_workers, int threshold,
 			  struct progress *progress, unsigned int *progress_cnt);
 
+/*
+ * Unlink all the unlink entries in the index, returning the number of entries
+ * unlinked plus the origin value of cnt. If the number of entries
+ * to be removed is smaller than the specified threshold, the operation
+ * is performed sequentially.
+ */
+unsigned run_parallel_unlink(struct index_state *index,
+			  struct progress *progress,
+			  const char *super_prefix,
+			  int num_workers, int threshold,
+			  unsigned cnt);
+
 /****************************************************************
  * Interface with checkout--worker
  ****************************************************************/
@@ -76,6 +88,19 @@ struct parallel_checkout_item {
 	struct stat st;
 };
 
+struct parallel_unlink_data {
+	pthread_t pthread;
+	struct cache_entry **cache;
+	struct strbuf *removal_cache;
+	size_t len;
+	int start;
+	size_t step;
+	unsigned cnt;
+	const char *super_prefix;
+};
+
+void *parallel_unlink_proc(void *_data);
+
 /*
  * The fixed-size portion of `struct parallel_checkout_item` that is sent to the
  * workers. Following this will be 2 strings: ca.working_tree_encoding and
-- 
2.43.0



^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [RFC PATCH 4/4] unpack-trees: introduce parallel_unlink
  2023-12-03 13:39 [RFC PATCH 0/4] add parallel unlink Han Young
                   ` (2 preceding siblings ...)
  2023-12-03 13:39 ` [RFC PATCH 3/4] parallel-checkout: add parallel_unlink Han Young
@ 2023-12-03 13:39 ` Han Young
  3 siblings, 0 replies; 5+ messages in thread
From: Han Young @ 2023-12-03 13:39 UTC (permalink / raw
  To: git; +Cc: Han Young

From: Han Young <hanyang.tony@bytedance.com>

We have parallel_checkout option since 04155bdad, but the unlink is still executed single threaded. On very large repo, checkout across directory rename or restructure commit can lead to large amount of unlinked entries. In some instance, the unlink operation can be slower than the parallel checkout. This commit add parallel unlink support, parallel unlink uses multithreaded removal of entries.
---
Unlink operation by itself is way faster than checkout, the default threshold should be way higher
than parallel_checkout. I hardcoded the threshold to be 100 times higher, probably need to introduce
a new config option with sensible default.
To discover how many entries to remove require us to iterate index->cache, this is fast even for large
number of entries compare to filesystem operation.
I think we can reuse checkout.workers as the main switch for parallel_unlink, since it's also part of
checkout process.

 unpack-trees.c | 15 ++-------------
 1 file changed, 2 insertions(+), 13 deletions(-)

diff --git a/unpack-trees.c b/unpack-trees.c
index c2b20b80d5..53589cde8a 100644
--- a/unpack-trees.c
+++ b/unpack-trees.c
@@ -452,17 +452,8 @@ static int check_updates(struct unpack_trees_options *o,
 	if (should_update_submodules())
 		load_gitmodules_file(index, NULL);
 
-	for (i = 0; i < index->cache_nr; i++) {
-		const struct cache_entry *ce = index->cache[i];
-
-		if (ce->ce_flags & CE_WT_REMOVE) {
-			display_progress(progress, ++cnt);
-			unlink_entry(ce, o->super_prefix);
-		}
-	}
-
-	remove_marked_cache_entries(index, 0);
-	remove_scheduled_dirs();
+	get_parallel_checkout_configs(&pc_workers, &pc_threshold);
+	cnt = run_parallel_unlink(index, progress, o->super_prefix, pc_workers, pc_threshold * 100, cnt);
 
 	if (should_update_submodules())
 		load_gitmodules_file(index, &state);
@@ -474,8 +465,6 @@ static int check_updates(struct unpack_trees_options *o,
 		 */
 		prefetch_cache_entries(index, must_checkout);
 
-	get_parallel_checkout_configs(&pc_workers, &pc_threshold);
-
 	enable_delayed_checkout(&state);
 	if (pc_workers > 1)
 		init_parallel_checkout();
-- 
2.43.0



^ permalink raw reply related	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2023-12-03 13:40 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2023-12-03 13:39 [RFC PATCH 0/4] add parallel unlink Han Young
2023-12-03 13:39 ` [RFC PATCH 1/4] symlinks: add and export threaded rmdir variants Han Young
2023-12-03 13:39 ` [RFC PATCH 2/4] entry: add threaded_unlink_entry function Han Young
2023-12-03 13:39 ` [RFC PATCH 3/4] parallel-checkout: add parallel_unlink Han Young
2023-12-03 13:39 ` [RFC PATCH 4/4] unpack-trees: introduce parallel_unlink Han Young

Code repositories for project(s) associated with this public inbox

	https://80x24.org/mirrors/git.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).