From 20b7820e61482f5bc5d5c1a50c4a707fa6672081 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Wed, 24 Jan 2024 22:43:16 +0100 Subject: [PATCH] implement different load balancing technique suited for smaller task sizes maybe --- offloading-cacher/cache.hpp | 92 ++++++++++++++++--------------------- 1 file changed, 40 insertions(+), 52 deletions(-) diff --git a/offloading-cacher/cache.hpp b/offloading-cacher/cache.hpp index 82ce9c2..6a75305 100644 --- a/offloading-cacher/cache.hpp +++ b/offloading-cacher/cache.hpp @@ -97,7 +97,7 @@ namespace dsacache { // dml handler vector pointer which is used // to wait on caching task completion - std::atomic*>* handlers_; + std::atomic* handler_; // deallocates the global cache-location // and invalidates it @@ -106,7 +106,7 @@ namespace dsacache { size_t GetSize() const { return size_; } uint8_t* GetSource() const { return src_; } int32_t GetRefCount() const { return active_->load(); } - void SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers); + void SetTaskHandlerAndCache(uint8_t* cache, dml_handler* handler); // initializes the class after which it is thread safe // but may only be destroyed safely after setting handlers @@ -403,40 +403,34 @@ inline uint8_t* dsacache::Cache::AllocOnNode(const size_t size, const int node) } inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, const int src_node) { + static thread_local int last_node_index = -1; + + // stores the last node used for the local thread so we can achieve some + // load balancing which locally might look like round robin, but considering + // that one source thread may see different results for "executing_nodes" with + // different sizes, and that multiple threads will submit, in reality we + // achieve a "wild-west-style" load balance here + uint8_t* dst = AllocOnNode(task->GetSize(), dst_node); if (dst == nullptr) { return; } - // querry copy policy function for the nodes to use for the copy + // querry copy policy function for the nodes available to use for the copy const std::vector executing_nodes = copy_policy_function_(dst_node, src_node, task->GetSize()); - const size_t task_count = executing_nodes.size(); - - // each task will copy one fair part of the total size - // and in case the total size is not a factor of the - // given task count the last node must copy the remainder - - const size_t size = task->GetSize() / task_count; - const size_t last_size = size + task->GetSize() % task_count; - // save the current numa node mask to restore later - // as executing the copy task will place this thread - // on a different node + // use our load balancing method and determine node for this task - auto handlers = new std::vector(); + last_node_index = ++last_node_index % executing_nodes.size(); + const int node = executing_nodes[last_node_index]; - for (uint32_t i = 0; i < task_count; i++) { - const size_t local_size = i + 1 == task_count ? size : last_size; - const size_t local_offset = i * size; - const uint8_t* local_src = task->GetSource() + local_offset; - uint8_t* local_dst = dst + local_offset; + // submit the copy and attach it to the task entry - handlers->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); - } - - task->SetTaskHandlersAndCache(dst, handlers); + auto* handler = new CacheData::dml_handler(); + *handler = ExecuteCopy(task->GetSource(), dst, task->GetSize(), node); + task->SetTaskHandlerAndCache(dst, handler); } inline dml::handler> dsacache::Cache::ExecuteCopy( @@ -590,7 +584,7 @@ inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { delete_ = false; active_ = new std::atomic(1); cache_ = new std::atomic(data); - handlers_ = new std::atomic*>(); + handler_ = new std::atomic(nullptr); incomplete_cache_ = new uint8_t*(nullptr); } @@ -606,7 +600,7 @@ inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) { cache_ = other.cache_; incomplete_cache_ = other.incomplete_cache_; - handlers_ = other.handlers_; + handler_ = other.handler_; } inline dsacache::CacheData::~CacheData() { @@ -631,7 +625,7 @@ inline dsacache::CacheData::~CacheData() { delete active_; delete cache_; - delete handlers_; + delete handler_; delete incomplete_cache_; } } @@ -659,7 +653,7 @@ inline void dsacache::CacheData::WaitOnCompletion(const bool weak) { // then check if the handlers are available - handlers_->wait(nullptr); + handler_->wait(nullptr); // exchange the global handlers pointer with nullptr to have a local // copy - this signals that this thread is the sole owner and therefore @@ -667,12 +661,12 @@ inline void dsacache::CacheData::WaitOnCompletion(const bool weak) { // set to maximum of 64-bit in order to prevent deadlocks from the above // waiting construct - std::vector* local_handlers = handlers_->exchange(reinterpret_cast*>(maxptr)); + dml_handler* local_handler = handler_->exchange(reinterpret_cast(maxptr)); // ensure that no other thread snatched the handlers before us // and in case one did, wait again and then return - if (local_handlers == nullptr || local_handlers == reinterpret_cast*>(maxptr)) { + if (local_handler == nullptr || local_handler == reinterpret_cast(maxptr)) { cache_->wait(nullptr); return; } @@ -680,31 +674,25 @@ inline void dsacache::CacheData::WaitOnCompletion(const bool weak) { // at this point we are responsible for waiting for the handlers // and handling any error that comes through them gracefully - bool error = false; - - for (auto& handler : *local_handlers) { - if (weak && !handler.is_finished()) { - handlers_->store(local_handlers); - return; - } + if (weak && !local_handler->is_finished()) { + handler_->store(local_handler); + return; + } - auto result = handler.get(); + // perform the wait - if (result.status != dml::status_code::ok) { - // if one of the copy tasks failed we abort the whole task - // after all operations are completed on it - error = true; - } - } + auto result = local_handler->get(); - // at this point all handlers have been waited for + // at this point handlers has been waited for // and therefore may be decomissioned - delete local_handlers; + delete local_handler; + + // if the copy tasks failed we abort the whole task + // otherwise the cache will be set to valid now - // handle errors now by aborting the cache + if (result.status != dml::status_code::ok) { - if (error) { cache_->store(src_); numa_free(*incomplete_cache_, size_); delete_ = false; @@ -717,13 +705,13 @@ inline void dsacache::CacheData::WaitOnCompletion(const bool weak) { // notify all waiting threads so they wake up quickly cache_->notify_all(); - handlers_->notify_all(); + handler_->notify_all(); } -void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers) { +void dsacache::CacheData::SetTaskHandlerAndCache(uint8_t* cache, dml_handler* handler) { *incomplete_cache_ = cache; - handlers_->store(handlers); - handlers_->notify_one(); + handler_->store(handler); + handler_->notify_one(); } void dsacache::CacheData::Init() {