Browse Source

handle memory allocation outside of the cache, pre-allocate in benchmark and memset to hopefully guarantee no pagefaults will be encountered

master
Constantin Fürst 11 months ago
parent
commit
e99bf619c2
  1. 118
      offloading-cacher/cache.hpp
  2. 68
      qdp_project/src/Benchmark.cpp
  3. 4
      qdp_project/src/Configuration.hpp
  4. 36
      qdp_project/src/utils/BenchmarkHelpers.cpp

118
offloading-cacher/cache.hpp

@ -57,6 +57,28 @@ namespace dsacache {
class Cache;
// cache policy is defined as a type here to allow flexible usage of the cacher
// given a numa destination node (where the data will be needed), the numa source
// node (current location of the data) and the data size, this function should
// return optimal cache placement
// dst node and returned value can differ if the system, for example, has HBM
// attached accessible directly to node n under a different node id m
typedef int (CachePolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size);
// copy policy specifies the copy-executing nodes for a given task
// which allows flexibility in assignment for optimizing raw throughput
// or choosing a conservative usage policy
typedef std::vector<int> (CopyPolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size);
// memory allocation is a complex topic but can have big performance
// impact, we therefore do not handle it in the cache. must return
// pointer to a block of at least the given size, cache will also
// not handle deallocation but signal that the block is free
typedef uint8_t* (MemoryAllocator_Allocate)(const int numa_node, const size_t size);
// signals that the given memory block will not be used by cache anymore
typedef void (MemoryAllocator_Free)(uint8_t* pointer, const size_t size);
/*
* Class Description:
* Holds all required information on one cache entry and is used
@ -93,6 +115,8 @@ namespace dsacache {
// set to false if we do not own the cache pointer
bool delete_ = false;
MemoryAllocator_Free* memory_free_function_;
// data source and size of the block
uint8_t* src_;
size_t size_;
@ -135,7 +159,7 @@ namespace dsacache {
friend Cache;
public:
CacheData(uint8_t* data, const size_t size);
CacheData(uint8_t* data, const size_t size, MemoryAllocator_Free* free);
CacheData(const CacheData& other);
~CacheData();
@ -225,20 +249,6 @@ namespace dsacache {
*/
class Cache {
public:
// cache policy is defined as a type here to allow flexible usage of the cacher
// given a numa destination node (where the data will be needed), the numa source
// node (current location of the data) and the data size, this function should
// return optimal cache placement
// dst node and returned value can differ if the system, for example, has HBM
// attached accessible directly to node n under a different node id m
typedef int (CachePolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size);
// copy policy specifies the copy-executing nodes for a given task
// which allows flexibility in assignment for optimizing raw throughput
// or choosing a conservative usage policy
typedef std::vector<int> (CopyPolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size);
private:
// flags to store options duh
@ -261,6 +271,8 @@ namespace dsacache {
CachePolicy* cache_policy_function_ = nullptr;
CopyPolicy* copy_policy_function_ = nullptr;
MemoryAllocator_Allocate* memory_allocate_function_ = nullptr;
MemoryAllocator_Free* memory_free_function_ = nullptr;
// function used to submit a copy task on a specific node to the dml
// engine on that node - will change the current threads node assignment
@ -281,12 +293,6 @@ namespace dsacache {
// as this is set as the "optimal placement" node
void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const;
// allocates memory of size "size" on the numa node "node"
// and returns nullptr if this is not possible, also may
// try to flush the cache of the requested node to
// alleviate encountered shortage
uint8_t* AllocOnNode(const size_t size, const int node);
// checks whether the cache contains an entry for
// the given data in the given memory node and
// returns it, otherwise returns nullptr
@ -299,7 +305,11 @@ namespace dsacache {
// initializes the cache with the two policy functions
// only after this is it safe to use in a threaded environment
void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function);
void Init(
CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function,
MemoryAllocator_Allocate* memory_allocate_function,
MemoryAllocator_Free* memory_free_function
);
// function to perform data access through the cache, behaviour depends
// on flags, by default will also perform prefetch, otherwise with
@ -336,9 +346,15 @@ inline void dsacache::Cache::Clear() {
}
}
inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) {
inline void dsacache::Cache::Init(
CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function,
MemoryAllocator_Allocate* memory_allocate_function,
MemoryAllocator_Free* memory_free_function
) {
cache_policy_function_ = cache_policy_function;
copy_policy_function_ = copy_policy_function;
memory_allocate_function_ = memory_allocate_function;
memory_free_function_ = memory_free_function;
// initialize numa library
@ -382,7 +398,7 @@ inline std::unique_ptr<dsacache::CacheData> dsacache::Cache::Access(uint8_t* dat
// at this point the requested data is not present in cache
// and we create a caching task for it, copying our current flags
task = std::make_unique<CacheData>(data, size);
task = std::make_unique<CacheData>(data, size, memory_free_function_);
task->SetFlags(flags_);
// when the ACCESS_WEAK flag is set for the flags parameter (!)
@ -434,51 +450,8 @@ inline std::unique_ptr<dsacache::CacheData> dsacache::Cache::Access(uint8_t* dat
return std::move(task);
}
inline uint8_t* dsacache::Cache::AllocOnNode(const size_t size, const int node) {
// allocate data on this node and flush the unused parts of the
// cache if the operation fails and retry once
// TODO: smarter flush strategy could keep some stuff cached
// check currently free memory to see if the data fits
long long int free_space = 0;
numa_node_size64(node, &free_space);
if (free_space < size) {
// dst node lacks memory space so we flush the cache for this
// node hoping to free enough currently unused entries to make
// the second allocation attempt successful
Flush(node);
// re-test by getting the free space and checking again
numa_node_size64(node, &free_space);
if (free_space < size) {
return nullptr;
}
}
uint8_t* dst = reinterpret_cast<uint8_t*>(numa_alloc_onnode(size, node));
if (dst == nullptr) {
return nullptr;
}
if (CheckFlag(flags_, FLAG_FORCE_MAP_PAGES)) {
static const size_t page_size_b = getpagesize();
for (size_t i = 0; i < size; i += page_size_b) {
dst[i] = 0;
}
}
return dst;
}
inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, const int src_node) {
uint8_t* dst = AllocOnNode(task->GetSize(), dst_node);
uint8_t* dst = memory_allocate_function_(dst_node, task->GetSize());
if (dst == nullptr) {
return;
@ -667,10 +640,11 @@ inline dsacache::Cache::~Cache() {
}
}
inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) {
inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size, MemoryAllocator_Free* free) {
src_ = data;
size_ = size;
delete_ = false;
memory_free_function_ = free;
active_ = new std::atomic<int32_t>(1);
cache_ = new std::atomic<uint8_t*>(data);
handlers_ = new std::atomic<std::vector<dml_handler>*>();
@ -689,6 +663,8 @@ inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) {
cache_ = other.cache_;
flags_ = other.flags_;
memory_free_function_ = other.memory_free_function_;
incomplete_cache_ = other.incomplete_cache_;
handlers_ = other.handlers_;
invalid_handlers_ = other.invalid_handlers_;
@ -733,8 +709,8 @@ inline void dsacache::CacheData::Deallocate() {
// takes place for the retrieved local cache
uint8_t* cache_local = cache_->exchange(nullptr);
if (cache_local != nullptr && delete_) numa_free(cache_local, size_);
else if (*incomplete_cache_ != nullptr) numa_free(*incomplete_cache_, size_);
if (cache_local != nullptr && delete_) memory_free_function_(cache_local, size_);
else if (*incomplete_cache_ != nullptr) memory_free_function_(*incomplete_cache_, size_);
else;
}

68
qdp_project/src/Benchmark.cpp

@ -38,47 +38,12 @@ void scan_b(size_t gid, size_t tid) {
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
constexpr size_t VIRT_TID_INCREMENT = TC_SCANB / TC_AGGRJ;
constexpr size_t SUBCHUNK_THREAD_RATIO = TC_AGGRJ / (TC_SCANB == 0 ? 1 : TC_SCANB);
constexpr bool CACHE_SUBCHUNKING = SUBCHUNK_THREAD_RATIO > 1;
constexpr bool CACHE_OVERCHUNKING = VIRT_TID_INCREMENT > 1;
if constexpr (PERFORM_CACHING) {
if constexpr (CACHE_SUBCHUNKING) {
constexpr size_t SUBCHUNK_COUNT = SUBCHUNK_THREAD_RATIO > 0 ? SUBCHUNK_THREAD_RATIO : 1;
constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / SUBCHUNK_COUNT;
constexpr size_t SUBCHUNK_SIZE_ELEMENTS = CHUNK_SIZE_ELEMENTS / SUBCHUNK_COUNT;
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_SCANB>(DATA_B_, chunk_index, tid);
for (size_t j = 0; j < SUBCHUNK_COUNT; j++) {
uint64_t* sub_chunk_ptr = &chunk_ptr[j * SUBCHUNK_SIZE_ELEMENTS];
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid);
CACHE_.Access(reinterpret_cast<uint8_t*>(sub_chunk_ptr), SUBCHUNK_SIZE_B);
}
}
}
else if constexpr (CACHE_OVERCHUNKING) {
for (size_t tid_virt = tid; tid_virt < TC_AGGRJ; tid_virt += VIRT_TID_INCREMENT) {
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid_virt);
CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), CHUNK_SIZE_B);
}
}
}
else {
constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ;
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid);
CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), SUBCHUNK_SIZE_B);
}
CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), AGGRJ_CHUNK_SIZE_B);
}
}
@ -102,7 +67,7 @@ void scan_a(size_t gid, size_t tid) {
uint64_t* chunk_ptr = get_chunk<TC_SCANA>(DATA_A_, chunk_index, tid);
uint16_t* mask_ptr = get_mask<TC_SCANA>(MASK_A_, chunk_index, tid);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, SCANA_CHUNK_SIZE_B);
}
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
@ -113,8 +78,6 @@ void scan_a(size_t gid, size_t tid) {
}
void aggr_j(size_t gid, size_t tid) {
constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ;
CACHE_HITS_[UniqueIndex(gid,tid)] = 0;
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].clear();
@ -139,7 +102,7 @@ void aggr_j(size_t gid, size_t tid) {
uint64_t* data_ptr;
if constexpr (PERFORM_CACHING) {
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), SUBCHUNK_SIZE_B, dsacache::FLAG_ACCESS_WEAK);
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), AGGRJ_CHUNK_SIZE_B, dsacache::FLAG_ACCESS_WEAK);
data->WaitOnCompletion();
data_ptr = reinterpret_cast<uint64_t*>(data->GetDataLocation());
@ -157,8 +120,7 @@ void aggr_j(size_t gid, size_t tid) {
data_ptr = chunk_ptr;
}
uint64_t tmp = _mm512_reduce_add_epi64(aggregator);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, AGGRJ_CHUNK_SIZE_B);
}
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now();
@ -189,6 +151,8 @@ int main() {
MASK_A_ = (uint16_t*) numa_alloc_onnode(WL_SIZE_ELEMENTS, MEM_NODE_HBM);
DATA_DST_ = (uint64_t*) numa_alloc_onnode(TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t), MEM_NODE_HBM);
std::vector<void*> cache_memory_locations;
if constexpr (PERFORM_CACHING) {
// cache will be configured to wait weak by default
// it will also not handle page faults which cause delay
@ -196,9 +160,15 @@ int main() {
// which is configured for xeonmax with smart assignment
uint64_t cache_flags = 0;
cache_flags |= dsacache::FLAG_WAIT_WEAK;
cache_flags |= dsacache::FLAG_FORCE_MAP_PAGES;
CACHE_.SetFlags(cache_flags);
CACHE_.Init(CachePlacementPolicy, CopyMethodPolicy);
CACHE_.Init(CachePlacementPolicy, CopyMethodPolicy, MemoryAllocator_Allocate, MemoryAllocator_Free);
for (uint32_t i = 0; i < GROUP_COUNT * TC_AGGRJ; i++) {
void* ptr = numa_alloc_onnode(AGGRJ_CHUNK_SIZE_B, MEM_NODE_HBM);
cache_memory_locations.emplace_back(ptr);
CACHE_LOCATIONS_.push(reinterpret_cast<uint8_t*>(ptr));
memset(ptr, 0xAB, AGGRJ_CHUNK_SIZE_B);
}
}
fill_mt<uint64_t>(DATA_A_, WL_SIZE_B, 0, 100, 42);
@ -275,5 +245,9 @@ int main() {
numa_free(MASK_A_, WL_SIZE_ELEMENTS);
numa_free(DATA_DST_, TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t));
for (void* ptr : cache_memory_locations) {
numa_free(ptr, AGGRJ_CHUNK_SIZE_B);
}
return 0;
}

