Browse Source

switch dsa cache to job api to use efficient wait

master
Constantin Fürst 11 months ago
parent
commit
aaaaa16e94
  1. 109
      offloading-cacher/cache.hpp

109
offloading-cacher/cache.hpp

@ -6,37 +6,13 @@
#include <shared_mutex> #include <shared_mutex>
#include <mutex> #include <mutex>
#include <memory> #include <memory>
#include <atomic>
#include <sched.h> #include <sched.h>
#include <numa.h> #include <numa.h>
#include <numaif.h> #include <numaif.h>
#include <dml/dml.hpp>
namespace dml {
inline const std::string StatusCodeToString(const dml::status_code code) {
switch (code) {
case dml::status_code::ok: return "ok";
case dml::status_code::false_predicate: return "false predicate";
case dml::status_code::partial_completion: return "partial completion";
case dml::status_code::nullptr_error: return "nullptr error";
case dml::status_code::bad_size: return "bad size";
case dml::status_code::bad_length: return "bad length";
case dml::status_code::inconsistent_size: return "inconsistent size";
case dml::status_code::dualcast_bad_padding: return "dualcast bad padding";
case dml::status_code::bad_alignment: return "bad alignment";
case dml::status_code::buffers_overlapping: return "buffers overlapping";
case dml::status_code::delta_delta_empty: return "delta delta empty";
case dml::status_code::batch_overflow: return "batch overflow";
case dml::status_code::execution_failed: return "execution failed";
case dml::status_code::unsupported_operation: return "unsupported operation";
case dml::status_code::queue_busy: return "queue busy";
case dml::status_code::error: return "unknown error";
case dml::status_code::config_error: return "config error";
default: return "unhandled error";
}
}
}
#include <dml/dml.h>
namespace dsacache { namespace dsacache {
class Cache; class Cache;
@ -70,9 +46,6 @@ namespace dsacache {
*/ */
class CacheData { class CacheData {
public:
using dml_handler = dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>>;
private: private:
static constexpr uint64_t maxptr = 0xffff'ffff'ffff'ffff; static constexpr uint64_t maxptr = 0xffff'ffff'ffff'ffff;
@ -95,7 +68,7 @@ namespace dsacache {
// dml handler vector pointer which is used // dml handler vector pointer which is used
// to wait on caching task completion // to wait on caching task completion
std::atomic<std::vector<dml_handler>*>* handlers_;
std::atomic<std::vector<dml_job_t*>*>* handlers_;
// deallocates the global cache-location // deallocates the global cache-location
// and invalidates it // and invalidates it
@ -104,7 +77,7 @@ namespace dsacache {
size_t GetSize() const { return size_; } size_t GetSize() const { return size_; }
uint8_t* GetSource() const { return src_; } uint8_t* GetSource() const { return src_; }
int32_t GetRefCount() const { return active_->load(); } int32_t GetRefCount() const { return active_->load(); }
void SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_handler>* handlers);
void SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_job_t*>* handlers);
// initializes the class after which it is thread safe // initializes the class after which it is thread safe
// but may only be destroyed safely after setting handlers // but may only be destroyed safely after setting handlers
@ -221,7 +194,7 @@ namespace dsacache {
// function used to submit a copy task on a specific node to the dml // function used to submit a copy task on a specific node to the dml
// engine on that node - will change the current threads node assignment // engine on that node - will change the current threads node assignment
// to achieve this so take care to restore this // to achieve this so take care to restore this
dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> ExecuteCopy(
dml_job_t* ExecuteCopy(
const uint8_t* src, uint8_t* dst, const size_t size, const int node const uint8_t* src, uint8_t* dst, const size_t size, const int node
) const; ) const;
@ -433,7 +406,7 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con
bitmask* nodemask = numa_get_run_node_mask(); bitmask* nodemask = numa_get_run_node_mask();
auto handlers = new std::vector<CacheData::dml_handler>();
auto handlers = new std::vector<dml_job_t*>();
for (uint32_t i = 0; i < task_count; i++) { for (uint32_t i = 0; i < task_count; i++) {
const size_t local_size = i + 1 == task_count ? size : last_size; const size_t local_size = i + 1 == task_count ? size : last_size;
@ -452,15 +425,43 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con
numa_free_nodemask(nodemask); numa_free_nodemask(nodemask);
} }
inline dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> dsacache::Cache::ExecuteCopy(
inline dml_job_t* dsacache::Cache::ExecuteCopy(
const uint8_t* src, uint8_t* dst, const size_t size, const int node const uint8_t* src, uint8_t* dst, const size_t size, const int node
) const { ) const {
numa_run_on_node(node); numa_run_on_node(node);
dml::const_data_view srcv = dml::make_view(src, size);
dml::data_view dstv = dml::make_view(dst, size);
uint32_t job_size = 0;
dml_status_t status = dml_get_job_size(DML_PATH_HW, &job_size);
if (status != DML_STATUS_OK) {
std::cerr << "[x] Error querying job size!" << std::endl;
return nullptr;
}
dml_job_t* job = (dml_job_t*)malloc(job_size);
status = dml_init_job(DML_PATH_HW, job);
if (status != DML_STATUS_OK) {
std::cerr << "[x] Error initializing job!" << std::endl;
delete job;
return nullptr;
}
job->operation = DML_OP_MEM_MOVE;
job->source_first_ptr = (uint8_t*)src;
job->destination_first_ptr = dst;
job->source_length = size;
job->flags |= DML_FLAG_BLOCK_ON_FAULT | DML_FLAG_COPY_ONLY;
return dml::submit<dml::hardware>(dml::mem_copy.block_on_fault(), srcv, dstv);
status = dml_submit_job(job);
if (status != DML_STATUS_OK) {
std::cerr << "[x] Error submitting job!" << std::endl;
delete job;
return nullptr;
}
return job;
} }
inline void dsacache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const { inline void dsacache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const {
@ -602,7 +603,7 @@ inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) {
delete_ = false; delete_ = false;
active_ = new std::atomic<int32_t>(1); active_ = new std::atomic<int32_t>(1);
cache_ = new std::atomic<uint8_t*>(data); cache_ = new std::atomic<uint8_t*>(data);
handlers_ = new std::atomic<std::vector<dml_handler>*>();
handlers_ = new std::atomic<std::vector<dml_job_t*>*>();
incomplete_cache_ = new uint8_t*(nullptr); incomplete_cache_ = new uint8_t*(nullptr);
} }
@ -641,6 +642,10 @@ inline dsacache::CacheData::~CacheData() {
Deallocate(); Deallocate();
for (dml_job_t* job : *handlers_->load()) {
if (job != nullptr) delete job;
}
delete active_; delete active_;
delete cache_; delete cache_;
delete handlers_; delete handlers_;
@ -679,12 +684,12 @@ inline void dsacache::CacheData::WaitOnCompletion() {
// set to maximum of 64-bit in order to prevent deadlocks from the above // set to maximum of 64-bit in order to prevent deadlocks from the above
// waiting construct // waiting construct
std::vector<dml_handler>* local_handlers = handlers_->exchange(reinterpret_cast<std::vector<dml_handler>*>(maxptr));
std::vector<dml_job_t*>* local_handlers = handlers_->exchange(reinterpret_cast<std::vector<dml_job_t*>*>(maxptr));
// ensure that no other thread snatched the handlers before us // ensure that no other thread snatched the handlers before us
// and in case one did, wait again and then return // and in case one did, wait again and then return
if (local_handlers == nullptr || local_handlers == reinterpret_cast<std::vector<dml_handler>*>(maxptr)) {
if (local_handlers == nullptr || local_handlers == reinterpret_cast<std::vector<dml_job_t*>*>(maxptr)) {
cache_->wait(nullptr); cache_->wait(nullptr);
return; return;
} }
@ -694,17 +699,27 @@ inline void dsacache::CacheData::WaitOnCompletion() {
bool error = false; bool error = false;
for (auto& handler : *local_handlers) {
while (!handler.is_finished()) sched_yield();
auto result = handler.get();
if (result.status != dml::status_code::ok) {
std::cerr << "[x] Encountered bad status code for operation: " << dml::StatusCodeToString(result.status) << std::endl;
for (dml_job_t* job : *local_handlers) {
if (job == nullptr) {
std::cerr << "[x] Got nullptr-job!" << std::endl;
// if one of the copy tasks failed we abort the whole task // if one of the copy tasks failed we abort the whole task
// after all operations are completed on it // after all operations are completed on it
error = true; error = true;
} }
else {
const dml_status_t status = dml_wait_job(job, DML_WAIT_MODE_UMWAIT);
if (status != DML_STATUS_OK) {
std::cerr << "[x] Operation Failed!" << std::endl;
// if one of the copy tasks failed we abort the whole task
// after all operations are completed on it
error = true;
}
}
delete job;
} }
// at this point all handlers have been waited for // at this point all handlers have been waited for
@ -730,7 +745,7 @@ inline void dsacache::CacheData::WaitOnCompletion() {
handlers_->notify_all(); handlers_->notify_all();
} }
void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_handler>* handlers) {
void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_job_t*>* handlers) {
*incomplete_cache_ = cache; *incomplete_cache_ = cache;
handlers_->store(handlers); handlers_->store(handlers);
handlers_->notify_one(); handlers_->notify_one();

Loading…
Cancel
Save