From 395d3073100110fc9c899c82eee2c568730837ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Wed, 10 Jan 2024 00:58:17 +0100 Subject: [PATCH] fix an issue with the freeing of data in the cacher --- offloading-cacher/main.cpp | 85 +++++++++-- offloading-cacher/offloading-cache.hpp | 189 ++++++++++++++++--------- 2 files changed, 192 insertions(+), 82 deletions(-) diff --git a/offloading-cacher/main.cpp b/offloading-cacher/main.cpp index 7aa8ea0..726033b 100644 --- a/offloading-cacher/main.cpp +++ b/offloading-cacher/main.cpp @@ -3,6 +3,8 @@ #include "offloading-cache.hpp" +offcache::Cache CACHE; + double* GetRandomArray(const size_t size) { double* array = new double[size]; @@ -29,36 +31,91 @@ bool IsEqual(const double* a, const double* b, const size_t size) { return true; } +void PerformAccessAndTest(double* src, const size_t size) { + // this is the function that any cache access will go through + // execution policy picks between three options: + // Relaxed may return an invalid (but not nullptr) CacheData + // which can then be validated with WaitOnCompletion() + // Immediate never returns an invalid CacheData structure + // however it may return just the pointer to source + // WaitOnCompletion() will then ensure that the data + // is actually in cache + // ImmediateNoCache behaves the same as Immediate but does never perform + // caching itself so only returns cached version if + // previously cached is available + + std::unique_ptr data_cache = CACHE.Access( + reinterpret_cast(src), + size * sizeof(double), + offcache::ExecutionPolicy::Immediate + ); + + double* cached_imm = reinterpret_cast(data_cache->GetDataLocation()); + + // check the value immediately just to see if ram or cache was returned + + if (src == cached_imm) { + std::cout << "Caching did not immediately yield different data location." << std::endl; + } + else { + std::cout << "Immediately got different data location." << std::endl; + } + + // waits for the completion of the asynchronous caching operation + + data_cache->WaitOnCompletion(); + + // gets the cache-data-location from the struct + + double* cached = reinterpret_cast(data_cache->GetDataLocation()); + + // tests on the resulting value + + if (src == cached) { + std::cout << "Caching did not affect data location." << std::endl; + } + + if (IsEqual(src,cached,size)) { + std::cout << "Cached data is correct." << std::endl; + } + else { + std::cout << "Cached data is wrong." << std::endl; + } +} + int main(int argc, char **argv) { - offcache::Cache cache; + + // given numa destination and source node and the size of the data + // this function decides on which the data will be placed + // which is used to select the HBM-node for the dst-node if desired auto cache_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) { return numa_dst_node; }; + // this function receives the memory source and destination node + // and then decides, on which nodes the copy operation will be split + auto copy_policy = [](const int numa_dst_node, const int numa_src_node) { return std::vector{ numa_src_node, numa_dst_node }; }; - cache.Init(cache_policy,copy_policy); + // initializes the cache with the two policies + + CACHE.Init(cache_policy,copy_policy); + + // generate the test data static constexpr size_t data_size = 1024 * 1024; double* data = GetRandomArray(data_size); - std::unique_ptr data_cache = cache.Access(reinterpret_cast(data), data_size * sizeof(double), offcache::ExecutionPolicy::Relaxed); + std::cout << "--- first access --- " << std::endl; - data_cache->WaitOnCompletion(); + PerformAccessAndTest(data, data_size); - double* cached = reinterpret_cast(data_cache->GetDataLocation()); + std::cout << "--- second access --- " << std::endl; - if (data == cached) { - std::cout << "Caching did not affect data location." << std::endl; - } + PerformAccessAndTest(data, data_size); - if (IsEqual(data,cached,data_size)) { - std::cout << "Cached data is correct." << std::endl; - } - else { - std::cout << "Cached data is wrong." << std::endl; - } + std::cout << "--- end of application --- " << std::endl; } diff --git a/offloading-cacher/offloading-cache.hpp b/offloading-cacher/offloading-cache.hpp index ea91fae..e265665 100644 --- a/offloading-cacher/offloading-cache.hpp +++ b/offloading-cacher/offloading-cache.hpp @@ -94,9 +94,14 @@ namespace offcache { typedef std::vector (CopyPolicy)(const int numa_dst_node, const int numa_src_node); private: + // mutex for accessing the cache state map + std::shared_mutex cache_mutex_; - std::unordered_map cache_state_; + // map from [dst-numa-node,map2] + // map2 from [data-ptr,cache-structure] + + std::unordered_map> cache_state_; CachePolicy* cache_policy_function_ = nullptr; CopyPolicy* copy_policy_function_ = nullptr; @@ -105,6 +110,12 @@ namespace offcache { void SubmitTask(CacheData* task); + void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const; + + void AbortTask(CacheData* task) const; + + std::unique_ptr GetFromCache(uint8_t* src, const size_t size); + public: void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); @@ -126,40 +137,29 @@ inline void offcache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy // initialize numa library numa_available(); + const int nodes_max = numa_num_configured_nodes(); + const bitmask* valid_nodes = numa_get_mems_allowed(); + + for (int node = 0; node < nodes_max; node++) { + if (numa_bitmask_isbitset(valid_nodes, node)) { + cache_state_.insert({node,{}}); + } + } + std::cout << "[-] Cache Initialized" << std::endl; } inline std::unique_ptr offcache::Cache::Access(uint8_t* data, const size_t size, const ExecutionPolicy policy) { - // the best situation is if this data is already cached - // which we check in an unnamed block in which the cache - // is locked for reading to prevent another thread - // from marking the element we may find as unused and - // clearing it - { - std::shared_lock lock(cache_mutex_); - - const auto search = cache_state_.find(data); - - if (search != cache_state_.end()) { - if (search->second.size_ == size) { - search->second.active_->store(true); - - std::cout << "[+] Found Cached version for 0x" << std::hex << (uint64_t)data << std::dec << std::endl; - - return std::move(std::make_unique(search->second)); - } - else { - std::cout << "[!] Found Cached version with size missmatch for 0x" << std::hex << (uint64_t)data << std::dec << std::endl; + std::unique_ptr task = GetFromCache(data, size); - cache_state_.erase(search); - } - } + if (task != nullptr) { + return std::move(task); } // at this point the requested data is not present in cache // and we create a caching task for it - auto task = std::make_unique(data, size); + task = std::make_unique(data, size); if (policy == ExecutionPolicy::Immediate) { // in intermediate mode the returned task @@ -197,19 +197,12 @@ inline std::unique_ptr offcache::Cache::Access(uint8_t* dat } inline void offcache::Cache::SubmitTask(CacheData* task) { - // obtain numa node of current thread to determine where the data is needed - - const int current_cpu = sched_getcpu(); - const int current_node = numa_node_of_cpu(current_cpu); - - // obtain node that the given data pointer is allocated on - - int data_node = -1; - get_mempolicy(&data_node, NULL, 0, (void*)task->src_, MPOL_F_NODE | MPOL_F_ADDR); + // get destination numa node for the cache - // querry cache policy function for the destination numa node + int dst_node = -1; + int src_node = -1; - const int dst_node = cache_policy_function_(current_node, data_node, task->size_); + GetCacheNode(task->src_, task->size_, &dst_node, &src_node); std::cout << "[+] Allocating " << task->size_ << "B on node " << dst_node << " for " << std::hex << (uint64_t)task->src_ << std::dec << std::endl; @@ -236,7 +229,7 @@ inline void offcache::Cache::SubmitTask(CacheData* task) { // querry copy policy function for the nodes to use for the copy - const std::vector executing_nodes = copy_policy_function_(dst_node, data_node); + const std::vector executing_nodes = copy_policy_function_(dst_node, src_node); const size_t task_count = executing_nodes.size(); // each task will copy one fair part of the total size @@ -272,7 +265,7 @@ inline void offcache::Cache::SubmitTask(CacheData* task) { { std::unique_lock lock(cache_mutex_); - const auto state = cache_state_.insert({task->src_, *task}); + const auto state = cache_state_[dst_node].emplace(task->src_, *task); // if state.second is false then no insertion took place // which means that concurrently whith this thread @@ -283,20 +276,7 @@ inline void offcache::Cache::SubmitTask(CacheData* task) { if (!state.second) { std::cout << "[x] Found another cache instance for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; - // first wait on all copy operations to be completed - - task->WaitOnCompletion(); - - // abort by doing the following steps - // (1) free the allocated memory, (2) remove the "maybe result" as - // we will not run the caching operation, (3) clear the sub tasks - // for the very same reason, (4) set the result to the RAM-location - - numa_free(dst, task->size_); - task->incomplete_cache_ = nullptr; - task->cache_->store(task->src_); - - std::cout << "[-] Abort completed for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; + AbortTask(task); return; } @@ -346,7 +326,7 @@ offcache::CacheData::CacheData(uint8_t* data, const size_t size) { src_ = data; size_ = size; - active_ = new std::atomic(); + active_ = new std::atomic(1); cache_ = new std::atomic(); incomplete_cache_ = nullptr; handlers_ = std::make_unique>(); @@ -355,21 +335,25 @@ offcache::CacheData::CacheData(uint8_t* data, const size_t size) { offcache::CacheData::CacheData(const offcache::CacheData& other) { std::cout << "[-] Copy Created for CacheData 0x" << std::hex << (uint64_t)other.src_ << std::dec << std::endl; + active_ = other.active_; + const int current_active = active_->fetch_add(1); + src_ = other.src_; size_ = other.size_; cache_ = other.cache_; - active_ = other.active_; incomplete_cache_ = nullptr; handlers_ = nullptr; - active_->fetch_add(1); } offcache::CacheData::~CacheData() { std::cout << "[-] Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; - const int32_t v = active_->fetch_sub(1); + // due to fetch_sub returning the preivously held value + // we must subtract one locally to get the current value - // if the returned value is non-positive + const int32_t v = active_->fetch_sub(1) - 1; + + // if the returned value is zero or lower // then we must execute proper deletion // as this was the last reference @@ -390,7 +374,23 @@ void offcache::CacheData::Deallocate() { incomplete_cache_ = nullptr; } -uint8_t *offcache::CacheData::GetDataLocation() const { +void offcache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const { + // obtain numa node of current thread to determine where the data is needed + + const int current_cpu = sched_getcpu(); + const int current_node = numa_node_of_cpu(current_cpu); + + // obtain node that the given data pointer is allocated on + + *OUT_SRC_NODE = -1; + get_mempolicy(OUT_SRC_NODE, NULL, 0, (void*)src, MPOL_F_NODE | MPOL_F_ADDR); + + // querry cache policy function for the destination numa node + + *OUT_DST_NODE = cache_policy_function_(current_node, *OUT_SRC_NODE, size); +} + +uint8_t* offcache::CacheData::GetDataLocation() const { return cache_->load(); } @@ -405,17 +405,70 @@ inline void offcache::Cache::Flush() { { std::unique_lock lock(cache_mutex_); - - auto it = cache_state_.begin(); - while (it != cache_state_.end()) { - if (it->second.Active() == false) { - cache_state_.erase(it); - it = cache_state_.begin(); - } - else { - it++; + for (auto& nc : cache_state_) { + auto it = nc.second.begin(); + + while (it != nc.second.end()) { + if (it->second.Active() == false) { + nc.second.erase(it); + it = nc.second.begin(); + } + else { + it++; + } } } } -} \ No newline at end of file +} + +void offcache::Cache::AbortTask(offcache::CacheData *task) const { + // first wait on all copy operations to be completed + + task->WaitOnCompletion(); + + // abort by doing the following steps + // (1) free the allocated memory, (2) remove the "maybe result" as + // we will not run the caching operation, (3) clear the sub tasks + // for the very same reason, (4) set the result to the RAM-location + + numa_free(task->incomplete_cache_, task->size_); + task->incomplete_cache_ = nullptr; + task->cache_->store(task->src_); + + std::cout << "[-] Abort completed for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; +} + +std::unique_ptr offcache::Cache::GetFromCache(uint8_t* src, const size_t size) { + // the best situation is if this data is already cached + // which we check in an unnamed block in which the cache + // is locked for reading to prevent another thread + // from marking the element we may find as unused and + // clearing it + + int dst_node = -1; + int src_node = -1; + + GetCacheNode(src, size, &dst_node, &src_node); + + std::shared_lock lock(cache_mutex_); + + const auto search = cache_state_[dst_node].find(src); + + if (search != cache_state_[dst_node].end()) { + if (search->second.size_ == size) { + search->second.active_->store(true); + + std::cout << "[+] Found Cached version for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; + + return std::move(std::make_unique(search->second)); + } + else { + std::cout << "[!] Found Cached version with size missmatch for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; + + cache_state_[dst_node].erase(search); + } + } + + return nullptr; +}