Browse Source

Merge branch 'dev-caching' into writing

master
Constantin Fürst 11 months ago
parent
commit
cc452fb1a4
  1. 2
      .gitignore
  2. 20
      offloading-cacher/CMakeLists.txt
  3. 727
      offloading-cacher/cache.hpp
  4. 43
      offloading-cacher/cmake/modules/FindNUMA.cmake
  5. 252
      offloading-cacher/main.cpp
  6. 104
      qdp_project/.gitignore
  7. 82
      qdp_project/CMakeLists.txt
  8. 5
      qdp_project/README.md
  9. 9
      qdp_project/bench_max.sh
  10. 0
      qdp_project/src/.gitkeep
  11. 316
      qdp_project/src/algorithm/operators/aggregation.h
  12. 170
      qdp_project/src/algorithm/operators/filter.h
  13. 284
      qdp_project/src/benchmark/MAX_benchmark.cpp
  14. 420
      qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h
  15. 80
      qdp_project/src/utils/array_utils.h
  16. 73
      qdp_project/src/utils/barrier_utils.h
  17. 33
      qdp_project/src/utils/const.h
  18. 82
      qdp_project/src/utils/cpu_set_utils.h
  19. 104
      qdp_project/src/utils/execution_modes.h
  20. 76
      qdp_project/src/utils/file_output.h
  21. 208
      qdp_project/src/utils/iterable_range.h
  22. 152
      qdp_project/src/utils/measurement_utils.h
  23. 45
      qdp_project/src/utils/memory_literals.h
  24. 6
      qdp_project/src/utils/pcm.h
  25. 80
      qdp_project/src/utils/timer_utils.h
  26. 93
      qdp_project/src/utils/vector_loader.h

2
.gitignore

@ -13,6 +13,8 @@
*.fls
*/.vscode/*
*/.idea/*
*/cmake-build-*/*
# ---> C++
# Prerequisites

20
offloading-cacher/CMakeLists.txt

@ -0,0 +1,20 @@
cmake_minimum_required(VERSION 3.18)
project(offloading-cacher LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 20)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules")
find_package(NUMA REQUIRED)
find_package(OpenMP REQUIRED)
set(DML_SOURCE_DIR "../../DML/include/")
set(SOURCES main.cpp)
add_executable(offloading-cacher ${SOURCES})
target_include_directories(offloading-cacher PRIVATE ${CMAKE_SOURCE_DIR} ${NUMA_INCLUDE_DIRS} ${DML_SOURCE_DIR})
target_link_libraries(offloading-cacher PRIVATE libdml.a pthread ${CMAKE_DL_LIBS} ${NUMA_LIBRARY} OpenMP::OpenMP_CXX)
install(TARGETS offloading-cacher DESTINATION ${CMAKE_INSTALL_PREFIX})

727
offloading-cacher/cache.hpp

@ -0,0 +1,727 @@
#pragma once
#include <iostream>
#include <unordered_map>
#include <shared_mutex>
#include <mutex>
#include <memory>
#include <sched.h>
#include <numa.h>
#include <numaif.h>
#include <dml/dml.hpp>
namespace dml {
inline const std::string StatusCodeToString(const dml::status_code code) {
switch (code) {
case dml::status_code::ok: return "ok";
case dml::status_code::false_predicate: return "false predicate";
case dml::status_code::partial_completion: return "partial completion";
case dml::status_code::nullptr_error: return "nullptr error";
case dml::status_code::bad_size: return "bad size";
case dml::status_code::bad_length: return "bad length";
case dml::status_code::inconsistent_size: return "inconsistent size";
case dml::status_code::dualcast_bad_padding: return "dualcast bad padding";
case dml::status_code::bad_alignment: return "bad alignment";
case dml::status_code::buffers_overlapping: return "buffers overlapping";
case dml::status_code::delta_delta_empty: return "delta delta empty";
case dml::status_code::batch_overflow: return "batch overflow";
case dml::status_code::execution_failed: return "execution failed";
case dml::status_code::unsupported_operation: return "unsupported operation";
case dml::status_code::queue_busy: return "queue busy";
case dml::status_code::error: return "unknown error";
case dml::status_code::config_error: return "config error";
default: return "unhandled error";
}
}
}
namespace dsacache {
class Cache;
/*
* Class Description:
* Holds all required information on one cache entry and is used
* both internally by the Cache and externally by the user.
*
* Important Usage Notes:
* The pointer is only updated in WaitOnCompletion() which
* therefore must be called by the user at some point in order
* to use the cached data. Using this class as T for
* std::shared_ptr<T> is not recommended as references are
* already counted internally.
*
* Cache Lifetime:
* As long as the instance is referenced, the pointer it stores
* is guaranteed to be either nullptr or pointing to a valid copy.
*
* Implementation Detail:
* Performs self-reference counting with a shared atomic integer.
* Therefore on creating a copy the reference count is increased
* and with the destructor it is deacresed. If the last copy is
* destroyed the actual underlying data is freed and all shared
* variables deleted.
*
* Notes on Thread Safety:
* Class is thread safe in any possible state and performs
* reference counting and deallocation itself entirely atomically.
*/
class CacheData {
public:
using dml_handler = dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>>;
private:
// data source and size of the block
uint8_t* src_;
size_t size_;
// global reference counting object
std::atomic<int32_t>* active_;
// global cache-location pointer
std::atomic<uint8_t*>* cache_;
// object-local incomplete cache location pointer
// which is only available in the first instance
uint8_t* incomplete_cache_;
// dml handler vector pointer which is only
// available in the first instance
std::unique_ptr<std::vector<dml_handler>> handlers_;
// deallocates the global cache-location
// and invalidates it
void Deallocate();
// checks whether there are at least two
// valid references to this object which
// is done as the cache always has one
// internally to any living instance
bool Active() const;
friend Cache;
public:
CacheData(uint8_t* data, const size_t size);
CacheData(const CacheData& other);
~CacheData();
// waits on completion of caching operations
// for this task and is safe to be called in
// any state of the object
void WaitOnCompletion();
// returns the cache data location for this
// instance which is valid as long as the
// instance is alive - !!! this may also
// yield a nullptr !!!
uint8_t* GetDataLocation() const;
};
/*
* Class Description:
* Class will handle access to data through internal copies.
* These are obtained via work submission to the Intel DSA which takes
* care of asynchronously duplicating the data. The user will define
* where these copies lie and which system nodes will perform the copy.
* This is done through policy functions set during initialization.
*
* Placement Policy:
* The Placement Policy Function decides on which node a particular
* entry is to be placed, given the current executing node and the
* data source node and data size. This in turn means that for one
* datum, multiple cached copies may exist at one time.
*
* Cache Lifetime:
* When accessing the cache, a CacheData-object will be returned.
* As long as this object lives, the pointer which it holds is
* guaranteed to be either nullptr or a valid copy. When destroyed
* the entry is marked for deletion which is only carried out
* when system memory pressure drives an automated cache flush.
*
* Restrictions:
* - Overlapping Pointers may lead to undefined behaviour during
* manual cache invalidation which should not be used if you
* intend to have these types of pointers
* - Cache Invalidation may only be performed manually and gives
* no ordering guarantees. Therefore, it is the users responsibility
* to ensure that results after invalidation have been generated
* using the latest state of data. The cache is best suited
* to static data.
*
* Notes on Thread Safety:
* - Cache is completely thread-safe after initialization
* - CacheData-class will handle deallocation of data itself by
* performing self-reference-counting atomically and only
* deallocating if the last reference is destroyed
* - The internal cache state has one lock which is either
* acquired shared for reading the state (upon accessing an already
* cached element) or unique (accessing a new element, flushing, invalidating)
* - Waiting on copy completion is done over an atomic-wait in copies
* of the original CacheData-instance
* - Overall this class may experience performance issues due to the use
* of locking (in any configuration), lock contention (worsens with higher
* core count, node count and utilization) and atomics (worse in the same
* situations as lock contention)
*
* Improving Performance:
* When data is never shared between threads or memory size for the cache is
* not an issue you may consider having one Cache-instance per thread and removing
* the lock in Cache and modifying the reference counting and waiting mechanisms
* of CacheData accordingly (although this is high effort and will yield little due
* to the atomics not being shared among cores/nodes).
* Otherwise, one Cache-instance per node could also be considered. This will allow
* the placement policy function to be barebones and reduces the lock contention and
* synchronization impact of the atomic variables.
*/
class Cache {
public:
// cache policy is defined as a type here to allow flexible usage of the cacher
// given a numa destination node (where the data will be needed), the numa source
// node (current location of the data) and the data size, this function should
// return optimal cache placement
// dst node and returned value can differ if the system, for example, has HBM
// attached accessible directly to node n under a different node id m
typedef int (CachePolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size);
// copy policy specifies the copy-executing nodes for a given task
// which allows flexibility in assignment for optimizing raw throughput
// or choosing a conservative usage policy
typedef std::vector<int> (CopyPolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size);
private:
// mutex for accessing the cache state map
std::shared_mutex cache_mutex_;
// map from [dst-numa-node,map2]
// map2 from [data-ptr,cache-structure]
std::unordered_map<uint8_t, std::unordered_map<uint8_t*, CacheData>> cache_state_;
CachePolicy* cache_policy_function_ = nullptr;
CopyPolicy* copy_policy_function_ = nullptr;
// function used to submit a copy task on a specific node to the dml
// engine on that node - will change the current threads node assignment
// to achieve this so take care to restore this
dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> ExecuteCopy(
const uint8_t* src, uint8_t* dst, const size_t size, const int node
) const;
// allocates the required memory on the destination node
// and then submits task to the dml library for processing
// and attaches the handlers to the cache data structure
void SubmitTask(CacheData* task, const int dst_node, const int src_node);
// querries the policy functions for the given data and size
// to obtain destination cache node, also returns the datas
// source node for further usage
// output may depend on the calling threads node assignment
// as this is set as the "optimal placement" node
void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const;
// allocates memory of size "size" on the numa node "node"
// and returns nullptr if this is not possible, also may
// try to flush the cache of the requested node to
// alleviate encountered shortage
uint8_t* AllocOnNode(const size_t size, const int node);
// checks whether the cache contains an entry for
// the given data in the given memory node and
// returns it, otherwise returns nullptr
std::unique_ptr<CacheData> GetFromCache(uint8_t* src, const size_t size, const int dst_node);
public:
Cache() = default;
Cache(const Cache& other) = delete;
// initializes the cache with the two policy functions
// only after this is it safe to use in a threaded environment
void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function);
// function to perform data access through the cache
std::unique_ptr<CacheData> Access(uint8_t* data, const size_t size);
// flushes the cache of inactive entries
// if node is -1 then the whole cache is
// checked and otherwise the specified
// node - no checks on node validity
void Flush(const int node = -1);
// forces out all entries from the
// cache and therefore will also "forget"
// still-in-use entries, these will still
// be properly deleted, but the cache
// will be fresh - use for testing
void Clear();
void Invalidate(uint8_t* data);
};
}
inline void dsacache::Cache::Clear() {
std::unique_lock<std::shared_mutex> lock(cache_mutex_);
cache_state_.clear();
Init(cache_policy_function_, copy_policy_function_);
}
inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) {
cache_policy_function_ = cache_policy_function;
copy_policy_function_ = copy_policy_function;
// initialize numa library
numa_available();
// obtain all available nodes
// and those we may allocate
// memory on
const int nodes_max = numa_num_configured_nodes();
const bitmask* valid_nodes = numa_get_mems_allowed();
// prepare the cache state with entries
// for all given nodes
for (int node = 0; node < nodes_max; node++) {
if (numa_bitmask_isbitset(valid_nodes, node)) {
cache_state_.insert({node,{}});
}
}
}
inline std::unique_ptr<dsacache::CacheData> dsacache::Cache::Access(uint8_t* data, const size_t size) {
// get destination numa node for the cache
int dst_node = -1;
int src_node = -1;
GetCacheNode(data, size, &dst_node, &src_node);
// TODO: at this point it could be beneficial to check whether
// TODO: the given destination node is present as an entry
// TODO: in the cache state to see if it is valid
// check whether the data is already cached
std::unique_ptr<CacheData> task = GetFromCache(data, size, dst_node);
if (task != nullptr) {
return std::move(task);
}
// at this point the requested data is not present in cache
// and we create a caching task for it
task = std::make_unique<CacheData>(data, size);
{
std::unique_lock<std::shared_mutex> lock(cache_mutex_);
const auto state = cache_state_[dst_node].emplace(task->src_, *task);
// if state.second is false then no insertion took place
// which means that concurrently whith this thread
// some other thread must have accessed the same
// resource in which case we return the other
// threads data cache structure
if (!state.second) {
std::cout << "[!] Found another cache instance for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl;
return std::move(std::make_unique<CacheData>(state.first->second));
}
}
SubmitTask(task.get(), dst_node, src_node);
return std::move(task);
}
inline uint8_t* dsacache::Cache::AllocOnNode(const size_t size, const int node) {
// allocate data on this node and flush the unused parts of the
// cache if the operation fails and retry once
// TODO: smarter flush strategy could keep some stuff cached
// check currently free memory to see if the data fits
long long int free_space = 0;
numa_node_size64(node, &free_space);
if (free_space < size) {
std::cout << "[!] Memory shortage when allocating " << size << "B on node " << node << std::endl;
// dst node lacks memory space so we flush the cache for this
// node hoping to free enough currently unused entries to make
// the second allocation attempt successful
Flush(node);
// re-test by getting the free space and checking again
numa_node_size64(node, &free_space);
if (free_space < size) {
std::cout << "[x] Memory shortage after flush when allocating " << size << "B on node " << node << std::endl;
return nullptr;
}
}
uint8_t* dst = reinterpret_cast<uint8_t*>(numa_alloc_onnode(size, node));
if (dst == nullptr) {
std::cout << "[x] Allocation try failed for " << size << "B on node " << node << std::endl;
return nullptr;
}
return dst;
}
inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, const int src_node) {
uint8_t* dst = AllocOnNode(task->size_, dst_node);
if (dst == nullptr) {
std::cout << "[x] Allocation failed so we can not cache" << std::endl;
return;
}
task->incomplete_cache_ = dst;
// querry copy policy function for the nodes to use for the copy
const std::vector<int> executing_nodes = copy_policy_function_(dst_node, src_node, task->size_);
const size_t task_count = executing_nodes.size();
// each task will copy one fair part of the total size
// and in case the total size is not a factor of the
// given task count the last node must copy the remainder
const size_t size = task->size_ / task_count;
const size_t last_size = size + task->size_ % task_count;
// save the current numa node mask to restore later
// as executing the copy task will place this thread
// on a different node
bitmask* nodemask = numa_get_run_node_mask();
for (uint32_t i = 0; i < task_count; i++) {
const size_t local_size = i + 1 == task_count ? size : last_size;
const size_t local_offset = i * size;
const uint8_t* local_src = task->src_ + local_offset;
uint8_t* local_dst = dst + local_offset;
task->handlers_->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i]));
}
// restore the previous nodemask
numa_run_on_node_mask(nodemask);
numa_free_nodemask(nodemask);
}
inline dml::handler<dml::mem_copy_operation, std::allocator<uint8_t>> dsacache::Cache::ExecuteCopy(
const uint8_t* src, uint8_t* dst, const size_t size, const int node
) const {
dml::const_data_view srcv = dml::make_view(src, size);
dml::data_view dstv = dml::make_view(dst, size);
numa_run_on_node(node);
return dml::submit<dml::automatic>(dml::mem_copy.block_on_fault(), srcv, dstv);
}
inline void dsacache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const {
// obtain numa node of current thread to determine where the data is needed
const int current_cpu = sched_getcpu();
const int current_node = numa_node_of_cpu(current_cpu);
// obtain node that the given data pointer is allocated on
*OUT_SRC_NODE = -1;
get_mempolicy(OUT_SRC_NODE, NULL, 0, (void*)src, MPOL_F_NODE | MPOL_F_ADDR);
// querry cache policy function for the destination numa node
*OUT_DST_NODE = cache_policy_function_(current_node, *OUT_SRC_NODE, size);
}
inline void dsacache::Cache::Flush(const int node) {
// this lambda is used because below we have two code paths that
// flush nodes, either one single or all successively
const auto FlushNode = [](std::unordered_map<uint8_t*,CacheData>& map) {
// begin at the front of the map
auto it = map.begin();
// loop until we reach the end of the map
while (it != map.end()) {
// if the iterator points to an inactive element
// then we may erase it
if (it->second.Active() == false) {
// erase the iterator from the map
map.erase(it);
// as the erasure invalidated out iterator
// we must start at the beginning again
it = map.begin();
}
else {
// if element is active just move over to the next one
it++;
}
}
};
{
// we require exclusive lock as we modify the cache state
std::unique_lock<std::shared_mutex> lock(cache_mutex_);
// node == -1 means that cache on all nodes should be flushed
if (node == -1) {
for (auto& nc : cache_state_) {
FlushNode(nc.second);
}
}
else {
FlushNode(cache_state_[node]);
}
}
}
inline std::unique_ptr<dsacache::CacheData> dsacache::Cache::GetFromCache(uint8_t* src, const size_t size, const int dst_node) {
// the best situation is if this data is already cached
// which we check in an unnamed block in which the cache
// is locked for reading to prevent another thread
// from marking the element we may find as unused and
// clearing it
// lock the cache state in shared-mode because we read
std::shared_lock<std::shared_mutex> lock(cache_mutex_);
// search for the data in our cache state structure at the given node
const auto search = cache_state_[dst_node].find(src);
// if the data is in our structure we continue
if (search != cache_state_[dst_node].end()) {
// now check whether the sizes match
if (search->second.size_ >= size) {
// return a unique copy of the entry which uses the object
// lifetime and destructor to safely handle deallocation
return std::move(std::make_unique<CacheData>(search->second));
}
else {
// if the sizes missmatch then we clear the current entry from cache
// which will cause its deletion only after the last possible outside
// reference is also destroyed
cache_state_[dst_node].erase(search);
}
}
return nullptr;
}
void dsacache::Cache::Invalidate(uint8_t* data) {
// as the cache is modified we must obtain a unique writers lock
std::unique_lock<std::shared_mutex> lock(cache_mutex_);
// loop through all per-node-caches available
for (auto node : cache_state_) {
// search for an entry for the given data pointer
auto search = node.second.find(data);
if (search != node.second.end()) {
// if the data is represented in-cache
// then it will be erased to re-trigger
// caching on next access
node.second.erase(search);
}
}
}
inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) {
src_ = data;
size_ = size;
active_ = new std::atomic<int32_t>(1);
cache_ = new std::atomic<uint8_t*>();
incomplete_cache_ = nullptr;
handlers_ = std::make_unique<std::vector<dml_handler>>();
}
inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) {
// we copy the ptr to the global atomic reference counter
// and increase the amount of active references
active_ = other.active_;
const int current_active = active_->fetch_add(1);
// source and size will be copied too
// as well as the reference to the global
// atomic cache pointer
src_ = other.src_;
size_ = other.size_;
cache_ = other.cache_;
// incomplete cache and handlers will not
// be copied because only the first instance
// will wait on the completion of handlers
incomplete_cache_ = nullptr;
handlers_ = nullptr;
}
inline dsacache::CacheData::~CacheData() {
// if this is the first instance of this cache structure
// and it has not been waited on and is now being destroyed
// we must wait on completion here to ensure the cache
// remains in a valid state
if (handlers_ != nullptr) {
WaitOnCompletion();
}
// due to fetch_sub returning the preivously held value
// we must subtract one locally to get the current value
const int32_t v = active_->fetch_sub(1) - 1;
// if the returned value is zero or lower
// then we must execute proper deletion
// as this was the last reference
if (v <= 0) {
Deallocate();
delete active_;
delete cache_;
}
}
inline void dsacache::CacheData::Deallocate() {
// although deallocate should only be called from
// a safe context to do so, it can not hurt to
// defensively perform the operation atomically
uint8_t* cache_local = cache_->exchange(nullptr);
if (cache_local != nullptr) numa_free(cache_local, size_);
// if the cache was never waited for then incomplete_cache_
// may still contain a valid pointer which has to be freed
if (incomplete_cache_ != nullptr) numa_free(incomplete_cache_, size_);
}
inline uint8_t* dsacache::CacheData::GetDataLocation() const {
return cache_->load();
}
inline bool dsacache::CacheData::Active() const {
// this entry is active if more than one
// reference exists to it, as the Cache
// will always keep one internally until
// the entry is cleared from cache
return active_->load() > 1;
}
inline void dsacache::CacheData::WaitOnCompletion() {
// the cache data entry can be in two states
// either it is the original one which has not
// been waited for in which case the handlers
// are non-null or it is not
if (handlers_ == nullptr) {
// when no handlers are attached to this cache entry we wait on a
// value change for the cache structure from nullptr to non-null
// which will either go through immediately if the cache is valid
// already or wait until the handler-owning thread notifies us
cache_->wait(nullptr);
}
else {
// when the handlers are non-null there are some DSA task handlers
// available on which we must wait here
// abort is set if any operation encountered an error
bool abort = false;
for (auto& handler : *handlers_) {
auto result = handler.get();
if (result.status != dml::status_code::ok) {
std::cerr << "[x] Encountered bad status code for operation: " << dml::StatusCodeToString(result.status) << std::endl;
// if one of the copy tasks failed we abort the whole task
// after all operations are completed on it
abort = true;
}
}
// the handlers are cleared after all have completed
handlers_ = nullptr;
// now we act depending on whether an abort has been
// called for which signals operation incomplete
if (abort) {
// store nullptr in the cache location
cache_->store(nullptr);
// then free the now incomplete cache
// TODO: it would be possible to salvage the
// TODO: operation at this point but this
// TODO: is quite complicated so we just abort
numa_free(incomplete_cache_, size_);
}
else {
// incomplete cache is now safe to use and therefore we
// swap it with the global cache state of this entry
// and notify potentially waiting threads
cache_->store(incomplete_cache_);
}
// as a last step all waiting threads must
// be notified (copies of this will wait on value
// change of the cache) and the incomplete cache
// is cleared to nullptr as it is not incomplete
cache_->notify_all();
incomplete_cache_ = nullptr;
}
}