4
qdp_project/src/Configuration.hpp

@ -55,3 +55,7 @@ constexpr size_t RUN_COUNT = CHUNK_COUNT / GROUP_COUNT;
static_assert(RUN_COUNT > 0);
static_assert(WL_SIZE_B % 16 == 0);
static_assert(CHUNK_SIZE_B % 16 == 0);
static_assert(PERFORM_CACHING ? TC_SCANB == TC_AGGRJ : true);
constexpr size_t SCANA_CHUNK_SIZE_B = CHUNK_SIZE_B / TC_SCANA;
constexpr size_t AGGRJ_CHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ;

36
qdp_project/src/utils/BenchmarkHelpers.cpp

@ -1,4 +1,6 @@
#include <vector>
#include <queue>
#include <mutex>
#include "../Configuration.hpp"
@ -15,6 +17,9 @@ std::array<std::vector<std::vector<std::array<std::chrono::steady_clock::time_po
std::array<uint32_t, GROUP_COUNT * TC_AGGRJ> CACHE_HITS_;
std::mutex CACHE_LOCATIONS_MTX_;
std::queue<uint8_t*> CACHE_LOCATIONS_;
inline size_t UniqueIndex(const uint32_t gid, const uint32_t tid) {
return tid * GROUP_COUNT + gid;
}
@ -27,26 +32,25 @@ uint64_t sum_check(uint64_t compare_value, uint64_t* row_A, uint64_t* row_B, siz
return sum;
}
uint8_t* MemoryAllocator_Allocate(const int node, const size_t size) {
std::lock_guard<std::mutex> lock(CACHE_LOCATIONS_MTX_);
uint8_t* ptr = CACHE_LOCATIONS_.front();
CACHE_LOCATIONS_.pop();
return ptr;
}
void MemoryAllocator_Free(uint8_t* ptr, const size_t size) {
return;
}
int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node;
return MEM_NODE_HBM;
}
std::vector<int> CopyMethodPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
// we always run on n0 and can cut the amount of code here therefore
// for small data it is more efficient to run on only one node
// which causes less submissions and therefore completes faster
// as submission cost matters for low transfer size
if (data_size < 16_MiB) {
static thread_local int last_node = 0;
const int node = last_node++ % 4;
return std::vector<int>{ node };
}
else {
static thread_local int last_group = 0;
const int group = last_group++ % 2;
return group == 0 ? std::vector<int>{ 0, 1 } : std::vector<int>{ 2, 3 };
}
static thread_local int last_node = 0;
const int node = last_node++ % 4;
return std::vector<int>{ node };
}
struct NopStruct {

Loading…
Cancel
Save