#pragma once #include #include #include #include #include #include #include #include #include #include "cache-data.hpp" namespace dsacache { // cache class will handle access to data through the cache // by managing the cache through work submission, it sticks // to user-defined caching and copy policies, is thread // safe after initialization and returns copies of // cache data class to the user class Cache { public: // cache policy is defined as a type here to allow flexible usage of the cacher // given a numa destination node (where the data will be needed), the numa source // node (current location of the data) and the data size, this function should // return optimal cache placement // dst node and returned value can differ if the system, for example, has HBM // attached accessible directly to node n under a different node id m typedef int (CachePolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size); // copy policy specifies the copy-executing nodes for a given task // which allows flexibility in assignment for optimizing raw throughput // or choosing a conservative usage policy typedef std::vector (CopyPolicy)(const int numa_dst_node, const int numa_src_node); private: // mutex for accessing the cache state map std::shared_mutex cache_mutex_; // map from [dst-numa-node,map2] // map2 from [data-ptr,cache-structure] std::unordered_map> cache_state_; CachePolicy* cache_policy_function_ = nullptr; CopyPolicy* copy_policy_function_ = nullptr; // function used to submit a copy task on a specific node to the dml // engine on that node - will change the current threads node assignment // to achieve this so take care to restore this dml::handler> ExecuteCopy( const uint8_t* src, uint8_t* dst, const size_t size, const int node ) const; // allocates the required memory on the destination node // and then submits task to the dml library for processing // and attaches the handlers to the cache data structure void SubmitTask(CacheData* task, const int dst_node, const int src_node); // querries the policy functions for the given data and size // to obtain destination cache node, also returns the datas // source node for further usage // output may depend on the calling threads node assignment // as this is set as the "optimal placement" node void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const; // checks whether the cache contains an entry for // the given data in the given memory node and // returns it, otherwise returns nullptr std::unique_ptr GetFromCache(uint8_t* src, const size_t size, const int dst_node); public: // initializes the cache with the two policy functions // only after this is it safe to use in a threaded environment void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); // function to perform data access through the cache std::unique_ptr Access(uint8_t* data, const size_t size); // flushes the cache of inactive entries // if node is -1 then the whole cache is // checked and otherwise the specified // node - no checks on node validity void Flush(const int node = -1); }; } inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { cache_policy_function_ = cache_policy_function; copy_policy_function_ = copy_policy_function; // initialize numa library numa_available(); // obtain all available nodes // and those we may allocate // memory on const int nodes_max = numa_num_configured_nodes(); const bitmask* valid_nodes = numa_get_mems_allowed(); // prepare the cache state with entries // for all given nodes for (int node = 0; node < nodes_max; node++) { if (numa_bitmask_isbitset(valid_nodes, node)) { cache_state_.insert({node,{}}); } } std::cout << "[-] Cache Initialized" << std::endl; } inline std::unique_ptr dsacache::Cache::Access(uint8_t* data, const size_t size) { // get destination numa node for the cache int dst_node = -1; int src_node = -1; GetCacheNode(data, size, &dst_node, &src_node); // TODO: at this point it could be beneficial to check whether // TODO: the given destination node is present as an entry // TODO: in the cache state to see if it is valid // check whether the data is already cached std::unique_ptr task = GetFromCache(data, size, dst_node); if (task != nullptr) { return std::move(task); } // at this point the requested data is not present in cache // and we create a caching task for it task = std::make_unique(data, size); { std::unique_lock lock(cache_mutex_); const auto state = cache_state_[dst_node].emplace(task->src_, *task); // if state.second is false then no insertion took place // which means that concurrently whith this thread // some other thread must have accessed the same // resource in which case we return the other // threads data cache structure if (!state.second) { std::cout << "[!] Found another cache instance for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; return std::move(std::make_unique(state.first->second)); } } SubmitTask(task.get(), dst_node, src_node); return std::move(task); } inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, const int src_node) { std::cout << "[+] Allocating " << task->size_ << "B on node " << dst_node << " for " << std::hex << (uint64_t)task->src_ << std::dec << std::endl; // allocate data on this node and flush the unused parts of the // cache if the operation fails and retry once // TODO: smarter flush strategy could keep some stuff cached uint8_t* dst = reinterpret_cast(numa_alloc_onnode(task->size_, dst_node)); if (dst == nullptr) { std::cout << "[!] First allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; // allocation on dst_node failed so we flush the cache for this // node hoping to free enough currently unused entries to make // the second allocation attempt successful Flush(dst_node); dst = reinterpret_cast(numa_alloc_onnode(task->size_, dst_node)); if (dst == nullptr) { std::cerr << "[x] Second allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; return; } } task->incomplete_cache_ = dst; // querry copy policy function for the nodes to use for the copy const std::vector executing_nodes = copy_policy_function_(dst_node, src_node); const size_t task_count = executing_nodes.size(); // each task will copy one fair part of the total size // and in case the total size is not a factor of the // given task count the last node must copy the remainder const size_t size = task->size_ / task_count; const size_t last_size = size + task->size_ % task_count; std::cout << "[-] Splitting Copy into " << task_count << " tasks of " << size << "B 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; // save the current numa node mask to restore later // as executing the copy task will place this thread // on a different node bitmask* nodemask = numa_get_run_node_mask(); for (uint32_t i = 0; i < task_count; i++) { const size_t local_size = i + 1 == task_count ? size : last_size; const size_t local_offset = i * size; const uint8_t* local_src = task->src_ + local_offset; uint8_t* local_dst = dst + local_offset; task->handlers_->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); } // restore the previous nodemask numa_run_on_node_mask(nodemask); numa_free_nodemask(nodemask); } inline dml::handler> dsacache::Cache::ExecuteCopy( const uint8_t* src, uint8_t* dst, const size_t size, const int node ) const { dml::const_data_view srcv = dml::make_view(src, size); dml::data_view dstv = dml::make_view(dst, size); numa_run_on_node(node); return dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv); } inline void dsacache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const { // obtain numa node of current thread to determine where the data is needed const int current_cpu = sched_getcpu(); const int current_node = numa_node_of_cpu(current_cpu); // obtain node that the given data pointer is allocated on *OUT_SRC_NODE = -1; get_mempolicy(OUT_SRC_NODE, NULL, 0, (void*)src, MPOL_F_NODE | MPOL_F_ADDR); // querry cache policy function for the destination numa node *OUT_DST_NODE = cache_policy_function_(current_node, *OUT_SRC_NODE, size); } inline void dsacache::Cache::Flush(const int node) { std::cout << "[-] Flushing Cache for " << (node == -1 ? "all nodes" : "node " + std::to_string(node)) << std::endl; // this lambda is used because below we have two code paths that // flush nodes, either one single or all successively const auto FlushNode = [](std::unordered_map& map) { // begin at the front of the map auto it = map.begin(); // loop until we reach the end of the map while (it != map.end()) { // if the iterator points to an inactive element // then we may erase it if (it->second.Active() == false) { // erase the iterator from the map map.erase(it); // as the erasure invalidated out iterator // we must start at the beginning again it = map.begin(); } else { // if element is active just move over to the next one it++; } } }; { // we require exclusive lock as we modify the cache state std::unique_lock lock(cache_mutex_); // node == -1 means that cache on all nodes should be flushed if (node == -1) { for (auto& nc : cache_state_) { FlushNode(nc.second); } } else { FlushNode(cache_state_[node]); } } } inline std::unique_ptr dsacache::Cache::GetFromCache(uint8_t* src, const size_t size, const int dst_node) { // the best situation is if this data is already cached // which we check in an unnamed block in which the cache // is locked for reading to prevent another thread // from marking the element we may find as unused and // clearing it // lock the cache state in shared-mode because we read std::shared_lock lock(cache_mutex_); // search for the data in our cache state structure at the given node const auto search = cache_state_[dst_node].find(src); // if the data is in our structure we continue if (search != cache_state_[dst_node].end()) { // now check whether the sizes match // TODO: second.size_ >= size would also work if (search->second.size_ == size) { std::cout << "[+] Found Cached version for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; // return a unique copy of the entry which uses the object // lifetime and destructor to safely handle deallocation return std::move(std::make_unique(search->second)); } else { std::cout << "[!] Found Cached version with size missmatch for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; // if the sizes missmatch then we clear the current entry from cache // which will cause its deletion only after the last possible outside // reference is also destroyed cache_state_[dst_node].erase(search); } } return nullptr; }