43
offloading-cacher/cmake/modules/FindNUMA.cmake

@ -0,0 +1,43 @@
# Module for locating libnuma
#
# Read-only variables:
# NUMA_FOUND
# Indicates that the library has been found.
#
# NUMA_INCLUDE_DIR
# Points to the libnuma include directory.
#
# NUMA_LIBRARY_DIR
# Points to the directory that contains the libraries.
# The content of this variable can be passed to link_directories.
#
# NUMA_LIBRARY
# Points to the libnuma that can be passed to target_link_libararies.
#
# Copyright (c) 2013-2020 MulticoreWare, Inc
include(FindPackageHandleStandardArgs)
find_path(NUMA_ROOT_DIR
NAMES include/numa.h
PATHS ENV NUMA_ROOT
DOC "NUMA root directory")
find_path(NUMA_INCLUDE_DIR
NAMES numa.h
HINTS ${NUMA_ROOT_DIR}
PATH_SUFFIXES include
DOC "NUMA include directory")
find_library(NUMA_LIBRARY
NAMES numa
HINTS ${NUMA_ROOT_DIR}
DOC "NUMA library")
if (NUMA_LIBRARY)
get_filename_component(NUMA_LIBRARY_DIR ${NUMA_LIBRARY} PATH)
endif()
mark_as_advanced(NUMA_INCLUDE_DIR NUMA_LIBRARY_DIR NUMA_LIBRARY)
find_package_handle_standard_args(NUMA REQUIRED_VARS NUMA_ROOT_DIR NUMA_INCLUDE_DIR NUMA_LIBRARY)

252
offloading-cacher/main.cpp

