2 Commits

  1. 19
      offloading-cacher/cache.hpp
  2. 41
      offloading-cacher/main.cpp

19
offloading-cacher/cache.hpp

@ -149,11 +149,13 @@ namespace dsacache {
size_t GetSize() const { return size_; }
uint8_t* GetSource() const { return src_; }
int32_t GetRefCount() const { return active_->load(); }
void SetCacheToSource() { cache_->store(src_); delete_ = false; }
void SetInvalidHandlersAndCacheToSource();
void SetTaskHandlersAndCache(uint8_t* cache, std::vector<dml_handler>* handlers);
// initializes the class after which it is thread safe
// but may only be destroyed safely after setting handlers
// afterwards either SetTaskHandlersAndCache or
// SetCacheToSource must be called to prevent deadlocks
void Init(std::vector<dml_handler>* invalid_handlers);
friend Cache;
@ -291,6 +293,9 @@ namespace dsacache {
// source node for further usage
// output may depend on the calling threads node assignment
// as this is set as the "optimal placement" node
// TODO: it would be better to not handle any decisions regarding nodes in the cache
// TODO: and leave this entirely to the user, however, this idea came to me 3 days before
// TODO: submission date and there are more important things to do
void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const;
// checks whether the cache contains an entry for
@ -409,7 +414,7 @@ inline std::unique_ptr<dsacache::CacheData> dsacache::Cache::Access(uint8_t* dat
// data source location
if (CheckFlag(flags, FLAG_ACCESS_WEAK)) {
task->SetCacheToSource();
task->SetInvalidHandlersAndCacheToSource();
return std::move(task);
}
@ -454,6 +459,9 @@ inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, con
uint8_t* dst = memory_allocate_function_(dst_node, task->GetSize());
if (dst == nullptr) {
// allocation failure encountered, therefore submission is aborted
// which necessitates making the CacheData instance safe for usage
task->SetInvalidHandlersAndCacheToSource();
return;
}
@ -794,6 +802,13 @@ void dsacache::CacheData::SetTaskHandlersAndCache(uint8_t* cache, std::vector<dm
handlers_->notify_one();
}
void dsacache::CacheData::SetInvalidHandlersAndCacheToSource() {
cache_->store(src_);
delete_ = false;
handlers_->store(invalid_handlers_);
handlers_->notify_all();
}
void dsacache::CacheData::Init(std::vector<dml_handler>* invalid_handlers) {
cache_->store(nullptr);
delete_ = true;

41
offloading-cacher/main.cpp

@ -21,46 +21,15 @@ void InitCache(const std::string& device) {
return std::vector<int>{ numa_dst_node };
};
CACHE.Init(cache_policy,copy_policy);
}
else if (device == "xeonmax") {
auto cache_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) {
// xeon max is configured to have hbm on node ids that are +8
return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node;
auto mem_alloc = [](const int numa_node, const size_t data_size) {
return reinterpret_cast<uint8_t*>(malloc(data_size));
};
auto copy_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) {
if (data_size < SIZE_64_MIB) {
// if the data size is small then the copy will just be carried
// out by the destination node which does not require setting numa
// thread affinity as the selected dsa engine is already the one
// present on the calling thread
return std::vector<int>{ (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) };
}
else {
// for sufficiently large data, smart copy is used which will utilize
// all four engines for intra-socket copy operations and cross copy on
// the source and destination nodes for inter-socket copy
const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0;
if (same_socket) {
const bool socket_number = numa_dst_node >> 2;
if (socket_number == 0) return std::vector<int>{ 0, 1, 2, 3 };
else return std::vector<int>{ 4, 5, 6, 7 };
}
else {
return std::vector<int>{
(numa_src_node >= 8 ? numa_src_node - 8 : numa_src_node),
(numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node)
};
}
}
auto mem_free = [](uint8_t* ptr, const size_t data_size) {
free(ptr);
};
CACHE.Init(cache_policy,copy_policy);
CACHE.Init(cache_policy,copy_policy,mem_alloc,mem_free);
}
else {
std::cerr << "Given device '" << device << "' not supported!" << std::endl;

Loading…
Cancel
Save