Browse Source
refactor the cacher to reduce complexity, removes the access guarantees (relaxed,immediate,...), uses the fact that other tasks will wait on atomic value change for the cache-pointer if it is nullptr to add the entry to cache structure earlier reducing cost of two threads accessing new entry at the same time, splits the offloading-cache.hpp file into two with one containing the data-class (represents a cache entry and task) and the other containing the cacher itself
master
refactor the cacher to reduce complexity, removes the access guarantees (relaxed,immediate,...), uses the fact that other tasks will wait on atomic value change for the cache-pointer if it is nullptr to add the entry to cache structure earlier reducing cost of two threads accessing new entry at the same time, splits the offloading-cache.hpp file into two with one containing the data-class (represents a cache entry and task) and the other containing the cacher itself
master
Constantin Fürst
12 months ago
4 changed files with 432 additions and 491 deletions
-
139offloading-cacher/cache-data.hpp
-
280offloading-cacher/cache.hpp
-
30offloading-cacher/main.cpp
-
474offloading-cacher/offloading-cache.hpp
@ -0,0 +1,139 @@ |
|||
#pragma once
|
|||
|
|||
#include <iostream>
|
|||
|
|||
#include <atomic>
|
|||
#include <memory>
|
|||
#include <vector>
|
|||
|
|||
#include <dml/dml.hpp>
|
|||
|
|||
namespace dsacache { |
|||
class Cache; |
|||
|
|||
// the cache task structure will be used to submit and
|
|||
// control a cache element, while providing source pointer
|
|||
// and size in bytes for submission
|
|||
//
|
|||
// then the submitting thread may wait on the atomic "result"
|
|||
// which will be notified by the cache worker upon processing
|
|||
// after which the atomic-bool-ptr active will also become valid
|
|||
class CacheData { |
|||
public: |
|||
using dml_handler = dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>>; |
|||
|
|||
private: |
|||
uint8_t* src_; |
|||
size_t size_; |
|||
|
|||
std::atomic<int32_t>* active_; |
|||
|
|||
protected: |
|||
std::atomic<uint8_t*>* cache_; |
|||
|
|||
uint8_t* incomplete_cache_; |
|||
|
|||
std::unique_ptr<std::vector<dml_handler>> handlers_; |
|||
|
|||
friend Cache; |
|||
|
|||
public: |
|||
CacheData(uint8_t* data, const size_t size); |
|||
CacheData(const CacheData& other); |
|||
~CacheData(); |
|||
|
|||
void Deallocate(); |
|||
|
|||
void WaitOnCompletion(); |
|||
|
|||
uint8_t* GetDataLocation() const; |
|||
|
|||
bool Active() const; |
|||
}; |
|||
} |
|||
|
|||
inline void dsacache::CacheData::WaitOnCompletion() { |
|||
if (handlers_ == nullptr) { |
|||
std::cout << "[-] Waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
|
|||
cache_->wait(nullptr); |
|||
|
|||
std::cout << "[+] Finished waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
} |
|||
else { |
|||
std::cout << "[-] Waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
|
|||
for (auto& handler : *handlers_) { |
|||
auto result = handler.get(); |
|||
// TODO: handle the returned status code
|
|||
} |
|||
|
|||
handlers_ = nullptr; |
|||
|
|||
std::cout << "[+] Finished waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
|
|||
cache_->store(incomplete_cache_); |
|||
cache_->notify_all(); |
|||
} |
|||
} |
|||
|
|||
dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { |
|||
std::cout << "[-] New CacheData 0x" << std::hex << (uint64_t)data << std::dec << std::endl; |
|||
|
|||
src_ = data; |
|||
size_ = size; |
|||
active_ = new std::atomic<int32_t>(1); |
|||
cache_ = new std::atomic<uint8_t*>(); |
|||
incomplete_cache_ = nullptr; |
|||
handlers_ = std::make_unique<std::vector<dml_handler>>(); |
|||
} |
|||
|
|||
dsacache::CacheData::CacheData(const dsacache::CacheData& other) { |
|||
std::cout << "[-] Copy Created for CacheData 0x" << std::hex << (uint64_t)other.src_ << std::dec << std::endl; |
|||
|
|||
active_ = other.active_; |
|||
const int current_active = active_->fetch_add(1); |
|||
|
|||
src_ = other.src_; |
|||
size_ = other.size_; |
|||
cache_ = other.cache_; |
|||
incomplete_cache_ = nullptr; |
|||
handlers_ = nullptr; |
|||
} |
|||
|
|||
dsacache::CacheData::~CacheData() { |
|||
std::cout << "[-] Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
|
|||
// due to fetch_sub returning the preivously held value
|
|||
// we must subtract one locally to get the current value
|
|||
|
|||
const int32_t v = active_->fetch_sub(1) - 1; |
|||
|
|||
// if the returned value is zero or lower
|
|||
// then we must execute proper deletion
|
|||
// as this was the last reference
|
|||
|
|||
if (v <= 0) { |
|||
std::cout << "[!] Full Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
|
|||
Deallocate(); |
|||
delete active_; |
|||
delete cache_; |
|||
} |
|||
} |
|||
|
|||
void dsacache::CacheData::Deallocate() { |
|||
std::cout << "[!] Deallocating for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
|
|||
numa_free(cache_, size_); |
|||
cache_ = nullptr; |
|||
incomplete_cache_ = nullptr; |
|||
} |
|||
|
|||
uint8_t* dsacache::CacheData::GetDataLocation() const { |
|||
return cache_->load(); |
|||
} |
|||
|
|||
bool dsacache::CacheData::Active() const { |
|||
return active_->load() > 0; |
|||
} |
@ -0,0 +1,280 @@ |
|||
#pragma once
|
|||
|
|||
#include <iostream>
|
|||
|
|||
#include <unordered_map>
|
|||
#include <shared_mutex>
|
|||
#include <mutex>
|
|||
#include <memory>
|
|||
|
|||
#include <sched.h>
|
|||
#include <numa.h>
|
|||
#include <numaif.h>
|
|||
|
|||
#include <dml/dml.hpp>
|
|||
|
|||
#include "cache-data.hpp"
|
|||
|
|||
namespace dsacache { |
|||
// singleton which holds the cache workers
|
|||
// and is the place where work will be submited
|
|||
class Cache { |
|||
public: |
|||
// cache policy is defined as a type here to allow flexible usage of the cacher
|
|||
// given a numa destination node (where the data will be needed), the numa source
|
|||
// node (current location of the data) and the data size, this function should
|
|||
// return optimal cache placement
|
|||
// dst node and returned value can differ if the system, for example, has HBM
|
|||
// attached accessible directly to node n under a different node id m
|
|||
typedef int (CachePolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size); |
|||
|
|||
// copy policy specifies the copy-executing nodes for a given task
|
|||
// which allows flexibility in assignment for optimizing raw throughput
|
|||
// or choosing a conservative usage policy
|
|||
typedef std::vector<int> (CopyPolicy)(const int numa_dst_node, const int numa_src_node); |
|||
|
|||
private: |
|||
// mutex for accessing the cache state map
|
|||
|
|||
std::shared_mutex cache_mutex_; |
|||
|
|||
// map from [dst-numa-node,map2]
|
|||
// map2 from [data-ptr,cache-structure]
|
|||
|
|||
std::unordered_map<uint8_t, std::unordered_map<uint8_t*, CacheData>> cache_state_; |
|||
|
|||
CachePolicy* cache_policy_function_ = nullptr; |
|||
CopyPolicy* copy_policy_function_ = nullptr; |
|||
|
|||
dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> ExecuteCopy( |
|||
const uint8_t* src, uint8_t* dst, const size_t size, const int node |
|||
) const; |
|||
|
|||
void SubmitTask(CacheData* task, const int dst_node, const int src_node); |
|||
|
|||
void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const; |
|||
|
|||
std::unique_ptr<CacheData> GetFromCache(uint8_t* src, const size_t size, const int dst_node); |
|||
|
|||
public: |
|||
void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); |
|||
|
|||
// function to perform data access through the cache
|
|||
std::unique_ptr<CacheData> Access(uint8_t* data, const size_t size); |
|||
|
|||
void Flush(const int node = -1); |
|||
}; |
|||
} |
|||
|
|||
inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { |
|||
cache_policy_function_ = cache_policy_function; |
|||
copy_policy_function_ = copy_policy_function; |
|||
|
|||
// initialize numa library
|
|||
numa_available(); |
|||
|
|||
const int nodes_max = numa_num_configured_nodes(); |
|||
const bitmask* valid_nodes = numa_get_mems_allowed(); |
|||
|
|||
for (int node = 0; node < nodes_max; node++) { |
|||
if (numa_bitmask_isbitset(valid_nodes, node)) { |
|||
cache_state_.insert({node,{}}); |
|||
} |
|||
} |
|||
|
|||
std::cout << "[-] Cache Initialized" << std::endl; |
|||
} |
|||
|
|||
inline std::unique_ptr<dsacache::CacheData> dsacache::Cache::Access(uint8_t* data, const size_t size) { |
|||
// get destination numa node for the cache
|
|||
|
|||
int dst_node = -1; |
|||
int src_node = -1; |
|||
|
|||
GetCacheNode(data, size, &dst_node, &src_node); |
|||
|
|||
// check whether the data is already cached
|
|||
|
|||
std::unique_ptr<CacheData> task = GetFromCache(data, size, dst_node); |
|||
|
|||
if (task != nullptr) { |
|||
return std::move(task); |
|||
} |
|||
|
|||
// at this point the requested data is not present in cache
|
|||
// and we create a caching task for it
|
|||
|
|||
task = std::make_unique<CacheData>(data, size); |
|||
|
|||
{ |
|||
std::unique_lock<std::shared_mutex> lock(cache_mutex_); |
|||
|
|||
const auto state = cache_state_[dst_node].emplace(task->src_, *task); |
|||
|
|||
// if state.second is false then no insertion took place
|
|||
// which means that concurrently whith this thread
|
|||
// some other thread must have accessed the same
|
|||
// resource in which case we return the other
|
|||
// threads data cache structure
|
|||
|
|||
if (!state.second) { |
|||
std::cout << "[!] Found another cache instance for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; |
|||
return std::move(std::make_unique<CacheData>(state.first->second)); |
|||
} |
|||
} |
|||
|
|||
SubmitTask(task.get(), dst_node, src_node); |
|||
|
|||
return std::move(task); |
|||
} |
|||
|
|||
inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, const int src_node) { |
|||
std::cout << "[+] Allocating " << task->size_ << "B on node " << dst_node << " for " << std::hex << (uint64_t)task->src_ << std::dec << std::endl; |
|||
|
|||
// allocate data on this node and flush the unused parts of the
|
|||
// cache if the operation fails and retry once
|
|||
// TODO: smarter flush strategy could keep some stuff cached
|
|||
|
|||
uint8_t* dst = reinterpret_cast<uint8_t*>(numa_alloc_onnode(task->size_, dst_node)); |
|||
|
|||
if (dst == nullptr) { |
|||
std::cout << "[!] First allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; |
|||
|
|||
// allocation on dst_node failed so we flush the cache for this
|
|||
// node hoping to free enough currently unused entries to make
|
|||
// the second allocation attempt successful
|
|||
|
|||
Flush(dst_node); |
|||
|
|||
dst = reinterpret_cast<uint8_t*>(numa_alloc_onnode(task->size_, dst_node)); |
|||
|
|||
if (dst == nullptr) { |
|||
std::cout << "[x] Second allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; |
|||
return; |
|||
} |
|||
} |
|||
|
|||
task->incomplete_cache_ = dst; |
|||
|
|||
// querry copy policy function for the nodes to use for the copy
|
|||
|
|||
const std::vector<int> executing_nodes = copy_policy_function_(dst_node, src_node); |
|||
const size_t task_count = executing_nodes.size(); |
|||
|
|||
// each task will copy one fair part of the total size
|
|||
// and in case the total size is not a factor of the
|
|||
// given task count the last node must copy the remainder
|
|||
|
|||
const size_t size = task->size_ / task_count; |
|||
const size_t last_size = size + task->size_ % task_count; |
|||
|
|||
std::cout << "[-] Splitting Copy into " << task_count << " tasks of " << size << "B 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; |
|||
|
|||
// save the current numa node mask to restore later
|
|||
// as executing the copy task will place this thread
|
|||
// on a different node
|
|||
|
|||
bitmask* nodemask = numa_get_run_node_mask(); |
|||
|
|||
for (uint32_t i = 0; i < task_count; i++) { |
|||
const size_t local_size = i + 1 == task_count ? size : last_size; |
|||
const size_t local_offset = i * size; |
|||
const uint8_t* local_src = task->src_ + local_offset; |
|||
uint8_t* local_dst = dst + local_offset; |
|||
|
|||
task->handlers_->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); |
|||
} |
|||
|
|||
// restore the previous nodemask
|
|||
|
|||
numa_run_on_node_mask(nodemask); |
|||
} |
|||
|
|||
inline dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> dsacache::Cache::ExecuteCopy( |
|||
const uint8_t* src, uint8_t* dst, const size_t size, const int node |
|||
) const { |
|||
dml::const_data_view srcv = dml::make_view(src, size); |
|||
dml::data_view dstv = dml::make_view(dst, size); |
|||
|
|||
numa_run_on_node(node); |
|||
|
|||
return dml::submit<dml::automatic>(dml::mem_copy.block_on_fault(), srcv, dstv); |
|||
} |
|||
|
|||
|
|||
void dsacache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const { |
|||
// obtain numa node of current thread to determine where the data is needed
|
|||
|
|||
const int current_cpu = sched_getcpu(); |
|||
const int current_node = numa_node_of_cpu(current_cpu); |
|||
|
|||
// obtain node that the given data pointer is allocated on
|
|||
|
|||
*OUT_SRC_NODE = -1; |
|||
get_mempolicy(OUT_SRC_NODE, NULL, 0, (void*)src, MPOL_F_NODE | MPOL_F_ADDR); |
|||
|
|||
// querry cache policy function for the destination numa node
|
|||
|
|||
*OUT_DST_NODE = cache_policy_function_(current_node, *OUT_SRC_NODE, size); |
|||
} |
|||
|
|||
|
|||
inline void dsacache::Cache::Flush(const int node) { |
|||
std::cout << "[-] Flushing Cache for " << (node == -1 ? "all nodes" : "node " + std::to_string(node)) << std::endl; |
|||
|
|||
const auto FlushNode = [](std::unordered_map<uint8_t*,CacheData>& map) { |
|||
auto it = map.begin(); |
|||
|
|||
while (it != map.end()) { |
|||
if (it->second.Active() == false) { |
|||
map.erase(it); |
|||
it = map.begin(); |
|||
} |
|||
else { |
|||
it++; |
|||
} |
|||
} |
|||
}; |
|||
|
|||
{ |
|||
std::unique_lock<std::shared_mutex> lock(cache_mutex_); |
|||
|
|||
if (node == -1) { |
|||
for (auto& nc : cache_state_) { |
|||
FlushNode(nc.second); |
|||
} |
|||
} |
|||
else { |
|||
FlushNode(cache_state_[node]); |
|||
} |
|||
} |
|||
} |
|||
|
|||
std::unique_ptr<dsacache::CacheData> dsacache::Cache::GetFromCache(uint8_t* src, const size_t size, const int dst_node) { |
|||
// the best situation is if this data is already cached
|
|||
// which we check in an unnamed block in which the cache
|
|||
// is locked for reading to prevent another thread
|
|||
// from marking the element we may find as unused and
|
|||
// clearing it
|
|||
|
|||
std::shared_lock<std::shared_mutex> lock(cache_mutex_); |
|||
|
|||
const auto search = cache_state_[dst_node].find(src); |
|||
|
|||
if (search != cache_state_[dst_node].end()) { |
|||
if (search->second.size_ == size) { |
|||
search->second.active_->store(true); |
|||
|
|||
std::cout << "[+] Found Cached version for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; |
|||
|
|||
return std::move(std::make_unique<CacheData>(search->second)); |
|||
} |
|||
else { |
|||
std::cout << "[!] Found Cached version with size missmatch for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; |
|||
|
|||
cache_state_[dst_node].erase(search); |
|||
} |
|||
} |
|||
|
|||
return nullptr; |
|||
} |
@ -1,474 +0,0 @@ |
|||
#pragma once
|
|||
|
|||
#include <iostream>
|
|||
|
|||
#include <atomic>
|
|||
#include <vector>
|
|||
#include <thread>
|
|||
#include <unordered_map>
|
|||
#include <shared_mutex>
|
|||
#include <mutex>
|
|||
#include <memory>
|
|||
|
|||
#include <semaphore.h>
|
|||
|
|||
#include <sched.h>
|
|||
#include <numa.h>
|
|||
#include <numaif.h>
|
|||
|
|||
#include <dml/dml.hpp>
|
|||
|
|||
namespace offcache { |
|||
// execution policy selects in which way the data is supposed to be cached
|
|||
// and returned with the following behaviour is guaranteed in addition to the
|
|||
// returned value being valid:
|
|||
// Immediate: return as fast as possible
|
|||
// may return cached data, can return data in RAM
|
|||
// will trigger caching of the data provided
|
|||
// ImmediateNoCache: return as fast as possible and never trigger caching
|
|||
// same as Immediate but will not trigger caching
|
|||
// Relaxed: no rapid return needed, take time
|
|||
// will trigger caching and may only return
|
|||
// once the caching is successful but can still
|
|||
// provide data in RAM
|
|||
enum class ExecutionPolicy { |
|||
Relaxed, Immediate, ImmediateNoCache |
|||
}; |
|||
|
|||
class Cache; |
|||
|
|||
// the cache task structure will be used to submit and
|
|||
// control a cache element, while providing source pointer
|
|||
// and size in bytes for submission
|
|||
//
|
|||
// then the submitting thread may wait on the atomic "result"
|
|||
// which will be notified by the cache worker upon processing
|
|||
// after which the atomic-bool-ptr active will also become valid
|
|||
class CacheData { |
|||
public: |
|||
using dml_handler = dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>>; |
|||
|
|||
private: |
|||
uint8_t* src_; |
|||
size_t size_; |
|||
|
|||
std::atomic<int32_t>* active_; |
|||
|
|||
protected: |
|||
std::atomic<uint8_t*>* cache_; |
|||
|
|||
uint8_t* incomplete_cache_; |
|||
|
|||
std::unique_ptr<std::vector<dml_handler>> handlers_; |
|||
|
|||
friend Cache; |
|||
|
|||
public: |
|||
CacheData(uint8_t* data, const size_t size); |
|||
CacheData(const CacheData& other); |
|||
~CacheData(); |
|||
|
|||
void Deallocate(); |
|||
void WaitOnCompletion(); |
|||
|
|||
uint8_t* GetDataLocation() const; |
|||
|
|||
bool Active() const; |
|||
}; |
|||
|
|||
// singleton which holds the cache workers
|
|||
// and is the place where work will be submited
|
|||
class Cache { |
|||
public: |
|||
// cache policy is defined as a type here to allow flexible usage of the cacher
|
|||
// given a numa destination node (where the data will be needed), the numa source
|
|||
// node (current location of the data) and the data size, this function should
|
|||
// return optimal cache placement
|
|||
// dst node and returned value can differ if the system, for example, has HBM
|
|||
// attached accessible directly to node n under a different node id m
|
|||
typedef int (CachePolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size); |
|||
|
|||
// copy policy specifies the copy-executing nodes for a given task
|
|||
// which allows flexibility in assignment for optimizing raw throughput
|
|||
// or choosing a conservative usage policy
|
|||
typedef std::vector<int> (CopyPolicy)(const int numa_dst_node, const int numa_src_node); |
|||
|
|||
private: |
|||
// mutex for accessing the cache state map
|
|||
|
|||
std::shared_mutex cache_mutex_; |
|||
|
|||
// map from [dst-numa-node,map2]
|
|||
// map2 from [data-ptr,cache-structure]
|
|||
|
|||
std::unordered_map<uint8_t, std::unordered_map<uint8_t*, CacheData>> cache_state_; |
|||
|
|||
CachePolicy* cache_policy_function_ = nullptr; |
|||
CopyPolicy* copy_policy_function_ = nullptr; |
|||
|
|||
dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) const; |
|||
|
|||
void SubmitTask(CacheData* task); |
|||
|
|||
void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const; |
|||
|
|||
void AbortTask(CacheData* task) const; |
|||
|
|||
std::unique_ptr<CacheData> GetFromCache(uint8_t* src, const size_t size); |
|||
|
|||
public: |
|||
void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); |
|||
|
|||
// function to perform data access through the cache
|
|||
// behaviour depends on the chosen execution policy
|
|||
// Immediate and ImmediateNoCache return a cache task
|
|||
// with guaranteed-valid result value where Relaxed
|
|||
// policy does not come with this guarantee.
|
|||
std::unique_ptr<CacheData> Access(uint8_t* data, const size_t size, const ExecutionPolicy policy); |
|||
|
|||
void Flush(); |
|||
}; |
|||
} |
|||
|
|||
inline void offcache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { |
|||
cache_policy_function_ = cache_policy_function; |
|||
copy_policy_function_ = copy_policy_function; |
|||
|
|||
// initialize numa library
|
|||
numa_available(); |
|||
|
|||
const int nodes_max = numa_num_configured_nodes(); |
|||
const bitmask* valid_nodes = numa_get_mems_allowed(); |
|||
|
|||
for (int node = 0; node < nodes_max; node++) { |
|||
if (numa_bitmask_isbitset(valid_nodes, node)) { |
|||
cache_state_.insert({node,{}}); |
|||
} |
|||
} |
|||
|
|||
std::cout << "[-] Cache Initialized" << std::endl; |
|||
} |
|||
|
|||
inline std::unique_ptr<offcache::CacheData> offcache::Cache::Access(uint8_t* data, const size_t size, const ExecutionPolicy policy) { |
|||
std::unique_ptr<CacheData> task = GetFromCache(data, size); |
|||
|
|||
if (task != nullptr) { |
|||
return std::move(task); |
|||
} |
|||
|
|||
// at this point the requested data is not present in cache
|
|||
// and we create a caching task for it
|
|||
|
|||
task = std::make_unique<CacheData>(data, size); |
|||
|
|||
if (policy == ExecutionPolicy::Immediate) { |
|||
// in intermediate mode the returned task
|
|||
// object is guaranteed to be valid and therefore
|
|||
// its resulting location must be validated
|
|||
// after which we submit the task
|
|||
// maybe_result is then set by submit
|
|||
|
|||
task->cache_->store(data); |
|||
SubmitTask(task.get()); |
|||
return std::move(task); |
|||
} |
|||
else if (policy == ExecutionPolicy::ImmediateNoCache) { |
|||
// for immediatenocache we just validate
|
|||
// the generated task and return it
|
|||
// we must also set maybe_result in case
|
|||
// someone waits on this
|
|||
|
|||
task->cache_->store(data); |
|||
task->incomplete_cache_ = data; |
|||
return std::move(task); |
|||
} |
|||
else if (policy == ExecutionPolicy::Relaxed) { |
|||
// for relaxed no valid task must be returned
|
|||
// and therefore we just submit and then give
|
|||
// the possible invalid task back with only
|
|||
// maybe_result set by submission
|
|||
|
|||
SubmitTask(task.get()); |
|||
return std::move(task); |
|||
} |
|||
else { |
|||
// this should not be reached
|
|||
} |
|||
} |
|||
|
|||
inline void offcache::Cache::SubmitTask(CacheData* task) { |
|||
// get destination numa node for the cache
|
|||
|
|||
int dst_node = -1; |
|||
int src_node = -1; |
|||
|
|||
GetCacheNode(task->src_, task->size_, &dst_node, &src_node); |
|||
|
|||
std::cout << "[+] Allocating " << task->size_ << "B on node " << dst_node << " for " << std::hex << (uint64_t)task->src_ << std::dec << std::endl; |
|||
|
|||
// allocate data on this node and flush the unused parts of the
|
|||
// cache if the operation fails and retry once
|
|||
// TODO: smarter flush strategy could keep some stuff cached
|
|||
|
|||
uint8_t* dst = reinterpret_cast<uint8_t*>(numa_alloc_onnode(task->size_, dst_node)); |
|||
|
|||
if (dst == nullptr) { |
|||
std::cout << "[!] First allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; |
|||
|
|||
Flush(); |
|||
|
|||
dst = reinterpret_cast<uint8_t*>(numa_alloc_onnode(task->size_, dst_node)); |
|||
|
|||
if (dst == nullptr) { |
|||
std::cout << "[x] Second allocation try failed for " << task->size_ << "B on node " << dst_node << std::endl; |
|||
return; |
|||
} |
|||
} |
|||
|
|||
task->incomplete_cache_ = dst; |
|||
|
|||
// querry copy policy function for the nodes to use for the copy
|
|||
|
|||
const std::vector<int> executing_nodes = copy_policy_function_(dst_node, src_node); |
|||
const size_t task_count = executing_nodes.size(); |
|||
|
|||
// each task will copy one fair part of the total size
|
|||
// and in case the total size is not a factor of the
|
|||
// given task count the last node must copy the remainder
|
|||
|
|||
const size_t size = task->size_ / task_count; |
|||
const size_t last_size = size + task->size_ % task_count; |
|||
|
|||
std::cout << "[-] Splitting Copy into " << task_count << " tasks of " << size << "B 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; |
|||
|
|||
// save the current numa node mask to restore later
|
|||
// as executing the copy task will place this thread
|
|||
// on a different node
|
|||
|
|||
bitmask* nodemask = numa_get_run_node_mask(); |
|||
|
|||
for (uint32_t i = 0; i < task_count; i++) { |
|||
const size_t local_size = i + 1 == task_count ? size : last_size; |
|||
const size_t local_offset = i * size; |
|||
const uint8_t* local_src = task->src_ + local_offset; |
|||
uint8_t* local_dst = dst + local_offset; |
|||
|
|||
task->handlers_->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); |
|||
} |
|||
|
|||
// only at this point may the task be added to the control structure
|
|||
// because adding it earlier could cause it to be returned for an
|
|||
// access request while the handler-vector is not fully populated
|
|||
// which could cause the wait-function to return prematurely
|
|||
// TODO: this can be optimized because the abort is quite expensive
|
|||
|
|||
{ |
|||
std::unique_lock<std::shared_mutex> lock(cache_mutex_); |
|||
|
|||
const auto state = cache_state_[dst_node].emplace(task->src_, *task); |
|||
|
|||
// if state.second is false then no insertion took place
|
|||
// which means that concurrently whith this thread
|
|||
// some other thread must have accessed the same
|
|||
// resource in which case we must perform an abort
|
|||
// TODO: abort is not the only way to handle this situation
|
|||
|
|||
if (!state.second) { |
|||
std::cout << "[x] Found another cache instance for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; |
|||
|
|||
AbortTask(task); |
|||
|
|||
return; |
|||
} |
|||
} |
|||
|
|||
// restore the previous nodemask
|
|||
|
|||
numa_run_on_node_mask(nodemask); |
|||
} |
|||
|
|||
inline dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> offcache::Cache::ExecuteCopy(const uint8_t* src, uint8_t* dst, const size_t size, const int node) const { |
|||
dml::const_data_view srcv = dml::make_view(src, size); |
|||
dml::data_view dstv = dml::make_view(dst, size); |
|||
|
|||
numa_run_on_node(node); |
|||
|
|||
return dml::submit<dml::automatic>(dml::mem_copy.block_on_fault(), srcv, dstv); |
|||
} |
|||
|
|||
inline void offcache::CacheData::WaitOnCompletion() { |
|||
if (handlers_ == nullptr) { |
|||
std::cout << "[-] Waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
|
|||
cache_->wait(nullptr); |
|||
|
|||
std::cout << "[+] Finished waiting on cache-var-update for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
} |
|||
else { |
|||
std::cout << "[-] Waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
|
|||
for (auto& handler : *handlers_) { |
|||
auto result = handler.get(); |
|||
// TODO: handle the returned status code
|
|||
} |
|||
|
|||
handlers_ = nullptr; |
|||
|
|||
std::cout << "[+] Finished waiting on handlers for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
|
|||
cache_->store(incomplete_cache_); |
|||
cache_->notify_all(); |
|||
} |
|||
} |
|||
|
|||
offcache::CacheData::CacheData(uint8_t* data, const size_t size) { |
|||
std::cout << "[-] New CacheData 0x" << std::hex << (uint64_t)data << std::dec << std::endl; |
|||
|
|||
src_ = data; |
|||
size_ = size; |
|||
active_ = new std::atomic<int32_t>(1); |
|||
cache_ = new std::atomic<uint8_t*>(); |
|||
incomplete_cache_ = nullptr; |
|||
handlers_ = std::make_unique<std::vector<dml_handler>>(); |
|||
} |
|||
|
|||
offcache::CacheData::CacheData(const offcache::CacheData& other) { |
|||
std::cout << "[-] Copy Created for CacheData 0x" << std::hex << (uint64_t)other.src_ << std::dec << std::endl; |
|||
|
|||
active_ = other.active_; |
|||
const int current_active = active_->fetch_add(1); |
|||
|
|||
src_ = other.src_; |
|||
size_ = other.size_; |
|||
cache_ = other.cache_; |
|||
incomplete_cache_ = nullptr; |
|||
handlers_ = nullptr; |
|||
} |
|||
|
|||
offcache::CacheData::~CacheData() { |
|||
std::cout << "[-] Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
|
|||
// due to fetch_sub returning the preivously held value
|
|||
// we must subtract one locally to get the current value
|
|||
|
|||
const int32_t v = active_->fetch_sub(1) - 1; |
|||
|
|||
// if the returned value is zero or lower
|
|||
// then we must execute proper deletion
|
|||
// as this was the last reference
|
|||
|
|||
if (v <= 0) { |
|||
std::cout << "[!] Full Destructor for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
|
|||
Deallocate(); |
|||
delete active_; |
|||
delete cache_; |
|||
} |
|||
} |
|||
|
|||
void offcache::CacheData::Deallocate() { |
|||
std::cout << "[!] Deallocating for CacheData 0x" << std::hex << (uint64_t)src_ << std::dec << std::endl; |
|||
|
|||
numa_free(cache_, size_); |
|||
cache_ = nullptr; |
|||
incomplete_cache_ = nullptr; |
|||
} |
|||
|
|||
void offcache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const { |
|||
// obtain numa node of current thread to determine where the data is needed
|
|||
|
|||
const int current_cpu = sched_getcpu(); |
|||
const int current_node = numa_node_of_cpu(current_cpu); |
|||
|
|||
// obtain node that the given data pointer is allocated on
|
|||
|
|||
*OUT_SRC_NODE = -1; |
|||
get_mempolicy(OUT_SRC_NODE, NULL, 0, (void*)src, MPOL_F_NODE | MPOL_F_ADDR); |
|||
|
|||
// querry cache policy function for the destination numa node
|
|||
|
|||
*OUT_DST_NODE = cache_policy_function_(current_node, *OUT_SRC_NODE, size); |
|||
} |
|||
|
|||
uint8_t* offcache::CacheData::GetDataLocation() const { |
|||
return cache_->load(); |
|||
} |
|||
|
|||
bool offcache::CacheData::Active() const { |
|||
return active_->load() > 0; |
|||
} |
|||
|
|||
inline void offcache::Cache::Flush() { |
|||
std::cout << "[-] Flushing Cache" << std::endl; |
|||
|
|||
// TODO: there is a better way to implement this flush
|
|||
|
|||
{ |
|||
std::unique_lock<std::shared_mutex> lock(cache_mutex_); |
|||
|
|||
for (auto& nc : cache_state_) { |
|||
auto it = nc.second.begin(); |
|||
|
|||
while (it != nc.second.end()) { |
|||
if (it->second.Active() == false) { |
|||
nc.second.erase(it); |
|||
it = nc.second.begin(); |
|||
} |
|||
else { |
|||
it++; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
void offcache::Cache::AbortTask(offcache::CacheData *task) const { |
|||
// first wait on all copy operations to be completed
|
|||
|
|||
task->WaitOnCompletion(); |
|||
|
|||
// abort by doing the following steps
|
|||
// (1) free the allocated memory, (2) remove the "maybe result" as
|
|||
// we will not run the caching operation, (3) clear the sub tasks
|
|||
// for the very same reason, (4) set the result to the RAM-location
|
|||
|
|||
numa_free(task->incomplete_cache_, task->size_); |
|||
task->incomplete_cache_ = nullptr; |
|||
task->cache_->store(task->src_); |
|||
|
|||
std::cout << "[-] Abort completed for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; |
|||
} |
|||
|
|||
std::unique_ptr<offcache::CacheData> offcache::Cache::GetFromCache(uint8_t* src, const size_t size) { |
|||
// the best situation is if this data is already cached
|
|||
// which we check in an unnamed block in which the cache
|
|||
// is locked for reading to prevent another thread
|
|||
// from marking the element we may find as unused and
|
|||
// clearing it
|
|||
|
|||
int dst_node = -1; |
|||
int src_node = -1; |
|||
|
|||
GetCacheNode(src, size, &dst_node, &src_node); |
|||
|
|||
std::shared_lock<std::shared_mutex> lock(cache_mutex_); |
|||
|
|||
const auto search = cache_state_[dst_node].find(src); |
|||
|
|||
if (search != cache_state_[dst_node].end()) { |
|||
if (search->second.size_ == size) { |
|||
search->second.active_->store(true); |
|||
|
|||
std::cout << "[+] Found Cached version for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; |
|||
|
|||
return std::move(std::make_unique<CacheData>(search->second)); |
|||
} |
|||
else { |
|||
std::cout << "[!] Found Cached version with size missmatch for 0x" << std::hex << (uint64_t)src << std::dec << std::endl; |
|||
|
|||
cache_state_[dst_node].erase(search); |
|||
} |
|||
} |
|||
|
|||
return nullptr; |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue