From 46de3151a2634dbf7eeb75556eb89ee0ff2f669e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Wed, 10 Jan 2024 12:29:31 +0100 Subject: [PATCH] add a lot of comments to the code, also handle errors in the dml handlers gracefully --- offloading-cacher/cache-data.hpp | 131 +++++++++++++++++++++++--- offloading-cacher/cache.hpp | 35 ++++++- offloading-cacher/util/dml-helper.hpp | 60 ++++++++---- 3 files changed, 192 insertions(+), 34 deletions(-) diff --git a/offloading-cacher/cache-data.hpp b/offloading-cacher/cache-data.hpp index 4028597..4de6138 100644 --- a/offloading-cacher/cache-data.hpp +++ b/offloading-cacher/cache-data.hpp @@ -8,6 +8,8 @@ #include +#include "util/dml-helper.hpp" + namespace dsacache { class Cache; @@ -23,57 +25,130 @@ namespace dsacache { using dml_handler = dml::handler>; private: + // data source and size of the block uint8_t* src_; size_t size_; + // global reference counting object std::atomic* active_; - protected: + // global cache-location pointer std::atomic* cache_; + // object-local incomplete cache location pointer + // which is only available in the first instance uint8_t* incomplete_cache_; + // dml handler vector pointer which is only + // available in the first instance std::unique_ptr> handlers_; - friend Cache; + // deallocates the global cache-location + // and invalidates it + void Deallocate(); + + // checks whether there are at least two + // valid references to this object which + // is done as the cache always has one + // internally to any living instance + bool Active() const; + friend Cache; public: CacheData(uint8_t* data, const size_t size); CacheData(const CacheData& other); ~CacheData(); - void Deallocate(); - + // waits on completion of caching operations + // for this task and is safe to be called in + // any state of the object void WaitOnCompletion(); + // returns the cache data location for this + // instance which is valid as long as the + // instance is alive uint8_t* GetDataLocation() const; - - bool Active() const; }; } inline void dsacache::CacheData::WaitOnCompletion() { + // the cache data entry can be in two states + // either it is the original one which has not + // been waited for in which case the handlers + // are non-null or it is not + if (handlers_ == nullptr) { std::cout << "[-] Waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + // when no handlers are attached to this cache entry we wait on a + // value change for the cache structure from nullptr to non-null + // which will either go through immediately if the cache is valid + // already or wait until the handler-owning thread notifies us + cache_->wait(nullptr); std::cout << "[+] Finished waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; } else { + // when the handlers are non-null there are some DSA task handlers + // available on which we must wait here + std::cout << "[-] Waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + // abort is set if any operation encountered an error + + bool abort = false; + for (auto& handler : *handlers_) { auto result = handler.get(); - // TODO: handle the returned status code + + if (result.status != dml::status_code::ok) { + std::cerr << "[x] Encountered bad status code for operation: " << dml::StatusCodeToString(result.status) << std::endl; + + // if one of the copy tasks failed we abort the whole task + // after all operations are completed on it + + abort = true; + } } + // the handlers are cleared after all have completed + handlers_ = nullptr; - std::cout << "[+] Finished waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + // now we act depending on whether an abort has been + // called for which signals operation incomplete + + if (abort) { + // store nullptr in the cache location + + cache_->store(nullptr); + + // then free the now incomplete cache + + // TODO: it would be possible to salvage the + // TODO: operation at this point but this + // TODO: is quite complicated so we just abort + + numa_free(incomplete_cache_, size_); + } + else { + std::cout << "[+] Finished waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + + // incomplete cache is now safe to use and therefore we + // swap it with the global cache state of this entry + // and notify potentially waiting threads + + cache_->store(incomplete_cache_); + } + + // as a last step all waiting threads must + // be notified (copies of this will wait on value + // change of the cache) and the incomplete cache + // is cleared to nullptr as it is not incomplete - cache_->store(incomplete_cache_); cache_->notify_all(); + incomplete_cache_ = nullptr; } } @@ -91,12 +166,24 @@ dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { dsacache::CacheData::CacheData(const dsacache::CacheData& other) { std::cout << "[-] Copy Created for CacheData 0x" << std::hex << (uint64_t)other.src_ << std::dec << std::endl; + // we copy the ptr to the global atomic reference counter + // and increase the amount of active references + active_ = other.active_; const int current_active = active_->fetch_add(1); + // source and size will be copied too + // as well as the reference to the global + // atomic cache pointer + src_ = other.src_; size_ = other.size_; cache_ = other.cache_; + + // incomplete cache and handlers will not + // be copied because only the first instance + // will wait on the completion of handlers + incomplete_cache_ = nullptr; handlers_ = nullptr; } @@ -104,6 +191,15 @@ dsacache::CacheData::CacheData(const dsacache::CacheData& other) { dsacache::CacheData::~CacheData() { std::cout << "[-] Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; + // if this is the first instance of this cache structure + // and it has not been waited on and is now being destroyed + // we must wait on completion here to ensure the cache + // remains in a valid state + + if (handlers_ != nullptr) { + WaitOnCompletion(); + } + // due to fetch_sub returning the preivously held value // we must subtract one locally to get the current value @@ -117,6 +213,7 @@ dsacache::CacheData::~CacheData() { std::cout << "[!] Full Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; Deallocate(); + delete active_; delete cache_; } @@ -125,9 +222,12 @@ dsacache::CacheData::~CacheData() { void dsacache::CacheData::Deallocate() { std::cout << "[!] Deallocating for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; - numa_free(cache_, size_); - cache_ = nullptr; - incomplete_cache_ = nullptr; + // although deallocate should only be called from + // a safe context to do so, it can not hurt to + // defensively perform the operation atomically + + uint8_t* cache_local = cache_->exchange(nullptr); + if (cache_local != nullptr) numa_free(cache_local, size_); } uint8_t* dsacache::CacheData::GetDataLocation() const { @@ -135,5 +235,10 @@ uint8_t* dsacache::CacheData::GetDataLocation() const { } bool dsacache::CacheData::Active() const { - return active_->load() > 0; + // this entry is active if more than one + // reference exists to it, as the Cache + // will always keep one internally until + // the entry is cleared from cache + + return active_->load() > 1; } \ No newline at end of file diff --git a/offloading-cacher/cache.hpp b/offloading-cacher/cache.hpp index 0081a04..f3ef90d 100644 --- a/offloading-cacher/cache.hpp +++ b/offloading-cacher/cache.hpp @@ -46,22 +46,42 @@ namespace dsacache { CachePolicy* cache_policy_function_ = nullptr; CopyPolicy* copy_policy_function_ = nullptr; + // function used to submit a copy task on a specific node to the dml + // engine on that node - will change the current threads node assignment + // to achieve this so take care to restore this dml::handler> ExecuteCopy( const uint8_t* src, uint8_t* dst, const size_t size, const int node ) const; + // allocates the required memory on the destination node + // and then submits task to the dml library for processing + // and attaches the handlers to the cache data structure void SubmitTask(CacheData* task, const int dst_node, const int src_node); + // querries the policy functions for the given data and size + // to obtain destination cache node, also returns the datas + // source node for further usage + // output may depend on the calling threads node assignment + // as this is set as the "optimal placement" node void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const; + // checks whether the cache contains an entry for + // the given data in the given memory node and + // returns it, otherwise returns nullptr std::unique_ptr GetFromCache(uint8_t* src, const size_t size, const int dst_node); public: + // initializes the cache with the two policy functions + // only after this is it safe to use in a threaded environment void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); // function to perform data access through the cache std::unique_ptr Access(uint8_t* data, const size_t size); + // flushes the cache of inactive entries + // if node is -1 then the whole cache is + // checked and otherwise the specified + // node - no checks on node validity void Flush(const int node = -1); }; } @@ -71,11 +91,19 @@ inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy copy_policy_function_ = copy_policy_function; // initialize numa library + numa_available(); + // obtain all available nodes + // and those we may allocate + // memory on + const int nodes_max = numa_num_configured_nodes(); const bitmask* valid_nodes = numa_get_mems_allowed(); + // prepare the cache state with entries + // for all given nodes + for (int node = 0; node < nodes_max; node++) { if (numa_bitmask_isbitset(valid_nodes, node)) { cache_state_.insert({node,{}}); @@ -93,6 +121,10 @@ inline std::unique_ptr dsacache::Cache::Access(uint8_t* dat GetCacheNode(data, size, &dst_node, &src_node); + // TODO: at this point it could be beneficial to check whether + // TODO: the given destination node is present as an entry + // TODO: in the cache state to see if it is valid + // check whether the data is already cached std::unique_ptr task = GetFromCache(data, size, dst_node); @@ -149,7 +181,7 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con dst = reinterpret_cast(numa_alloc_onnode(task->size_, dst_node)); if (dst == nullptr) { - std::cout << "[x] Second allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; + std::cerr << "[x] Second allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; return; } } @@ -188,6 +220,7 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con // restore the previous nodemask numa_run_on_node_mask(nodemask); + numa_free_nodemask(nodemask); } inline dml::handler> dsacache::Cache::ExecuteCopy( diff --git a/offloading-cacher/util/dml-helper.hpp b/offloading-cacher/util/dml-helper.hpp index 1686fd1..de92bb7 100644 --- a/offloading-cacher/util/dml-helper.hpp +++ b/offloading-cacher/util/dml-helper.hpp @@ -2,25 +2,45 @@ #include -inline const std::string StatusCodeToString(const dml::status_code code) { - switch(code) { - case dml::status_code::ok: return "ok"; - case dml::status_code::false_predicate: return "false predicate"; - case dml::status_code::partial_completion: return "partial completion"; - case dml::status_code::nullptr_error: return "nullptr error"; - case dml::status_code::bad_size: return "bad size"; - case dml::status_code::bad_length: return "bad length"; - case dml::status_code::inconsistent_size: return "inconsistent size"; - case dml::status_code::dualcast_bad_padding: return "dualcast bad padding"; - case dml::status_code::bad_alignment: return "bad alignment"; - case dml::status_code::buffers_overlapping: return "buffers overlapping"; - case dml::status_code::delta_delta_empty: return "delta delta empty"; - case dml::status_code::batch_overflow: return "batch overflow"; - case dml::status_code::execution_failed: return "execution failed"; - case dml::status_code::unsupported_operation: return "unsupported operation"; - case dml::status_code::queue_busy: return "queue busy"; - case dml::status_code::error: return "unknown error"; - case dml::status_code::config_error: return "config error"; - default: return "unhandled error"; +namespace dml { + inline const std::string StatusCodeToString(const dml::status_code code) { + switch (code) { + case dml::status_code::ok: + return "ok"; + case dml::status_code::false_predicate: + return "false predicate"; + case dml::status_code::partial_completion: + return "partial completion"; + case dml::status_code::nullptr_error: + return "nullptr error"; + case dml::status_code::bad_size: + return "bad size"; + case dml::status_code::bad_length: + return "bad length"; + case dml::status_code::inconsistent_size: + return "inconsistent size"; + case dml::status_code::dualcast_bad_padding: + return "dualcast bad padding"; + case dml::status_code::bad_alignment: + return "bad alignment"; + case dml::status_code::buffers_overlapping: + return "buffers overlapping"; + case dml::status_code::delta_delta_empty: + return "delta delta empty"; + case dml::status_code::batch_overflow: + return "batch overflow"; + case dml::status_code::execution_failed: + return "execution failed"; + case dml::status_code::unsupported_operation: + return "unsupported operation"; + case dml::status_code::queue_busy: + return "queue busy"; + case dml::status_code::error: + return "unknown error"; + case dml::status_code::config_error: + return "config error"; + default: + return "unhandled error"; + } } } \ No newline at end of file