From e99bf619c278cad8f6afc039de7a294942da78a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Thu, 8 Feb 2024 16:24:39 +0100 Subject: [PATCH] handle memory allocation outside of the cache, pre-allocate in benchmark and memset to hopefully guarantee no pagefaults will be encountered --- offloading-cacher/cache.hpp | 118 ++++++++------------- qdp_project/src/Benchmark.cpp | 68 ++++-------- qdp_project/src/Configuration.hpp | 4 + qdp_project/src/utils/BenchmarkHelpers.cpp | 36 ++++--- 4 files changed, 92 insertions(+), 134 deletions(-) diff --git a/offloading-cacher/cache.hpp b/offloading-cacher/cache.hpp index c135430..b8d31e3 100755 --- a/offloading-cacher/cache.hpp +++ b/offloading-cacher/cache.hpp @@ -57,6 +57,28 @@ namespace dsacache { class Cache; + // cache policy is defined as a type here to allow flexible usage of the cacher + // given a numa destination node (where the data will be needed), the numa source + // node (current location of the data) and the data size, this function should + // return optimal cache placement + // dst node and returned value can differ if the system, for example, has HBM + // attached accessible directly to node n under a different node id m + typedef int (CachePolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size); + + // copy policy specifies the copy-executing nodes for a given task + // which allows flexibility in assignment for optimizing raw throughput + // or choosing a conservative usage policy + typedef std::vector (CopyPolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size); + + // memory allocation is a complex topic but can have big performance + // impact, we therefore do not handle it in the cache. must return + // pointer to a block of at least the given size, cache will also + // not handle deallocation but signal that the block is free + typedef uint8_t* (MemoryAllocator_Allocate)(const int numa_node, const size_t size); + + // signals that the given memory block will not be used by cache anymore + typedef void (MemoryAllocator_Free)(uint8_t* pointer, const size_t size); + /* * Class Description: * Holds all required information on one cache entry and is used @@ -93,6 +115,8 @@ namespace dsacache { // set to false if we do not own the cache pointer bool delete_ = false; + MemoryAllocator_Free* memory_free_function_; + // data source and size of the block uint8_t* src_; size_t size_; @@ -135,7 +159,7 @@ namespace dsacache { friend Cache; public: - CacheData(uint8_t* data, const size_t size); + CacheData(uint8_t* data, const size_t size, MemoryAllocator_Free* free); CacheData(const CacheData& other); ~CacheData(); @@ -225,20 +249,6 @@ namespace dsacache { */ class Cache { - public: - // cache policy is defined as a type here to allow flexible usage of the cacher - // given a numa destination node (where the data will be needed), the numa source - // node (current location of the data) and the data size, this function should - // return optimal cache placement - // dst node and returned value can differ if the system, for example, has HBM - // attached accessible directly to node n under a different node id m - typedef int (CachePolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size); - - // copy policy specifies the copy-executing nodes for a given task - // which allows flexibility in assignment for optimizing raw throughput - // or choosing a conservative usage policy - typedef std::vector (CopyPolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size); - private: // flags to store options duh @@ -261,6 +271,8 @@ namespace dsacache { CachePolicy* cache_policy_function_ = nullptr; CopyPolicy* copy_policy_function_ = nullptr; + MemoryAllocator_Allocate* memory_allocate_function_ = nullptr; + MemoryAllocator_Free* memory_free_function_ = nullptr; // function used to submit a copy task on a specific node to the dml // engine on that node - will change the current threads node assignment @@ -281,12 +293,6 @@ namespace dsacache { // as this is set as the "optimal placement" node void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const; - // allocates memory of size "size" on the numa node "node" - // and returns nullptr if this is not possible, also may - // try to flush the cache of the requested node to - // alleviate encountered shortage - uint8_t* AllocOnNode(const size_t size, const int node); - // checks whether the cache contains an entry for // the given data in the given memory node and // returns it, otherwise returns nullptr @@ -299,7 +305,11 @@ namespace dsacache { // initializes the cache with the two policy functions // only after this is it safe to use in a threaded environment - void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); + void Init( + CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function, + MemoryAllocator_Allocate* memory_allocate_function, + MemoryAllocator_Free* memory_free_function + ); // function to perform data access through the cache, behaviour depends // on flags, by default will also perform prefetch, otherwise with @@ -336,9 +346,15 @@ inline void dsacache::Cache::Clear() { } } -inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { +inline void dsacache::Cache::Init( + CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function, + MemoryAllocator_Allocate* memory_allocate_function, + MemoryAllocator_Free* memory_free_function +) { cache_policy_function_ = cache_policy_function; copy_policy_function_ = copy_policy_function; + memory_allocate_function_ = memory_allocate_function; + memory_free_function_ = memory_free_function; // initialize numa library @@ -382,7 +398,7 @@ inline std::unique_ptr dsacache::Cache::Access(uint8_t* dat // at this point the requested data is not present in cache // and we create a caching task for it, copying our current flags - task = std::make_unique(data, size); + task = std::make_unique(data, size, memory_free_function_); task->SetFlags(flags_); // when the ACCESS_WEAK flag is set for the flags parameter (!) @@ -434,51 +450,8 @@ inline std::unique_ptr dsacache::Cache::Access(uint8_t* dat return std::move(task); } -inline uint8_t* dsacache::Cache::AllocOnNode(const size_t size, const int node) { - // allocate data on this node and flush the unused parts of the - // cache if the operation fails and retry once - // TODO: smarter flush strategy could keep some stuff cached - - // check currently free memory to see if the data fits - - long long int free_space = 0; - numa_node_size64(node, &free_space); - - if (free_space < size) { - // dst node lacks memory space so we flush the cache for this - // node hoping to free enough currently unused entries to make - // the second allocation attempt successful - - Flush(node); - - // re-test by getting the free space and checking again - - numa_node_size64(node, &free_space); - - if (free_space < size) { - return nullptr; - } - } - - uint8_t* dst = reinterpret_cast(numa_alloc_onnode(size, node)); - - if (dst == nullptr) { - return nullptr; - } - - if (CheckFlag(flags_, FLAG_FORCE_MAP_PAGES)) { - static const size_t page_size_b = getpagesize(); - - for (size_t i = 0; i < size; i += page_size_b) { - dst[i] = 0; - } - } - - return dst; -} - inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, const int src_node) { - uint8_t* dst = AllocOnNode(task->GetSize(), dst_node); + uint8_t* dst = memory_allocate_function_(dst_node, task->GetSize()); if (dst == nullptr) { return; @@ -667,10 +640,11 @@ inline dsacache::Cache::~Cache() { } } -inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { +inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size, MemoryAllocator_Free* free) { src_ = data; size_ = size; delete_ = false; + memory_free_function_ = free; active_ = new std::atomic(1); cache_ = new std::atomic(data); handlers_ = new std::atomic*>(); @@ -689,6 +663,8 @@ inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) { cache_ = other.cache_; flags_ = other.flags_; + memory_free_function_ = other.memory_free_function_; + incomplete_cache_ = other.incomplete_cache_; handlers_ = other.handlers_; invalid_handlers_ = other.invalid_handlers_; @@ -733,8 +709,8 @@ inline void dsacache::CacheData::Deallocate() { // takes place for the retrieved local cache uint8_t* cache_local = cache_->exchange(nullptr); - if (cache_local != nullptr && delete_) numa_free(cache_local, size_); - else if (*incomplete_cache_ != nullptr) numa_free(*incomplete_cache_, size_); + if (cache_local != nullptr && delete_) memory_free_function_(cache_local, size_); + else if (*incomplete_cache_ != nullptr) memory_free_function_(*incomplete_cache_, size_); else; } diff --git a/qdp_project/src/Benchmark.cpp b/qdp_project/src/Benchmark.cpp index 214e6c3..0c1cb2a 100644 --- a/qdp_project/src/Benchmark.cpp +++ b/qdp_project/src/Benchmark.cpp @@ -38,47 +38,12 @@ void scan_b(size_t gid, size_t tid) { THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); - constexpr size_t VIRT_TID_INCREMENT = TC_SCANB / TC_AGGRJ; - constexpr size_t SUBCHUNK_THREAD_RATIO = TC_AGGRJ / (TC_SCANB == 0 ? 1 : TC_SCANB); - constexpr bool CACHE_SUBCHUNKING = SUBCHUNK_THREAD_RATIO > 1; - constexpr bool CACHE_OVERCHUNKING = VIRT_TID_INCREMENT > 1; - if constexpr (PERFORM_CACHING) { - if constexpr (CACHE_SUBCHUNKING) { - constexpr size_t SUBCHUNK_COUNT = SUBCHUNK_THREAD_RATIO > 0 ? SUBCHUNK_THREAD_RATIO : 1; - constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / SUBCHUNK_COUNT; - constexpr size_t SUBCHUNK_SIZE_ELEMENTS = CHUNK_SIZE_ELEMENTS / SUBCHUNK_COUNT; - - for (size_t i = 0; i < RUN_COUNT; i++) { - const size_t chunk_index = get_chunk_index(gid, i); - uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); - - for (size_t j = 0; j < SUBCHUNK_COUNT; j++) { - uint64_t* sub_chunk_ptr = &chunk_ptr[j * SUBCHUNK_SIZE_ELEMENTS]; + for (size_t i = 0; i < RUN_COUNT; i++) { + const size_t chunk_index = get_chunk_index(gid, i); + uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); - CACHE_.Access(reinterpret_cast(sub_chunk_ptr), SUBCHUNK_SIZE_B); - } - } - } - else if constexpr (CACHE_OVERCHUNKING) { - for (size_t tid_virt = tid; tid_virt < TC_AGGRJ; tid_virt += VIRT_TID_INCREMENT) { - for (size_t i = 0; i < RUN_COUNT; i++) { - const size_t chunk_index = get_chunk_index(gid, i); - uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid_virt); - - CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B); - } - } - } - else { - constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ; - - for (size_t i = 0; i < RUN_COUNT; i++) { - const size_t chunk_index = get_chunk_index(gid, i); - uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); - - CACHE_.Access(reinterpret_cast(chunk_ptr), SUBCHUNK_SIZE_B); - } + CACHE_.Access(reinterpret_cast(chunk_ptr), AGGRJ_CHUNK_SIZE_B); } } @@ -102,7 +67,7 @@ void scan_a(size_t gid, size_t tid) { uint64_t* chunk_ptr = get_chunk(DATA_A_, chunk_index, tid); uint16_t* mask_ptr = get_mask(MASK_A_, chunk_index, tid); - filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA); + filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, SCANA_CHUNK_SIZE_B); } THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); @@ -113,8 +78,6 @@ void scan_a(size_t gid, size_t tid) { } void aggr_j(size_t gid, size_t tid) { - constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ; - CACHE_HITS_[UniqueIndex(gid,tid)] = 0; THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); @@ -139,7 +102,7 @@ void aggr_j(size_t gid, size_t tid) { uint64_t* data_ptr; if constexpr (PERFORM_CACHING) { - data = CACHE_.Access(reinterpret_cast(chunk_ptr), SUBCHUNK_SIZE_B, dsacache::FLAG_ACCESS_WEAK); + data = CACHE_.Access(reinterpret_cast(chunk_ptr), AGGRJ_CHUNK_SIZE_B, dsacache::FLAG_ACCESS_WEAK); data->WaitOnCompletion(); data_ptr = reinterpret_cast(data->GetDataLocation()); @@ -157,8 +120,7 @@ void aggr_j(size_t gid, size_t tid) { data_ptr = chunk_ptr; } - uint64_t tmp = _mm512_reduce_add_epi64(aggregator); - aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B); + aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, AGGRJ_CHUNK_SIZE_B); } THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now(); @@ -189,6 +151,8 @@ int main() { MASK_A_ = (uint16_t*) numa_alloc_onnode(WL_SIZE_ELEMENTS, MEM_NODE_HBM); DATA_DST_ = (uint64_t*) numa_alloc_onnode(TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t), MEM_NODE_HBM); + std::vector cache_memory_locations; + if constexpr (PERFORM_CACHING) { // cache will be configured to wait weak by default // it will also not handle page faults which cause delay @@ -196,9 +160,15 @@ int main() { // which is configured for xeonmax with smart assignment uint64_t cache_flags = 0; cache_flags |= dsacache::FLAG_WAIT_WEAK; - cache_flags |= dsacache::FLAG_FORCE_MAP_PAGES; CACHE_.SetFlags(cache_flags); - CACHE_.Init(CachePlacementPolicy, CopyMethodPolicy); + CACHE_.Init(CachePlacementPolicy, CopyMethodPolicy, MemoryAllocator_Allocate, MemoryAllocator_Free); + + for (uint32_t i = 0; i < GROUP_COUNT * TC_AGGRJ; i++) { + void* ptr = numa_alloc_onnode(AGGRJ_CHUNK_SIZE_B, MEM_NODE_HBM); + cache_memory_locations.emplace_back(ptr); + CACHE_LOCATIONS_.push(reinterpret_cast(ptr)); + memset(ptr, 0xAB, AGGRJ_CHUNK_SIZE_B); + } } fill_mt(DATA_A_, WL_SIZE_B, 0, 100, 42); @@ -275,5 +245,9 @@ int main() { numa_free(MASK_A_, WL_SIZE_ELEMENTS); numa_free(DATA_DST_, TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t)); + for (void* ptr : cache_memory_locations) { + numa_free(ptr, AGGRJ_CHUNK_SIZE_B); + } + return 0; } diff --git a/qdp_project/src/Configuration.hpp b/qdp_project/src/Configuration.hpp index 1183ed6..bb36b08 100644 --- a/qdp_project/src/Configuration.hpp +++ b/qdp_project/src/Configuration.hpp @@ -55,3 +55,7 @@ constexpr size_t RUN_COUNT = CHUNK_COUNT / GROUP_COUNT; static_assert(RUN_COUNT > 0); static_assert(WL_SIZE_B % 16 == 0); static_assert(CHUNK_SIZE_B % 16 == 0); +static_assert(PERFORM_CACHING ? TC_SCANB == TC_AGGRJ : true); + +constexpr size_t SCANA_CHUNK_SIZE_B = CHUNK_SIZE_B / TC_SCANA; +constexpr size_t AGGRJ_CHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ; \ No newline at end of file diff --git a/qdp_project/src/utils/BenchmarkHelpers.cpp b/qdp_project/src/utils/BenchmarkHelpers.cpp index 903eba5..91a0251 100644 --- a/qdp_project/src/utils/BenchmarkHelpers.cpp +++ b/qdp_project/src/utils/BenchmarkHelpers.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include "../Configuration.hpp" @@ -15,6 +17,9 @@ std::array CACHE_HITS_; +std::mutex CACHE_LOCATIONS_MTX_; +std::queue CACHE_LOCATIONS_; + inline size_t UniqueIndex(const uint32_t gid, const uint32_t tid) { return tid * GROUP_COUNT + gid; } @@ -27,26 +32,25 @@ uint64_t sum_check(uint64_t compare_value, uint64_t* row_A, uint64_t* row_B, siz return sum; } +uint8_t* MemoryAllocator_Allocate(const int node, const size_t size) { + std::lock_guard lock(CACHE_LOCATIONS_MTX_); + uint8_t* ptr = CACHE_LOCATIONS_.front(); + CACHE_LOCATIONS_.pop(); + return ptr; +} + +void MemoryAllocator_Free(uint8_t* ptr, const size_t size) { + return; +} + int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { - return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node; + return MEM_NODE_HBM; } std::vector CopyMethodPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { - // we always run on n0 and can cut the amount of code here therefore - // for small data it is more efficient to run on only one node - // which causes less submissions and therefore completes faster - // as submission cost matters for low transfer size - - if (data_size < 16_MiB) { - static thread_local int last_node = 0; - const int node = last_node++ % 4; - return std::vector{ node }; - } - else { - static thread_local int last_group = 0; - const int group = last_group++ % 2; - return group == 0 ? std::vector{ 0, 1 } : std::vector{ 2, 3 }; - } + static thread_local int last_node = 0; + const int node = last_node++ % 4; + return std::vector{ node }; } struct NopStruct {