@ -0,0 +1,252 @@
#include <iostream>
#include <random>
#include <vector>
#include <string>
#include <omp.h>
#include "cache.hpp"
static constexpr size_t SIZE_64_MIB = 64 * 1024 * 1024;
dsacache::Cache CACHE;
void InitCache(const std::string& device) {
if (device == "default") {
auto cache_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return numa_dst_node;
};
auto copy_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return std::vector<int>{ numa_dst_node };
};
CACHE.Init(cache_policy,copy_policy);
}
else if (device == "xeonmax") {
auto cache_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) {
// xeon max is configured to have hbm on node ids that are +8
return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node;
};
auto copy_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) {
if (data_size < SIZE_64_MIB) {
// if the data size is small then the copy will just be carried
// out by the destination node which does not require setting numa
// thread affinity as the selected dsa engine is already the one
// present on the calling thread
return std::vector<int>{ (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) };
}
else {
// for sufficiently large data, smart copy is used which will utilize
// all four engines for intra-socket copy operations and cross copy on
// the source and destination nodes for inter-socket copy
const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0;
if (same_socket) {
const bool socket_number = numa_dst_node >> 2;
if (socket_number == 0) return std::vector<int>{ 0, 1, 2, 3 };
else return std::vector<int>{ 4, 5, 6, 7 };
}
else {
return std::vector<int>{
(numa_src_node >= 8 ? numa_src_node - 8 : numa_src_node),
(numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node)
};
}
}
};
CACHE.Init(cache_policy,copy_policy);
}
else {
std::cerr << "Given device '" << device << "' not supported!" << std::endl;
exit(-1);
}
}
uint8_t* GetRandomArray(const size_t size) {
uint8_t* array = new uint8_t[size];
std::uniform_int_distribution<uint8_t> unif(std::numeric_limits<uint8_t>::min(), std::numeric_limits<uint8_t>::max());
std::default_random_engine re;
for (size_t i = 0; i < size; i++) {
array[i] = unif(re);
}
return array;
}
bool IsEqual(const uint8_t* a, const uint8_t* b, const size_t size) {
for (size_t i = 0; i < size; i++) {
try {
if (a[i] != b[i]) return false;
}
catch (...) {
return false;
}
}
return true;
}
std::unique_ptr<dsacache::CacheData> PerformAccessAndTest(uint8_t* src, const size_t size, const int tid) {
std::unique_ptr<dsacache::CacheData> data_cache = CACHE.Access(
reinterpret_cast<uint8_t *>(src),
size * sizeof(uint8_t)
);
uint8_t* cached_imm = reinterpret_cast<uint8_t *>(data_cache->GetDataLocation());
// check the value immediately just to see if ram or cache was returned
if (src == cached_imm) {
std::cout << "[" << tid << "] Caching did not immediately yield different data location." << std::endl;
}
else if (cached_imm == nullptr) {
std::cout << "[" << tid << "] Immediately got nullptr." << std::endl;
}
else {
std::cout << "[" << tid << "] Immediately got different data location." << std::endl;
}
// waits for the completion of the asynchronous caching operation
data_cache->WaitOnCompletion();
// gets the cache-data-location from the struct
uint8_t* cached = reinterpret_cast<uint8_t *>(data_cache->GetDataLocation());
// tests on the resulting value
if (src == cached) {
std::cout << "[" << tid << "] Caching did not affect data location." << std::endl;
}
else if (cached == nullptr) {
std::cerr << "[" << tid << "] Got nullptr from cache." << std::endl;
}
else {
std::cout << "[" << tid << "] Got different data location from cache." << std::endl;
}
if (IsEqual(src,cached,size)) {
std::cout << "[" << tid << "] Cached data is correct." << std::endl;
}
else {
std::cerr << "[" << tid << "] Cached data is wrong." << std::endl;
}
return std::move(data_cache);
}
void RunTestST(const size_t size) {
uint8_t* data = GetRandomArray(size);
static constexpr int tid = 0;
std::cout << "[" << tid << "] first access --- " << std::endl;
PerformAccessAndTest(data, size, tid);
std::cout << "[" << tid << "] second access --- " << std::endl;
PerformAccessAndTest(data, size, tid);
std::cout << "[" << tid << "] end of application --- " << std::endl;
}
void RunTestMT(const size_t size) {
uint8_t* data = GetRandomArray(size);
#pragma omp parallel
{
const int tid = omp_get_thread_num();
std::cout << "[" << tid << "] first access --- " << std::endl;
PerformAccessAndTest(data, size, tid);
std::cout << "[" << tid << "] second access --- " << std::endl;
PerformAccessAndTest(data, size, tid);
std::cout << "[" << tid << "] end of block --- " << std::endl;
}
}
void RunTestFlush(const size_t size) {
uint8_t* data1 = GetRandomArray(size);
uint8_t* data2 = GetRandomArray(size);
uint8_t* data3 = GetRandomArray(size);
static constexpr int tid = 0;
std::cout << "[" << tid << "] first access to data d1 and keepalive --- " << std::endl;
const auto c1 = PerformAccessAndTest(data1, size, tid);
std::cout << "[" << tid << "] second access to d2 lets d2 vanish --- " << std::endl;
PerformAccessAndTest(data2, size, tid);
std::cout << "[" << tid << "] third access to d3 should clear d2 --- " << std::endl;
PerformAccessAndTest(data3, size, tid);
std::cout << "[" << tid << "] end of block and test d1 == cache1 --- " << std::endl;
if (IsEqual(data1, c1->GetDataLocation(), size)) {
std::cout << "[" << tid << "] Cached d1 is still correct." << std::endl;
}
else {
std::cerr << "[" << tid << "] Cached d1 is bad." << std::endl;
}
}
int main(int argc, char **argv) {
if (argc != 4) {
std::cerr << "This application requires three parameters!" << std::endl;
std::cout << "Please provide the following positional arguments: [device] [mode] [size]" << std::endl;
std::cout << "[device] from { default, xeonmax } which influences cache and execution placement" << std::endl;
std::cout << "[mode] from { st, mt, flt } or single and multi threaded and flushtest respectively" << std::endl;
std::cout << "[size] positive integral number, amount of bytes in data array" << std::endl;
std::cout << "for flushtest the given size should be 1/3 of the available cache size" << std::endl;
exit(-1);
}
const std::string device = argv[1];
const std::string mode = argv[2];
const std::string size_s = argv[3];
uint32_t size = 0;
try {
size = std::stoul(size_s);
}
catch (...) {
std::cerr << "Given Size '" << size_s << "' caused error during conversion to number!" << std::endl;
}
InitCache(device);
if (mode == "st") {
RunTestST(size);
}
else if (mode == "mt") {
RunTestMT(size);
}
else if (mode == "flt") {
RunTestFlush(size);
}
else {
std::cerr << "Given Mode '" << mode << "' not supported!" << std::endl;
exit(-1);
}
}

104
qdp_project/.gitignore

@ -0,0 +1,104 @@
bin/
# CMake building files
CMakeLists.txt.user
CMakeCache.txt
CMakeFiles
CMakeScripts
Testing
Makefile
cmake_install.cmake
install_manifest.txt
compile_commands.json
CTestTestfile.cmake
_deps
.cmake
# Prerequisites
*.d
# Object files
*.o
*.ko
*.obj
*.elf
# Linker output
*.ilk
*.map
*.exp
# Precompiled Headers
*.gch
*.pch
# Libraries
*.lib
*.a
*.la
*.lo
# Shared objects (inc. Windows DLLs)
*.dll
*.so
*.so.*
*.dylib
# Executables
*.exe
*.out
*.app
*.i*86
*.x86_64
*.hex
# Debug files
*.dSYM/
*.su
*.idb
*.pdb
# Kernel Module Compile Results
*.mod*
*.cmd
.tmp_versions/
modules.order
Module.symvers
Mkfile.old
dkms.conf
# Prerequisites
*.d
# Compiled Object files
*.slo
*.lo
*.o
*.obj
# Precompiled Headers
*.gch
*.pch
# Compiled Dynamic libraries
*.so
*.dylib
*.dll
# Fortran module files
*.mod
*.smod
# Compiled Static libraries
*.lai
*.la
*.a
*.lib
# Executables
*.exe
*.out
*.app

82
qdp_project/CMakeLists.txt

@ -0,0 +1,82 @@
cmake_minimum_required(VERSION 3.18)
# set the project name
project(NUMA_Slow_Fast_Datamigration_Test VERSION 0.1)
# specify the C standard
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
#set flags on need cross compile for sapphirerapids architecture
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=sapphirerapids")
#set flags on need cross compile for skylake micro architecture
#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=skylake-avx512")
#set flags on need cross compile for knights landing micro architecture (for debugging)
#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512cd -mavx512er -mavx512pf")
#suppress selected! warnigs that are not very important to resolve. This is to keep the compileation output clean
set(SUPPRESS_WARNINGS "-Wno-literal-suffix -Wno-volatile")
set(DEBUG_FLAGS "-g3" "-ggdb")
set(RELEASE_FLAGS "-O3")
#set flags used for Release and Debug build type
add_compile_options(
"$<$<CONFIG:Release>:${RELEASE_FLAGS}>"
"$<$<CONFIG:Debug>:${DEBUG_FLAGS}>"
)
# evaluate custom variables
function(eval vname vvalid vdefault)
# is variable is set to the below value if its not already defined from the comand line
set(VALID ${vvalid} CACHE INTERNAL "Possible values for ${vname}")
set(${vname} ${vdefault} CACHE STRING "The barrier mode")
# command for GUI shenanigans
set_property(CACHE ${vname} PROPERTY STRINGS VALID)
if(${vname} IN_LIST VALID)
message(STATUS "Variable ${vname} = ${${vname}}")
else()
message(STATUS "Variable ${vname} has invalid value ${${vname}}")
# set the fallback value for use in parent function
unset(${vname} CACHE)
message(STATUS "Fallback to default: ${vname} = ${vdefault}")
set(${vname} ${vdefault} PARENT_SCOPE)
endif()
endfunction()
eval(WSUPPRESS "suppress;show" "show")
if($<STREQUAL:${BUFFER_LIMIT},suppress> EQUAL 1)
add_compile_options("${SUPPRESS_WARNINGS}")
endif()
eval(BARRIER_MODE "global;local" "global")
add_definitions(-DBARRIER_MODE="${BARRIER_MODE}")
eval(BUFFER_LIMIT "unlimited;limited" "unlimited")
add_definitions(-DBUFFER_LIMIT=$<STREQUAL:${BUFFER_LIMIT},limited>)
eval(QUERY "simple;complex" "simple")
add_definitions(-DQUERY=$<STREQUAL:${QUERY},simple>)
eval(THREAD_FACTOR "1;2;3;4;5;6;7;8;9;10" "4")
add_definitions(-DTHREAD_GROUP_MULTIPLIER=${THREAD_FACTOR})
eval(PINNING "cpu;numa" "cpu")
add_definitions(-DPINNING=$<STREQUAL:${PINNING},cpu>)
# build directory
set(CMAKE_BINARY_DIR "../bin") #relative to inside build
set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR})
# include directories
include_directories(src/utils)
include_directories(src/algorithm)
include_directories(src/algorithm/operators)
# link libraries
link_libraries(-lnuma -lpthread -l:libdml.a)
# Add targets only below
# specify build targets
add_executable(MAXBench src/benchmark/MAX_benchmark.cpp)

5
qdp_project/README.md

@ -0,0 +1,5 @@
This is a copy of the Query Driven Prefetching Repository
https://os.inf.tu-dresden.de/repo/gitbox/andre.berthold/Query-driven_Prefetching/src/branch/qdp_minimal/code
Original Authors: André Berthold and Anna Bartuschka

9
qdp_project/bench_max.sh

@ -0,0 +1,9 @@
#!/bin/bash
current_date_time=$(date)
echo "Benchmark start at: $current_date_time"
sudo numactl --cpunodebind=2 -- taskset -c 0,1,2,3,4,5 ../bin/MAXBench
current_date_time=$(date)
echo "Benchmark end at: $current_date_time"

0
qdp_project/src/.gitkeep

316
qdp_project/src/algorithm/operators/aggregation.h

