Browse Source

distribute multiple per-node locks for the cache state and allocate them on the respective numa node for fastest possible access

master
Constantin Fürst 11 months ago
parent
commit
7e48c96828
  1. 68
      offloading-cacher/cache.hpp

68
offloading-cacher/cache.hpp

@ -196,12 +196,16 @@ namespace dsacache {
private: private:
// mutex for accessing the cache state map // mutex for accessing the cache state map
std::shared_mutex cache_mutex_;
// map from [dst-numa-node,map2] // map from [dst-numa-node,map2]
// map2 from [data-ptr,cache-structure] // map2 from [data-ptr,cache-structure]
std::unordered_map<uint8_t, std::unordered_map<uint8_t*, CacheData>> cache_state_;
struct LockedNodeCacheState {
std::shared_mutex cache_mutex_;
std::unordered_map<uint8_t*, CacheData> node_cache_state_;
};
std::unordered_map<uint8_t, LockedNodeCacheState*> cache_state_;
CachePolicy* cache_policy_function_ = nullptr; CachePolicy* cache_policy_function_ = nullptr;
CopyPolicy* copy_policy_function_ = nullptr; CopyPolicy* copy_policy_function_ = nullptr;
@ -237,6 +241,7 @@ namespace dsacache {
std::unique_ptr<CacheData> GetFromCache(uint8_t* src, const size_t size, const int dst_node); std::unique_ptr<CacheData> GetFromCache(uint8_t* src, const size_t size, const int dst_node);
public: public:
~Cache();
Cache() = default; Cache() = default;
Cache(const Cache& other) = delete; Cache(const Cache& other) = delete;
@ -265,11 +270,10 @@ namespace dsacache {
} }
inline void dsacache::Cache::Clear() { inline void dsacache::Cache::Clear() {
std::unique_lock<std::shared_mutex> lock(cache_mutex_);
cache_state_.clear();
Init(cache_policy_function_, copy_policy_function_);
for (auto& nc : cache_state_) {
std::unique_lock<std::shared_mutex> lock(nc.second->cache_mutex_);
nc.second->node_cache_state_.clear();
}
} }
inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) {
@ -292,7 +296,9 @@ inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy
for (int node = 0; node < nodes_max; node++) { for (int node = 0; node < nodes_max; node++) {
if (numa_bitmask_isbitset(valid_nodes, node)) { if (numa_bitmask_isbitset(valid_nodes, node)) {
cache_state_.insert({node,{}});
void* block = numa_alloc_onnode(sizeof(LockedNodeCacheState), node);
auto* state = new(block)LockedNodeCacheState;
cache_state_.insert({node,state});
} }
} }
} }
@ -323,9 +329,11 @@ inline std::unique_ptr<dsacache::CacheData> dsacache::Cache::Access(uint8_t* dat
task = std::make_unique<CacheData>(data, size); task = std::make_unique<CacheData>(data, size);
{ {
std::unique_lock<std::shared_mutex> lock(cache_mutex_);
LockedNodeCacheState* local_cache_state = cache_state_[dst_node];
std::unique_lock<std::shared_mutex> lock(local_cache_state->cache_mutex_);
const auto state = cache_state_[dst_node].emplace(task->src_, *task);
const auto state = local_cache_state->node_cache_state_.emplace(task->src_, *task);
// if state.second is false then no insertion took place // if state.second is false then no insertion took place
// which means that concurrently whith this thread // which means that concurrently whith this thread
@ -488,21 +496,18 @@ inline void dsacache::Cache::Flush(const int node) {
} }
}; };
{
// we require exclusive lock as we modify the cache state // we require exclusive lock as we modify the cache state
std::unique_lock<std::shared_mutex> lock(cache_mutex_);
// node == -1 means that cache on all nodes should be flushed // node == -1 means that cache on all nodes should be flushed
if (node == -1) { if (node == -1) {
for (auto& nc : cache_state_) { for (auto& nc : cache_state_) {
FlushNode(nc.second);
std::unique_lock<std::shared_mutex> lock(nc.second->cache_mutex_);
FlushNode(nc.second->node_cache_state_);
} }
} }
else { else {
FlushNode(cache_state_[node]);
}
std::unique_lock<std::shared_mutex> lock(cache_state_[node]->cache_mutex_);
FlushNode(cache_state_[node]->node_cache_state_);
} }
} }
@ -513,17 +518,19 @@ inline std::unique_ptr<dsacache::CacheData> dsacache::Cache::GetFromCache(uint8_
// from marking the element we may find as unused and // from marking the element we may find as unused and
// clearing it // clearing it
LockedNodeCacheState* local_cache_state = cache_state_[dst_node];
// lock the cache state in shared-mode because we read // lock the cache state in shared-mode because we read
std::shared_lock<std::shared_mutex> lock(cache_mutex_);
std::shared_lock<std::shared_mutex> lock(local_cache_state->cache_mutex_);
// search for the data in our cache state structure at the given node // search for the data in our cache state structure at the given node
const auto search = cache_state_[dst_node].find(src);
const auto search = local_cache_state->node_cache_state_.find(src);
// if the data is in our structure we continue // if the data is in our structure we continue
if (search != cache_state_[dst_node].end()) {
if (search != local_cache_state->node_cache_state_.end()) {
// now check whether the sizes match // now check whether the sizes match
@ -538,7 +545,7 @@ inline std::unique_ptr<dsacache::CacheData> dsacache::Cache::GetFromCache(uint8_
// which will cause its deletion only after the last possible outside // which will cause its deletion only after the last possible outside
// reference is also destroyed // reference is also destroyed
cache_state_[dst_node].erase(search);
local_cache_state->node_cache_state_.erase(search);
} }
} }
@ -547,23 +554,30 @@ inline std::unique_ptr<dsacache::CacheData> dsacache::Cache::GetFromCache(uint8_
void dsacache::Cache::Invalidate(uint8_t* data) { void dsacache::Cache::Invalidate(uint8_t* data) {
// as the cache is modified we must obtain a unique writers lock // as the cache is modified we must obtain a unique writers lock
std::unique_lock<std::shared_mutex> lock(cache_mutex_);
// loop through all per-node-caches available // loop through all per-node-caches available
for (auto node : cache_state_) { for (auto node : cache_state_) {
std::unique_lock<std::shared_mutex> lock(node.second->cache_mutex_);
// search for an entry for the given data pointer // search for an entry for the given data pointer
auto search = node.second.find(data);
auto search = node.second->node_cache_state_.find(data);
if (search != node.second.end()) {
if (search != node.second->node_cache_state_.end()) {
// if the data is represented in-cache // if the data is represented in-cache
// then it will be erased to re-trigger // then it will be erased to re-trigger
// caching on next access // caching on next access
node.second.erase(search);
node.second->node_cache_state_.erase(search);
}
}
} }
inline dsacache::Cache::~Cache() {
for (auto node : cache_state_) {
std::unique_lock<std::shared_mutex> lock(node.second->cache_mutex_);
node.second->~LockedNodeCacheState();
numa_free(reinterpret_cast<void*>(node.second), sizeof(LockedNodeCacheState));
} }
} }

Loading…
Cancel
Save