Browse Source

reset cacher to cpp api implementation

master
Constantin Fürst 11 months ago
parent
commit
053bb949ce
  1. 120
      offloading-cacher/cache.hpp

120
offloading-cacher/cache.hpp

@ -6,13 +6,37 @@
#include <shared_mutex>
#include <mutex>
#include <memory>
#include <atomic>
#include <sched.h>
#include <numa.h>
#include <numaif.h>
#include <dml/dml.h>
#include <dml/dml.hpp>
namespace dml {
inline const std::string StatusCodeToString(const dml::status_code code) {
switch (code) {
case dml::status_code::ok: return "ok";
case dml::status_code::false_predicate: return "false predicate";
case dml::status_code::partial_completion: return "partial completion";
case dml::status_code::nullptr_error: return "nullptr error";
case dml::status_code::bad_size: return "bad size";
case dml::status_code::bad_length: return "bad length";
case dml::status_code::inconsistent_size: return "inconsistent size";
case dml::status_code::dualcast_bad_padding: return "dualcast bad padding";
case dml::status_code::bad_alignment: return "bad alignment";
case dml::status_code::buffers_overlapping: return "buffers overlapping";
case dml::status_code::delta_delta_empty: return "delta delta empty";
case dml::status_code::batch_overflow: return "batch overflow";
case dml::status_code::execution_failed: return "execution failed";
case dml::status_code::unsupported_operation: return "unsupported operation";
case dml::status_code::queue_busy: return "queue busy";
case dml::status_code::error: return "unknown error";
case dml::status_code::config_error: return "config error";
default: return "unhandled error";
}
}
}
namespace dsacache {
class Cache;
@ -46,6 +70,9 @@ namespace dsacache {
*/
class CacheData {
public:
using dml_handler = dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>>;
private:
static constexpr uint64_t maxptr = 0xffff'ffff'ffff'ffff;
@ -68,7 +95,7 @@ namespace dsacache {
// dml handler vector pointer which is used
// to wait on caching task completion
std::atomic<std::vector<dml_job_t*>*>* handlers_;
std::atomic<std::vector<dml_handler>*>* handlers_;
// deallocates the global cache-location
// and invalidates it
@ -77,7 +104,7 @@ namespace dsacache {
size_t GetSize() const { return size_; }
uint8_t* GetSource() const { return src_; }
int32_t GetRefCount() const { return active_->load(); }
void SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_job_t*>* handlers);
void SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_handler>* handlers);
// initializes the class after which it is thread safe
// but may only be destroyed safely after setting handlers
@ -194,7 +221,7 @@ namespace dsacache {
// function used to submit a copy task on a specific node to the dml
// engine on that node - will change the current threads node assignment
// to achieve this so take care to restore this
dml_job_t* ExecuteCopy(
dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> ExecuteCopy(
const uint8_t* src, uint8_t* dst, const size_t size, const int node
) const;
@ -400,7 +427,11 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con
const size_t size = task->GetSize() / task_count;
const size_t last_size = size + task->GetSize() % task_count;
auto handlers = new std::vector<dml_job_t*>();
// save the current numa node mask to restore later
// as executing the copy task will place this thread
// on a different node
auto handlers = new std::vector<CacheData::dml_handler>();
for (uint32_t i = 0; i < task_count; i++) {
const size_t local_size = i + 1 == task_count ? size : last_size;
@ -414,42 +445,16 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con
task->SetTaskHandlersAndCache(dst, handlers);
}
inline dml_job_t* dsacache::Cache::ExecuteCopy(
inline dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> dsacache::Cache::ExecuteCopy(
const uint8_t* src, uint8_t* dst, const size_t size, const int node
) const {
uint32_t job_size = 0;
dml_status_t status = dml_get_job_size(DML_PATH_HW, &job_size);
if (status != DML_STATUS_OK) {
std::cerr << "[x] Error querying job size!" << std::endl;
return nullptr;
}
dml_job_t* job = (dml_job_t*)malloc(job_size);
status = dml_init_job(DML_PATH_HW, job);
if (status != DML_STATUS_OK) {
std::cerr << "[x] Error initializing job!" << std::endl;
delete job;
return nullptr;
}
job->operation = DML_OP_MEM_MOVE;
job->source_first_ptr = (uint8_t*)src;
job->destination_first_ptr = dst;
job->source_length = size;
job->flags |= DML_FLAG_BLOCK_ON_FAULT | DML_FLAG_COPY_ONLY;
job->numa_id = node;
dml::const_data_view srcv = dml::make_view(src, size);
dml::data_view dstv = dml::make_view(dst, size);
status = dml_submit_job(job);
if (status != DML_STATUS_OK) {
std::cerr << "[x] Error submitting job!" << std::endl;
delete job;
return nullptr;
}
return job;
return dml::submit<dml::hardware>(
dml::mem_copy.block_on_fault(), srcv, dstv,
dml::execution_interface<dml::hardware,std::allocator<uint8_t>>(), node
);
}
inline void dsacache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const {
@ -591,7 +596,7 @@ inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) {
delete_ = false;
active_ = new std::atomic<int32_t>(1);
cache_ = new std::atomic<uint8_t*>(data);
handlers_ = new std::atomic<std::vector<dml_job_t*>*>();
handlers_ = new std::atomic<std::vector<dml_handler>*>();
incomplete_cache_ = new uint8_t*(nullptr);
}
@ -630,14 +635,6 @@ inline dsacache::CacheData::~CacheData() {
Deallocate();
std::vector<dml_job_t*>* handlers = handlers_->load();
if (handlers != nullptr && handlers != reinterpret_cast<std::vector<dml_job_t*>*>(maxptr)) {
for (dml_job_t* job : *handlers_->load()) {
if (job != nullptr) delete job;
}
}
delete active_;
delete cache_;
delete handlers_;
@ -676,12 +673,12 @@ inline void dsacache::CacheData::WaitOnCompletion() {
// set to maximum of 64-bit in order to prevent deadlocks from the above
// waiting construct
std::vector<dml_job_t*>* local_handlers = handlers_->exchange(reinterpret_cast<std::vector<dml_job_t*>*>(maxptr));
std::vector<dml_handler>* local_handlers = handlers_->exchange(reinterpret_cast<std::vector<dml_handler>*>(maxptr));
// ensure that no other thread snatched the handlers before us
// and in case one did, wait again and then return
if (local_handlers == nullptr || local_handlers == reinterpret_cast<std::vector<dml_job_t*>*>(maxptr)) {
if (local_handlers == nullptr || local_handlers == reinterpret_cast<std::vector<dml_handler>*>(maxptr)) {
cache_->wait(nullptr);
return;
}
@ -691,27 +688,16 @@ inline void dsacache::CacheData::WaitOnCompletion() {
bool error = false;
for (dml_job_t* job : *local_handlers) {
if (job == nullptr) {
std::cerr << "[x] Got nullptr-job!" << std::endl;
for (auto& handler : *local_handlers) {
auto result = handler.get();
if (result.status != dml::status_code::ok) {
std::cerr << "[x] Encountered bad status code for operation: " << dml::StatusCodeToString(result.status) << std::endl;
// if one of the copy tasks failed we abort the whole task
// after all operations are completed on it
error = true;
}
else {
const dml_status_t status = dml_wait_job(job, DML_WAIT_MODE_UMWAIT);
if (status != DML_STATUS_OK) {
std::cerr << "[x] Operation Failed!" << std::endl;
// if one of the copy tasks failed we abort the whole task
// after all operations are completed on it
error = true;
}
}
delete job;
}
// at this point all handlers have been waited for
@ -737,7 +723,7 @@ inline void dsacache::CacheData::WaitOnCompletion() {
handlers_->notify_all();
}
void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_job_t*>* handlers) {
void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_handler>* handlers) {
*incomplete_cache_ = cache;
handlers_->store(handlers);
handlers_->notify_one();

Loading…
Cancel
Save