@ -0,0 +1,316 @@
#pragma once
#include <cstdint>
#include <algorithm>
#include <immintrin.h>
#include <type_traits>
#include "vector_loader.h"
#include "const.h"
/**
* @brief Super Class for all Aggregation functions. Guards Sub Classes from having an non integral base type.
*
* @tparam T
*/
template <typename T>
class AggFunction {
static_assert(std::is_integral<T>::value, "The base type of an AggFunction must be an integral");
};
/**
* @brief Template class that implements methods used for Summation. It wraps the corresponding vector intrinsics
*
* @tparam T base datatype for the implemented methods
*/
template<typename T>
class Sum : public AggFunction<T> {
public:
static inline __m512i simd_agg(__m512i aggregator, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_add_epi32(aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_add_epi64(aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers");
};
static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_mask_add_epi32(aggregator, mask, aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_mask_add_epi64(aggregator, mask, aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers");
};
static inline T simd_reduce(__m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_reduce_add_epi32(vector);
else if constexpr (sizeof(T) == 8) return _mm512_reduce_add_epi64(vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers");
};
static inline T scalar_agg(T aggregator, T scalar) { return aggregator + scalar; };
static inline __m512i zero() { return _mm512_set1_epi32(0); };
};
/**
* @brief Template class that implements methods used for Maximum determination. It wraps the corresponding vector intrinsics
*
* @tparam T base datatype for the implemented methods
*
*/
template<typename T>
class Max : public AggFunction<T> {
public:
static inline __m512i simd_agg(__m512i aggregator, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_max_epi32(aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_max_epi64(aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers");
}
static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_mask_max_epi32(aggregator, mask, aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_mask_max_epi64(aggregator, mask, aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers");
}
static inline T simd_reduce(__m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_reduce_max_epi32(vector);
else if constexpr (sizeof(T) == 8) return _mm512_reduce_max_epi64(vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers");
}
static inline T scalar_agg(T aggregator, T scalar) { return std::max(aggregator, scalar); }
static inline __m512i zero() {
if constexpr (sizeof(T) == 4) {
if constexpr (std::is_signed<T>::value) return _mm512_set1_epi32(0xFFFFFFFF);
else return _mm512_set1_epi32(0x0);
}
else if constexpr (sizeof(T) == 8) {
if constexpr (std::is_signed<T>::value) return _mm512_set1_epi32(0xFFFFFFFFFFFFFFFF);
else return _mm512_set1_epi32(0x0);
}
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers");
}
};
/**
* @brief Template class that implements methods used for Minimum determination. It wraps the corresponding vector intrinsics
*
* @tparam T base datatype for the implemented methods
*
*/
template<typename T>
class Min : public AggFunction<T> {
public:
static inline __m512i simd_agg(__m512i aggregator, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_min_epi32(aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_min_epi64(aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers");
}
static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_mask_min_epi32(aggregator, mask, aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_mask_min_epi64(aggregator, mask, aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers");
}
static inline T simd_reduce(__m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_reduce_min_epi32(vector);
else if constexpr (sizeof(T) == 8) return _mm512_reduce_min_epi64(vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers");
}
static inline T scalar_agg(T aggregator, T scalar) { return std::min(aggregator, scalar); }
static inline __m512i zero() {
if constexpr (sizeof(T) == 4) {
if constexpr (std::is_signed<T>::value) return _mm512_set1_epi32(0xEFFFFFFF);
else return _mm512_set1_epi32(0xFFFFFFFF);
}
else if constexpr (sizeof(T) == 8) {
if constexpr (std::is_signed<T>::value) return _mm512_set1_epi32(0xEFFFFFFFFFFFFFFF);
else return _mm512_set1_epi32(0xFFFFFFFFFFFFFFFF);
}
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers");
}
};
/**
* @brief Template Class that implements an aggregation operation.
*
* @tparam base_t Base type of the values for aggregation
* @tparam func
* @tparam load_mode
*/
template<typename base_t, template<typename _base_t> class func, load_mode load_mode>
class Aggregation{
public:
static_assert(std::is_same_v<base_t, uint64_t>, "Enforce unsigned 64 bit ints.");
using OP = func<base_t>;
/**
* @brief Calculates the memory maximal needed to store a chunk's processing result.
*
* @param chunk_size_b Size of the chunk in byte
* @return size_t Size of the chunk's processing result in byte
*/
static size_t result_bytes_per_chunk(size_t chunk_size_b) {
// aggregation returns a single value of type base_t
return sizeof(base_t);
}
/**
* @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes.
* The result is written to main memory.
*
* @param dest Pointer to the start of the result chunk
* @param src Pointer to the start of the source chunk
* @param chunk_size_b Size of the source chunk in bytes
* @return true When the aggregation is done
* @return false Never
*/
static bool apply (base_t *dest, base_t *src, size_t chunk_size_b) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
size_t value_count = chunk_size_b / sizeof(base_t);
__m512i agg_vec = func<base_t>::zero();
size_t i = 0;
base_t result = 0;
// stop before! running out of space
if(value_count >= lanes) {// keep in mind value_count is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
agg_vec = func<base_t>::simd_agg(agg_vec, vec);
}
result = func<base_t>::simd_reduce(agg_vec);
}
for(; i < value_count; ++i) {
result = func<base_t>::scalar_agg(result, src[i]);
}
*dest = result;
return true;
}
/**
* @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes,
* while applying the bit string stored in *masks*. The result is written to main memory.
*
* @param dest Pointer to the start of the result chunk
* @param src Pointer to the start of the source chunk
* @param masks Pointer the bitstring that marks the values that should be aggregated
* @param chunk_size_b Size of the source chunk in bytes
* @return true When the aggregation is done
* @return false Never
*/
static bool apply_masked (base_t *dest, base_t *src, uint16_t* msks, size_t chunk_size_b) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
uint8_t* masks = (uint8_t *)msks;
size_t value_count = chunk_size_b / sizeof(base_t);
__m512i agg_vec = func<base_t>::zero();
size_t i = 0;
// stop before! running out of space
if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
__mmask8 mask = _mm512_int2mask(masks[i / lanes]);
agg_vec = func<base_t>::simd_mask_agg(agg_vec, mask, vec);
}
*dest = func<base_t>::simd_reduce(agg_vec);
for(; i < value_count; ++i) {
uint8_t mask = masks[i / lanes];
if(mask & (0b1 << (i % lanes))){
*dest = func<base_t>::scalar_agg(*dest, src[i]);
}
}
return true;
}
/**
* @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes,
* while applying the bit string stored in *masks*. The values are agggegated in the register *dest* without
* clearing beforehand.
*
* NOTE! This function only works correctly if the the chunk_size_b is a multiple of 64 byte
*
* @param dest Vector register used for storing and passing the result around
* @param src Pointer to the start of the source chunk
* @param masks Pointer the bitstring that marks the values that should be aggregated
* @param chunk_size_b Size of the source chunk in bytes
* @return __m512i Vector register holding the aggregation result
*/
static __m512i apply_masked (__m512i dest, base_t *src, uint16_t* msks, size_t chunk_size_b) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
uint8_t* masks = (uint8_t*) msks;
//TODO this function does not work if value_count % lanes != 0
size_t value_count = chunk_size_b / sizeof(base_t);
size_t i = 0;
// stop before! running out of space
if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
__mmask8 mask = _mm512_int2mask(masks[i / lanes]);
dest = func<base_t>::simd_agg(dest, mask, vec);
}
return dest;
}
/**
* @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes,
* while applying two bit strings stored in *masks_0* and *masks_1*. The values are aggregated in the register
* *dest* without clearing beforehand.
*
* NOTE! This function only works correctly if the the chunk_size_b is a multiple of 64 byte
*
* @param dest Vector register used for storing and passing the result around
* @param src Pointer to the start of the source chunk
* @param masks_0 Pointer the bitstring that marks the values that should be aggregated
* @param masks_1 Pointer the bitstring that marks the values that should be aggregated
* @param chunk_size_b Size of the source chunk in bytes
* @return __m512i Vector register holding the aggregation result
*/
static __m512i apply_masked (__m512i dest, base_t *src, uint16_t* msks0, uint16_t* msks1, size_t chunk_size_b) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
uint8_t* masks0 = (uint8_t*) msks0;
uint8_t* masks1 = (uint8_t*) msks1;
//TODO this function does not work if value_count % lanes != 0
size_t value_count = chunk_size_b / sizeof(base_t);
size_t i = 0;
// stop before! running out of space
if(value_count >= lanes) // keep in mind value_count is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
__mmask8 mask0 = _mm512_int2mask(masks0[i / lanes]);
__mmask8 mask1 = _mm512_int2mask(masks1[i / lanes]);
mask0 = _kand_mask8(mask0, mask1);
dest = func<base_t>::simd_agg(dest, mask0, vec);
}
return dest;
}
/**
* @brief Reduces a vector by applying the aggregation function horizontally.
*
* @param dest Result of the horizontal aggregation
* @param src Vector as source for the horizontal aggregation
* @return true When the operation is done
* @return false Never
*/
static bool happly (base_t *dest, __m512i src) {
*dest = func<base_t>::simd_reduce(src);
return true;
}
static __m512i get_zero() {
return func<base_t>::zero();
}
};

170
qdp_project/src/algorithm/operators/filter.h

@ -0,0 +1,170 @@
#pragma once
#include<cstdint>
#include<type_traits>
#include <immintrin.h>
#include "vector_loader.h"
/**
* @brief Super Class for all Aggregation functions. Guards Sub Classes from having an non integral base type.
*
* @tparam T An integral datatype
*/
template<typename T>
class FilterFunction {
static_assert(std::is_integral<T>::value, "The base type of a FilterFunction must be an integeral.");
};
/**
* @brief Template class that implements methods used for finding values that are not equal to the compare value.
* It wraps the corresponding vector intrinsics.
*
* @tparam T base datatype for the implemented methods
*/
template<typename T>
class NEQ : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmpneq_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmpneq_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "NEQ is only implemented for 32 and 64 wide integers");
}
static inline bool scalar_filter(T scalar, T comp) { return scalar != comp; }
};
template<typename T>
class EQ : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmpeq_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmpeq_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "EQ is only implemented for 32 and 64 wide integers");
}
static inline bool scalar_filter(T scalar, T comp) { return scalar == comp; }
};
template<typename T>
class LT : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmplt_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmplt_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "LT is only implemented for 32 and 64 wide integers");
}
static inline bool scalar_filter(T scalar, T comp) { return scalar < comp; }
};
template<typename T>
class LEQ : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmple_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmple_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "LEQ is only implemented for 32 and 64 wide integers");
}
static inline bool scalar_filter(T scalar, T comp) { return scalar <= comp; }
};
template<typename T>
class GT : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmpgt_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmpgt_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "GT is only implemented for 32 and 64 wide integers");
}
static inline bool scalar_filter(T scalar, T comp) { return scalar > comp; }
};
template<typename T>
class GEQ : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmpge_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmpge_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "GEQ is only implemented for 32 and 64 wide integers");
}
static inline bool scalar_filter(T scalar, T comp) { return scalar >= comp; }
};
template<typename base_t, template<typename _base_t> class func, load_mode load_mode, bool copy>
class Filter {
public:
static_assert(std::is_same_v<base_t, uint64_t>, "We enforce 64 bit integer");
/**
* @brief Calculates the memory maximal needed to store a chunk's processing result.
*
* @param chunk_size_b Size of the chunk in byte
* @return size_t Size of the chunk's processing result in byte
*/
static size_t result_bytes_per_chunk(size_t chunk_size_b) {
// + 7 to enshure that we have enougth bytes -> / 8 -> rounds down
// if we had 17 / 8 = 2 but (17 + 7) / 8 = 3
// if we hat 16 / 8 = 2 is right, as well as, 16 + 7 / 8 = 2
return (chunk_size_b / sizeof(base_t) + 7) / 8;
}
/**
* @brief Applies the filter function on the chunk starting at *src* and spanning *chunk_size_b* bytes, while comparing with he same value every time.
* The resulting bit string is written to main memory.
*
* @param dest Pointer to the start of the result chunk
* @param src Pointer to the start of the source chunk
* @param cmp_value Comparision value to compare the values from source to
* @param chunk_size_b Size of the source chunk in bytes
* @return true When the filter operation is done
* @return false Never
*/
// we only need this impl. yet, as all filter are at the end of a pipeline
static bool apply_same (uint16_t *dst, base_t *buffer, base_t *src, base_t cmp_value, size_t chunk_size_b) {
constexpr uint32_t lanes = VECTOR_SIZE<base_t>();
uint8_t* dest = (uint8_t*) dst;
size_t value_count = chunk_size_b / sizeof(base_t);
__m512i cmp_vec = _mm512_set1_epi64(cmp_value);
size_t i = 0;
// this weird implementetion is neccessary, see analogous impl in aggregation for explaination
if(value_count > lanes) {
for(; (i < value_count - lanes); i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
__mmask8 bitmask = func<base_t>::simd_filter(vec, cmp_vec);
uint8_t int_mask = (uint8_t) _mm512_mask2int(bitmask);
dest[i / lanes] = int_mask;
if constexpr(copy){
Vector_Loader<base_t, load_mode>::store(buffer + i, vec);
}
}
}
auto dest_pos = i / lanes;
uint8_t int_mask = 0;
for(; i < value_count; ++i) {
base_t val = src[i];
uint8_t result = func<base_t>::scalar_filter(val, cmp_value);
int_mask |= (result << (i % lanes));
if constexpr(copy){
buffer[i] = val;
}
}
dest[dest_pos] = int_mask;
return true;
}
};

284
qdp_project/src/benchmark/MAX_benchmark.cpp

