diff --git a/offloading-cacher/cache.hpp b/offloading-cacher/cache.hpp index 07253cc..df6fdef 100644 --- a/offloading-cacher/cache.hpp +++ b/offloading-cacher/cache.hpp @@ -6,13 +6,37 @@ #include #include #include -#include #include #include #include -#include +#include + +namespace dml { + inline const std::string StatusCodeToString(const dml::status_code code) { + switch (code) { + case dml::status_code::ok: return "ok"; + case dml::status_code::false_predicate: return "false predicate"; + case dml::status_code::partial_completion: return "partial completion"; + case dml::status_code::nullptr_error: return "nullptr error"; + case dml::status_code::bad_size: return "bad size"; + case dml::status_code::bad_length: return "bad length"; + case dml::status_code::inconsistent_size: return "inconsistent size"; + case dml::status_code::dualcast_bad_padding: return "dualcast bad padding"; + case dml::status_code::bad_alignment: return "bad alignment"; + case dml::status_code::buffers_overlapping: return "buffers overlapping"; + case dml::status_code::delta_delta_empty: return "delta delta empty"; + case dml::status_code::batch_overflow: return "batch overflow"; + case dml::status_code::execution_failed: return "execution failed"; + case dml::status_code::unsupported_operation: return "unsupported operation"; + case dml::status_code::queue_busy: return "queue busy"; + case dml::status_code::error: return "unknown error"; + case dml::status_code::config_error: return "config error"; + default: return "unhandled error"; + } + } +} namespace dsacache { class Cache; @@ -46,6 +70,9 @@ namespace dsacache { */ class CacheData { + public: + using dml_handler = dml::handler>; + private: static constexpr uint64_t maxptr = 0xffff'ffff'ffff'ffff; @@ -68,7 +95,7 @@ namespace dsacache { // dml handler vector pointer which is used // to wait on caching task completion - std::atomic*>* handlers_; + std::atomic*>* handlers_; // deallocates the global cache-location // and invalidates it @@ -77,7 +104,7 @@ namespace dsacache { size_t GetSize() const { return size_; } uint8_t* GetSource() const { return src_; } int32_t GetRefCount() const { return active_->load(); } - void SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers); + void SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers); // initializes the class after which it is thread safe // but may only be destroyed safely after setting handlers @@ -194,7 +221,7 @@ namespace dsacache { // function used to submit a copy task on a specific node to the dml // engine on that node - will change the current threads node assignment // to achieve this so take care to restore this - dml_job_t* ExecuteCopy( + dml::handler> ExecuteCopy( const uint8_t* src, uint8_t* dst, const size_t size, const int node ) const; @@ -400,7 +427,11 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con const size_t size = task->GetSize() / task_count; const size_t last_size = size + task->GetSize() % task_count; - auto handlers = new std::vector(); + // save the current numa node mask to restore later + // as executing the copy task will place this thread + // on a different node + + auto handlers = new std::vector(); for (uint32_t i = 0; i < task_count; i++) { const size_t local_size = i + 1 == task_count ? size : last_size; @@ -414,42 +445,16 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con task->SetTaskHandlersAndCache(dst, handlers); } -inline dml_job_t* dsacache::Cache::ExecuteCopy( +inline dml::handler> dsacache::Cache::ExecuteCopy( const uint8_t* src, uint8_t* dst, const size_t size, const int node ) const { - uint32_t job_size = 0; - dml_status_t status = dml_get_job_size(DML_PATH_HW, &job_size); - - if (status != DML_STATUS_OK) { - std::cerr << "[x] Error querying job size!" << std::endl; - return nullptr; - } - - dml_job_t* job = (dml_job_t*)malloc(job_size); - status = dml_init_job(DML_PATH_HW, job); - - if (status != DML_STATUS_OK) { - std::cerr << "[x] Error initializing job!" << std::endl; - delete job; - return nullptr; - } - - job->operation = DML_OP_MEM_MOVE; - job->source_first_ptr = (uint8_t*)src; - job->destination_first_ptr = dst; - job->source_length = size; - job->flags |= DML_FLAG_BLOCK_ON_FAULT | DML_FLAG_COPY_ONLY; - job->numa_id = node; + dml::const_data_view srcv = dml::make_view(src, size); + dml::data_view dstv = dml::make_view(dst, size); - status = dml_submit_job(job); - - if (status != DML_STATUS_OK) { - std::cerr << "[x] Error submitting job!" << std::endl; - delete job; - return nullptr; - } - - return job; + return dml::submit( + dml::mem_copy.block_on_fault(), srcv, dstv, + dml::execution_interface>(), node + ); } inline void dsacache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const { @@ -591,7 +596,7 @@ inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { delete_ = false; active_ = new std::atomic(1); cache_ = new std::atomic(data); - handlers_ = new std::atomic*>(); + handlers_ = new std::atomic*>(); incomplete_cache_ = new uint8_t*(nullptr); } @@ -630,14 +635,6 @@ inline dsacache::CacheData::~CacheData() { Deallocate(); - std::vector* handlers = handlers_->load(); - - if (handlers != nullptr && handlers != reinterpret_cast*>(maxptr)) { - for (dml_job_t* job : *handlers_->load()) { - if (job != nullptr) delete job; - } - } - delete active_; delete cache_; delete handlers_; @@ -676,12 +673,12 @@ inline void dsacache::CacheData::WaitOnCompletion() { // set to maximum of 64-bit in order to prevent deadlocks from the above // waiting construct - std::vector* local_handlers = handlers_->exchange(reinterpret_cast*>(maxptr)); + std::vector* local_handlers = handlers_->exchange(reinterpret_cast*>(maxptr)); // ensure that no other thread snatched the handlers before us // and in case one did, wait again and then return - if (local_handlers == nullptr || local_handlers == reinterpret_cast*>(maxptr)) { + if (local_handlers == nullptr || local_handlers == reinterpret_cast*>(maxptr)) { cache_->wait(nullptr); return; } @@ -691,27 +688,16 @@ inline void dsacache::CacheData::WaitOnCompletion() { bool error = false; - for (dml_job_t* job : *local_handlers) { - if (job == nullptr) { - std::cerr << "[x] Got nullptr-job!" << std::endl; + for (auto& handler : *local_handlers) { + auto result = handler.get(); + + if (result.status != dml::status_code::ok) { + std::cerr << "[x] Encountered bad status code for operation: " << dml::StatusCodeToString(result.status) << std::endl; // if one of the copy tasks failed we abort the whole task // after all operations are completed on it error = true; } - else { - const dml_status_t status = dml_wait_job(job, DML_WAIT_MODE_UMWAIT); - - if (status != DML_STATUS_OK) { - std::cerr << "[x] Operation Failed!" << std::endl; - - // if one of the copy tasks failed we abort the whole task - // after all operations are completed on it - error = true; - } - } - - delete job; } // at this point all handlers have been waited for @@ -737,7 +723,7 @@ inline void dsacache::CacheData::WaitOnCompletion() { handlers_->notify_all(); } -void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers) { +void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector* handlers) { *incomplete_cache_ = cache; handlers_->store(handlers); handlers_->notify_one();