From aaaaa16e94b8649b1fe888519a97a420355df813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Wed, 24 Jan 2024 04:26:30 +0100 Subject: [PATCH] switch dsa cache to job api to use efficient wait --- offloading-cacher/cache.hpp | 109 ++++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 47 deletions(-) diff --git a/offloading-cacher/cache.hpp b/offloading-cacher/cache.hpp index 7db8739..d392167 100644 --- a/offloading-cacher/cache.hpp +++ b/offloading-cacher/cache.hpp @@ -6,37 +6,13 @@ #include #include #include +#include #include #include #include -#include - -namespace dml { - inline const std::string StatusCodeToString(const dml::status_code code) { - switch (code) { - case dml::status_code::ok: return "ok"; - case dml::status_code::false_predicate: return "false predicate"; - case dml::status_code::partial_completion: return "partial completion"; - case dml::status_code::nullptr_error: return "nullptr error"; - case dml::status_code::bad_size: return "bad size"; - case dml::status_code::bad_length: return "bad length"; - case dml::status_code::inconsistent_size: return "inconsistent size"; - case dml::status_code::dualcast_bad_padding: return "dualcast bad padding"; - case dml::status_code::bad_alignment: return "bad alignment"; - case dml::status_code::buffers_overlapping: return "buffers overlapping"; - case dml::status_code::delta_delta_empty: return "delta delta empty"; - case dml::status_code::batch_overflow: return "batch overflow"; - case dml::status_code::execution_failed: return "execution failed"; - case dml::status_code::unsupported_operation: return "unsupported operation"; - case dml::status_code::queue_busy: return "queue busy"; - case dml::status_code::error: return "unknown error"; - case dml::status_code::config_error: return "config error"; - default: return "unhandled error"; - } - } -} +#include namespace dsacache { class Cache; @@ -70,9 +46,6 @@ namespace dsacache { */ class CacheData { - public: - using dml_handler = dml::handler>; - private: static constexpr uint64_t maxptr = 0xffff'ffff'ffff'ffff; @@ -95,7 +68,7 @@ namespace dsacache { // dml handler vector pointer which is used // to wait on caching task completion - std::atomic*>* handlers_; + std::atomic*>* handlers_; // deallocates the global cache-location // and invalidates it @@ -104,7 +77,7 @@ namespace dsacache { size_t GetSize() const { return size_; } uint8_t* GetSource() const { return src_; } int32_t GetRefCount() const { return active_->load(); } - void SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers); + void SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers); // initializes the class after which it is thread safe // but may only be destroyed safely after setting handlers @@ -221,7 +194,7 @@ namespace dsacache { // function used to submit a copy task on a specific node to the dml // engine on that node - will change the current threads node assignment // to achieve this so take care to restore this - dml::handler> ExecuteCopy( + dml_job_t* ExecuteCopy( const uint8_t* src, uint8_t* dst, const size_t size, const int node ) const; @@ -433,7 +406,7 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con bitmask* nodemask = numa_get_run_node_mask(); - auto handlers = new std::vector(); + auto handlers = new std::vector(); for (uint32_t i = 0; i < task_count; i++) { const size_t local_size = i + 1 == task_count ? size : last_size; @@ -452,15 +425,43 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con numa_free_nodemask(nodemask); } -inline dml::handler> dsacache::Cache::ExecuteCopy( +inline dml_job_t* dsacache::Cache::ExecuteCopy( const uint8_t* src, uint8_t* dst, const size_t size, const int node ) const { numa_run_on_node(node); - dml::const_data_view srcv = dml::make_view(src, size); - dml::data_view dstv = dml::make_view(dst, size); + uint32_t job_size = 0; + dml_status_t status = dml_get_job_size(DML_PATH_HW, &job_size); + + if (status != DML_STATUS_OK) { + std::cerr << "[x] Error querying job size!" << std::endl; + return nullptr; + } + + dml_job_t* job = (dml_job_t*)malloc(job_size); + status = dml_init_job(DML_PATH_HW, job); + + if (status != DML_STATUS_OK) { + std::cerr << "[x] Error initializing job!" << std::endl; + delete job; + return nullptr; + } + + job->operation = DML_OP_MEM_MOVE; + job->source_first_ptr = (uint8_t*)src; + job->destination_first_ptr = dst; + job->source_length = size; + job->flags |= DML_FLAG_BLOCK_ON_FAULT | DML_FLAG_COPY_ONLY; - return dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv); + status = dml_submit_job(job); + + if (status != DML_STATUS_OK) { + std::cerr << "[x] Error submitting job!" << std::endl; + delete job; + return nullptr; + } + + return job; } inline void dsacache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const { @@ -602,7 +603,7 @@ inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { delete_ = false; active_ = new std::atomic(1); cache_ = new std::atomic(data); - handlers_ = new std::atomic*>(); + handlers_ = new std::atomic*>(); incomplete_cache_ = new uint8_t*(nullptr); } @@ -641,6 +642,10 @@ inline dsacache::CacheData::~CacheData() { Deallocate(); + for (dml_job_t* job : *handlers_->load()) { + if (job != nullptr) delete job; + } + delete active_; delete cache_; delete handlers_; @@ -679,12 +684,12 @@ inline void dsacache::CacheData::WaitOnCompletion() { // set to maximum of 64-bit in order to prevent deadlocks from the above // waiting construct - std::vector* local_handlers = handlers_->exchange(reinterpret_cast*>(maxptr)); + std::vector* local_handlers = handlers_->exchange(reinterpret_cast*>(maxptr)); // ensure that no other thread snatched the handlers before us // and in case one did, wait again and then return - if (local_handlers == nullptr || local_handlers == reinterpret_cast*>(maxptr)) { + if (local_handlers == nullptr || local_handlers == reinterpret_cast*>(maxptr)) { cache_->wait(nullptr); return; } @@ -694,17 +699,27 @@ inline void dsacache::CacheData::WaitOnCompletion() { bool error = false; - for (auto& handler : *local_handlers) { - while (!handler.is_finished()) sched_yield(); - auto result = handler.get(); - - if (result.status != dml::status_code::ok) { - std::cerr << "[x] Encountered bad status code for operation: " << dml::StatusCodeToString(result.status) << std::endl; + for (dml_job_t* job : *local_handlers) { + if (job == nullptr) { + std::cerr << "[x] Got nullptr-job!" << std::endl; // if one of the copy tasks failed we abort the whole task // after all operations are completed on it error = true; } + else { + const dml_status_t status = dml_wait_job(job, DML_WAIT_MODE_UMWAIT); + + if (status != DML_STATUS_OK) { + std::cerr << "[x] Operation Failed!" << std::endl; + + // if one of the copy tasks failed we abort the whole task + // after all operations are completed on it + error = true; + } + } + + delete job; } // at this point all handlers have been waited for @@ -730,7 +745,7 @@ inline void dsacache::CacheData::WaitOnCompletion() { handlers_->notify_all(); } -void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers) { +void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers) { *incomplete_cache_ = cache; handlers_->store(handlers); handlers_->notify_one();