@ -0,0 +1,284 @@
#include <atomic>
#include <barrier>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <future>
#include <iostream>
#include <limits>
#include <list>
#include <mutex>
#include <queue>
#include <thread>
#include <tuple>
#include <utility>
#include <numa.h>
#ifndef THREAD_GROUP_MULTIPLIER
#define THREAD_GROUP_MULTIPLIER 2
#endif
#ifndef QUERY
#define QUERY 1
#endif
#ifndef BARRIER_MODE
#define BARRIER_MODE "global"
#endif
#ifndef BUFFER_LIMIT
#define BUFFER_LIMIT 1
#endif
#ifndef PINNING
#define PINNING 1
#endif
#ifndef PCM_M
#define PCM_M 0
#endif
#if PCM_M == 1
#include "pcm.h"
#endif
#include "const.h"
#include "file_output.h"
#include "array_utils.h"
#include "timer_utils.h"
#include "barrier_utils.h"
#include "measurement_utils.h"
#include "cpu_set_utils.h"
#include "iterable_range.h"
#include "memory_literals.h"
#include "pipelines/MAX_scan_filter_pipe.h"
#include "aggregation.h"
#include "filter.h"
using base_t = uint64_t;
base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) {
base_t sum = 0;
for(int i = 0; i < row_size / sizeof(base_t); ++i) {
sum += (row_A[i] < compare_value) * row_B[i];
}
return sum;
}
base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t* row_A, base_t* row_B, size_t row_size) {
base_t sum = 0;
for(int i = 0; i < row_size / sizeof(base_t); ++i) {
sum += (row_A[i] < compare_value_a && row_B[i] < compare_value_b) * row_B[i];
}
return sum;
}
int main(int argc, char** argv) {
#if PCM == 1
pcm::PCM *pcm = pcm::PCM::getInstance();
//and check for errors
auto error_code = pcm->program();
if(error_code != pcm::PCM::Success) {
std::cerr << "PCM couldn't start" << std::endl;
std::cerr << "Error code: " << error_code << std::endl;
std::cerr << "Try to execute 'sudo modprobe msr' and execute this program with root privigeges.";
return 1;
}
#endif
// set constants
constexpr size_t workload_b = 2_GiB;
constexpr base_t compare_value_a = 50;
constexpr base_t compare_value_b = 42;
constexpr bool simple_query = (QUERY == 1);
constexpr bool cache_a = false;
constexpr bool wait_b = false;
constexpr size_t chunk_min = 1_MiB;
constexpr size_t chunk_max = 8_MiB + 1;
constexpr size_t chunk_incr = 128_kiB;
// thread count is 12 here but as the default measurement uses 6
// we must restrict the core assignment of these 12 threads to
// 6 physical cpu cores on the executing node
constexpr size_t thread_count = 12;
std::ofstream out_file;
out_file.open("../results/max_"
"q-" + (std::string)(simple_query == true ? "simple" : "complex") +
"_bm-" + (std::string) BARRIER_MODE +
"_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") +
"_tc-" + std::to_string(thread_count) + "1MiB-2MiB.csv");
// set benchmark parameter
Linear_Int_Range<uint32_t, 0, 30, 1> run("run");
Linear_Int_Range<size_t, chunk_min, chunk_max, chunk_incr> chunk_size("chunk_size");
Range<NewPMode, DRAM_base, new_mode_manager, new_mode_manager> mode("mode");
print_to_file(out_file, generateHead(run, chunk_size, mode), "thread_group", "time",
#ifdef THREAD_TIMINGS
"scan_a", "scan_b", "aggr_j",
#endif
#ifdef BARRIER_TIMINGS
"wait_scan_a", "wait_scan_b", "wait_aggr_j",
#endif
#if PCM == 1
pcm_value_collector::getHead("scan_a"),
pcm_value_collector::getHead("scan_b"),
pcm_value_collector::getHead("aggr_j"),
#endif
"result");
/*** alloc data and buffers ************************************************/
base_t* data_a = (base_t*) numa_alloc_local(workload_b);
base_t* data_b = (base_t*) numa_alloc_local(workload_b);
base_t* results = (base_t*) numa_alloc_local(thread_count * sizeof(base_t));
fill_mt<base_t>(data_a, workload_b, 0, 100, 42);
fill_mt<base_t>(data_b, workload_b, 0, 100, 420);
std::ofstream check_file;
check_file.open("../results/max_"
"q-" + (std::string)(simple_query == true ? "simple" : "complex") +
"_bm-" + (std::string) BARRIER_MODE +
"_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") +
"_tc-" + std::to_string(thread_count) + ".checksum");
if constexpr (QUERY == 1) {
//calculate simple checksum if QUERY == 1 -> simple query is applied
check_file << sum_check(compare_value_a, data_a, data_b, workload_b);
} else {
check_file << sum_check_complex(compare_value_a, compare_value_b, data_a, data_b, workload_b);
}
check_file.close();
std::string iteration("init");
Query_Wrapper<base_t, simple_query, cache_a, wait_b>* qw = nullptr;
while(iteration != "false") {
std::promise<void> p;
std::shared_future<void> ready_future(p.get_future());
if(iteration != "run") {
if(qw != nullptr) {
delete qw;
}
uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A);
uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B);
uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J);
switch(mode.current) {
case NewPMode::Prefetch:
qw = new Query_Wrapper<base_t, simple_query, cache_a, wait_b>(
&ready_future, workload_b, chunk_size.current,
data_a, data_b, results, tc_filter, tc_copy, tc_agg,
mode.current, 50, 42
);
break;
default:
std::cerr << "[x] Unsupported Execution Mode by this build." << std::endl;
exit(-1);
}
}
qw->ready_future = &ready_future;
qw->clear_buffers();
auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_a(gid, gcnt, tid); };
auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_b(gid, gcnt, tid); };
auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->aggr_j(gid, gcnt, tid); };
std::vector<std::thread> filter_pool;
std::vector<std::thread> copy_pool;
std::vector<std::thread> agg_pool;
uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A);
uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B);
uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J);
int thread_id = 0;
// std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(28, 42), std::make_pair(84, 98)}; // node 2 heacboehm II
//std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(32, 48), std::make_pair(96, 112)}; // node 2 heacboehm
std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(24, 36), std::make_pair(120, 132)}; // node 2 sapphire rapids
//std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(24, 48)}; // node 2+3 sapphire rapids
//std::vector<std::pair<int, int>> pinning_ranges {std::make_pair(0, 48)}; // node 0-3 sapphire rapids
for(uint32_t gid = 0; gid < THREAD_GROUP_MULTIPLIER; ++gid) {
for(uint32_t tid = 0; tid < tc_filter; ++tid) {
filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
#if PINNING
pin_thread_in_range(filter_pool.back(), thread_id++, pinning_ranges);
#else
pin_thread_in_range(filter_pool.back(), pinning_ranges);
#endif
}
// if tc_copy == 0 this loop is skipped
for(uint32_t tid = 0; tid < tc_copy; ++tid) {
copy_pool.emplace_back(copy_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
#if PINNING
pin_thread_in_range(copy_pool.back(), thread_id++, pinning_ranges);
#else
pin_thread_in_range(copy_pool.back(), pinning_ranges);
#endif
}
for(uint32_t tid = 0; tid < tc_agg; ++tid) {
agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
#if PINNING
pin_thread_in_range(agg_pool.back(), thread_id++, pinning_ranges);
#else
pin_thread_in_range(agg_pool.back(), pinning_ranges);
#endif
}
}
auto start = std::chrono::steady_clock::now();
p.set_value();
for(std::thread& t : filter_pool) { t.join(); }
for(std::thread& t : copy_pool) { t.join(); }
for(std::thread& t : agg_pool) { t.join(); }
Aggregation<base_t, Sum, load_mode::Aligned>::apply(results, results, sizeof(base_t) * tc_agg * THREAD_GROUP_MULTIPLIER);
auto end = std::chrono::steady_clock::now();
constexpr double nanos_per_second = ((double)1000) * 1000 * 1000;
uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
double seconds = (double)(nanos) / nanos_per_second;
print_to_file(out_file, run, chunk_size, new_mode_manager::string(mode.current), THREAD_GROUP_MULTIPLIER, seconds,
#ifdef THREAD_TIMINGS
qw->trt->summarize_time(0), qw->trt->summarize_time(1), qw->trt->summarize_time(2),
#endif
#ifdef BARRIER_TIMINGS
qw->bt->summarize_time(0), qw->bt->summarize_time(1), qw->bt->summarize_time(2),
#endif
#if PCM == 1
qw->pvc->summarize_as_string("scan_a"),
qw->pvc->summarize_as_string("scan_b"),
qw->pvc->summarize_as_string("aggr_j"),
#endif
results[0]);
iteration = IterateOnce(run, chunk_size, mode);
}
numa_free(data_a, workload_b);
numa_free(data_b, workload_b);
numa_free(results, THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t));
}

420
qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h

