Browse Source

prevent another type of deadlock that could arise from destroying a CacheData instance that never had tasks assigned to it due to waiting infinitely on it

master
Constantin Fürst 11 months ago
parent
commit
bbdb2d9588
  1. 71
      offloading-cacher/cache.hpp

71
offloading-cacher/cache.hpp

@ -39,6 +39,8 @@ namespace dml {
} }
namespace dsacache { namespace dsacache {
class Cache;
/* /*
* Class Description: * Class Description:
* Holds all required information on one cache entry and is used * Holds all required information on one cache entry and is used
@ -72,6 +74,11 @@ namespace dsacache {
using dml_handler = dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>>; using dml_handler = dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>>;
private: private:
static constexpr uint64_t maxptr = 0xffff'ffff'ffff'ffff;
// set to false if we do not own the cache pointer
bool delete_ = false;
// data source and size of the block // data source and size of the block
uint8_t* src_; uint8_t* src_;
size_t size_; size_t size_;
@ -84,7 +91,7 @@ namespace dsacache {
// object-local incomplete cache location pointer // object-local incomplete cache location pointer
// contract: only access when being in sole posession of handlers // contract: only access when being in sole posession of handlers
uint8_t* incomplete_cache_;
uint8_t** incomplete_cache_;
// dml handler vector pointer which is used // dml handler vector pointer which is used
// to wait on caching task completion // to wait on caching task completion
@ -94,6 +101,17 @@ namespace dsacache {
// and invalidates it // and invalidates it
void Deallocate(); void Deallocate();
size_t GetSize() const { return size_; }
uint8_t* GetSource() const { return src_; }
int32_t GetRefCount() const { return active_->load(); }
void SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_handler>* handlers);
// initializes the class after which it is thread safe
// but may only be destroyed safely after setting handlers
void Init();
friend Cache;
public: public:
CacheData(uint8_t* data, const size_t size); CacheData(uint8_t* data, const size_t size);
CacheData(const CacheData& other); CacheData(const CacheData& other);
@ -108,13 +126,7 @@ namespace dsacache {
// instance which is valid as long as the // instance which is valid as long as the
// instance is alive - !!! this may also // instance is alive - !!! this may also
// yield a nullptr !!! // yield a nullptr !!!
void SetTaskHanldersAndCache(uint8_t* cache, std::vector<dml_handler>* handlers);
uint8_t* GetDataLocation() const { return cache_->load(); } uint8_t* GetDataLocation() const { return cache_->load(); }
size_t GetSize() const { return size_; }
uint8_t* GetSource() const { return src_; }
int32_t GetRefCount() const { return active_->load(); }
}; };
/* /*
@ -341,6 +353,12 @@ inline std::unique_ptr<dsacache::CacheData> dsacache::Cache::Access(uint8_t* dat
std::cout << "[!] Found another cache instance for 0x" << std::hex << (uint64_t)task->GetSource() << std::dec << std::endl; std::cout << "[!] Found another cache instance for 0x" << std::hex << (uint64_t)task->GetSource() << std::dec << std::endl;
return std::move(std::make_unique<CacheData>(state.first->second)); return std::move(std::make_unique<CacheData>(state.first->second));
} }
// initialize the task now for thread safety
// as we are now sure that we will submit work
// to it and will not delete it beforehand
task->Init();
} }
SubmitTask(task.get(), dst_node, src_node); SubmitTask(task.get(), dst_node, src_node);
@ -426,7 +444,7 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con
handlers->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); handlers->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i]));
} }
task->SetTaskHanldersAndCache(dst, handlers);
task->SetTaskHandlersAndCache(dst, handlers);
// restore the previous nodemask // restore the previous nodemask
@ -581,10 +599,11 @@ inline dsacache::Cache::~Cache() {
inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) {
src_ = data; src_ = data;
size_ = size; size_ = size;
delete_ = false;
active_ = new std::atomic<int32_t>(1); active_ = new std::atomic<int32_t>(1);
cache_ = new std::atomic<uint8_t*>();
cache_ = new std::atomic<uint8_t*>(data);
handlers_ = new std::atomic<std::vector<dml_handler>*>(); handlers_ = new std::atomic<std::vector<dml_handler>*>();
incomplete_cache_ = nullptr;
incomplete_cache_ = new uint8_t*(nullptr);
} }
inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) { inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) {
@ -612,7 +631,7 @@ inline dsacache::CacheData::~CacheData() {
// then we must execute proper deletion // then we must execute proper deletion
// as this was the last reference // as this was the last reference
if (v <= 0) {
if (v == 0) {
// on deletion we must ensure that all offloaded // on deletion we must ensure that all offloaded
// operations have completed successfully // operations have completed successfully
@ -625,6 +644,7 @@ inline dsacache::CacheData::~CacheData() {
delete active_; delete active_;
delete cache_; delete cache_;
delete handlers_; delete handlers_;
delete incomplete_cache_;
} }
} }
@ -636,8 +656,8 @@ inline void dsacache::CacheData::Deallocate() {
// takes place for the retrieved local cache // takes place for the retrieved local cache
uint8_t* cache_local = cache_->exchange(nullptr); uint8_t* cache_local = cache_->exchange(nullptr);
if (cache_local != nullptr) numa_free(cache_local, size_);
else if (incomplete_cache_ != nullptr) numa_free(incomplete_cache_, size_);
if (cache_local != nullptr && delete_) numa_free(cache_local, size_);
else if (*incomplete_cache_ != nullptr) numa_free(*incomplete_cache_, size_);
else; else;
} }
@ -655,15 +675,17 @@ inline void dsacache::CacheData::WaitOnCompletion() {
// exchange the global handlers pointer with nullptr to have a local // exchange the global handlers pointer with nullptr to have a local
// copy - this signals that this thread is the sole owner and therefore // copy - this signals that this thread is the sole owner and therefore
// responsible for waiting for them
// responsible for waiting for them. we can not set to nullptr here but
// set to maximum of 64-bit in order to prevent deadlocks from the above
// waiting construct
std::vector<dml_handler>* local_handlers = handlers_->exchange(nullptr);
std::vector<dml_handler>* local_handlers = handlers_->exchange(reinterpret_cast<std::vector<dml_handler>*>(maxptr));
// ensure that no other thread snatched the handlers before us // ensure that no other thread snatched the handlers before us
// and in case one did, wait again and then return // and in case one did, wait again and then return
if (local_handlers == nullptr) {
WaitOnCompletion();
if (local_handlers == nullptr || local_handlers == reinterpret_cast<std::vector<dml_handler>*>(maxptr)) {
cache_->wait(nullptr);
return; return;
} }
@ -693,10 +715,12 @@ inline void dsacache::CacheData::WaitOnCompletion() {
if (error) { if (error) {
cache_->store(src_); cache_->store(src_);
numa_free(incomplete_cache_, size_);
numa_free(*incomplete_cache_, size_);
delete_ = false;
*incomplete_cache_ = nullptr;
} }
else { else {
cache_->store(incomplete_cache_);
cache_->store(*incomplete_cache_);
} }
// notify all waiting threads so they wake up quickly // notify all waiting threads so they wake up quickly
@ -705,8 +729,13 @@ inline void dsacache::CacheData::WaitOnCompletion() {
handlers_->notify_all(); handlers_->notify_all();
} }
void dsacache::CacheData::SetTaskHanldersAndCache(uint8_t* cache, std::vector<dml_handler>* handlers) {
incomplete_cache_ = cache;
void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_handler>* handlers) {
*incomplete_cache_ = cache;
handlers_->store(handlers); handlers_->store(handlers);
handlers_->notify_one(); handlers_->notify_one();
} }
void dsacache::CacheData::Init() {
cache_->store(nullptr);
delete_ = true;
}
Loading…
Cancel
Save