From c01eafedaea03fb70f2c2ae0421e5f2a4b7b2f96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Wed, 10 Jan 2024 11:45:29 +0100 Subject: [PATCH] refactor the cacher to reduce complexity, removes the access guarantees (relaxed,immediate,...), uses the fact that other tasks will wait on atomic value change for the cache-pointer if it is nullptr to add the entry to cache structure earlier reducing cost of two threads accessing new entry at the same time, splits the offloading-cache.hpp file into two with one containing the data-class (represents a cache entry and task) and the other containing the cacher itself --- offloading-cacher/cache-data.hpp | 139 ++++++++ offloading-cacher/cache.hpp | 280 +++++++++++++++ offloading-cacher/main.cpp | 30 +- offloading-cacher/offloading-cache.hpp | 474 ------------------------- 4 files changed, 432 insertions(+), 491 deletions(-) create mode 100644 offloading-cacher/cache-data.hpp create mode 100644 offloading-cacher/cache.hpp delete mode 100644 offloading-cacher/offloading-cache.hpp diff --git a/offloading-cacher/cache-data.hpp b/offloading-cacher/cache-data.hpp new file mode 100644 index 0000000..4028597 --- /dev/null +++ b/offloading-cacher/cache-data.hpp @@ -0,0 +1,139 @@ +#pragma once + +#include + +#include +#include +#include + +#include + +namespace dsacache { + class Cache; + + // the cache task structure will be used to submit and + // control a cache element, while providing source pointer + // and size in bytes for submission + // + // then the submitting thread may wait on the atomic "result" + // which will be notified by the cache worker upon processing + // after which the atomic-bool-ptr active will also become valid + class CacheData { + public: + using dml_handler = dml::handler>; + + private: + uint8_t* src_; + size_t size_; + + std::atomic* active_; + + protected: + std::atomic* cache_; + + uint8_t* incomplete_cache_; + + std::unique_ptr> handlers_; + + friend Cache; + + public: + CacheData(uint8_t* data, const size_t size); + CacheData(const CacheData& other); + ~CacheData(); + + void Deallocate(); + + void WaitOnCompletion(); + + uint8_t* GetDataLocation() const; + + bool Active() const; + }; +} + +inline void dsacache::CacheData::WaitOnCompletion() { + if (handlers_ == nullptr) { + std::cout << "[-] Waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + + cache_->wait(nullptr); + + std::cout << "[+] Finished waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + } + else { + std::cout << "[-] Waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + + for (auto& handler : *handlers_) { + auto result = handler.get(); + // TODO: handle the returned status code + } + + handlers_ = nullptr; + + std::cout << "[+] Finished waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + + cache_->store(incomplete_cache_); + cache_->notify_all(); + } +} + +dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { + std::cout << "[-] New CacheData 0x" << std::hex << (uint64_t)data << std::dec << std::endl; + + src_ = data; + size_ = size; + active_ = new std::atomic(1); + cache_ = new std::atomic(); + incomplete_cache_ = nullptr; + handlers_ = std::make_unique>(); +} + +dsacache::CacheData::CacheData(const dsacache::CacheData& other) { + std::cout << "[-] Copy Created for CacheData 0x" << std::hex << (uint64_t)other.src_ << std::dec << std::endl; + + active_ = other.active_; + const int current_active = active_->fetch_add(1); + + src_ = other.src_; + size_ = other.size_; + cache_ = other.cache_; + incomplete_cache_ = nullptr; + handlers_ = nullptr; +} + +dsacache::CacheData::~CacheData() { + std::cout << "[-] Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + + // due to fetch_sub returning the preivously held value + // we must subtract one locally to get the current value + + const int32_t v = active_->fetch_sub(1) - 1; + + // if the returned value is zero or lower + // then we must execute proper deletion + // as this was the last reference + + if (v <= 0) { + std::cout << "[!] Full Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + + Deallocate(); + delete active_; + delete cache_; + } +} + +void dsacache::CacheData::Deallocate() { + std::cout << "[!] Deallocating for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + + numa_free(cache_, size_); + cache_ = nullptr; + incomplete_cache_ = nullptr; +} + +uint8_t* dsacache::CacheData::GetDataLocation() const { + return cache_->load(); +} + +bool dsacache::CacheData::Active() const { + return active_->load() > 0; +} \ No newline at end of file diff --git a/offloading-cacher/cache.hpp b/offloading-cacher/cache.hpp new file mode 100644 index 0000000..0081a04 --- /dev/null +++ b/offloading-cacher/cache.hpp @@ -0,0 +1,280 @@ +#pragma once + +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include "cache-data.hpp" + +namespace dsacache { + // singleton which holds the cache workers + // and is the place where work will be submited + class Cache { + public: + // cache policy is defined as a type here to allow flexible usage of the cacher + // given a numa destination node (where the data will be needed), the numa source + // node (current location of the data) and the data size, this function should + // return optimal cache placement + // dst node and returned value can differ if the system, for example, has HBM + // attached accessible directly to node n under a different node id m + typedef int (CachePolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size); + + // copy policy specifies the copy-executing nodes for a given task + // which allows flexibility in assignment for optimizing raw throughput + // or choosing a conservative usage policy + typedef std::vector (CopyPolicy)(const int numa_dst_node, const int numa_src_node); + + private: + // mutex for accessing the cache state map + + std::shared_mutex cache_mutex_; + + // map from [dst-numa-node,map2] + // map2 from [data-ptr,cache-structure] + + std::unordered_map> cache_state_; + + CachePolicy* cache_policy_function_ = nullptr; + CopyPolicy* copy_policy_function_ = nullptr; + + dml::handler> ExecuteCopy( + const uint8_t* src, uint8_t* dst, const size_t size, const int node + ) const; + + void SubmitTask(CacheData* task, const int dst_node, const int src_node); + + void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const; + + std::unique_ptr GetFromCache(uint8_t* src, const size_t size, const int dst_node); + + public: + void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); + + // function to perform data access through the cache + std::unique_ptr Access(uint8_t* data, const size_t size); + + void Flush(const int node = -1); + }; +} + +inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { + cache_policy_function_ = cache_policy_function; + copy_policy_function_ = copy_policy_function; + + // initialize numa library + numa_available(); + + const int nodes_max = numa_num_configured_nodes(); + const bitmask* valid_nodes = numa_get_mems_allowed(); + + for (int node = 0; node < nodes_max; node++) { + if (numa_bitmask_isbitset(valid_nodes, node)) { + cache_state_.insert({node,{}}); + } + } + + std::cout << "[-] Cache Initialized" << std::endl; +} + +inline std::unique_ptr dsacache::Cache::Access(uint8_t* data, const size_t size) { + // get destination numa node for the cache + + int dst_node = -1; + int src_node = -1; + + GetCacheNode(data, size, &dst_node, &src_node); + + // check whether the data is already cached + + std::unique_ptr task = GetFromCache(data, size, dst_node); + + if (task != nullptr) { + return std::move(task); + } + + // at this point the requested data is not present in cache + // and we create a caching task for it + + task = std::make_unique(data, size); + + { + std::unique_lock lock(cache_mutex_); + + const auto state = cache_state_[dst_node].emplace(task->src_, *task); + + // if state.second is false then no insertion took place + // which means that concurrently whith this thread + // some other thread must have accessed the same + // resource in which case we return the other + // threads data cache structure + + if (!state.second) { + std::cout << "[!] Found another cache instance for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; + return std::move(std::make_unique(state.first->second)); + } + } + + SubmitTask(task.get(), dst_node, src_node); + + return std::move(task); +} + +inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, const int src_node) { + std::cout << "[+] Allocating " << task->size_ << "B on node " << dst_node << " for " << std::hex << (uint64_t)task->src_ << std::dec << std::endl; + + // allocate data on this node and flush the unused parts of the + // cache if the operation fails and retry once + // TODO: smarter flush strategy could keep some stuff cached + + uint8_t* dst = reinterpret_cast(numa_alloc_onnode(task->size_, dst_node)); + + if (dst == nullptr) { + std::cout << "[!] First allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; + + // allocation on dst_node failed so we flush the cache for this + // node hoping to free enough currently unused entries to make + // the second allocation attempt successful + + Flush(dst_node); + + dst = reinterpret_cast(numa_alloc_onnode(task->size_, dst_node)); + + if (dst == nullptr) { + std::cout << "[x] Second allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; + return; + } + } + + task->incomplete_cache_ = dst; + + // querry copy policy function for the nodes to use for the copy + + const std::vector executing_nodes = copy_policy_function_(dst_node, src_node); + const size_t task_count = executing_nodes.size(); + + // each task will copy one fair part of the total size + // and in case the total size is not a factor of the + // given task count the last node must copy the remainder + + const size_t size = task->size_ / task_count; + const size_t last_size = size + task->size_ % task_count; + + std::cout << "[-] Splitting Copy into " << task_count << " tasks of " << size << "B 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; + + // save the current numa node mask to restore later + // as executing the copy task will place this thread + // on a different node + + bitmask* nodemask = numa_get_run_node_mask(); + + for (uint32_t i = 0; i < task_count; i++) { + const size_t local_size = i + 1 == task_count ? size : last_size; + const size_t local_offset = i * size; + const uint8_t* local_src = task->src_ + local_offset; + uint8_t* local_dst = dst + local_offset; + + task->handlers_->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); + } + + // restore the previous nodemask + + numa_run_on_node_mask(nodemask); +} + +inline dml::handler> dsacache::Cache::ExecuteCopy( + const uint8_t* src, uint8_t* dst, const size_t size, const int node +) const { + dml::const_data_view srcv = dml::make_view(src, size); + dml::data_view dstv = dml::make_view(dst, size); + + numa_run_on_node(node); + + return dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv); +} + + +void dsacache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const { + // obtain numa node of current thread to determine where the data is needed + + const int current_cpu = sched_getcpu(); + const int current_node = numa_node_of_cpu(current_cpu); + + // obtain node that the given data pointer is allocated on + + *OUT_SRC_NODE = -1; + get_mempolicy(OUT_SRC_NODE, NULL, 0, (void*)src, MPOL_F_NODE | MPOL_F_ADDR); + + // querry cache policy function for the destination numa node + + *OUT_DST_NODE = cache_policy_function_(current_node, *OUT_SRC_NODE, size); +} + + +inline void dsacache::Cache::Flush(const int node) { + std::cout << "[-] Flushing Cache for " << (node == -1 ? "all nodes" : "node " + std::to_string(node)) << std::endl; + + const auto FlushNode = [](std::unordered_map& map) { + auto it = map.begin(); + + while (it != map.end()) { + if (it->second.Active() == false) { + map.erase(it); + it = map.begin(); + } + else { + it++; + } + } + }; + + { + std::unique_lock lock(cache_mutex_); + + if (node == -1) { + for (auto& nc : cache_state_) { + FlushNode(nc.second); + } + } + else { + FlushNode(cache_state_[node]); + } + } +} + +std::unique_ptr dsacache::Cache::GetFromCache(uint8_t* src, const size_t size, const int dst_node) { + // the best situation is if this data is already cached + // which we check in an unnamed block in which the cache + // is locked for reading to prevent another thread + // from marking the element we may find as unused and + // clearing it + + std::shared_lock lock(cache_mutex_); + + const auto search = cache_state_[dst_node].find(src); + + if (search != cache_state_[dst_node].end()) { + if (search->second.size_ == size) { + search->second.active_->store(true); + + std::cout << "[+] Found Cached version for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; + + return std::move(std::make_unique(search->second)); + } + else { + std::cout << "[!] Found Cached version with size missmatch for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; + + cache_state_[dst_node].erase(search); + } + } + + return nullptr; +} diff --git a/offloading-cacher/main.cpp b/offloading-cacher/main.cpp index 726033b..e67eb22 100644 --- a/offloading-cacher/main.cpp +++ b/offloading-cacher/main.cpp @@ -1,9 +1,9 @@ #include #include -#include "offloading-cache.hpp" +#include "cache.hpp" -offcache::Cache CACHE; +dsacache::Cache CACHE; double* GetRandomArray(const size_t size) { double* array = new double[size]; @@ -32,22 +32,9 @@ bool IsEqual(const double* a, const double* b, const size_t size) { } void PerformAccessAndTest(double* src, const size_t size) { - // this is the function that any cache access will go through - // execution policy picks between three options: - // Relaxed may return an invalid (but not nullptr) CacheData - // which can then be validated with WaitOnCompletion() - // Immediate never returns an invalid CacheData structure - // however it may return just the pointer to source - // WaitOnCompletion() will then ensure that the data - // is actually in cache - // ImmediateNoCache behaves the same as Immediate but does never perform - // caching itself so only returns cached version if - // previously cached is available - - std::unique_ptr data_cache = CACHE.Access( + std::unique_ptr data_cache = CACHE.Access( reinterpret_cast(src), - size * sizeof(double), - offcache::ExecutionPolicy::Immediate + size * sizeof(double) ); double* cached_imm = reinterpret_cast(data_cache->GetDataLocation()); @@ -57,6 +44,9 @@ void PerformAccessAndTest(double* src, const size_t size) { if (src == cached_imm) { std::cout << "Caching did not immediately yield different data location." << std::endl; } + else if (cached_imm == nullptr) { + std::cout << "Immediately got nullptr." << std::endl; + } else { std::cout << "Immediately got different data location." << std::endl; } @@ -74,6 +64,12 @@ void PerformAccessAndTest(double* src, const size_t size) { if (src == cached) { std::cout << "Caching did not affect data location." << std::endl; } + else if (cached == nullptr) { + std::cout << "Got nullptr from cache." << std::endl; + } + else { + std::cout << "Got different data location from cache." << std::endl; + } if (IsEqual(src,cached,size)) { std::cout << "Cached data is correct." << std::endl; diff --git a/offloading-cacher/offloading-cache.hpp b/offloading-cacher/offloading-cache.hpp deleted file mode 100644 index e265665..0000000 --- a/offloading-cacher/offloading-cache.hpp +++ /dev/null @@ -1,474 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include -#include -#include -#include - -#include - -#include -#include -#include - -#include - -namespace offcache { - // execution policy selects in which way the data is supposed to be cached - // and returned with the following behaviour is guaranteed in addition to the - // returned value being valid: - // Immediate: return as fast as possible - // may return cached data, can return data in RAM - // will trigger caching of the data provided - // ImmediateNoCache: return as fast as possible and never trigger caching - // same as Immediate but will not trigger caching - // Relaxed: no rapid return needed, take time - // will trigger caching and may only return - // once the caching is successful but can still - // provide data in RAM - enum class ExecutionPolicy { - Relaxed, Immediate, ImmediateNoCache - }; - - class Cache; - - // the cache task structure will be used to submit and - // control a cache element, while providing source pointer - // and size in bytes for submission - // - // then the submitting thread may wait on the atomic "result" - // which will be notified by the cache worker upon processing - // after which the atomic-bool-ptr active will also become valid - class CacheData { - public: - using dml_handler = dml::handler>; - - private: - uint8_t* src_; - size_t size_; - - std::atomic* active_; - - protected: - std::atomic* cache_; - - uint8_t* incomplete_cache_; - - std::unique_ptr> handlers_; - - friend Cache; - - public: - CacheData(uint8_t* data, const size_t size); - CacheData(const CacheData& other); - ~CacheData(); - - void Deallocate(); - void WaitOnCompletion(); - - uint8_t* GetDataLocation() const; - - bool Active() const; - }; - - // singleton which holds the cache workers - // and is the place where work will be submited - class Cache { - public: - // cache policy is defined as a type here to allow flexible usage of the cacher - // given a numa destination node (where the data will be needed), the numa source - // node (current location of the data) and the data size, this function should - // return optimal cache placement - // dst node and returned value can differ if the system, for example, has HBM - // attached accessible directly to node n under a different node id m - typedef int (CachePolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size); - - // copy policy specifies the copy-executing nodes for a given task - // which allows flexibility in assignment for optimizing raw throughput - // or choosing a conservative usage policy - typedef std::vector (CopyPolicy)(const int numa_dst_node, const int numa_src_node); - - private: - // mutex for accessing the cache state map - - std::shared_mutex cache_mutex_; - - // map from [dst-numa-node,map2] - // map2 from [data-ptr,cache-structure] - - std::unordered_map> cache_state_; - - CachePolicy* cache_policy_function_ = nullptr; - CopyPolicy* copy_policy_function_ = nullptr; - - dml::handler> ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) const; - - void SubmitTask(CacheData* task); - - void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const; - - void AbortTask(CacheData* task) const; - - std::unique_ptr GetFromCache(uint8_t* src, const size_t size); - - public: - void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); - - // function to perform data access through the cache - // behaviour depends on the chosen execution policy - // Immediate and ImmediateNoCache return a cache task - // with guaranteed-valid result value where Relaxed - // policy does not come with this guarantee. - std::unique_ptr Access(uint8_t* data, const size_t size, const ExecutionPolicy policy); - - void Flush(); - }; -} - -inline void offcache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { - cache_policy_function_ = cache_policy_function; - copy_policy_function_ = copy_policy_function; - - // initialize numa library - numa_available(); - - const int nodes_max = numa_num_configured_nodes(); - const bitmask* valid_nodes = numa_get_mems_allowed(); - - for (int node = 0; node < nodes_max; node++) { - if (numa_bitmask_isbitset(valid_nodes, node)) { - cache_state_.insert({node,{}}); - } - } - - std::cout << "[-] Cache Initialized" << std::endl; -} - -inline std::unique_ptr offcache::Cache::Access(uint8_t* data, const size_t size, const ExecutionPolicy policy) { - std::unique_ptr task = GetFromCache(data, size); - - if (task != nullptr) { - return std::move(task); - } - - // at this point the requested data is not present in cache - // and we create a caching task for it - - task = std::make_unique(data, size); - - if (policy == ExecutionPolicy::Immediate) { - // in intermediate mode the returned task - // object is guaranteed to be valid and therefore - // its resulting location must be validated - // after which we submit the task - // maybe_result is then set by submit - - task->cache_->store(data); - SubmitTask(task.get()); - return std::move(task); - } - else if (policy == ExecutionPolicy::ImmediateNoCache) { - // for immediatenocache we just validate - // the generated task and return it - // we must also set maybe_result in case - // someone waits on this - - task->cache_->store(data); - task->incomplete_cache_ = data; - return std::move(task); - } - else if (policy == ExecutionPolicy::Relaxed) { - // for relaxed no valid task must be returned - // and therefore we just submit and then give - // the possible invalid task back with only - // maybe_result set by submission - - SubmitTask(task.get()); - return std::move(task); - } - else { - // this should not be reached - } -} - -inline void offcache::Cache::SubmitTask(CacheData* task) { - // get destination numa node for the cache - - int dst_node = -1; - int src_node = -1; - - GetCacheNode(task->src_, task->size_, &dst_node, &src_node); - - std::cout << "[+] Allocating " << task->size_ << "B on node " << dst_node << " for " << std::hex << (uint64_t)task->src_ << std::dec << std::endl; - - // allocate data on this node and flush the unused parts of the - // cache if the operation fails and retry once - // TODO: smarter flush strategy could keep some stuff cached - - uint8_t* dst = reinterpret_cast(numa_alloc_onnode(task->size_, dst_node)); - - if (dst == nullptr) { - std::cout << "[!] First allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; - - Flush(); - - dst = reinterpret_cast(numa_alloc_onnode(task->size_, dst_node)); - - if (dst == nullptr) { - std::cout << "[x] Second allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; - return; - } - } - - task->incomplete_cache_ = dst; - - // querry copy policy function for the nodes to use for the copy - - const std::vector executing_nodes = copy_policy_function_(dst_node, src_node); - const size_t task_count = executing_nodes.size(); - - // each task will copy one fair part of the total size - // and in case the total size is not a factor of the - // given task count the last node must copy the remainder - - const size_t size = task->size_ / task_count; - const size_t last_size = size + task->size_ % task_count; - - std::cout << "[-] Splitting Copy into " << task_count << " tasks of " << size << "B 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; - - // save the current numa node mask to restore later - // as executing the copy task will place this thread - // on a different node - - bitmask* nodemask = numa_get_run_node_mask(); - - for (uint32_t i = 0; i < task_count; i++) { - const size_t local_size = i + 1 == task_count ? size : last_size; - const size_t local_offset = i * size; - const uint8_t* local_src = task->src_ + local_offset; - uint8_t* local_dst = dst + local_offset; - - task->handlers_->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); - } - - // only at this point may the task be added to the control structure - // because adding it earlier could cause it to be returned for an - // access request while the handler-vector is not fully populated - // which could cause the wait-function to return prematurely - // TODO: this can be optimized because the abort is quite expensive - - { - std::unique_lock lock(cache_mutex_); - - const auto state = cache_state_[dst_node].emplace(task->src_, *task); - - // if state.second is false then no insertion took place - // which means that concurrently whith this thread - // some other thread must have accessed the same - // resource in which case we must perform an abort - // TODO: abort is not the only way to handle this situation - - if (!state.second) { - std::cout << "[x] Found another cache instance for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; - - AbortTask(task); - - return; - } - } - - // restore the previous nodemask - - numa_run_on_node_mask(nodemask); -} - -inline dml::handler> offcache::Cache::ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) const { - dml::const_data_view srcv = dml::make_view(src, size); - dml::data_view dstv = dml::make_view(dst, size); - - numa_run_on_node(node); - - return dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv); -} - -inline void offcache::CacheData::WaitOnCompletion() { - if (handlers_ == nullptr) { - std::cout << "[-] Waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; - - cache_->wait(nullptr); - - std::cout << "[+] Finished waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; - } - else { - std::cout << "[-] Waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; - - for (auto& handler : *handlers_) { - auto result = handler.get(); - // TODO: handle the returned status code - } - - handlers_ = nullptr; - - std::cout << "[+] Finished waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; - - cache_->store(incomplete_cache_); - cache_->notify_all(); - } -} - -offcache::CacheData::CacheData(uint8_t* data, const size_t size) { - std::cout << "[-] New CacheData 0x" << std::hex << (uint64_t)data << std::dec << std::endl; - - src_ = data; - size_ = size; - active_ = new std::atomic(1); - cache_ = new std::atomic(); - incomplete_cache_ = nullptr; - handlers_ = std::make_unique>(); -} - -offcache::CacheData::CacheData(const offcache::CacheData& other) { - std::cout << "[-] Copy Created for CacheData 0x" << std::hex << (uint64_t)other.src_ << std::dec << std::endl; - - active_ = other.active_; - const int current_active = active_->fetch_add(1); - - src_ = other.src_; - size_ = other.size_; - cache_ = other.cache_; - incomplete_cache_ = nullptr; - handlers_ = nullptr; -} - -offcache::CacheData::~CacheData() { - std::cout << "[-] Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; - - // due to fetch_sub returning the preivously held value - // we must subtract one locally to get the current value - - const int32_t v = active_->fetch_sub(1) - 1; - - // if the returned value is zero or lower - // then we must execute proper deletion - // as this was the last reference - - if (v <= 0) { - std::cout << "[!] Full Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; - - Deallocate(); - delete active_; - delete cache_; - } -} - -void offcache::CacheData::Deallocate() { - std::cout << "[!] Deallocating for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; - - numa_free(cache_, size_); - cache_ = nullptr; - incomplete_cache_ = nullptr; -} - -void offcache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const { - // obtain numa node of current thread to determine where the data is needed - - const int current_cpu = sched_getcpu(); - const int current_node = numa_node_of_cpu(current_cpu); - - // obtain node that the given data pointer is allocated on - - *OUT_SRC_NODE = -1; - get_mempolicy(OUT_SRC_NODE, NULL, 0, (void*)src, MPOL_F_NODE | MPOL_F_ADDR); - - // querry cache policy function for the destination numa node - - *OUT_DST_NODE = cache_policy_function_(current_node, *OUT_SRC_NODE, size); -} - -uint8_t* offcache::CacheData::GetDataLocation() const { - return cache_->load(); -} - -bool offcache::CacheData::Active() const { - return active_->load() > 0; -} - -inline void offcache::Cache::Flush() { - std::cout << "[-] Flushing Cache" << std::endl; - - // TODO: there is a better way to implement this flush - - { - std::unique_lock lock(cache_mutex_); - - for (auto& nc : cache_state_) { - auto it = nc.second.begin(); - - while (it != nc.second.end()) { - if (it->second.Active() == false) { - nc.second.erase(it); - it = nc.second.begin(); - } - else { - it++; - } - } - } - } -} - -void offcache::Cache::AbortTask(offcache::CacheData *task) const { - // first wait on all copy operations to be completed - - task->WaitOnCompletion(); - - // abort by doing the following steps - // (1) free the allocated memory, (2) remove the "maybe result" as - // we will not run the caching operation, (3) clear the sub tasks - // for the very same reason, (4) set the result to the RAM-location - - numa_free(task->incomplete_cache_, task->size_); - task->incomplete_cache_ = nullptr; - task->cache_->store(task->src_); - - std::cout << "[-] Abort completed for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; -} - -std::unique_ptr offcache::Cache::GetFromCache(uint8_t* src, const size_t size) { - // the best situation is if this data is already cached - // which we check in an unnamed block in which the cache - // is locked for reading to prevent another thread - // from marking the element we may find as unused and - // clearing it - - int dst_node = -1; - int src_node = -1; - - GetCacheNode(src, size, &dst_node, &src_node); - - std::shared_lock lock(cache_mutex_); - - const auto search = cache_state_[dst_node].find(src); - - if (search != cache_state_[dst_node].end()) { - if (search->second.size_ == size) { - search->second.active_->store(true); - - std::cout << "[+] Found Cached version for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; - - return std::move(std::make_unique(search->second)); - } - else { - std::cout << "[!] Found Cached version with size missmatch for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; - - cache_state_[dst_node].erase(search); - } - } - - return nullptr; -}