@ -0,0 +1,420 @@
#include <cassert>
#include <mutex>
#include <cstring>
#include <bitset>
#include <algorithm>
#include <numa.h>
#include "filter.h"
#include "aggregation.h"
#include "vector_loader.h"
#include "timer_utils.h"
#include "barrier_utils.h"
#include "measurement_utils.h"
#include "execution_modes.h"
#include "../../../../offloading-cacher/cache.hpp"
template<typename base_t, bool simple, bool cache_a, bool wait_b>
class Query_Wrapper {
public:
// sync
std::shared_future<void>* ready_future;
thread_runtime_timing* trt;
barrier_timing* bt;
pcm_value_collector* pvc;
private:
static constexpr size_t COPY_POLICY_MIN_SIZE = 64 * 1024 * 1024;
dsacache::Cache cache_;
// data
size_t size_b;
size_t chunk_size_b;
size_t chunk_size_w;
size_t chunk_cnt;
base_t* data_a;
base_t* data_b;
base_t* dest;
// ratios
uint32_t thread_count_fc;
uint32_t thread_count_fi;
uint32_t thread_count_ag;
uint32_t thread_count;
// done bits
volatile uint8_t* ready_flag_a;
volatile uint8_t* ready_flag_b;
// buffer
uint16_t* mask_a;
uint16_t* mask_b;
// params
base_t cmp_a;
base_t cmp_b;
NewPMode mode;
// sync
std::unique_ptr<std::vector<std::barrier<barrier_completion_function>*>> sync_barrier;
std::string barrier_mode = BARRIER_MODE;
using filterCopy = Filter<base_t, LT, load_mode::Stream, true>;
using filterNoCopy = Filter<base_t, LT, load_mode::Stream, false>;
using filter = Filter<base_t, LT, load_mode::Stream, false>;
using aggregation = Aggregation<base_t, Sum, load_mode::Stream>;
static int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node;
}
static std::vector<int> CopyMethodPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
if (data_size < COPY_POLICY_MIN_SIZE) {
// if the data size is small then the copy will just be carried
// out by the destination node which does not require setting numa
// thread affinity as the selected dsa engine is already the one
// present on the calling thread
return std::vector<int>{ (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) };
}
else {
// for sufficiently large data, smart copy is used which will utilize
// all four engines for intra-socket copy operations and cross copy on
// the source and destination nodes for inter-socket copy
const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0;
if (same_socket) {
const bool socket_number = numa_dst_node >> 2;
if (socket_number == 0) return std::vector<int>{ 0, 1, 2, 3 };
else return std::vector<int>{ 4, 5, 6, 7 };
}
else {
return std::vector<int>{
(numa_src_node >= 8 ? numa_src_node - 8 : numa_src_node),
(numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node)
};
}
}
}
public:
Query_Wrapper(std::shared_future<void>* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a,
base_t* data_b, base_t* dest, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag,
NewPMode mode, base_t cmp_a = 50, base_t cmp_b = 42) :
ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b),
dest(dest), mode(mode), cmp_a(cmp_a), cmp_b(cmp_b) {
const int current_cpu = sched_getcpu();
const int current_node = numa_node_of_cpu(current_cpu);
const int cache_node = CachePlacementPolicy(current_node, current_node, 0);
chunk_size_w = chunk_size_b / sizeof(base_t);
chunk_cnt = size_b / chunk_size_b;
thread_count_fi = tc_fi;
thread_count_fc = tc_fc;
thread_count_ag = tc_ag;
thread_count = tc_fi + tc_fc + tc_ag;
ready_flag_a = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), cache_node);
ready_flag_b = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), cache_node);
mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node);
mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node);
cache_.Init(CachePlacementPolicy, CopyMethodPolicy);
size_t measurement_space = std::max(std::max(tc_fi, tc_fc), tc_ag);
trt = new thread_runtime_timing(3, measurement_space, current_node);
bt = new barrier_timing(3, measurement_space, current_node);
pvc = new pcm_value_collector({"scan_a", "scan_b", "aggr_j"}, measurement_space, current_node);
reset_barriers();
};
void reset_barriers(){
if(sync_barrier != nullptr) {
for(auto& barrier : *sync_barrier) {
delete barrier;
}
sync_barrier.reset();
}
sync_barrier = std::make_unique<std::vector<std::barrier<barrier_completion_function>*>>(thread_count);
uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc;
uint32_t barrier_count = barrier_mode.compare("global") == 0 ? 1 : thread_count;
uint32_t barrier_thread_count;
if constexpr(simple){
barrier_thread_count = (thread_count / barrier_count) * (mode == NewPMode::Prefetch ? thread_count_sum : (thread_count_ag + thread_count_fi));
} else {
barrier_thread_count = (thread_count / barrier_count) * thread_count_sum;
}
for(uint32_t i = 0; i < barrier_count; ++i) {
(*sync_barrier)[i] = new std::barrier<barrier_completion_function>(barrier_thread_count);
}
}
void clear_buffers () {
std::memset((void*)ready_flag_a, 0x00, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0));
std::memset((void*)ready_flag_b, 0x00, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0));
std::memset(mask_a, 0x00, size_b / sizeof(base_t));
std::memset(mask_b, 0x00, size_b / sizeof(base_t));
cache_.Clear();
trt->reset_accumulator();
bt->reset_accumulator();
pvc->reset();
reset_barriers();
};
~Query_Wrapper() {
numa_free((void*)ready_flag_a, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0));
numa_free((void*)ready_flag_b, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0));
numa_free(mask_a, size_b / sizeof(base_t));
numa_free(mask_b, size_b / sizeof(base_t));
delete trt;
for(auto& barrier : *sync_barrier) {
delete barrier;
}
delete bt;
delete pvc;
};
//this can be set without need to change allocations
void set_thread_group_count(uint32_t value) {
this->thread_group = value;
};
private:
static inline base_t* get_sub_chunk_ptr(base_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) {
base_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w;
return chunk_ptr + tid * (chunk_size_w / tcnt);
}
static inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) {
// 16 integer are addressed with one uint16_t in mask buffer
size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt);
return base_ptr + (offset / 16);
}
static bool bit_at(volatile uint8_t* bitmap, uint32_t bitpos) {
uint8_t value = bitmap[bitpos / 8];
switch(bitpos % 8) {
case 0: return value & 0b00000001;
case 1: return value & 0b00000010;
case 2: return value & 0b00000100;
case 3: return value & 0b00001000;
case 4: return value & 0b00010000;
case 5: return value & 0b00100000;
case 6: return value & 0b01000000;
case 7: return value & 0b10000000;
default: return false;
}
}
static void set_bit_at(volatile uint8_t* bitmap, std::mutex& mutex, uint32_t bitpos) {
mutex.lock();
switch(bitpos % 8) {
case 0: bitmap[bitpos / 8] |= 0b00000001;break;
case 1: bitmap[bitpos / 8] |= 0b00000010;break;
case 2: bitmap[bitpos / 8] |= 0b00000100;break;
case 3: bitmap[bitpos / 8] |= 0b00001000;break;
case 4: bitmap[bitpos / 8] |= 0b00010000;break;
case 5: bitmap[bitpos / 8] |= 0b00100000;break;
case 6: bitmap[bitpos / 8] |= 0b01000000;break;
case 7: bitmap[bitpos / 8] |= 0b10000000;break;
}
mutex.unlock();
}
public:
void scan_b(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_fc;
assert(chunk_size_w % tcnt == 0);
assert(chunk_size_w % 16 == 0);
assert(chunk_size_w % tcnt * 16 == 0);
// wait till everyone can start
ready_future->wait();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) {
trt->start_timer(1, tid * gcnt + gid);
pvc->start("scan_b", tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr = get_sub_mask_ptr(mask_b, chunk_id, chunk_size_w, tid, tcnt);
if constexpr(simple){
cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
} else {
const auto data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
if constexpr(wait_b) {
// wait on copy to complete - during this time other threads may
// continue with their calculation which leads to little impact
// and we will be faster if the cache is used
data->WaitOnCompletion();
// obtain the data location from the cache entry
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
// nullptr is still a legal return value for CacheData::GetLocation()
// even after waiting, so this must be checked
if (data_ptr == nullptr) {
std::cerr << "[!] Cache Miss in ScanB" << std::endl;
data_ptr = chunk_ptr;
}
filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt);
}
else {
// obtain the data location from the cache entry
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
// nullptr is still a legal return value for CacheData::GetLocation()
// even after waiting, so this must be checked
if (data_ptr == nullptr) {
data_ptr = chunk_ptr;
}
filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt);
}
}
pvc->stop("scan_b", tid * gcnt + gid);
trt->stop_timer(1, tid * gcnt + gid);
bt->timed_wait(*(*sync_barrier)[barrier_idx], 1, tid * gcnt + gid);
}
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
void scan_a(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_fi;
assert(chunk_size_w % tcnt == 0);
assert(chunk_size_w % 16 == 0);
assert(chunk_size_w % tcnt * 16 == 0);
// wait till everyone can start
ready_future->wait();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) {
trt->start_timer(0, tid * gcnt + gid);
pvc->start("scan_a", tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
if constexpr (cache_a) {
const auto data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
data->WaitOnCompletion();
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
if (data_ptr == nullptr) {
std::cerr << "[!] Cache Miss in ScanA" << std::endl;
data_ptr = chunk_ptr;
}
filter::apply_same(mask_ptr, nullptr, data_ptr, cmp_a, chunk_size_b / tcnt);
}
else {
filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt);
}
pvc->stop("scan_a", tid * gcnt + gid);
trt->stop_timer(0, tid * gcnt + gid);
bt->timed_wait(*(*sync_barrier)[barrier_idx], 0, tid * gcnt + gid);
}
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
void aggr_j(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_ag;
// wait till everyone can start
ready_future->wait();
// calculate values
__m512i aggregator = aggregation::OP::zero();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) {
bt->timed_wait(*(*sync_barrier)[barrier_idx], 2, tid * gcnt + gid);
trt->start_timer(2, tid * gcnt + gid);
pvc->start("aggr_j", tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
// access the cache for the given chunk which will have been accessed in scan_b
const auto data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
// wait on the caching task to complete, this will give time for other processes
// to make progress here which will therefore not hurt performance
data->WaitOnCompletion();
// after the copy task has finished we obtain the pointer to the cached
// copy of data_b which is then used from now on
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
// nullptr is still a legal return value for CacheData::GetLocation()
// even after waiting, so this must be checked
if (data_ptr == nullptr) {
data_ptr = chunk_ptr;
std::cerr << "[!] Cache Miss in AggrJ" << std::endl;
}
uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr_b = get_sub_mask_ptr (mask_b, chunk_id, chunk_size_w, tid, tcnt);
base_t tmp = _mm512_reduce_add_epi64(aggregator);
if constexpr(simple){
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, chunk_size_b / tcnt);
} else {
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, mask_ptr_b, chunk_size_b / tcnt);
}
pvc->stop("aggr_j", tid * gcnt + gid);
trt->stop_timer(2, tid * gcnt + gid);
}
// so threads with more runs dont wait for alerady finished threads
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
aggregation::happly(dest + (tid * gcnt + gid), aggregator);
}
};

80
qdp_project/src/utils/array_utils.h

@ -0,0 +1,80 @@
#pragma once
#include <cstdlib>
#include <ctime>
#include <cstdint>
#include <type_traits>
#include <random>
#include <chrono>
#include <immintrin.h>
/// @brief Fills a given array with random generated integers.
/// @tparam base_t Datatype of the array
/// @param dest Pointer to the array
/// @param size Size of the array
/// @param min Minumum value of the generated integers
/// @param max Maximum value of the generated integers
template<typename base_t>
void fill(base_t * dest, uint64_t size, base_t min, base_t max) {
std::srand(std::time(nullptr));
for(uint64_t i = 0; i < size/sizeof(base_t); ++i) {
dest[i] = (std::rand() % (max - min)) + min;
}
}
/// @brief Fills a given array with random generated integers using the mersenne twister engine (type std::mt19937).
/// @tparam base_t Datatype of the array
/// @param dest Pointer to the array
/// @param size Size of the array
/// @param min Minumum value of the generated integers
/// @param max Maximum value of the generated integers
template <typename T>
void fill_mt(T* array, uint64_t size, T min, T max, uint64_t int_seed = 0) {
static_assert(std::is_integral<T>::value, "Data type is not integral.");
size = size / sizeof(T);
std::mt19937::result_type seed;
if (int_seed == 0) {
std::random_device rd;
seed = rd() ^ (
(std::mt19937::result_type) std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count() +
(std::mt19937::result_type) std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch()).count());
} else seed = int_seed;
std::mt19937 gen(seed);
std::uniform_int_distribution<T> distrib(min, max);
for (uint64_t j = 0; j < size; ++j) {
array[j] = distrib(gen);
}
}
/**
* @brief Checks if two arrays of the integral type *T* contain the same values
*
* @tparam T Integral type of *array0* and *array1*
* @param array0 Array 0 to check
* @param array1 Array 1 to check
* @param size_b Size of the two arrays in byte
* @param verbose Decides if outputs are verbose of not (print every not matching numbers with their index)
* @return bool Weathor or not the content is equal or not
*/
template <typename T>
typename std::enable_if<std::is_integral<T>::value, bool>::type
check_same(T* array0, T* array1, size_t size_b, bool verbose) {
for(uint64_t i = 0; i <= size_b / sizeof(T); i += 64 / sizeof(T)) {
__m512i vec0 = _mm512_stream_load_si512(array0 + i);
__m512i vec1 = _mm512_stream_load_si512(array1 + i);
__mmask8 res = _mm512_cmpeq_epi64_mask(vec0, vec1);
}
//TODO complete function
return false;
}

73
qdp_project/src/utils/barrier_utils.h

@ -0,0 +1,73 @@
#pragma once
#include <cstdint>
#include <numa.h>
#include <barrier>
#include <chrono>
#define BARRIER_TIMINGS 1
struct barrier_completion_function {
inline void operator() () {
return;
}
};
struct barrier_timing {
uint32_t time_points, time_threads;
double** time_accumulator;
barrier_timing(uint32_t timing_points, uint32_t timing_threads, uint32_t memory_node) {
#ifdef BARRIER_TIMINGS
time_points = timing_points;
time_threads = timing_threads;
time_accumulator = (double**) numa_alloc_onnode(timing_points * sizeof(double*), memory_node);
for(uint32_t i = 0; i < timing_points; ++i) {
time_accumulator[i] = (double*) numa_alloc_onnode(timing_threads * sizeof(double), memory_node);
}
#endif
}
~barrier_timing() {
#ifdef BARRIER_TIMINGS
for(uint32_t i = 0; i < time_points; ++i) {
numa_free(time_accumulator[i], time_threads * sizeof(double));
}
numa_free(time_accumulator, time_points * sizeof(double*));
#endif
}
void reset_accumulator() {
#ifdef BARRIER_TIMINGS
for(uint32_t i = 0; i < time_points; ++i){
for(uint32_t j = 0; j < time_threads; ++j){
time_accumulator[i][j] = 0.0;
}}
#endif
}
double summarize_time(uint32_t time_point) {
#ifdef BARRIER_TIMINGS
double sum = 0.0;
for(uint32_t i = 0; i < time_threads; ++i) {
sum += time_accumulator[time_point][i];
}
return sum;
#endif
}
void timed_wait(std::barrier<struct barrier_completion_function>& barrier, uint32_t point_id, uint32_t thread_id) {
#ifdef BARRIER_TIMINGS
auto before_barrier = std::chrono::steady_clock::now();
#endif
barrier.arrive_and_wait();
#ifdef BARRIER_TIMINGS
auto after_barrier = std::chrono::steady_clock::now();
uint64_t barrier_wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(after_barrier - before_barrier).count();
double seconds = barrier_wait_time / (1000.0 * 1000.0 * 1000.0);
time_accumulator[point_id][thread_id] += seconds;
#endif
}
};

33
qdp_project/src/utils/const.h

@ -0,0 +1,33 @@
/**
* @file const.h
* @author André Berthold
* @brief Defines handy constants.
* @version 0.1
* @date 2023-05-25
*
* @copyright Copyright (c) 2023
*
*/
#pragma once
#include <cstdint>
#include <immintrin.h>
constexpr size_t VECTOR_SIZE_I = 512;
constexpr size_t VECTOR_SIZE_B = VECTOR_SIZE_I / 8;
constexpr size_t VECTOR_SIZE_H = VECTOR_SIZE_B / sizeof(uint32_t);
constexpr size_t VECTOR_SIZE_W = VECTOR_SIZE_B / sizeof(uint64_t);
template<typename T>
constexpr size_t VECTOR_SIZE() {
return VECTOR_SIZE_B / sizeof(T);
}
template<typename T>
constexpr size_t V_MASK_SIZE() {
return VECTOR_SIZE<T>() / 8;
}
const __mmask16 full_m16 = _mm512_int2mask(0xFFFF);

82
qdp_project/src/utils/cpu_set_utils.h

