|
|
@ -94,9 +94,14 @@ namespace offcache { |
|
|
|
typedef std::vector<int> (CopyPolicy)(const int numa_dst_node, const int numa_src_node); |
|
|
|
|
|
|
|
private: |
|
|
|
// mutex for accessing the cache state map
|
|
|
|
|
|
|
|
std::shared_mutex cache_mutex_; |
|
|
|
|
|
|
|
std::unordered_map<uint8_t*, CacheData> cache_state_; |
|
|
|
// map from [dst-numa-node,map2]
|
|
|
|
// map2 from [data-ptr,cache-structure]
|
|
|
|
|
|
|
|
std::unordered_map<uint8_t, std::unordered_map<uint8_t*, CacheData>> cache_state_; |
|
|
|
|
|
|
|
CachePolicy* cache_policy_function_ = nullptr; |
|
|
|
CopyPolicy* copy_policy_function_ = nullptr; |
|
|
@ -105,6 +110,12 @@ namespace offcache { |
|
|
|
|
|
|
|
void SubmitTask(CacheData* task); |
|
|
|
|
|
|
|
void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const; |
|
|
|
|
|
|
|
void AbortTask(CacheData* task) const; |
|
|
|
|
|
|
|
std::unique_ptr<CacheData> GetFromCache(uint8_t* src, const size_t size); |
|
|
|
|
|
|
|
public: |
|
|
|
void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); |
|
|
|
|
|
|
@ -126,40 +137,29 @@ inline void offcache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy |
|
|
|
// initialize numa library
|
|
|
|
numa_available(); |
|
|
|
|
|
|
|
const int nodes_max = numa_num_configured_nodes(); |
|
|
|
const bitmask* valid_nodes = numa_get_mems_allowed(); |
|
|
|
|
|
|
|
for (int node = 0; node < nodes_max; node++) { |
|
|
|
if (numa_bitmask_isbitset(valid_nodes, node)) { |
|
|
|
cache_state_.insert({node,{}}); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
std::cout << "[-] Cache Initialized" << std::endl; |
|
|
|
} |
|
|
|
|
|
|
|
inline std::unique_ptr<offcache::CacheData> offcache::Cache::Access(uint8_t* data, const size_t size, const ExecutionPolicy policy) { |
|
|
|
// the best situation is if this data is already cached
|
|
|
|
// which we check in an unnamed block in which the cache
|
|
|
|
// is locked for reading to prevent another thread
|
|
|
|
// from marking the element we may find as unused and
|
|
|
|
// clearing it
|
|
|
|
{ |
|
|
|
std::shared_lock<std::shared_mutex> lock(cache_mutex_); |
|
|
|
|
|
|
|
const auto search = cache_state_.find(data); |
|
|
|
|
|
|
|
if (search != cache_state_.end()) { |
|
|
|
if (search->second.size_ == size) { |
|
|
|
search->second.active_->store(true); |
|
|
|
|
|
|
|
std::cout << "[+] Found Cached version for 0x" << std::hex << (uint64_t)data << std::dec << std::endl; |
|
|
|
std::unique_ptr<CacheData> task = GetFromCache(data, size); |
|
|
|
|
|
|
|
return std::move(std::make_unique<CacheData>(search->second)); |
|
|
|
} |
|
|
|
else { |
|
|
|
std::cout << "[!] Found Cached version with size missmatch for 0x" << std::hex << (uint64_t)data << std::dec << std::endl; |
|
|
|
|
|
|
|
cache_state_.erase(search); |
|
|
|
} |
|
|
|
} |
|
|
|
if (task != nullptr) { |
|
|
|
return std::move(task); |
|
|
|
} |
|
|
|
|
|
|
|
// at this point the requested data is not present in cache
|
|
|
|
// and we create a caching task for it
|
|
|
|
|
|
|
|
auto task = std::make_unique<CacheData>(data, size); |
|
|
|
task = std::make_unique<CacheData>(data, size); |
|
|
|
|
|
|
|
if (policy == ExecutionPolicy::Immediate) { |
|
|
|
// in intermediate mode the returned task
|
|
|
@ -197,19 +197,12 @@ inline std::unique_ptr<offcache::CacheData> offcache::Cache::Access(uint8_t* dat |
|
|
|
} |
|
|
|
|
|
|
|
inline void offcache::Cache::SubmitTask(CacheData* task) { |
|
|
|
// obtain numa node of current thread to determine where the data is needed
|
|
|
|
// get destination numa node for the cache
|
|
|
|
|
|
|
|
const int current_cpu = sched_getcpu(); |
|
|
|
const int current_node = numa_node_of_cpu(current_cpu); |
|
|
|
int dst_node = -1; |
|
|
|
int src_node = -1; |
|
|
|
|
|
|
|
// obtain node that the given data pointer is allocated on
|
|
|
|
|
|
|
|
int data_node = -1; |
|
|
|
get_mempolicy(&data_node, NULL, 0, (void*)task->src_, MPOL_F_NODE | MPOL_F_ADDR); |
|
|
|
|
|
|
|
// querry cache policy function for the destination numa node
|
|
|
|
|
|
|
|
const int dst_node = cache_policy_function_(current_node, data_node, task->size_); |
|
|
|
GetCacheNode(task->src_, task->size_, &dst_node, &src_node); |
|
|
|
|
|
|
|
std::cout << "[+] Allocating " << task->size_ << "B on node " << dst_node << " for " << std::hex << (uint64_t)task->src_ << std::dec << std::endl; |
|
|
|
|
|
|
@ -236,7 +229,7 @@ inline void offcache::Cache::SubmitTask(CacheData* task) { |
|
|
|
|
|
|
|
// querry copy policy function for the nodes to use for the copy
|
|
|
|
|
|
|
|
const std::vector<int> executing_nodes = copy_policy_function_(dst_node, data_node); |
|
|
|
const std::vector<int> executing_nodes = copy_policy_function_(dst_node, src_node); |
|
|
|
const size_t task_count = executing_nodes.size(); |
|
|
|
|
|
|
|
// each task will copy one fair part of the total size
|
|
|
@ -272,7 +265,7 @@ inline void offcache::Cache::SubmitTask(CacheData* task) { |
|
|
|
{ |
|
|
|
std::unique_lock<std::shared_mutex> lock(cache_mutex_); |
|
|
|
|
|
|
|
const auto state = cache_state_.insert({task->src_, *task}); |
|
|
|
const auto state = cache_state_[dst_node].emplace(task->src_, *task); |
|
|
|
|
|
|
|
// if state.second is false then no insertion took place
|
|
|
|
// which means that concurrently whith this thread
|
|
|
@ -283,20 +276,7 @@ inline void offcache::Cache::SubmitTask(CacheData* task) { |
|
|
|
if (!state.second) { |
|
|
|
std::cout << "[x] Found another cache instance for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; |
|
|
|
|
|
|
|
// first wait on all copy operations to be completed
|
|
|
|
|
|
|
|
task->WaitOnCompletion(); |
|
|
|
|
|
|
|
// abort by doing the following steps
|
|
|
|
// (1) free the allocated memory, (2) remove the "maybe result" as
|
|
|
|
// we will not run the caching operation, (3) clear the sub tasks
|
|
|
|
// for the very same reason, (4) set the result to the RAM-location
|
|
|
|
|
|
|
|
numa_free(dst, task->size_); |
|
|
|
task->incomplete_cache_ = nullptr; |
|
|
|
task->cache_->store(task->src_); |
|
|
|
|
|
|
|
std::cout << "[-] Abort completed for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; |
|
|
|
AbortTask(task); |
|
|
|
|
|
|
|
return; |
|
|
|
} |
|
|
@ -346,7 +326,7 @@ offcache::CacheData::CacheData(uint8_t* data, const size_t size) { |
|
|
|
|
|
|
|
src_ = data; |
|
|
|
size_ = size; |
|
|
|
active_ = new std::atomic<int32_t>(); |
|
|
|
active_ = new std::atomic<int32_t>(1); |
|
|
|
cache_ = new std::atomic<uint8_t*>(); |
|
|
|
incomplete_cache_ = nullptr; |
|
|
|
handlers_ = std::make_unique<std::vector<dml_handler>>(); |
|
|
@ -355,21 +335,25 @@ offcache::CacheData::CacheData(uint8_t* data, const size_t size) { |
|
|
|
offcache::CacheData::CacheData(const offcache::CacheData& other) { |
|
|
|
std::cout << "[-] Copy Created for CacheData 0x" << std::hex << (uint64_t)other.src_ << std::dec << std::endl; |
|
|
|
|
|
|
|
active_ = other.active_; |
|
|
|
const int current_active = active_->fetch_add(1); |
|
|
|
|
|
|
|
src_ = other.src_; |
|
|
|
size_ = other.size_; |
|
|
|
cache_ = other.cache_; |
|
|
|
active_ = other.active_; |
|
|
|
incomplete_cache_ = nullptr; |
|
|
|
handlers_ = nullptr; |
|
|
|
active_->fetch_add(1); |
|
|
|
} |
|
|
|
|
|
|
|
offcache::CacheData::~CacheData() { |
|
|
|
std::cout << "[-] Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|
|
|
|
|
|
|
const int32_t v = active_->fetch_sub(1); |
|
|
|
// due to fetch_sub returning the preivously held value
|
|
|
|
// we must subtract one locally to get the current value
|
|
|
|
|
|
|
|
const int32_t v = active_->fetch_sub(1) - 1; |
|
|
|
|
|
|
|
// if the returned value is non-positive
|
|
|
|
// if the returned value is zero or lower
|
|
|
|
// then we must execute proper deletion
|
|
|
|
// as this was the last reference
|
|
|
|
|
|
|
@ -390,6 +374,22 @@ void offcache::CacheData::Deallocate() { |
|
|
|
incomplete_cache_ = nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
void offcache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const { |
|
|
|
// obtain numa node of current thread to determine where the data is needed
|
|
|
|
|
|
|
|
const int current_cpu = sched_getcpu(); |
|
|
|
const int current_node = numa_node_of_cpu(current_cpu); |
|
|
|
|
|
|
|
// obtain node that the given data pointer is allocated on
|
|
|
|
|
|
|
|
*OUT_SRC_NODE = -1; |
|
|
|
get_mempolicy(OUT_SRC_NODE, NULL, 0, (void*)src, MPOL_F_NODE | MPOL_F_ADDR); |
|
|
|
|
|
|
|
// querry cache policy function for the destination numa node
|
|
|
|
|
|
|
|
*OUT_DST_NODE = cache_policy_function_(current_node, *OUT_SRC_NODE, size); |
|
|
|
} |
|
|
|
|
|
|
|
uint8_t* offcache::CacheData::GetDataLocation() const { |
|
|
|
return cache_->load(); |
|
|
|
} |
|
|
@ -406,12 +406,13 @@ inline void offcache::Cache::Flush() { |
|
|
|
{ |
|
|
|
std::unique_lock<std::shared_mutex> lock(cache_mutex_); |
|
|
|
|
|
|
|
auto it = cache_state_.begin(); |
|
|
|
for (auto& nc : cache_state_) { |
|
|
|
auto it = nc.second.begin(); |
|
|
|
|
|
|
|
while (it != cache_state_.end()) { |
|
|
|
while (it != nc.second.end()) { |
|
|
|
if (it->second.Active() == false) { |
|
|
|
cache_state_.erase(it); |
|
|
|
it = cache_state_.begin(); |
|
|
|
nc.second.erase(it); |
|
|
|
it = nc.second.begin(); |
|
|
|
} |
|
|
|
else { |
|
|
|
it++; |
|
|
@ -419,3 +420,55 @@ inline void offcache::Cache::Flush() { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void offcache::Cache::AbortTask(offcache::CacheData *task) const { |
|
|
|
// first wait on all copy operations to be completed
|
|
|
|
|
|
|
|
task->WaitOnCompletion(); |
|
|
|
|
|
|
|
// abort by doing the following steps
|
|
|
|
// (1) free the allocated memory, (2) remove the "maybe result" as
|
|
|
|
// we will not run the caching operation, (3) clear the sub tasks
|
|
|
|
// for the very same reason, (4) set the result to the RAM-location
|
|
|
|
|
|
|
|
numa_free(task->incomplete_cache_, task->size_); |
|
|
|
task->incomplete_cache_ = nullptr; |
|
|
|
task->cache_->store(task->src_); |
|
|
|
|
|
|
|
std::cout << "[-] Abort completed for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; |
|
|
|
} |
|
|
|
|
|
|
|
std::unique_ptr<offcache::CacheData> offcache::Cache::GetFromCache(uint8_t* src, const size_t size) { |
|
|
|
// the best situation is if this data is already cached
|
|
|
|
// which we check in an unnamed block in which the cache
|
|
|
|
// is locked for reading to prevent another thread
|
|
|
|
// from marking the element we may find as unused and
|
|
|
|
// clearing it
|
|
|
|
|
|
|
|
int dst_node = -1; |
|
|
|
int src_node = -1; |
|
|
|
|
|
|
|
GetCacheNode(src, size, &dst_node, &src_node); |
|
|
|
|
|
|
|
std::shared_lock<std::shared_mutex> lock(cache_mutex_); |
|
|
|
|
|
|
|
const auto search = cache_state_[dst_node].find(src); |
|
|
|
|
|
|
|
if (search != cache_state_[dst_node].end()) { |
|
|
|
if (search->second.size_ == size) { |
|
|
|
search->second.active_->store(true); |
|
|
|
|
|
|
|
std::cout << "[+] Found Cached version for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; |
|
|
|
|
|
|
|
return std::move(std::make_unique<CacheData>(search->second)); |
|
|
|
} |
|
|
|
else { |
|
|
|
std::cout << "[!] Found Cached version with size missmatch for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; |
|
|
|
|
|
|
|
cache_state_[dst_node].erase(search); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nullptr; |
|
|
|
} |