diff --git a/offloading-cacher/cache.hpp b/offloading-cacher/cache.hpp index 9a198e2..a8186e4 100644 --- a/offloading-cacher/cache.hpp +++ b/offloading-cacher/cache.hpp @@ -39,8 +39,6 @@ namespace dml { } namespace dsacache { - class Cache; - /* * Class Description: * Holds all required information on one cache entry and is used @@ -85,25 +83,17 @@ namespace dsacache { std::atomic* cache_; // object-local incomplete cache location pointer - // which is only available in the first instance + // contract: only access when being in sole posession of handlers uint8_t* incomplete_cache_; - // dml handler vector pointer which is only - // available in the first instance - std::unique_ptr> handlers_; + // dml handler vector pointer which is used + // to wait on caching task completion + std::atomic*>* handlers_; // deallocates the global cache-location // and invalidates it void Deallocate(); - // checks whether there are at least two - // valid references to this object which - // is done as the cache always has one - // internally to any living instance - bool Active() const; - - friend Cache; - public: CacheData(uint8_t* data, const size_t size); CacheData(const CacheData& other); @@ -118,7 +108,13 @@ namespace dsacache { // instance which is valid as long as the // instance is alive - !!! this may also // yield a nullptr !!! - uint8_t* GetDataLocation() const; + + void SetTaskHanldersAndCache(uint8_t* cache, std::vector* handlers); + + uint8_t* GetDataLocation() const { return cache_->load(); } + size_t GetSize() const { return size_; } + uint8_t* GetSource() const { return src_; } + int32_t GetRefCount() const { return active_->load(); } }; /* @@ -333,7 +329,7 @@ inline std::unique_ptr dsacache::Cache::Access(uint8_t* dat std::unique_lock lock(local_cache_state->cache_mutex_); - const auto state = local_cache_state->node_cache_state_.emplace(task->src_, *task); + const auto state = local_cache_state->node_cache_state_.emplace(task->GetSource(), *task); // if state.second is false then no insertion took place // which means that concurrently whith this thread @@ -342,7 +338,7 @@ inline std::unique_ptr dsacache::Cache::Access(uint8_t* dat // threads data cache structure if (!state.second) { - std::cout << "[!] Found another cache instance for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; + std::cout << "[!] Found another cache instance for 0x" << std::hex << (uint64_t)task->GetSource() << std::dec << std::endl; return std::move(std::make_unique(state.first->second)); } } @@ -394,26 +390,24 @@ inline uint8_t* dsacache::Cache::AllocOnNode(const size_t size, const int node) } inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, const int src_node) { - uint8_t* dst = AllocOnNode(task->size_, dst_node); + uint8_t* dst = AllocOnNode(task->GetSize(), dst_node); if (dst == nullptr) { std::cout << "[x] Allocation failed so we can not cache" << std::endl; return; } - task->incomplete_cache_ = dst; - // querry copy policy function for the nodes to use for the copy - const std::vector executing_nodes = copy_policy_function_(dst_node, src_node, task->size_); + const std::vector executing_nodes = copy_policy_function_(dst_node, src_node, task->GetSize()); const size_t task_count = executing_nodes.size(); // each task will copy one fair part of the total size // and in case the total size is not a factor of the // given task count the last node must copy the remainder - const size_t size = task->size_ / task_count; - const size_t last_size = size + task->size_ % task_count; + const size_t size = task->GetSize() / task_count; + const size_t last_size = size + task->GetSize() % task_count; // save the current numa node mask to restore later // as executing the copy task will place this thread @@ -421,15 +415,19 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con bitmask* nodemask = numa_get_run_node_mask(); + auto handlers = new std::vector(); + for (uint32_t i = 0; i < task_count; i++) { const size_t local_size = i + 1 == task_count ? size : last_size; const size_t local_offset = i * size; - const uint8_t* local_src = task->src_ + local_offset; + const uint8_t* local_src = task->GetSource() + local_offset; uint8_t* local_dst = dst + local_offset; - task->handlers_->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); + handlers->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); } + task->SetTaskHanldersAndCache(dst, handlers); + // restore the previous nodemask numa_run_on_node_mask(nodemask); @@ -478,7 +476,7 @@ inline void dsacache::Cache::Flush(const int node) { // if the iterator points to an inactive element // then we may erase it - if (it->second.Active() == false) { + if (it->second.GetRefCount() <= 1) { // erase the iterator from the map map.erase(it); @@ -534,7 +532,7 @@ inline std::unique_ptr dsacache::Cache::GetFromCache(uint8_ // now check whether the sizes match - if (search->second.size_ >= size) { + if (search->second.GetSize() >= size) { // return a unique copy of the entry which uses the object // lifetime and destructor to safely handle deallocation @@ -575,7 +573,6 @@ void dsacache::Cache::Invalidate(uint8_t* data) { inline dsacache::Cache::~Cache() { for (auto node : cache_state_) { - std::unique_lock lock(node.second->cache_mutex_); node.second->~LockedNodeCacheState(); numa_free(reinterpret_cast(node.second), sizeof(LockedNodeCacheState)); } @@ -586,8 +583,8 @@ inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { size_ = size; active_ = new std::atomic(1); cache_ = new std::atomic(); + handlers_ = new std::atomic*>(); incomplete_cache_ = nullptr; - handlers_ = std::make_unique>(); } inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) { @@ -597,32 +594,15 @@ inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) { active_ = other.active_; const int current_active = active_->fetch_add(1); - // source and size will be copied too - // as well as the reference to the global - // atomic cache pointer - src_ = other.src_; size_ = other.size_; cache_ = other.cache_; - // incomplete cache and handlers will not - // be copied because only the first instance - // will wait on the completion of handlers - - incomplete_cache_ = nullptr; - handlers_ = nullptr; + incomplete_cache_ = other.incomplete_cache_; + handlers_ = other.handlers_; } inline dsacache::CacheData::~CacheData() { - // if this is the first instance of this cache structure - // and it has not been waited on and is now being destroyed - // we must wait on completion here to ensure the cache - // remains in a valid state - - if (handlers_ != nullptr) { - WaitOnCompletion(); - } - // due to fetch_sub returning the preivously held value // we must subtract one locally to get the current value @@ -633,10 +613,18 @@ inline dsacache::CacheData::~CacheData() { // as this was the last reference if (v <= 0) { + // on deletion we must ensure that all offloaded + // operations have completed successfully + + WaitOnCompletion(); + + // only then can we deallocate the memory + Deallocate(); delete active_; delete cache_; + delete handlers_; } } @@ -644,98 +632,81 @@ inline void dsacache::CacheData::Deallocate() { // although deallocate should only be called from // a safe context to do so, it can not hurt to // defensively perform the operation atomically + // and check for incomplete cache if no deallocation + // takes place for the retrieved local cache uint8_t* cache_local = cache_->exchange(nullptr); if (cache_local != nullptr) numa_free(cache_local, size_); - - // if the cache was never waited for then incomplete_cache_ - // may still contain a valid pointer which has to be freed - - if (incomplete_cache_ != nullptr) numa_free(incomplete_cache_, size_); -} - -inline uint8_t* dsacache::CacheData::GetDataLocation() const { - return cache_->load(); -} - -inline bool dsacache::CacheData::Active() const { - // this entry is active if more than one - // reference exists to it, as the Cache - // will always keep one internally until - // the entry is cleared from cache - - return active_->load() > 1; + else if (incomplete_cache_ != nullptr) numa_free(incomplete_cache_, size_); + else; } inline void dsacache::CacheData::WaitOnCompletion() { - // the cache data entry can be in two states - // either it is the original one which has not - // been waited for in which case the handlers - // are non-null or it is not - - if (handlers_ == nullptr) { - // when no handlers are attached to this cache entry we wait on a - // value change for the cache structure from nullptr to non-null - // which will either go through immediately if the cache is valid - // already or wait until the handler-owning thread notifies us - - cache_->wait(nullptr); + // first check if waiting is even neccessary as a valid + // cache pointer signals that no waiting is to be performed + + if (cache_->load() != nullptr) { + return; } - else { - // when the handlers are non-null there are some DSA task handlers - // available on which we must wait here - // abort is set if any operation encountered an error + // then check if the handlers are available - bool abort = false; + handlers_->wait(nullptr); - for (auto& handler : *handlers_) { - auto result = handler.get(); + // exchange the global handlers pointer with nullptr to have a local + // copy - this signals that this thread is the sole owner and therefore + // responsible for waiting for them - if (result.status != dml::status_code::ok) { - std::cerr << "[x] Encountered bad status code for operation: " << dml::StatusCodeToString(result.status) << std::endl; + std::vector* local_handlers = handlers_->exchange(nullptr); - // if one of the copy tasks failed we abort the whole task - // after all operations are completed on it + // ensure that no other thread snatched the handlers before us + // and in case one did, wait again and then return - abort = true; - } - } + if (local_handlers == nullptr) { + WaitOnCompletion(); + return; + } - // the handlers are cleared after all have completed + // at this point we are responsible for waiting for the handlers + // and handling any error that comes through them gracefully - handlers_ = nullptr; + bool error = false; - // now we act depending on whether an abort has been - // called for which signals operation incomplete + for (auto& handler : *local_handlers) { + auto result = handler.get(); - if (abort) { - // store nullptr in the cache location + if (result.status != dml::status_code::ok) { + std::cerr << "[x] Encountered bad status code for operation: " << dml::StatusCodeToString(result.status) << std::endl; - cache_->store(nullptr); + // if one of the copy tasks failed we abort the whole task + // after all operations are completed on it + error = true; + } + } - // then free the now incomplete cache + // at this point all handlers have been waited for + // and therefore may be decomissioned - // TODO: it would be possible to salvage the - // TODO: operation at this point but this - // TODO: is quite complicated so we just abort + delete local_handlers; - numa_free(incomplete_cache_, size_); - } - else { - // incomplete cache is now safe to use and therefore we - // swap it with the global cache state of this entry - // and notify potentially waiting threads + // handle errors now by aborting the cache - cache_->store(incomplete_cache_); - } + if (error) { + cache_->store(nullptr); + numa_free(incomplete_cache_, size_); + } + else { + cache_->store(incomplete_cache_); + } - // as a last step all waiting threads must - // be notified (copies of this will wait on value - // change of the cache) and the incomplete cache - // is cleared to nullptr as it is not incomplete + // notify all waiting threads so they wake up quickly - cache_->notify_all(); - incomplete_cache_ = nullptr; - } + cache_->notify_all(); + handlers_->notify_all(); +} + +void dsacache::CacheData::SetTaskHanldersAndCache(uint8_t* cache, std::vector* handlers) { + incomplete_cache_ = cache; + handlers_->store(handlers); + handlers_->notify_one(); }