@ -0,0 +1,82 @@
#pragma once
#include <cstdint>
#include <thread>
#include <cassert>
#include <iostream>
#include <vector>
#include <utility>
/** Sets all bits in a given cpu_set_t between L and H (condition L <= H)*/
#define CPU_BETWEEN(L, H, SET) assert(L <= H); for(; L < H; ++L) {CPU_SET(L, SET);}
/**
* Applies the affinity defined in set to the thread, through pthread library
* calls. If it fails it wites the problem to stderr and terminated the program.
*/
inline void pin_thread(std::thread& thread, cpu_set_t* set) {
int error_code = pthread_setaffinity_np(thread.native_handle(), sizeof(cpu_set_t), set);
if (error_code != 0) {
std::cerr << "Error calling pthread_setaffinity_np in copy_pool assignment: " << error_code << std::endl;
exit(-1);
}
}
/**
* Returns the cpu id of the thread_id-th cpu in a given (multi)range. Thread_id
* greater than the number of cpus in the (multi)range are valid. In this case
* the (thread_id % #cpus in the range)-th cpu in the range is returned.
*/
int get_cpu_id(int thread_id, const std::vector<std::pair<int, int>>& range) {
int subrange_size = range[0].second - range[0].first;
int i = 0;
while(subrange_size <= thread_id) {
thread_id -= subrange_size;
i = (i + 1) % range.size();
subrange_size = range[i].second - range[i].first;
}
return thread_id + range[i].first;
}
/*inline void cpu_set_between(cpu_set_t* set, uint32_t low, uint32_t high) {
assert(low != high);
if (low > high) std::swap(low, high);
for(; low < high; ++low) {
CPU_SET(low, set);
}
}*/
/**
* Pins the given thread to the thread_id-th cpu in the given range.
*/
void pin_thread_in_range(std::thread& thread, int thread_id, std::vector<std::pair<int, int>>& range) {
cpu_set_t set;
CPU_ZERO(&set);
CPU_SET(get_cpu_id(thread_id, range), &set);
pin_thread(thread, &set);
}
/**
* Pins the given thread to all cpus in the given range.
*/
void pin_thread_in_range(std::thread& thread, std::vector<std::pair<int, int>>& range) {
cpu_set_t set;
CPU_ZERO(&set);
for(auto r : range) { CPU_BETWEEN(r.first, r.second, &set); }
pin_thread(thread, &set);
}
/**
* Pins the given thread to all cpu ids between low (incl.) and high (excl.).
*/
inline void pin_thread_between(std::thread& thread, uint32_t low, uint32_t high) {
cpu_set_t set;
CPU_ZERO(&set);
CPU_BETWEEN(low, high, &set);
pin_thread(thread, &set);
}

104
qdp_project/src/utils/execution_modes.h

@ -0,0 +1,104 @@
#include <string>
enum PMode{no_copy = 0, hbm = 1, expl_copy = 2};
struct mode_manager {
static inline PMode inc(PMode value) {
return static_cast<PMode>(value + 1);
};
static inline bool pred(PMode value) {
return no_copy <= value && value <= expl_copy;
};
static std::string string(PMode value) {
switch(value) {
case no_copy: return "no_copy";
case hbm: return "hbm_pre";
case expl_copy:return "expl_co";
} return "no_copy";
};
};
#define SIMPLE_Q 0
#define COMPLEX_Q 1
#define SCAN_A 0
#define SCAN_B 1
#define AGGR_J 2
enum NewPMode{DRAM_base = 0, HBM_base = 1, Mixed_base = 2, Prefetch = 3};
struct new_mode_manager {
/*constexpr static int thread_counts[2][4][3] = {
//simple query
//scan_a, scan_b, aggr_j
{{3, 0, 3}, // DRAM_base
{3, 0, 3}, // HBM_base
{3, 0, 3}, // Mixed_base
{1, 4, 1}},// Prefetching
//complex query
{{1, 4, 1}, // DRAM_base
{1, 4, 1}, // HBM_base
{1, 4, 1}, // Mixed_base
{1, 4, 1}},// Prefetching
};*/
/*constexpr static int thread_counts[2][4][3] = {
//simple query
//scan_a, scan_b, aggr_j
{{2, 0, 4}, // DRAM_base
{2, 0, 4}, // HBM_base
{2, 0, 4}, // Mixed_base
{1, 4, 1}},// Prefetching
//complex query
{{1, 4, 1}, // DRAM_base
{1, 4, 1}, // HBM_base
{1, 4, 1}, // Mixed_base
{1, 4, 1}},// Prefetching
};*/
constexpr static int thread_counts[2][4][3] = {
// thread counts for both simple and complex querry
// inner layout: { scan_a, scan_b, aggr_j }
//simple query
{
{4, 0, 2}, // DRAM_base
{4, 0, 2}, // HBM_base
{4, 0, 2}, // Mixed_base
{4, 4, 4} // Prefetching
},
//complex query
{
{1, 4, 1}, // DRAM_base
{1, 4, 1}, // HBM_base
{1, 4, 1}, // Mixed_base
{4, 4, 4} // Prefetching
}
};
static inline NewPMode inc(NewPMode value) {
return static_cast<NewPMode>(value + 1);
};
static inline bool pred(NewPMode value) {
return DRAM_base <= value && value <= Prefetch;
};
static int thread_count(uint8_t query_type, NewPMode mode, uint8_t thread_type){
if(query_type > 1) query_type = 1;
if(thread_type > 2) thread_type = 2;
return (thread_counts[query_type][mode][thread_type]);
};
static std::string string(NewPMode value) {
switch(value) {
case DRAM_base:
return "DRAM_Baseline";
case HBM_base:
return "HBM_Baseline";
case Mixed_base:
return "DRAM_HBM_Baseline";
case Prefetch:
return "Q-d_Prefetching";
default:
std::cerr << "[x] Unknown Processing Mode" << std::endl;
exit(-1);
}
};
};

76
qdp_project/src/utils/file_output.h

@ -0,0 +1,76 @@
/**
* @file file_output.h
* @author André Berthold
* @brief Implements a template-function that accepts an arbitrary number of parameters that should be printed
* @version 0.1
* @date 2023-05-25
*
* @copyright Copyright (c) 2023
*
*/
#pragma once
#include <fstream>
#include <string>
#include <type_traits>
#include "iterable_range.h"
template<class T>
inline constexpr bool is_numeric_v = std::disjunction<
std::is_integral<T>,
std::is_floating_point<T>>::value;
/**
* @brief Converts a parameter to a string by either using it directly or its member current (if it is of type Labeled)
* as parameter to the std::string-Constructor.
*
* @tparam T Type of the parameter
* @param value Parameter to be converted
* @return std::string The converted parameter
*/
template<typename T>
inline std::string to_string(T value) {
if constexpr(std::is_base_of<Labeled, T>::value){
// integrals cannot be use in the string constructor and must be translated by the std::to_string-function
if constexpr (is_numeric_v<decltype(value.current)>) {
return std::to_string(value.current);
} else {
return std::string(value.current);
}
} else {
// integrals cannot be use in the string constructor and must be translated by the std::to_string-function
if constexpr (is_numeric_v<decltype(value)>) {
return std::to_string(value);
} else {
return std::string(value);
}
}
}
/**
* @brief This function wites the content of *val* to *file*. Terminates terecursive function definition.
*
* @tparam type Type of the paramter *val* (is usually implicitly defeined)
* @param file File that is written to
* @param val Value that is translated to a char stream and written to the file
*/
template<typename type>
inline void print_to_file(std::ofstream &file, type val) {
file << to_string(val) << std::endl;
}
/**
* @brief This function wites the content of *val* and that content if *vals* to *file*.
*
* @tparam type Type of the paramter *val* (is usually implicitly defeined)
* @tparam types Parameter pack that describes the types of *vals*
* @param file File that is written to
* @param val Value that is translated to a char stream and written to the file
* @param vals Paramater pack of values that are gonna be printed to the file
*/
template<typename type, typename... types>
inline void print_to_file(std::ofstream &file, type val, types ... vals) {
file << to_string(val) << ",";
print_to_file(file, vals...);
}

208
qdp_project/src/utils/iterable_range.h

@ -0,0 +1,208 @@
#pragma once
#include <cstdint>
#include <type_traits>
#include <string>
constexpr auto NO_NEXT = "false";
/**
* @brief Class that adds an label member-parameter to a sub-class
*
*/
class Labeled {
public:
std::string label;
public:
Labeled(std::string str) : label(str) {};
Labeled(const char* str) { this->label = std::string(str); };
};
/**
* @brief Converts a parameter to a string by either reading the member label (if it is of type Labeled) or using it
* as parameter to the std::string-Constructor.
*
* @tparam T Type of the parameter
* @param value Parameter to be converted
* @return std::string The converted parameter
*/
template<typename T>
inline std::string generateHead(T value) {
if constexpr(std::is_base_of<Labeled, T>::value){
return value.label;
} else {
return std::string(value);
}
}
/**
* @brief Converts a parameter-pack to a string calling genarateHead(T) on every parameter and concatenatin the results.
*
* @tparam T Type of the first parameter
* @tparam Ts Parameter pack specifying the preceeding parameters' types
* @param value Parameter to be transformed
* @param values Parameter-pack of the next prameters to be transformed
* @return std::string Comma-separated concatenation of all parameters string representation
*/
template<typename T, typename... Ts>
inline std::string generateHead(T value, Ts... values) {
return generateHead(value) + ',' + generateHead(values...);
}
/**
* @brief Takes a single Range object and calls its next function.
*
* @tparam T Specific type of the Range object
* @param t Instance of the Range object
* @return std::string Label of the Range object or "false" if the Range reaced its end and was reset
*/
template<typename T>
std::string IterateOnce(T& t) {
if(t.next()) return t.label;
else t.reset();
return std::string(NO_NEXT); //the string signalises that the iteration has to be terminiated.
}
/**
* @brief Takes a number of Range objects and recusively increments them till the first Range does not reach its end
* upon incrementing. It tarts at the first Range object given. Every Range object that reached its end is reset to
* its start value.
*
* @tparam T Specific type of the first Range object
* @tparam Ts Types to the following Range objects
* @param t First instance of the Range object
* @param ts Parameter pack of the following Range objects
* @return std::string Label of the highest index Range object that was altered, or "false" if the last Range object
* reache its end and was reset
*/
template<typename T, typename... Ts>
std::string IterateOnce(T& t , Ts&... ts) {
if(t.next()) return t.label;
else t.reset();
return IterateOnce<Ts...>(ts...);
}
/**
* @brief Class that provides a convenient interface for iteratin throug a parameter range. It stores a public value
* that can be altered by the classes' methods.
*
* @tparam T Base type of the parameter
* @tparam INIT Initial value of the current pointer
* @tparam PRED Struct providing an apply function testing if the current value is in range or not
* @tparam INC Struct providing an apply function setting the current value to the value following the current value
*/
template<typename T, T INIT, typename PRED, typename INC>
class Range : public Labeled {
public:
/**
* @brief Current value of the parameter
*/
T current = INIT;
/**
* @brief Resets current to its initial value
*/
void reset() {current = INIT; };
/**
* @brief Sets current to its next value (according to INC::inc) and returns if the range Reached its end
* (accordingt to PRED::pred).
*
* @return true The newly assigned value of current is in the range
* @return false Otherwise
*/
bool next() {
current = INC::inc(current);
return PRED::pred(current);
};
/**
* @brief Checks if current is in the Range (according to PRED).
*
* @return true PRED returns true
* @return false Otherwise
*/
bool valid() { return PRED::apply(current); };
};
/**
* @brief Class that is in contrast to Range specialized for integral values.
*
* @tparam T Integral base type of the Range
* @tparam INIT Initial value of the parameter
* @tparam MAX Maximal value of the parameter
* @tparam INC Struct providing an apply function setting the current value to the value following the current value
*/
template<typename T, T INIT, T MAX, typename INC>
class Int_Range : public Labeled {
static_assert(std::is_integral<T>::value, "Int_Range requires an integral base type");
public:
const T max = MAX;
T current = INIT;
void reset() {current = INIT; };
bool next() {
current = INC::inc(current);
return current < MAX;
};
bool valid() { return current < MAX; };
};
/**
* @brief Class that is in contrast to Int_Range specialized for integrals that grow linearly.
*
* @tparam T Integral base type of the Range
* @tparam INIT Initial value of the parameter
* @tparam MAX Maximal value of the parameter
* @tparam STEP Increase of the value per next()-call
*/
template<typename T, T INIT, T MAX, T STEP = 1>
class Linear_Int_Range : public Labeled {
static_assert(std::is_integral<T>::value, "Linear_Int_Range requires an integral base type");
public:
const T max = MAX;
T current = INIT;
void reset() {current = INIT; };
bool next() {
current += STEP;
return current < MAX;
};
bool valid() { return current < MAX; };
};
/**
* @brief Class that is in contrast to Int_Range specialized for integrals that grow exponetially.
*
* @tparam T Integral base type of the Range
* @tparam INIT Initial value of the parameter
* @tparam MAX Maximal value of the parameter
* @tparam FACTOR Multiplicative Increase of the value per next()-call
*/
template<typename T, T INIT, T MAX, T FACTOR = 2>
class Exp_Int_Range : public Labeled {
static_assert(std::is_integral<T>::value, "Exp_Int_Range requires an integral base type");
public:
const T max = MAX;
T current = INIT;
void reset() {current = INIT; };
bool next() {
current *= FACTOR;
return current < MAX;
};
bool valid() { return current < MAX; };
};

152
qdp_project/src/utils/measurement_utils.h

