From 3c90e24bc1a925d50182b2d66449d3ac63b9f133 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Tue, 30 Jan 2024 14:14:59 +0100 Subject: [PATCH] revert the cacher to allow load balancing control through the copy placement policy function which now selects on how many nodes the task is split again, and not just which nodes the task MAY run on (which was done experimentally) --- offloading-cacher/cache.hpp | 91 +++++++++++++++++++++---------------- 1 file changed, 52 insertions(+), 39 deletions(-) diff --git a/offloading-cacher/cache.hpp b/offloading-cacher/cache.hpp index 2980a28..939a76e 100644 --- a/offloading-cacher/cache.hpp +++ b/offloading-cacher/cache.hpp @@ -113,7 +113,7 @@ namespace dsacache { // dml handler vector pointer which is used // to wait on caching task completion - std::atomic* handler_; + std::atomic*>* handlers_; // deallocates the global cache-location // and invalidates it @@ -122,8 +122,8 @@ namespace dsacache { size_t GetSize() const { return size_; } uint8_t* GetSource() const { return src_; } int32_t GetRefCount() const { return active_->load(); } - void SetTaskHandlerAndCache(uint8_t* cache, dml_handler* handler); void SetCacheToSource() { cache_->store(src_); delete_ = false; } + void SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers); // initializes the class after which it is thread safe // but may only be destroyed safely after setting handlers @@ -445,34 +445,40 @@ inline uint8_t* dsacache::Cache::AllocOnNode(const size_t size, const int node) } inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, const int src_node) { - static thread_local int last_node_index = -1; - - // stores the last node used for the local thread so we can achieve some - // load balancing which locally might look like round robin, but considering - // that one source thread may see different results for "executing_nodes" with - // different sizes, and that multiple threads will submit, in reality we - // achieve a "wild-west-style" load balance here - uint8_t* dst = AllocOnNode(task->GetSize(), dst_node); if (dst == nullptr) { return; } - // querry copy policy function for the nodes available to use for the copy + // querry copy policy function for the nodes to use for the copy const std::vector executing_nodes = copy_policy_function_(dst_node, src_node, task->GetSize()); + const size_t task_count = executing_nodes.size(); + + // each task will copy one fair part of the total size + // and in case the total size is not a factor of the + // given task count the last node must copy the remainder - // use our load balancing method and determine node for this task + const size_t size = task->GetSize() / task_count; + const size_t last_size = size + task->GetSize() % task_count; - last_node_index = ++last_node_index % executing_nodes.size(); - const int node = executing_nodes[last_node_index]; + // save the current numa node mask to restore later + // as executing the copy task will place this thread + // on a different node - // submit the copy and attach it to the task entry + auto handlers = new std::vector(); + + for (uint32_t i = 0; i < task_count; i++) { + const size_t local_size = i + 1 == task_count ? size : last_size; + const size_t local_offset = i * size; + const uint8_t* local_src = task->GetSource() + local_offset; + uint8_t* local_dst = dst + local_offset; + + handlers->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); + } - auto* handler = new CacheData::dml_handler(); - *handler = ExecuteCopy(task->GetSource(), dst, task->GetSize(), node); - task->SetTaskHandlerAndCache(dst, handler); + task->SetTaskHandlersAndCache(dst, handlers); } inline dml::handler> dsacache::Cache::ExecuteCopy( @@ -634,7 +640,7 @@ inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { delete_ = false; active_ = new std::atomic(1); cache_ = new std::atomic(data); - handler_ = new std::atomic(nullptr); + handlers_ = new std::atomic*>(); incomplete_cache_ = new uint8_t*(nullptr); } @@ -651,7 +657,7 @@ inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) { flags_ = other.flags_; incomplete_cache_ = other.incomplete_cache_; - handler_ = other.handler_; + handlers_ = other.handlers_; } inline dsacache::CacheData::~CacheData() { @@ -676,7 +682,7 @@ inline dsacache::CacheData::~CacheData() { delete active_; delete cache_; - delete handler_; + delete handlers_; delete incomplete_cache_; } } @@ -704,7 +710,7 @@ inline void dsacache::CacheData::WaitOnCompletion() { // then check if the handlers are available - handler_->wait(nullptr); + handlers_->wait(nullptr); // exchange the global handlers pointer with nullptr to have a local // copy - this signals that this thread is the sole owner and therefore @@ -712,37 +718,44 @@ inline void dsacache::CacheData::WaitOnCompletion() { // set to maximum of 64-bit in order to prevent deadlocks from the above // waiting construct - dml_handler* local_handler = handler_->exchange(reinterpret_cast(maxptr)); + std::vector* local_handlers = handlers_->exchange(reinterpret_cast*>(maxptr)); // ensure that no other thread snatched the handlers before us // and in case one did, wait again and then return - if (local_handler == nullptr || local_handler == reinterpret_cast(maxptr)) { + if (local_handlers == nullptr || local_handlers == reinterpret_cast*>(maxptr)) { cache_->wait(nullptr); return; } // at this point we are responsible for waiting for the handlers // and handling any error that comes through them gracefully + + bool error = false; - if (CheckFlag(flags_, FLAG_WAIT_WEAK) && !local_handler->is_finished()) { - handler_->store(local_handler); - return; - } + for (auto& handler : *local_handlers) { + if (CheckFlag(flags_, FLAG_WAIT_WEAK) && !handler.is_finished()) { + handlers_->store(local_handlers); + return; + } - // perform the wait + auto result = handler.get(); - auto result = local_handler->get(); + if (result.status != dml::status_code::ok) { + // if one of the copy tasks failed we abort the whole task + // after all operations are completed on it + error = true; + } + } - // at this point handlers has been waited for + // at this point all handlers have been waited for // and therefore may be decomissioned - delete local_handler; + delete local_handlers; - // if the copy tasks failed we abort the whole task - // otherwise the cache will be set to valid now + // handle errors now by aborting the cache - if (result.status != dml::status_code::ok) { + if (error) { cache_->store(src_); numa_free(*incomplete_cache_, size_); delete_ = false; @@ -755,13 +768,13 @@ inline void dsacache::CacheData::WaitOnCompletion() { // notify all waiting threads so they wake up quickly cache_->notify_all(); - handler_->notify_all(); + handlers_->notify_all(); } -void dsacache::CacheData::SetTaskHandlerAndCache(uint8_t* cache, dml_handler* handler) { +void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers) { *incomplete_cache_ = cache; - handler_->store(handler); - handler_->notify_one(); + handlers_->store(handlers); + handlers_->notify_one(); } void dsacache::CacheData::Init() {