Browse Source

implement different load balancing technique suited for smaller task sizes maybe

master
Constantin Fürst 11 months ago
parent
commit
20b7820e61
  1. 92
      offloading-cacher/cache.hpp

92
offloading-cacher/cache.hpp

@ -97,7 +97,7 @@ namespace dsacache {
// dml handler vector pointer which is used
// to wait on caching task completion
std::atomic<std::vector<dml_handler>*>* handlers_;
std::atomic<dml_handler*>* handler_;
// deallocates the global cache-location
// and invalidates it
@ -106,7 +106,7 @@ namespace dsacache {
size_t GetSize() const { return size_; }
uint8_t* GetSource() const { return src_; }
int32_t GetRefCount() const { return active_->load(); }
void SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_handler>* handlers);
void SetTaskHandlerAndCache(uint8_t* cache, dml_handler* handler);
// initializes the class after which it is thread safe
// but may only be destroyed safely after setting handlers
@ -403,40 +403,34 @@ inline uint8_t* dsacache::Cache::AllocOnNode(const size_t size, const int node)
}
inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, const int src_node) {
static thread_local int last_node_index = -1;
// stores the last node used for the local thread so we can achieve some
// load balancing which locally might look like round robin, but considering
// that one source thread may see different results for "executing_nodes" with
// different sizes, and that multiple threads will submit, in reality we
// achieve a "wild-west-style" load balance here
uint8_t* dst = AllocOnNode(task->GetSize(), dst_node);
if (dst == nullptr) {
return;
}
// querry copy policy function for the nodes to use for the copy
// querry copy policy function for the nodes available to use for the copy
const std::vector<int> executing_nodes = copy_policy_function_(dst_node, src_node, task->GetSize());
const size_t task_count = executing_nodes.size();
// each task will copy one fair part of the total size
// and in case the total size is not a factor of the
// given task count the last node must copy the remainder
const size_t size = task->GetSize() / task_count;
const size_t last_size = size + task->GetSize() % task_count;
// save the current numa node mask to restore later
// as executing the copy task will place this thread
// on a different node
// use our load balancing method and determine node for this task
auto handlers = new std::vector<CacheData::dml_handler>();
last_node_index = ++last_node_index % executing_nodes.size();
const int node = executing_nodes[last_node_index];
for (uint32_t i = 0; i < task_count; i++) {
const size_t local_size = i + 1 == task_count ? size : last_size;
const size_t local_offset = i * size;
const uint8_t* local_src = task->GetSource() + local_offset;
uint8_t* local_dst = dst + local_offset;
// submit the copy and attach it to the task entry
handlers->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i]));
}
task->SetTaskHandlersAndCache(dst, handlers);
auto* handler = new CacheData::dml_handler();
*handler = ExecuteCopy(task->GetSource(), dst, task->GetSize(), node);
task->SetTaskHandlerAndCache(dst, handler);
}
inline dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> dsacache::Cache::ExecuteCopy(
@ -590,7 +584,7 @@ inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) {
delete_ = false;
active_ = new std::atomic<int32_t>(1);
cache_ = new std::atomic<uint8_t*>(data);
handlers_ = new std::atomic<std::vector<dml_handler>*>();
handler_ = new std::atomic<dml_handler*>(nullptr);
incomplete_cache_ = new uint8_t*(nullptr);
}
@ -606,7 +600,7 @@ inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) {
cache_ = other.cache_;
incomplete_cache_ = other.incomplete_cache_;
handlers_ = other.handlers_;
handler_ = other.handler_;
}
inline dsacache::CacheData::~CacheData() {
@ -631,7 +625,7 @@ inline dsacache::CacheData::~CacheData() {
delete active_;
delete cache_;
delete handlers_;
delete handler_;
delete incomplete_cache_;
}
}
@ -659,7 +653,7 @@ inline void dsacache::CacheData::WaitOnCompletion(const bool weak) {
// then check if the handlers are available
handlers_->wait(nullptr);
handler_->wait(nullptr);
// exchange the global handlers pointer with nullptr to have a local
// copy - this signals that this thread is the sole owner and therefore
@ -667,12 +661,12 @@ inline void dsacache::CacheData::WaitOnCompletion(const bool weak) {
// set to maximum of 64-bit in order to prevent deadlocks from the above
// waiting construct
std::vector<dml_handler>* local_handlers = handlers_->exchange(reinterpret_cast<std::vector<dml_handler>*>(maxptr));
dml_handler* local_handler = handler_->exchange(reinterpret_cast<dml_handler*>(maxptr));
// ensure that no other thread snatched the handlers before us
// and in case one did, wait again and then return
if (local_handlers == nullptr || local_handlers == reinterpret_cast<std::vector<dml_handler>*>(maxptr)) {
if (local_handler == nullptr || local_handler == reinterpret_cast<dml_handler*>(maxptr)) {
cache_->wait(nullptr);
return;
}
@ -680,31 +674,25 @@ inline void dsacache::CacheData::WaitOnCompletion(const bool weak) {
// at this point we are responsible for waiting for the handlers
// and handling any error that comes through them gracefully
bool error = false;
for (auto& handler : *local_handlers) {
if (weak && !handler.is_finished()) {
handlers_->store(local_handlers);
return;
}
if (weak && !local_handler->is_finished()) {
handler_->store(local_handler);
return;
}
auto result = handler.get();
// perform the wait
if (result.status != dml::status_code::ok) {
// if one of the copy tasks failed we abort the whole task
// after all operations are completed on it
error = true;
}
}
auto result = local_handler->get();
// at this point all handlers have been waited for
// at this point handlers has been waited for
// and therefore may be decomissioned
delete local_handlers;
delete local_handler;
// if the copy tasks failed we abort the whole task
// otherwise the cache will be set to valid now
// handle errors now by aborting the cache
if (result.status != dml::status_code::ok) {
if (error) {
cache_->store(src_);
numa_free(*incomplete_cache_, size_);
delete_ = false;
@ -717,13 +705,13 @@ inline void dsacache::CacheData::WaitOnCompletion(const bool weak) {
// notify all waiting threads so they wake up quickly
cache_->notify_all();
handlers_->notify_all();
handler_->notify_all();
}
void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_handler>* handlers) {
void dsacache::CacheData::SetTaskHandlerAndCache(uint8_t* cache, dml_handler* handler) {
*incomplete_cache_ = cache;
handlers_->store(handlers);
handlers_->notify_one();
handler_->store(handler);
handler_->notify_one();
}
void dsacache::CacheData::Init() {

Loading…
Cancel
Save