From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.1 (2015-04-28) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS31976 209.132.180.0/23 X-Spam-Status: No, score=-4.0 required=3.0 tests=AWL,BAYES_00,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,FREEMAIL_FORGED_FROMDOMAIN,FREEMAIL_FROM, HEADER_FROM_DIFFERENT_DOMAINS,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI shortcircuit=no autolearn=ham autolearn_force=no version=3.4.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by dcvr.yhbt.net (Postfix) with ESMTP id 504681F453 for ; Mon, 1 Oct 2018 17:10:16 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1726157AbeJAXs7 (ORCPT ); Mon, 1 Oct 2018 19:48:59 -0400 Received: from mail-io1-f67.google.com ([209.85.166.67]:34186 "EHLO mail-io1-f67.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1725948AbeJAXs7 (ORCPT ); Mon, 1 Oct 2018 19:48:59 -0400 Received: by mail-io1-f67.google.com with SMTP id k19-v6so4408540iom.1 for ; Mon, 01 Oct 2018 10:10:13 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=YfETEiOeTjxBxwpxf/Dk8jfHKKh30ryt/OVRLC8PGLE=; b=R6dBt4/7+QtjUiWG2R78mx1uLiipYaEhMlbT4OVxUmswQ2TvB9JzLdFONltGPwOQtb 7A2KRCk+c1FpdSf/iVM1pPBV8XyiSUQkxx9DGN5uAJLe0qpErgIKsEzcKTUTCsAYVaKz ebIH5N047D5CAQaQQwzCBoasC75fpWEel4G2Stid60O9r2frQ33UUf8ZwIwEWwpbsp80 444YYT0vUJen8V3CLMqb0cNoFeivol3zSMzqSKxm0jFK3brsODb4VLDVp2jf/Ov/FJiU MVaxOudD0/P8WfPuLAr+qbx67yXm28C8E6NahHmOSVUtwElC4ttmzzfnWyiSr8ursI83 L+5A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=YfETEiOeTjxBxwpxf/Dk8jfHKKh30ryt/OVRLC8PGLE=; b=SEE0MvotIHOJnKS6hpw+DXuDN4GqCQpuEGyrW5qG/f8dvThlBoxDO6ptr14hkP09Vp 32Up/0zDBFN+nledIuHRxokwqrxPQo1W9Udt693fSmHtlu+zfQeb6mtVHnNHTwnPAbfD kicAXJWTUp5BS6go3+Bu89KpEBdhjglHS/UUFaJfVzc+pjU0uE6k5mMEEi+6hUB8wHep /0UH3qNDIcw5dEUQ1EJCsXxrr+ys0g4EbgJsxdhcnZVQTu2mn1abm/mKy7IeXWjNuknG tTo0f3mP1AbYdCfJn0SquZLougq7GFiDUy+2G2xwHo9bdX/AF1jinkBCf9H01syN5ySz pFKA== X-Gm-Message-State: ABuFfoh3Rv1zWxKOrIqeiERJJQuwCIyK6jafh49T4kqIJqW5NvAIEpNO EZXhr1pY97T23qkWnmGj/Po3n2SPSt1Cyfux4bU= X-Google-Smtp-Source: ACcGV62j4Pb+ZOEgnYo2tShilSt6sFnkjah+F8qG3talOumZfpXGKRTLUquHY+qK9jzy5CTsmFCBCzXBpuyZ8KISA5w= X-Received: by 2002:a6b:9885:: with SMTP id a127-v6mr7202104ioe.282.1538413813411; Mon, 01 Oct 2018 10:10:13 -0700 (PDT) MIME-Version: 1.0 References: <20180823154053.20212-1-benpeart@microsoft.com> <20181001134556.33232-1-peartben@gmail.com> <20181001134556.33232-8-peartben@gmail.com> In-Reply-To: <20181001134556.33232-8-peartben@gmail.com> From: Duy Nguyen Date: Mon, 1 Oct 2018 19:09:46 +0200 Message-ID: Subject: Re: [PATCH v7 7/7] read-cache: load cache entries on worker threads To: Ben Peart Cc: Git Mailing List , Junio C Hamano , Ben Peart Content-Type: text/plain; charset="UTF-8" Sender: git-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: git@vger.kernel.org On Mon, Oct 1, 2018 at 3:46 PM Ben Peart wrote: > +/* > + * A helper function that will load the specified range of cache entries > + * from the memory mapped file and add them to the given index. > + */ > +static unsigned long load_cache_entry_block(struct index_state *istate, > + struct mem_pool *ce_mem_pool, int offset, int nr, const char *mmap, Please use unsigned long for offset (here and in the thread_data struct). We should use off_t instead, but that's out of scope. At least keep offset type consistent in here. > + unsigned long start_offset, const struct cache_entry *previous_ce) I don't think you want to pass previous_ce in. You always pass NULL anyway. And if this function is about loading a block (i.e. at block boundary) then initial previous_ce _must_ be NULL or things break horribly. > @@ -1959,20 +2007,125 @@ static void *load_index_extensions(void *_data) > > #define THREAD_COST (10000) > > +struct load_cache_entries_thread_data > +{ > + pthread_t pthread; > + struct index_state *istate; > + struct mem_pool *ce_mem_pool; > + int offset; > + const char *mmap; > + struct index_entry_offset_table *ieot; > + int ieot_offset; /* starting index into the ieot array */ If it's an index, maybe just name it ieot_index and we can get rid of the comment. > + int ieot_work; /* count of ieot entries to process */ Maybe instead of saving the whole "ieot" table here. Add struct index_entry_offset *blocks; which points to the starting block for this thread and rename that mysterious (to me) ieot_work to nr_blocks. The thread will have access from blocks[0] to blocks[nr_blocks - 1] > + unsigned long consumed; /* return # of bytes in index file processed */ > +}; > + > +/* > + * A thread proc to run the load_cache_entries() computation > + * across multiple background threads. > + */ > +static void *load_cache_entries_thread(void *_data) > +{ > + struct load_cache_entries_thread_data *p = _data; > + int i; > + > + /* iterate across all ieot blocks assigned to this thread */ > + for (i = p->ieot_offset; i < p->ieot_offset + p->ieot_work; i++) { > + p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool, p->offset, p->ieot->entries[i].nr, p->mmap, p->ieot->entries[i].offset, NULL); Please wrap this long line. > + p->offset += p->ieot->entries[i].nr; > + } > + return NULL; > +} > + > +static unsigned long load_cache_entries_threaded(struct index_state *istate, const char *mmap, size_t mmap_size, > + unsigned long src_offset, int nr_threads, struct index_entry_offset_table *ieot) > +{ > + int i, offset, ieot_work, ieot_offset, err; > + struct load_cache_entries_thread_data *data; > + unsigned long consumed = 0; > + int nr; > + > + /* a little sanity checking */ > + if (istate->name_hash_initialized) > + BUG("the name hash isn't thread safe"); > + > + mem_pool_init(&istate->ce_mem_pool, 0); > + data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data)); we normally use sizeof(*data) instead of sizeof(struct ...) > + > + /* ensure we have no more threads than we have blocks to process */ > + if (nr_threads > ieot->nr) > + nr_threads = ieot->nr; > + data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data)); eh.. reallocate the same "data"? > + > + offset = ieot_offset = 0; > + ieot_work = DIV_ROUND_UP(ieot->nr, nr_threads); > + for (i = 0; i < nr_threads; i++) { > + struct load_cache_entries_thread_data *p = &data[i]; > + int j; > + > + if (ieot_offset + ieot_work > ieot->nr) > + ieot_work = ieot->nr - ieot_offset; > + > + p->istate = istate; > + p->offset = offset; > + p->mmap = mmap; > + p->ieot = ieot; > + p->ieot_offset = ieot_offset; > + p->ieot_work = ieot_work; > + > + /* create a mem_pool for each thread */ > + nr = 0; Since nr is only used in this for loop. Declare it in this scope instead of declaring it for the whole function. > + for (j = p->ieot_offset; j < p->ieot_offset + p->ieot_work; j++) > + nr += p->ieot->entries[j].nr; > + if (istate->version == 4) { > + mem_pool_init(&p->ce_mem_pool, > + estimate_cache_size_from_compressed(nr)); > + } > + else { > + mem_pool_init(&p->ce_mem_pool, > + estimate_cache_size(mmap_size, nr)); > + } Maybe keep this mem_pool_init code inside load_cache_entries_thread(), similar to how you do it for load_cache_entries_thread(). It's mostly to keep this loop shorter to see (and understand), of course parallelizing this mem_pool_init() is just noise. > + > + err = pthread_create(&p->pthread, NULL, load_cache_entries_thread, p); > + if (err) > + die(_("unable to create load_cache_entries thread: %s"), strerror(err)); > + > + /* increment by the number of cache entries in the ieot block being processed */ > + for (j = 0; j < ieot_work; j++) > + offset += ieot->entries[ieot_offset + j].nr; I wonder if it makes things simpler if you store cache_entry _index_ in entrie[] array instead of storing the number of entries. You can easily calculate nr then by doing entries[i].index - entries[i-1].index. And you can count multiple blocks the same way, without looping like this. > + ieot_offset += ieot_work; > + } > + > + for (i = 0; i < nr_threads; i++) { > + struct load_cache_entries_thread_data *p = &data[i]; > + > + err = pthread_join(p->pthread, NULL); > + if (err) > + die(_("unable to join load_cache_entries thread: %s"), strerror(err)); > + mem_pool_combine(istate->ce_mem_pool, p->ce_mem_pool); > + consumed += p->consumed; > + } > + > + free(data); > + > + return consumed; > +} > +#endif > + > /* remember to discard_cache() before reading a different cache! */ > int do_read_index(struct index_state *istate, const char *path, int must_exist) > { > - int fd, i; > + int fd; > struct stat st; > unsigned long src_offset; > const struct cache_header *hdr; > const char *mmap; > size_t mmap_size; > - const struct cache_entry *previous_ce = NULL; > struct load_index_extensions p; > size_t extension_offset = 0; > #ifndef NO_PTHREADS > - int nr_threads; > + int nr_threads, cpus; > + struct index_entry_offset_table *ieot = NULL; > #endif > > if (istate->initialized) > @@ -2014,10 +2167,18 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) > p.mmap = mmap; > p.mmap_size = mmap_size; > > + src_offset = sizeof(*hdr); OK we've been doing this since forever, sizeof(struct cache_header) probably does not have extra padding on any supported platform. > @@ -2032,29 +2193,22 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) > nr_threads--; > } > } > -#endif > - > - if (istate->version == 4) { > - mem_pool_init(&istate->ce_mem_pool, > - estimate_cache_size_from_compressed(istate->cache_nr)); > - } else { > - mem_pool_init(&istate->ce_mem_pool, > - estimate_cache_size(mmap_size, istate->cache_nr)); > - } > > - src_offset = sizeof(*hdr); > - for (i = 0; i < istate->cache_nr; i++) { > - struct ondisk_cache_entry *disk_ce; > - struct cache_entry *ce; > - unsigned long consumed; > + /* > + * Locate and read the index entry offset table so that we can use it > + * to multi-thread the reading of the cache entries. > + */ > + if (extension_offset && nr_threads > 1) > + ieot = read_ieot_extension(mmap, mmap_size, extension_offset); You need to free ieot at some point. > > - disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset); > - ce = create_from_disk(istate, disk_ce, &consumed, previous_ce); > - set_index_entry(istate, i, ce); > + if (ieot) > + src_offset += load_cache_entries_threaded(istate, mmap, mmap_size, src_offset, nr_threads, ieot); > + else > + src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset); > +#else > + src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset); > +#endif > > - src_offset += consumed; > - previous_ce = ce; > - } > istate->timestamp.sec = st.st_mtime; > istate->timestamp.nsec = ST_MTIME_NSEC(st); > > -- > 2.18.0.windows.1 > -- Duy