diff --git a/offloading-cacher/offloading-cache.hpp b/offloading-cacher/offloading-cache.hpp index 9c2967a..d937fb8 100644 --- a/offloading-cacher/offloading-cache.hpp +++ b/offloading-cacher/offloading-cache.hpp @@ -4,9 +4,11 @@ #include #include #include +#include #include +#include #include #include @@ -28,13 +30,6 @@ namespace offcache { Relaxed, Immediate, ImmediateNoCache }; - struct WorkerTask { - uint8_t* src_; - uint8_t* dst_; - size_t size_; - std::atomic completed_ { false }; - }; - // the cache task structure will be used to submit and // control a cache element, while providing source pointer // and size in bytes for submission @@ -45,26 +40,11 @@ namespace offcache { struct CacheTask { uint8_t* data_; size_t size_; - ExecutionPolicy policy_; - uint8_t* result_; - std::atomic active_; - std::vector sub_tasks_; - }; - - // worker class, one for each numa node - // discovers its node configuration on startup - // and keeps track of available memory - class CacheWorker { - public: - uint8_t numa_node_ = 0; - - // this is the mailbox of the worker to which a new task - // may be submitted by exchanging nullptr with a valid one - // and notifying on the atomic after which ownership - // of the CacheTask structure is transferred to the worker - std::atomic* task_slot_ = nullptr; - - static void run(CacheWorker* this_); + uint8_t* result_ = nullptr; + uint8_t* maybe_result_ = nullptr; + std::atomic active_ { true }; + std::atomic valid_ { false }; + std::vector>> handlers_; }; // singleton which holds the cache workers @@ -77,71 +57,283 @@ namespace offcache { // return optimal cache placement // dst node and returned value can differ if the system, for example, has HBM // attached accessible directly to node n under a different node id m - typedef uint8_t (CachePolicy)(const uint8_t numa_dst_node, const uint8_t numa_src_node, const size_t data_size); + typedef int (CachePolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size); // copy policy specifies the copy-executing nodes for a given task // which allows flexibility in assignment for optimizing raw throughput // or choosing a conservative usage policy - typedef std::vector (CopyPolicy)(const uint8_t numa_dst_node, const uint8_t numa_src_node); + typedef std::vector (CopyPolicy)(const int numa_dst_node, const int numa_src_node); private: - std::unordered_map workers_; + std::shared_mutex cache_mutex_; std::unordered_map cache_state_; CachePolicy* cache_policy_function_ = nullptr; CopyPolicy* copy_policy_function_ = nullptr; + dml::handler> ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) const; + + void SubmitTask(CacheTask* task); + + CacheTask* CreateTask(const uint8_t *data, const size_t size) const; + + void DestroyTask(CacheTask* task) const; + public: void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); - // submits the given task and takes ownership of the pointer - void SubmitTask(CacheTask* task, const ExecutionPolicy policy) const; + // function to perform data access through the cache + // behaviour depends on the chosen execution policy + // Immediate and ImmediateNoCache return a cache task + // with guaranteed-valid result value where Relaxed + // policy does not come with this guarantee. + CacheTask* Access(uint8_t* data, const size_t size, const ExecutionPolicy policy); // waits upon completion of caching - // returns the location of the data - static uint8_t* WaitOnCompletion(CacheTask* task); + static void WaitOnCompletion(CacheTask* task); // invalidates the given pointer + // afterwards the reference to the + // cache task object may be forgotten static void SignalDataUnused(CacheTask* task); - }; -} -void offcache::CacheWorker::run(CacheWorker* this_) { + // returns the location of the cached data + // which may or may not be valid + static uint8_t* GetDataLocation(CacheTask* task); + void Flush(); + }; } -void offcache::CacheCoordinator::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { +inline void offcache::CacheCoordinator::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { cache_policy_function_ = cache_policy_function; copy_policy_function_ = copy_policy_function; // initialize numa library numa_available(); +} + +inline offcache::CacheTask* offcache::CacheCoordinator::Access(uint8_t* data, const size_t size, const ExecutionPolicy policy) { + // the best situation is if this data is already cached + // which we check in an unnamed block in which the cache + // is locked for reading to prevent another thread + // from marking the element we may find as unused and + // clearing it + { + std::shared_lock lock(cache_mutex_); - const uint8_t nodes_max = numa_num_configured_nodes(); - const uint8_t valid_nodes = numa_get_mems_allowed(); + const auto search = cache_state_.find(data); - for (uint8_t node = 0; node < nodes_max; node++) { - if (numa_bitmask_isbitset(valid_nodes, node)) { - workers_.insert({ node, CacheWorker() }); - workers_[node].numa_node_ = node; - std::thread t (CacheWorker::run, &workers_[node]); - t.detach(); + if (search != cache_state_.end()) { + if (search->second->size_ == size) { + search->second->active_.store(true); + // TODO: check for completed status depending on execution policy + return search->second; + } + else { + DestroyTask(search->second); + cache_state_.erase(search); + } } - } + } + + // at this point the requested data is not present in cache + // and we create a caching task for it + + CacheTask* task = CreateTask(data, size); + + if (policy == ExecutionPolicy::Immediate) { + // in intermediate mode the returned task + // object is guaranteed to be valid and therefore + // its resulting location must be validated + // after which we submit the task + // maybe_result is then set by submit + + task->result_ = data; + SubmitTask(task); + return task; + } + else if (policy == ExecutionPolicy::ImmediateNoCache) { + // for immediatenocache we just validate + // the generated task and return it + // we must also set maybe_result in case + // someone waits on this + + task->result_ = data; + task->maybe_result_ = data; + return task; + } + else if (policy == ExecutionPolicy::Relaxed) { + // for relaxed no valid task must be returned + // and therefore we just submit and then give + // the possible invalid task back with only + // maybe_result set by submission + + SubmitTask(task); + return task; + } + else { + // this should not be reached + } } -void offcache::CacheCoordinator::SubmitTask(CacheTask* task, const ExecutionPolicy policy) const { +inline void offcache::CacheCoordinator::SubmitTask(CacheTask* task) { + // obtain numa node of current thread to determine where the data is needed + + const int current_cpu = sched_getcpu(); + const int current_node = numa_node_of_cpu(current_cpu); + + // obtain node that the given data pointer is allocated on + + int data_node = -1; + get_mempolicy(&data_node, NULL, 0, (void*)task->data_, MPOL_F_NODE | MPOL_F_ADDR); + + // querry cache policy function for the destination numa node + + const uint32_t dst_node = cache_policy_function_(current_node, data_node, task->size_); + + // allocate data on this node and flush the unused parts of the + // cache if the operation fails and retry once + // TODO: smarter flush strategy could keep some stuff cached + + uint8_t* dst = numa_alloc_onnode(task->size_, dst_node); + + if (dst == nullptr) { + Flush(); + + dst = numa_alloc_onnode(task->size_, dst_node); + + if (dst == nullptr) { + return; + } + } + task->maybe_result_ = dst; + + // querry copy policy function for the nodes to use for the copy + + const std::vector executing_nodes = copy_policy_function_(dst_node, data_node); + const size_t task_count = executing_nodes.size(); + + // at this point the task may be added to the cache structure + // due to the task being initialized with the valid flag set to false + + { + std::unique_lock lock(cache_mutex_); + + const auto state = cache_state_.insert({task->data_, task}); + + // if state.second is false then no insertion took place + // which means that concurrently whith this thread + // some other thread must have accessed the same + // resource in which case we must perform an abort + // TODO: abort is not the only way to handle this situation + + if (!state.second) { + // abort by doing the following steps + // (1) free the allocated memory, (2) remove the "maybe result" as + // we will not run the caching operation, (3) clear the sub tasks + // for the very same reason, (4) set the result to the RAM-location + + numa_free(dst, task->size_); + task->maybe_result_ = nullptr; + task->result_ = task->data_; + return; + } + } + + // each task will copy one fair part of the total size + // and in case the total size is not a factor of the + // given task count the last node must copy the remainder + + const size_t size = task->size_ / task_count; + const size_t last_size = size + task->size_ % task_count; + + // save the current numa node mask to restore later + // as executing the copy task will place this thread + // on a different node + + const int nodemask = numa_get_run_node_mask(); + + for (uint32_t i = 0; i < task_count; i++) { + const size_t local_size = i + 1 == task_count ? size : last_size; + const size_t local_offset = i * size; + const uint8_t* local_src = task->data_ + local_offset; + uint8_t* local_dst = dst + local_offset; + + const auto handler = ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i]); + task->handlers_.emplace_back(handler); + } + + // set the valid flag of the task as all handlers + // required for completion signal are registered + + task->valid_.store(true); + task->valid_.notify_all(); + + // restore the previous nodemask + + numa_run_on_node_mask(nodemask); +} + +inline dml::handler> offcache::CacheCoordinator::ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) { + dml::data_view srcv = dml::make_view(reinterpret_cast(src), size); + dml::data_view dstv = dml::make_view(reinterpret_cast(dst), size); + + numa_run_on_node(node); + + return dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv); +} + +inline offcache::CacheTask* offcache::CacheCoordinator::CreateTask(const uint8_t* data, const size_t size) const { + CacheTask* task = new CacheTask(); + task->data_ = data; + task->size_ = size; + return task; +} + +inline void offcache::CacheCoordinator::DestroyTask(CacheTask* task) const { + numa_free(task->result_, task->size_); + delete task; } -uint8_t* offcache::CacheCoordinator::WaitOnCompletion(CacheTask* task) { - while (!task->sub_tasks_.empty()) { - task->sub_tasks_.back().completed_.wait(false); - task->sub_tasks_.pop_back(); +inline void offcache::CacheCoordinator::WaitOnCompletion(CacheTask* task) { + task->valid_.wait(false); + + for (auto& handler : task->handlers_) { + auto result = handler.get(); + // TODO: handle the returned status code } + + task->handlers_.clear(); } -void offcache::CacheCoordinator::SignalDataUnused(CacheTask* task) { +inline uint8_t* offcache::CacheCoordinator::GetDataLocation(CacheTask* task) { + return task->result_; +} + +inline void offcache::CacheCoordinator::SignalDataUnused(CacheTask* task) { task->active_.store(false); } + +inline void offcache::CacheCoordinator::Flush() { + // TODO: there probably is a better way to implement this flush + + { + std::unique_lock lock(cache_mutex_); + + auto it = cache_state_.begin(); + + while (it != cache_state_.end()) { + if (it->second->active_.load() == false) { + DestroyTask(it->second); + cache_state_.erase(it); + it = cache_state_.begin(); + } + else { + it++; + } + } + } +} \ No newline at end of file