diff --git a/offloading-cacher/cache.hpp b/offloading-cacher/cache.hpp index 2980a28..939a76e 100644 --- a/offloading-cacher/cache.hpp +++ b/offloading-cacher/cache.hpp @@ -113,7 +113,7 @@ namespace dsacache { // dml handler vector pointer which is used // to wait on caching task completion - std::atomic* handler_; + std::atomic*>* handlers_; // deallocates the global cache-location // and invalidates it @@ -122,8 +122,8 @@ namespace dsacache { size_t GetSize() const { return size_; } uint8_t* GetSource() const { return src_; } int32_t GetRefCount() const { return active_->load(); } - void SetTaskHandlerAndCache(uint8_t* cache, dml_handler* handler); void SetCacheToSource() { cache_->store(src_); delete_ = false; } + void SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers); // initializes the class after which it is thread safe // but may only be destroyed safely after setting handlers @@ -445,34 +445,40 @@ inline uint8_t* dsacache::Cache::AllocOnNode(const size_t size, const int node) } inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, const int src_node) { - static thread_local int last_node_index = -1; - - // stores the last node used for the local thread so we can achieve some - // load balancing which locally might look like round robin, but considering - // that one source thread may see different results for "executing_nodes" with - // different sizes, and that multiple threads will submit, in reality we - // achieve a "wild-west-style" load balance here - uint8_t* dst = AllocOnNode(task->GetSize(), dst_node); if (dst == nullptr) { return; } - // querry copy policy function for the nodes available to use for the copy + // querry copy policy function for the nodes to use for the copy const std::vector executing_nodes = copy_policy_function_(dst_node, src_node, task->GetSize()); + const size_t task_count = executing_nodes.size(); + + // each task will copy one fair part of the total size + // and in case the total size is not a factor of the + // given task count the last node must copy the remainder - // use our load balancing method and determine node for this task + const size_t size = task->GetSize() / task_count; + const size_t last_size = size + task->GetSize() % task_count; - last_node_index = ++last_node_index % executing_nodes.size(); - const int node = executing_nodes[last_node_index]; + // save the current numa node mask to restore later + // as executing the copy task will place this thread + // on a different node - // submit the copy and attach it to the task entry + auto handlers = new std::vector(); + + for (uint32_t i = 0; i < task_count; i++) { + const size_t local_size = i + 1 == task_count ? size : last_size; + const size_t local_offset = i * size; + const uint8_t* local_src = task->GetSource() + local_offset; + uint8_t* local_dst = dst + local_offset; + + handlers->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); + } - auto* handler = new CacheData::dml_handler(); - *handler = ExecuteCopy(task->GetSource(), dst, task->GetSize(), node); - task->SetTaskHandlerAndCache(dst, handler); + task->SetTaskHandlersAndCache(dst, handlers); } inline dml::handler> dsacache::Cache::ExecuteCopy( @@ -634,7 +640,7 @@ inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { delete_ = false; active_ = new std::atomic(1); cache_ = new std::atomic(data); - handler_ = new std::atomic(nullptr); + handlers_ = new std::atomic*>(); incomplete_cache_ = new uint8_t*(nullptr); } @@ -651,7 +657,7 @@ inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) { flags_ = other.flags_; incomplete_cache_ = other.incomplete_cache_; - handler_ = other.handler_; + handlers_ = other.handlers_; } inline dsacache::CacheData::~CacheData() { @@ -676,7 +682,7 @@ inline dsacache::CacheData::~CacheData() { delete active_; delete cache_; - delete handler_; + delete handlers_; delete incomplete_cache_; } } @@ -704,7 +710,7 @@ inline void dsacache::CacheData::WaitOnCompletion() { // then check if the handlers are available - handler_->wait(nullptr); + handlers_->wait(nullptr); // exchange the global handlers pointer with nullptr to have a local // copy - this signals that this thread is the sole owner and therefore @@ -712,37 +718,44 @@ inline void dsacache::CacheData::WaitOnCompletion() { // set to maximum of 64-bit in order to prevent deadlocks from the above // waiting construct - dml_handler* local_handler = handler_->exchange(reinterpret_cast(maxptr)); + std::vector* local_handlers = handlers_->exchange(reinterpret_cast*>(maxptr)); // ensure that no other thread snatched the handlers before us // and in case one did, wait again and then return - if (local_handler == nullptr || local_handler == reinterpret_cast(maxptr)) { + if (local_handlers == nullptr || local_handlers == reinterpret_cast*>(maxptr)) { cache_->wait(nullptr); return; } // at this point we are responsible for waiting for the handlers // and handling any error that comes through them gracefully + + bool error = false; - if (CheckFlag(flags_, FLAG_WAIT_WEAK) && !local_handler->is_finished()) { - handler_->store(local_handler); - return; - } + for (auto& handler : *local_handlers) { + if (CheckFlag(flags_, FLAG_WAIT_WEAK) && !handler.is_finished()) { + handlers_->store(local_handlers); + return; + } - // perform the wait + auto result = handler.get(); - auto result = local_handler->get(); + if (result.status != dml::status_code::ok) { + // if one of the copy tasks failed we abort the whole task + // after all operations are completed on it + error = true; + } + } - // at this point handlers has been waited for + // at this point all handlers have been waited for // and therefore may be decomissioned - delete local_handler; + delete local_handlers; - // if the copy tasks failed we abort the whole task - // otherwise the cache will be set to valid now + // handle errors now by aborting the cache - if (result.status != dml::status_code::ok) { + if (error) { cache_->store(src_); numa_free(*incomplete_cache_, size_); delete_ = false; @@ -755,13 +768,13 @@ inline void dsacache::CacheData::WaitOnCompletion() { // notify all waiting threads so they wake up quickly cache_->notify_all(); - handler_->notify_all(); + handlers_->notify_all(); } -void dsacache::CacheData::SetTaskHandlerAndCache(uint8_t* cache, dml_handler* handler) { +void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers) { *incomplete_cache_ = cache; - handler_->store(handler); - handler_->notify_one(); + handlers_->store(handlers); + handlers_->notify_one(); } void dsacache::CacheData::Init() {