Browse Source

add a lot of comments to the code, also handle errors in the dml handlers gracefully

master
Constantin Fürst 12 months ago
parent
commit
46de3151a2
  1. 131
      offloading-cacher/cache-data.hpp
  2. 35
      offloading-cacher/cache.hpp
  3. 60
      offloading-cacher/util/dml-helper.hpp

131
offloading-cacher/cache-data.hpp

@ -8,6 +8,8 @@
#include <dml/dml.hpp>
#include "util/dml-helper.hpp"
namespace dsacache {
class Cache;
@ -23,57 +25,130 @@ namespace dsacache {
using dml_handler = dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>>;
private:
// data source and size of the block
uint8_t* src_;
size_t size_;
// global reference counting object
std::atomic<int32_t>* active_;
protected:
// global cache-location pointer
std::atomic<uint8_t*>* cache_;
// object-local incomplete cache location pointer
// which is only available in the first instance
uint8_t* incomplete_cache_;
// dml handler vector pointer which is only
// available in the first instance
std::unique_ptr<std::vector<dml_handler>> handlers_;
friend Cache;
// deallocates the global cache-location
// and invalidates it
void Deallocate();
// checks whether there are at least two
// valid references to this object which
// is done as the cache always has one
// internally to any living instance
bool Active() const;
friend Cache;
public:
CacheData(uint8_t* data, const size_t size);
CacheData(const CacheData& other);
~CacheData();
void Deallocate();
// waits on completion of caching operations
// for this task and is safe to be called in
// any state of the object
void WaitOnCompletion();
// returns the cache data location for this
// instance which is valid as long as the
// instance is alive
uint8_t* GetDataLocation() const;
bool Active() const;
};
}
inline void dsacache::CacheData::WaitOnCompletion() {
// the cache data entry can be in two states
// either it is the original one which has not
// been waited for in which case the handlers
// are non-null or it is not
if (handlers_ == nullptr) {
std::cout << "[-] Waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
// when no handlers are attached to this cache entry we wait on a
// value change for the cache structure from nullptr to non-null
// which will either go through immediately if the cache is valid
// already or wait until the handler-owning thread notifies us
cache_->wait(nullptr);
std::cout << "[+] Finished waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
}
else {
// when the handlers are non-null there are some DSA task handlers
// available on which we must wait here
std::cout << "[-] Waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
// abort is set if any operation encountered an error
bool abort = false;
for (auto& handler : *handlers_) {
auto result = handler.get();
// TODO: handle the returned status code
if (result.status != dml::status_code::ok) {
std::cerr << "[x] Encountered bad status code for operation: " << dml::StatusCodeToString(result.status) << std::endl;
// if one of the copy tasks failed we abort the whole task
// after all operations are completed on it
abort = true;
}
}
// the handlers are cleared after all have completed
handlers_ = nullptr;
std::cout << "[+] Finished waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
// now we act depending on whether an abort has been
// called for which signals operation incomplete
if (abort) {
// store nullptr in the cache location
cache_->store(nullptr);
// then free the now incomplete cache
// TODO: it would be possible to salvage the
// TODO: operation at this point but this
// TODO: is quite complicated so we just abort
numa_free(incomplete_cache_, size_);
}
else {
std::cout << "[+] Finished waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
// incomplete cache is now safe to use and therefore we
// swap it with the global cache state of this entry
// and notify potentially waiting threads
cache_->store(incomplete_cache_);
}
// as a last step all waiting threads must
// be notified (copies of this will wait on value
// change of the cache) and the incomplete cache
// is cleared to nullptr as it is not incomplete
cache_->store(incomplete_cache_);
cache_->notify_all();
incomplete_cache_ = nullptr;
}
}
@ -91,12 +166,24 @@ dsacache::CacheData::CacheData(uint8_t* data, const size_t size) {
dsacache::CacheData::CacheData(const dsacache::CacheData& other) {
std::cout << "[-] Copy Created for CacheData 0x" << std::hex << (uint64_t)other.src_ << std::dec << std::endl;
// we copy the ptr to the global atomic reference counter
// and increase the amount of active references
active_ = other.active_;
const int current_active = active_->fetch_add(1);
// source and size will be copied too
// as well as the reference to the global
// atomic cache pointer
src_ = other.src_;
size_ = other.size_;
cache_ = other.cache_;
// incomplete cache and handlers will not
// be copied because only the first instance
// will wait on the completion of handlers
incomplete_cache_ = nullptr;
handlers_ = nullptr;
}
@ -104,6 +191,15 @@ dsacache::CacheData::CacheData(const dsacache::CacheData& other) {
dsacache::CacheData::~CacheData() {
std::cout << "[-] Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
// if this is the first instance of this cache structure
// and it has not been waited on and is now being destroyed
// we must wait on completion here to ensure the cache
// remains in a valid state
if (handlers_ != nullptr) {
WaitOnCompletion();
}
// due to fetch_sub returning the preivously held value
// we must subtract one locally to get the current value
@ -117,6 +213,7 @@ dsacache::CacheData::~CacheData() {
std::cout << "[!] Full Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
Deallocate();
delete active_;
delete cache_;
}
@ -125,9 +222,12 @@ dsacache::CacheData::~CacheData() {
void dsacache::CacheData::Deallocate() {
std::cout << "[!] Deallocating for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl;
numa_free(cache_, size_);
cache_ = nullptr;
incomplete_cache_ = nullptr;
// although deallocate should only be called from
// a safe context to do so, it can not hurt to
// defensively perform the operation atomically
uint8_t* cache_local = cache_->exchange(nullptr);
if (cache_local != nullptr) numa_free(cache_local, size_);
}
uint8_t* dsacache::CacheData::GetDataLocation() const {
@ -135,5 +235,10 @@ uint8_t* dsacache::CacheData::GetDataLocation() const {
}
bool dsacache::CacheData::Active() const {
return active_->load() > 0;
// this entry is active if more than one
// reference exists to it, as the Cache
// will always keep one internally until
// the entry is cleared from cache
return active_->load() > 1;
}

35
offloading-cacher/cache.hpp

@ -46,22 +46,42 @@ namespace dsacache {
CachePolicy* cache_policy_function_ = nullptr;
CopyPolicy* copy_policy_function_ = nullptr;
// function used to submit a copy task on a specific node to the dml
// engine on that node - will change the current threads node assignment
// to achieve this so take care to restore this
dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> ExecuteCopy(
const uint8_t* src, uint8_t* dst, const size_t size, const int node
) const;
// allocates the required memory on the destination node
// and then submits task to the dml library for processing
// and attaches the handlers to the cache data structure
void SubmitTask(CacheData* task, const int dst_node, const int src_node);
// querries the policy functions for the given data and size
// to obtain destination cache node, also returns the datas
// source node for further usage
// output may depend on the calling threads node assignment
// as this is set as the "optimal placement" node
void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const;
// checks whether the cache contains an entry for
// the given data in the given memory node and
// returns it, otherwise returns nullptr
std::unique_ptr<CacheData> GetFromCache(uint8_t* src, const size_t size, const int dst_node);
public:
// initializes the cache with the two policy functions
// only after this is it safe to use in a threaded environment
void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function);
// function to perform data access through the cache
std::unique_ptr<CacheData> Access(uint8_t* data, const size_t size);
// flushes the cache of inactive entries
// if node is -1 then the whole cache is
// checked and otherwise the specified
// node - no checks on node validity
void Flush(const int node = -1);
};
}
@ -71,11 +91,19 @@ inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy
copy_policy_function_ = copy_policy_function;
// initialize numa library
numa_available();
// obtain all available nodes
// and those we may allocate
// memory on
const int nodes_max = numa_num_configured_nodes();
const bitmask* valid_nodes = numa_get_mems_allowed();
// prepare the cache state with entries
// for all given nodes
for (int node = 0; node < nodes_max; node++) {
if (numa_bitmask_isbitset(valid_nodes, node)) {
cache_state_.insert({node,{}});
@ -93,6 +121,10 @@ inline std::unique_ptr<dsacache::CacheData> dsacache::Cache::Access(uint8_t* dat
GetCacheNode(data, size, &dst_node, &src_node);
// TODO: at this point it could be beneficial to check whether
// TODO: the given destination node is present as an entry
// TODO: in the cache state to see if it is valid
// check whether the data is already cached
std::unique_ptr<CacheData> task = GetFromCache(data, size, dst_node);
@ -149,7 +181,7 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con
dst = reinterpret_cast<uint8_t*>(numa_alloc_onnode(task->size_, dst_node));
if (dst == nullptr) {
std::cout << "[x] Second allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl;
std::cerr << "[x] Second allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl;
return;
}
}
@ -188,6 +220,7 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con
// restore the previous nodemask
numa_run_on_node_mask(nodemask);
numa_free_nodemask(nodemask);
}
inline dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> dsacache::Cache::ExecuteCopy(

60
offloading-cacher/util/dml-helper.hpp

@ -2,25 +2,45 @@
#include <dml/dml.hpp>
inline const std::string StatusCodeToString(const dml::status_code code) {
switch(code) {
case dml::status_code::ok: return "ok";
case dml::status_code::false_predicate: return "false predicate";
case dml::status_code::partial_completion: return "partial completion";
case dml::status_code::nullptr_error: return "nullptr error";
case dml::status_code::bad_size: return "bad size";
case dml::status_code::bad_length: return "bad length";
case dml::status_code::inconsistent_size: return "inconsistent size";
case dml::status_code::dualcast_bad_padding: return "dualcast bad padding";
case dml::status_code::bad_alignment: return "bad alignment";
case dml::status_code::buffers_overlapping: return "buffers overlapping";
case dml::status_code::delta_delta_empty: return "delta delta empty";
case dml::status_code::batch_overflow: return "batch overflow";
case dml::status_code::execution_failed: return "execution failed";
case dml::status_code::unsupported_operation: return "unsupported operation";
case dml::status_code::queue_busy: return "queue busy";
case dml::status_code::error: return "unknown error";
case dml::status_code::config_error: return "config error";
default: return "unhandled error";
namespace dml {
inline const std::string StatusCodeToString(const dml::status_code code) {
switch (code) {
case dml::status_code::ok:
return "ok";
case dml::status_code::false_predicate:
return "false predicate";
case dml::status_code::partial_completion:
return "partial completion";
case dml::status_code::nullptr_error:
return "nullptr error";
case dml::status_code::bad_size:
return "bad size";
case dml::status_code::bad_length:
return "bad length";
case dml::status_code::inconsistent_size:
return "inconsistent size";
case dml::status_code::dualcast_bad_padding:
return "dualcast bad padding";
case dml::status_code::bad_alignment:
return "bad alignment";
case dml::status_code::buffers_overlapping:
return "buffers overlapping";
case dml::status_code::delta_delta_empty:
return "delta delta empty";
case dml::status_code::batch_overflow:
return "batch overflow";
case dml::status_code::execution_failed:
return "execution failed";
case dml::status_code::unsupported_operation:
return "unsupported operation";
case dml::status_code::queue_busy:
return "queue busy";
case dml::status_code::error:
return "unknown error";
case dml::status_code::config_error:
return "config error";
default:
return "unhandled error";
}
}
}
Loading…
Cancel
Save