@ -0,0 +1,152 @@
#pragma once
#include <cstdint>
#include <chrono>
#include <vector>
#include <string>
#include <algorithm>
#include <numa.h>
#if PCM_M == 1
#define PCM_MEASURE 1
#include "pcm.h"
#endif
struct pcm_value_collector {
const uint32_t value_count = 6;
uint32_t threads;
std::vector<std::string> points;
#ifdef PCM_MEASURE
pcm::SystemCounterState** states;
#endif
uint64_t** collection;
pcm_value_collector(const std::vector<std::string>& in_points, uint32_t threads, uint32_t memory_node) : threads(threads) {
#ifdef PCM_MEASURE
points = std::vector(in_points);
collection = (uint64_t**) numa_alloc_onnode(threads * sizeof(uint64_t*), memory_node);
states = (pcm::SystemCounterState**) numa_alloc_onnode(threads * sizeof(pcm::SystemCounterState*), memory_node);
for(int i = 0; i < threads; ++i) {
collection[i] = (uint64_t*) numa_alloc_onnode(points.size() * value_count * sizeof(uint64_t), memory_node);
states[i] = (pcm::SystemCounterState*) numa_alloc_onnode(points.size() * sizeof(pcm::SystemCounterState), memory_node);
}
#endif
}
~pcm_value_collector() {
#ifdef PCM_MEASURE
for(int i = 0; i < threads; ++i) {
numa_free(collection[threads], points.size() * value_count * sizeof(uint64_t));
}
numa_free(collection, threads * sizeof(uint64_t*));
numa_free(states, threads * sizeof(pcm::SystemCounterState));
#endif
}
void reset() {
#ifdef PCM_MEASURE
for(int i = 0; i < threads; ++i)
for(uint32_t j = 0; j < points.size() * value_count; ++j){
collection[i][j] = 0;
}
#endif
}
int64_t point_index(const std::string& value) {
auto it = std::find(points.begin(), points.end(), value);
if(it == points.end()) return -1;
else return it - points.begin();
}
std::vector<uint64_t> summarize(const std::string &point) {
#ifdef PCM_MEASURE
std::vector<uint64_t> sums(value_count);
int64_t idx = point_index(point);
if(idx < 0) return sums;
for(uint32_t v = 0; v < value_count; ++v) {
for(uint32_t i = 0; i < threads; ++i) {
sums[v] += collection[i][static_cast<uint32_t>(idx) + points.size() * v];
}
}
return sums;
#endif
return std::vector<uint64_t> {0};
}
std::string summarize_as_string(const std::string &point) {
#ifdef PCM_MEASURE
auto summary = summarize(point);
auto it = summary.begin();
auto end = summary.end();
if(it >= end) return "";
std::string result("");
result += std::to_string(*it);
++it;
while(it < end) {
result += ",";
result += std::to_string(*it);
++it;
}
return result;
#endif
return "";
}
void start(const std::string& point, uint32_t thread) {
#ifdef PCM_MEASURE
int64_t idx = point_index(point);
if(idx < 0) {
std::cerr << "Invalid 'point' given. Ignored!" << std::endl;
return;
}
states[thread][static_cast<uint32_t>(idx)] = pcm::getSystemCounterState();
#endif
}
static std::string getHead(const std::string& point) {
return point + "_l2h," +
point + "_l2m," +
point + "_l3h," +
point + "_l3hns," +
point + "_l3m," +
point + "_mc";
}
#ifdef PCM_MEASURE
void read_values(uint32_t point_idx, uint32_t thread, pcm::SystemCounterState& start, pcm::SystemCounterState& end) {
collection[thread][point_idx + points.size() * 0] += getL2CacheHits(start, end);
collection[thread][point_idx + points.size() * 1] += getL2CacheMisses(start, end);
collection[thread][point_idx + points.size() * 2] += getL3CacheHits(start, end);
collection[thread][point_idx + points.size() * 3] += getL3CacheHitsNoSnoop(start, end);
collection[thread][point_idx + points.size() * 4] += getL3CacheMisses(start, end);
collection[thread][point_idx + points.size() * 5] += getBytesReadFromMC(start, end);
}
#endif
void stop(const std::string& point, uint32_t thread) {
#ifdef PCM_MEASURE
auto state = pcm::getSystemCounterState();
int64_t idx = point_index(point);
if(idx < 0) {
std::cerr << "Invalid 'point' given. Ignored!" << std::endl;
return;
}
auto start = states[thread][static_cast<uint32_t>(idx)];
read_values(static_cast<uint32_t>(idx), thread, start, state);
#endif
}
};

45
qdp_project/src/utils/memory_literals.h

@ -0,0 +1,45 @@
/**
* @file memory_literals.h
* @author André Berthold
* @brief Defines some operators that ease to define a certain size of memory.
* e.g. to alloc 3 Gib (Gibibit = 2^30 bit) of memory one can now simply write: "std::malloc(3_Gib)"
* to alloc 512 MB (Megabyte = 10^2 byte) of memory one can now simply write: "std::malloc(512_MB)"
* @version 0.1
* @date 2023-05-25
*
* @copyright Copyright (c) 2023
*
*/
#pragma once
#include <cstdint>
typedef const unsigned long long int ull_int;
//***************************************************************************//
// Bit **********************************************************************//
//***************************************************************************//
constexpr size_t operator ""_b(ull_int value) {
// one byte is 8 bit + one byte if bit is no multiple of 8
return value / 8 + value % 8;
}
constexpr size_t operator ""_kb (ull_int value) { return value * 1000 / 8; }
constexpr size_t operator ""_kib(ull_int value) { return value * 1024 / 8; }
constexpr size_t operator ""_Mb (ull_int value) { return value * 1000 * 1000 / 8; }
constexpr size_t operator ""_Mib(ull_int value) { return value * 1024 * 1024 / 8; }
constexpr size_t operator ""_Gb (ull_int value) { return value * 1000 * 1000 * 1000 / 8; }
constexpr size_t operator ""_Gib(ull_int value) { return value * 1024 * 1024 * 1024 / 8; }
constexpr size_t operator ""_Tb (ull_int value) { return value * 1000 * 1000 * 1000 * 1000 / 8; }
constexpr size_t operator ""_Tib(ull_int value) { return value * 1024 * 1024 * 1024 * 1024 / 8; }
//***************************************************************************//
// Byte *********************************************************************//
//***************************************************************************//
constexpr size_t operator ""_B (ull_int value) { return value; }
constexpr size_t operator ""_kB (ull_int value) { return value * 1000; }
constexpr size_t operator ""_kiB(ull_int value) { return value * 1024; }
constexpr size_t operator ""_MB (ull_int value) { return value * 1000 * 1000; }
constexpr size_t operator ""_MiB(ull_int value) { return value * 1024 * 1024; }
constexpr size_t operator ""_GB (ull_int value) { return value * 1000 * 1000 * 1000; }
constexpr size_t operator ""_GiB(ull_int value) { return value * 1024 * 1024 * 1024; }
constexpr size_t operator ""_TB (ull_int value) { return value * 1000 * 1000 * 1000 * 1000; }
constexpr size_t operator ""_TiB(ull_int value) { return value * 1024 * 1024 * 1024 * 1024; }

6
qdp_project/src/utils/pcm.h

@ -0,0 +1,6 @@
#pragma once
//this file includes all important header from the pcm repository
#include "cpucounters.h"
#include "msr.h"
#include "pci.h"
#include "mutex.h"

80
qdp_project/src/utils/timer_utils.h

@ -0,0 +1,80 @@
#pragma once
#include <cstdint>
#include <chrono>
#include <barrier>
#include <numa.h>
#define THREAD_TIMINGS 1
struct thread_runtime_timing {
using time_point_t = std::chrono::time_point<std::chrono::steady_clock>;
uint32_t time_points, time_threads;
time_point_t** start_times;
double** time_accumulator;
thread_runtime_timing(uint32_t timing_points, uint32_t timing_threads, uint32_t memory_node) {
#ifdef THREAD_TIMINGS
time_points = timing_points;
time_threads = timing_threads;
start_times = (time_point_t**) numa_alloc_onnode(timing_points * sizeof(time_point_t*), memory_node);
time_accumulator = (double**) numa_alloc_onnode(timing_points * sizeof(double*), memory_node);
for(uint32_t i = 0; i < timing_points; ++i) {
start_times[i] = (time_point_t*) numa_alloc_onnode(timing_threads * sizeof(time_point_t), memory_node);
time_accumulator[i] = (double*) numa_alloc_onnode(timing_threads * sizeof(double), memory_node);
}
#endif
}
~thread_runtime_timing() {
#ifdef THREAD_TIMINGS
for(uint32_t i = 0; i < time_points; ++i) {
numa_free(start_times[i], time_threads * sizeof(time_point_t));
numa_free(time_accumulator[i], time_threads * sizeof(double));
}
numa_free(start_times, time_points * sizeof(time_point_t*));
numa_free(time_accumulator, time_points * sizeof(double*));
#endif
}
void reset_accumulator() {
#ifdef THREAD_TIMINGS
for(uint32_t i = 0; i < time_points; ++i){
for(uint32_t j = 0; j < time_threads; ++j){
time_accumulator[i][j] = 0.0;
}}
#endif
}
double summarize_time(uint32_t time_point) {
#ifdef THREAD_TIMINGS
double sum = 0.0;
for(uint32_t i = 0; i < time_threads; ++i) {
sum += time_accumulator[time_point][i];
}
return sum;
#endif
}
void stop_timer(uint32_t point_id, uint32_t thread_id) {
#ifdef THREAD_TIMINGS
auto end_time = std::chrono::steady_clock::now();
auto start_time = start_times[point_id][thread_id];
uint64_t time = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count();
double seconds = time / (1000.0 * 1000.0 * 1000.0);
time_accumulator[point_id][thread_id] += seconds;
#endif
}
void start_timer(uint32_t point_id, uint32_t thread_id) {
#ifdef THREAD_TIMINGS
start_times[point_id][thread_id] = std::chrono::steady_clock::now();
#endif
}
};

93
qdp_project/src/utils/vector_loader.h

@ -0,0 +1,93 @@
/**
* @file vector_loader.h
* @author André Berthold
* @brief Provides an interface to easily excange vector loading strategies
* @version 0.1
* @date 2023-05-25
*
* @copyright Copyright (c) 2023
*
*/
#pragma once
#include <cstdint>
#include <type_traits>
#include <immintrin.h>
enum load_mode {Unaligned = 0, Aligned = 1, Stream = 2};
/**
* @brief A class template that provides functions for loading and storing data of type *base_t* into/from vectors using the stretegy *mode*.
*
* @tparam base_t Base type of the data
* @tparam mode Strategy for loading the vector
*/
template<typename base_t, load_mode mode>
class Vector_Loader {};
/**
* @brief Template specialization for Vector_Loader with base_t = uint32_t.
*
* @tparam mode Strategy for loading the vector
*/
template<load_mode mode>
class Vector_Loader<uint32_t, mode> {
using base_t = uint32_t;
using mask_t = __mmask16;
using mask_base_t = uint8_t;
public:
/**
* @brief Loads 512 bit of data into a vector register
*
* @param src Pointer to the data to load
* @return __m512i The vector register with the loaded data
*/
static inline __m512i load(base_t* src) {
if constexpr (mode == load_mode::Unaligned) return _mm512_loadu_epi32(src);
else if constexpr (mode == load_mode::Aligned) return _mm512_load_epi32 (src);
else if constexpr (mode == load_mode::Stream) return _mm512_stream_load_si512(src);
};
/**
* @brief Stroes data from a given vector register to a destination pointer
*
* @param dst Pointer to the data destination
* @param vector Vector register containing the data to store
*/
static inline void store(base_t* dst, __m512i vector) {
if constexpr (mode == load_mode::Unaligned) _mm512_storeu_epi32(dst, vector);
else if constexpr (mode == load_mode::Aligned) _mm512_store_epi32 (dst, vector);
else if constexpr (mode == load_mode::Stream) _mm512_stream_si512((__m512i*)(dst), vector);
};
};
/**
* @brief Template specialization for Vector_Loader with base_t = uint64_t.
*
* @tparam mode Strategy for loading the vector
*/
template<load_mode mode>
class Vector_Loader<uint64_t, mode> {
using base_t = uint64_t;
using mask_t = __mmask8;
using mask_base_t = uint8_t;
public:
static inline __m512i load(base_t* src) {
if constexpr (mode == load_mode::Unaligned) return _mm512_loadu_epi64(src);
else if constexpr (mode == load_mode::Aligned) return _mm512_load_epi64 (src);
else if constexpr (mode == load_mode::Stream) return _mm512_stream_load_si512(src);
};
static inline void store(base_t* dst, __m512i vector) {
if constexpr (mode == load_mode::Unaligned) _mm512_storeu_epi64(dst, vector);
else if constexpr (mode == load_mode::Aligned) _mm512_store_epi64 (dst, vector);
else if constexpr (mode == load_mode::Stream) _mm512_stream_si512((__m512i*)(dst), vector);
};
};
Loading…
Cancel
Save