diff --git a/offloading-cacher/cache.hpp b/offloading-cacher/cache.hpp index 058a1e1..9a198e2 100644 --- a/offloading-cacher/cache.hpp +++ b/offloading-cacher/cache.hpp @@ -196,12 +196,16 @@ namespace dsacache { private: // mutex for accessing the cache state map - std::shared_mutex cache_mutex_; // map from [dst-numa-node,map2] // map2 from [data-ptr,cache-structure] - std::unordered_map> cache_state_; + struct LockedNodeCacheState { + std::shared_mutex cache_mutex_; + std::unordered_map node_cache_state_; + }; + + std::unordered_map cache_state_; CachePolicy* cache_policy_function_ = nullptr; CopyPolicy* copy_policy_function_ = nullptr; @@ -237,6 +241,7 @@ namespace dsacache { std::unique_ptr GetFromCache(uint8_t* src, const size_t size, const int dst_node); public: + ~Cache(); Cache() = default; Cache(const Cache& other) = delete; @@ -265,11 +270,10 @@ namespace dsacache { } inline void dsacache::Cache::Clear() { - std::unique_lock lock(cache_mutex_); - - cache_state_.clear(); - - Init(cache_policy_function_, copy_policy_function_); + for (auto& nc : cache_state_) { + std::unique_lock lock(nc.second->cache_mutex_); + nc.second->node_cache_state_.clear(); + } } inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { @@ -292,7 +296,9 @@ inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy for (int node = 0; node < nodes_max; node++) { if (numa_bitmask_isbitset(valid_nodes, node)) { - cache_state_.insert({node,{}}); + void* block = numa_alloc_onnode(sizeof(LockedNodeCacheState), node); + auto* state = new(block)LockedNodeCacheState; + cache_state_.insert({node,state}); } } } @@ -323,9 +329,11 @@ inline std::unique_ptr dsacache::Cache::Access(uint8_t* dat task = std::make_unique(data, size); { - std::unique_lock lock(cache_mutex_); + LockedNodeCacheState* local_cache_state = cache_state_[dst_node]; + + std::unique_lock lock(local_cache_state->cache_mutex_); - const auto state = cache_state_[dst_node].emplace(task->src_, *task); + const auto state = local_cache_state->node_cache_state_.emplace(task->src_, *task); // if state.second is false then no insertion took place // which means that concurrently whith this thread @@ -488,22 +496,19 @@ inline void dsacache::Cache::Flush(const int node) { } }; - { - // we require exclusive lock as we modify the cache state + // we require exclusive lock as we modify the cache state + // node == -1 means that cache on all nodes should be flushed - std::unique_lock lock(cache_mutex_); - - // node == -1 means that cache on all nodes should be flushed - - if (node == -1) { - for (auto& nc : cache_state_) { - FlushNode(nc.second); - } - } - else { - FlushNode(cache_state_[node]); + if (node == -1) { + for (auto& nc : cache_state_) { + std::unique_lock lock(nc.second->cache_mutex_); + FlushNode(nc.second->node_cache_state_); } } + else { + std::unique_lock lock(cache_state_[node]->cache_mutex_); + FlushNode(cache_state_[node]->node_cache_state_); + } } inline std::unique_ptr dsacache::Cache::GetFromCache(uint8_t* src, const size_t size, const int dst_node) { @@ -513,17 +518,19 @@ inline std::unique_ptr dsacache::Cache::GetFromCache(uint8_ // from marking the element we may find as unused and // clearing it + LockedNodeCacheState* local_cache_state = cache_state_[dst_node]; + // lock the cache state in shared-mode because we read - std::shared_lock lock(cache_mutex_); + std::shared_lock lock(local_cache_state->cache_mutex_); // search for the data in our cache state structure at the given node - const auto search = cache_state_[dst_node].find(src); + const auto search = local_cache_state->node_cache_state_.find(src); // if the data is in our structure we continue - if (search != cache_state_[dst_node].end()) { + if (search != local_cache_state->node_cache_state_.end()) { // now check whether the sizes match @@ -538,7 +545,7 @@ inline std::unique_ptr dsacache::Cache::GetFromCache(uint8_ // which will cause its deletion only after the last possible outside // reference is also destroyed - cache_state_[dst_node].erase(search); + local_cache_state->node_cache_state_.erase(search); } } @@ -547,26 +554,33 @@ inline std::unique_ptr dsacache::Cache::GetFromCache(uint8_ void dsacache::Cache::Invalidate(uint8_t* data) { // as the cache is modified we must obtain a unique writers lock - - std::unique_lock lock(cache_mutex_); - // loop through all per-node-caches available for (auto node : cache_state_) { + std::unique_lock lock(node.second->cache_mutex_); + // search for an entry for the given data pointer - auto search = node.second.find(data); + auto search = node.second->node_cache_state_.find(data); - if (search != node.second.end()) { + if (search != node.second->node_cache_state_.end()) { // if the data is represented in-cache // then it will be erased to re-trigger // caching on next access - node.second.erase(search); + node.second->node_cache_state_.erase(search); } } } +inline dsacache::Cache::~Cache() { + for (auto node : cache_state_) { + std::unique_lock lock(node.second->cache_mutex_); + node.second->~LockedNodeCacheState(); + numa_free(reinterpret_cast(node.second), sizeof(LockedNodeCacheState)); + } +} + inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { src_ = data; size_ = size;