diff --git a/.gitignore b/.gitignore index ab3553e..55c6836 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,8 @@ *.fls */.vscode/* +*/.idea/* +*/cmake-build-*/* # ---> C++ # Prerequisites diff --git a/offloading-cacher/CMakeLists.txt b/offloading-cacher/CMakeLists.txt new file mode 100755 index 0000000..19ddbdd --- /dev/null +++ b/offloading-cacher/CMakeLists.txt @@ -0,0 +1,20 @@ +cmake_minimum_required(VERSION 3.18) + +project(offloading-cacher LANGUAGES CXX) + +set(CMAKE_CXX_STANDARD 20) + +list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules") + +find_package(NUMA REQUIRED) +find_package(OpenMP REQUIRED) + +set(DML_SOURCE_DIR "../../DML/include/") +set(SOURCES main.cpp) + +add_executable(offloading-cacher ${SOURCES}) + +target_include_directories(offloading-cacher PRIVATE ${CMAKE_SOURCE_DIR} ${NUMA_INCLUDE_DIRS} ${DML_SOURCE_DIR}) +target_link_libraries(offloading-cacher PRIVATE libdml.a pthread ${CMAKE_DL_LIBS} ${NUMA_LIBRARY} OpenMP::OpenMP_CXX) + +install(TARGETS offloading-cacher DESTINATION ${CMAKE_INSTALL_PREFIX}) diff --git a/offloading-cacher/cache.hpp b/offloading-cacher/cache.hpp new file mode 100644 index 0000000..058a1e1 --- /dev/null +++ b/offloading-cacher/cache.hpp @@ -0,0 +1,727 @@ +#pragma once + +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include + +namespace dml { + inline const std::string StatusCodeToString(const dml::status_code code) { + switch (code) { + case dml::status_code::ok: return "ok"; + case dml::status_code::false_predicate: return "false predicate"; + case dml::status_code::partial_completion: return "partial completion"; + case dml::status_code::nullptr_error: return "nullptr error"; + case dml::status_code::bad_size: return "bad size"; + case dml::status_code::bad_length: return "bad length"; + case dml::status_code::inconsistent_size: return "inconsistent size"; + case dml::status_code::dualcast_bad_padding: return "dualcast bad padding"; + case dml::status_code::bad_alignment: return "bad alignment"; + case dml::status_code::buffers_overlapping: return "buffers overlapping"; + case dml::status_code::delta_delta_empty: return "delta delta empty"; + case dml::status_code::batch_overflow: return "batch overflow"; + case dml::status_code::execution_failed: return "execution failed"; + case dml::status_code::unsupported_operation: return "unsupported operation"; + case dml::status_code::queue_busy: return "queue busy"; + case dml::status_code::error: return "unknown error"; + case dml::status_code::config_error: return "config error"; + default: return "unhandled error"; + } + } +} + +namespace dsacache { + class Cache; + + /* + * Class Description: + * Holds all required information on one cache entry and is used + * both internally by the Cache and externally by the user. + * + * Important Usage Notes: + * The pointer is only updated in WaitOnCompletion() which + * therefore must be called by the user at some point in order + * to use the cached data. Using this class as T for + * std::shared_ptr is not recommended as references are + * already counted internally. + * + * Cache Lifetime: + * As long as the instance is referenced, the pointer it stores + * is guaranteed to be either nullptr or pointing to a valid copy. + * + * Implementation Detail: + * Performs self-reference counting with a shared atomic integer. + * Therefore on creating a copy the reference count is increased + * and with the destructor it is deacresed. If the last copy is + * destroyed the actual underlying data is freed and all shared + * variables deleted. + * + * Notes on Thread Safety: + * Class is thread safe in any possible state and performs + * reference counting and deallocation itself entirely atomically. + */ + + class CacheData { + public: + using dml_handler = dml::handler>; + + private: + // data source and size of the block + uint8_t* src_; + size_t size_; + + // global reference counting object + std::atomic* active_; + + // global cache-location pointer + std::atomic* cache_; + + // object-local incomplete cache location pointer + // which is only available in the first instance + uint8_t* incomplete_cache_; + + // dml handler vector pointer which is only + // available in the first instance + std::unique_ptr> handlers_; + + // deallocates the global cache-location + // and invalidates it + void Deallocate(); + + // checks whether there are at least two + // valid references to this object which + // is done as the cache always has one + // internally to any living instance + bool Active() const; + + friend Cache; + + public: + CacheData(uint8_t* data, const size_t size); + CacheData(const CacheData& other); + ~CacheData(); + + // waits on completion of caching operations + // for this task and is safe to be called in + // any state of the object + void WaitOnCompletion(); + + // returns the cache data location for this + // instance which is valid as long as the + // instance is alive - !!! this may also + // yield a nullptr !!! + uint8_t* GetDataLocation() const; + }; + + /* + * Class Description: + * Class will handle access to data through internal copies. + * These are obtained via work submission to the Intel DSA which takes + * care of asynchronously duplicating the data. The user will define + * where these copies lie and which system nodes will perform the copy. + * This is done through policy functions set during initialization. + * + * Placement Policy: + * The Placement Policy Function decides on which node a particular + * entry is to be placed, given the current executing node and the + * data source node and data size. This in turn means that for one + * datum, multiple cached copies may exist at one time. + * + * Cache Lifetime: + * When accessing the cache, a CacheData-object will be returned. + * As long as this object lives, the pointer which it holds is + * guaranteed to be either nullptr or a valid copy. When destroyed + * the entry is marked for deletion which is only carried out + * when system memory pressure drives an automated cache flush. + * + * Restrictions: + * - Overlapping Pointers may lead to undefined behaviour during + * manual cache invalidation which should not be used if you + * intend to have these types of pointers + * - Cache Invalidation may only be performed manually and gives + * no ordering guarantees. Therefore, it is the users responsibility + * to ensure that results after invalidation have been generated + * using the latest state of data. The cache is best suited + * to static data. + * + * Notes on Thread Safety: + * - Cache is completely thread-safe after initialization + * - CacheData-class will handle deallocation of data itself by + * performing self-reference-counting atomically and only + * deallocating if the last reference is destroyed + * - The internal cache state has one lock which is either + * acquired shared for reading the state (upon accessing an already + * cached element) or unique (accessing a new element, flushing, invalidating) + * - Waiting on copy completion is done over an atomic-wait in copies + * of the original CacheData-instance + * - Overall this class may experience performance issues due to the use + * of locking (in any configuration), lock contention (worsens with higher + * core count, node count and utilization) and atomics (worse in the same + * situations as lock contention) + * + * Improving Performance: + * When data is never shared between threads or memory size for the cache is + * not an issue you may consider having one Cache-instance per thread and removing + * the lock in Cache and modifying the reference counting and waiting mechanisms + * of CacheData accordingly (although this is high effort and will yield little due + * to the atomics not being shared among cores/nodes). + * Otherwise, one Cache-instance per node could also be considered. This will allow + * the placement policy function to be barebones and reduces the lock contention and + * synchronization impact of the atomic variables. + */ + + class Cache { + public: + // cache policy is defined as a type here to allow flexible usage of the cacher + // given a numa destination node (where the data will be needed), the numa source + // node (current location of the data) and the data size, this function should + // return optimal cache placement + // dst node and returned value can differ if the system, for example, has HBM + // attached accessible directly to node n under a different node id m + typedef int (CachePolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size); + + // copy policy specifies the copy-executing nodes for a given task + // which allows flexibility in assignment for optimizing raw throughput + // or choosing a conservative usage policy + typedef std::vector (CopyPolicy)(const int numa_dst_node, const int numa_src_node, const size_t data_size); + + private: + // mutex for accessing the cache state map + + std::shared_mutex cache_mutex_; + + // map from [dst-numa-node,map2] + // map2 from [data-ptr,cache-structure] + + std::unordered_map> cache_state_; + + CachePolicy* cache_policy_function_ = nullptr; + CopyPolicy* copy_policy_function_ = nullptr; + + // function used to submit a copy task on a specific node to the dml + // engine on that node - will change the current threads node assignment + // to achieve this so take care to restore this + dml::handler> ExecuteCopy( + const uint8_t* src, uint8_t* dst, const size_t size, const int node + ) const; + + // allocates the required memory on the destination node + // and then submits task to the dml library for processing + // and attaches the handlers to the cache data structure + void SubmitTask(CacheData* task, const int dst_node, const int src_node); + + // querries the policy functions for the given data and size + // to obtain destination cache node, also returns the datas + // source node for further usage + // output may depend on the calling threads node assignment + // as this is set as the "optimal placement" node + void GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const; + + // allocates memory of size "size" on the numa node "node" + // and returns nullptr if this is not possible, also may + // try to flush the cache of the requested node to + // alleviate encountered shortage + uint8_t* AllocOnNode(const size_t size, const int node); + + // checks whether the cache contains an entry for + // the given data in the given memory node and + // returns it, otherwise returns nullptr + std::unique_ptr GetFromCache(uint8_t* src, const size_t size, const int dst_node); + + public: + Cache() = default; + Cache(const Cache& other) = delete; + + // initializes the cache with the two policy functions + // only after this is it safe to use in a threaded environment + void Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function); + + // function to perform data access through the cache + std::unique_ptr Access(uint8_t* data, const size_t size); + + // flushes the cache of inactive entries + // if node is -1 then the whole cache is + // checked and otherwise the specified + // node - no checks on node validity + void Flush(const int node = -1); + + // forces out all entries from the + // cache and therefore will also "forget" + // still-in-use entries, these will still + // be properly deleted, but the cache + // will be fresh - use for testing + void Clear(); + + void Invalidate(uint8_t* data); + }; +} + +inline void dsacache::Cache::Clear() { + std::unique_lock lock(cache_mutex_); + + cache_state_.clear(); + + Init(cache_policy_function_, copy_policy_function_); +} + +inline void dsacache::Cache::Init(CachePolicy* cache_policy_function, CopyPolicy* copy_policy_function) { + cache_policy_function_ = cache_policy_function; + copy_policy_function_ = copy_policy_function; + + // initialize numa library + + numa_available(); + + // obtain all available nodes + // and those we may allocate + // memory on + + const int nodes_max = numa_num_configured_nodes(); + const bitmask* valid_nodes = numa_get_mems_allowed(); + + // prepare the cache state with entries + // for all given nodes + + for (int node = 0; node < nodes_max; node++) { + if (numa_bitmask_isbitset(valid_nodes, node)) { + cache_state_.insert({node,{}}); + } + } +} + +inline std::unique_ptr dsacache::Cache::Access(uint8_t* data, const size_t size) { + // get destination numa node for the cache + + int dst_node = -1; + int src_node = -1; + + GetCacheNode(data, size, &dst_node, &src_node); + + // TODO: at this point it could be beneficial to check whether + // TODO: the given destination node is present as an entry + // TODO: in the cache state to see if it is valid + + // check whether the data is already cached + + std::unique_ptr task = GetFromCache(data, size, dst_node); + + if (task != nullptr) { + return std::move(task); + } + + // at this point the requested data is not present in cache + // and we create a caching task for it + + task = std::make_unique(data, size); + + { + std::unique_lock lock(cache_mutex_); + + const auto state = cache_state_[dst_node].emplace(task->src_, *task); + + // if state.second is false then no insertion took place + // which means that concurrently whith this thread + // some other thread must have accessed the same + // resource in which case we return the other + // threads data cache structure + + if (!state.second) { + std::cout << "[!] Found another cache instance for 0x" << std::hex << (uint64_t)task->src_ << std::dec << std::endl; + return std::move(std::make_unique(state.first->second)); + } + } + + SubmitTask(task.get(), dst_node, src_node); + + return std::move(task); +} + +inline uint8_t* dsacache::Cache::AllocOnNode(const size_t size, const int node) { + // allocate data on this node and flush the unused parts of the + // cache if the operation fails and retry once + // TODO: smarter flush strategy could keep some stuff cached + + // check currently free memory to see if the data fits + + long long int free_space = 0; + numa_node_size64(node, &free_space); + + if (free_space < size) { + std::cout << "[!] Memory shortage when allocating " << size << "B on node " << node << std::endl; + + // dst node lacks memory space so we flush the cache for this + // node hoping to free enough currently unused entries to make + // the second allocation attempt successful + + Flush(node); + + // re-test by getting the free space and checking again + + numa_node_size64(node, &free_space); + + if (free_space < size) { + std::cout << "[x] Memory shortage after flush when allocating " << size << "B on node " << node << std::endl; + + return nullptr; + } + } + + uint8_t* dst = reinterpret_cast(numa_alloc_onnode(size, node)); + + if (dst == nullptr) { + std::cout << "[x] Allocation try failed for " << size << "B on node " << node << std::endl; + + return nullptr; + } + + return dst; +} + +inline void dsacache::Cache::SubmitTask(CacheData* task, const int dst_node, const int src_node) { + uint8_t* dst = AllocOnNode(task->size_, dst_node); + + if (dst == nullptr) { + std::cout << "[x] Allocation failed so we can not cache" << std::endl; + return; + } + + task->incomplete_cache_ = dst; + + // querry copy policy function for the nodes to use for the copy + + const std::vector executing_nodes = copy_policy_function_(dst_node, src_node, task->size_); + const size_t task_count = executing_nodes.size(); + + // each task will copy one fair part of the total size + // and in case the total size is not a factor of the + // given task count the last node must copy the remainder + + const size_t size = task->size_ / task_count; + const size_t last_size = size + task->size_ % task_count; + + // save the current numa node mask to restore later + // as executing the copy task will place this thread + // on a different node + + bitmask* nodemask = numa_get_run_node_mask(); + + for (uint32_t i = 0; i < task_count; i++) { + const size_t local_size = i + 1 == task_count ? size : last_size; + const size_t local_offset = i * size; + const uint8_t* local_src = task->src_ + local_offset; + uint8_t* local_dst = dst + local_offset; + + task->handlers_->emplace_back(ExecuteCopy(local_src, local_dst, local_size, executing_nodes[i])); + } + + // restore the previous nodemask + + numa_run_on_node_mask(nodemask); + numa_free_nodemask(nodemask); +} + +inline dml::handler> dsacache::Cache::ExecuteCopy( + const uint8_t* src, uint8_t* dst, const size_t size, const int node +) const { + dml::const_data_view srcv = dml::make_view(src, size); + dml::data_view dstv = dml::make_view(dst, size); + + numa_run_on_node(node); + + return dml::submit(dml::mem_copy.block_on_fault(), srcv, dstv); +} + +inline void dsacache::Cache::GetCacheNode(uint8_t* src, const size_t size, int* OUT_DST_NODE, int* OUT_SRC_NODE) const { + // obtain numa node of current thread to determine where the data is needed + + const int current_cpu = sched_getcpu(); + const int current_node = numa_node_of_cpu(current_cpu); + + // obtain node that the given data pointer is allocated on + + *OUT_SRC_NODE = -1; + get_mempolicy(OUT_SRC_NODE, NULL, 0, (void*)src, MPOL_F_NODE | MPOL_F_ADDR); + + // querry cache policy function for the destination numa node + + *OUT_DST_NODE = cache_policy_function_(current_node, *OUT_SRC_NODE, size); +} + +inline void dsacache::Cache::Flush(const int node) { + // this lambda is used because below we have two code paths that + // flush nodes, either one single or all successively + + const auto FlushNode = [](std::unordered_map& map) { + // begin at the front of the map + + auto it = map.begin(); + + // loop until we reach the end of the map + + while (it != map.end()) { + // if the iterator points to an inactive element + // then we may erase it + + if (it->second.Active() == false) { + // erase the iterator from the map + + map.erase(it); + + // as the erasure invalidated out iterator + // we must start at the beginning again + + it = map.begin(); + } + else { + // if element is active just move over to the next one + + it++; + } + } + }; + + { + // we require exclusive lock as we modify the cache state + + std::unique_lock lock(cache_mutex_); + + // node == -1 means that cache on all nodes should be flushed + + if (node == -1) { + for (auto& nc : cache_state_) { + FlushNode(nc.second); + } + } + else { + FlushNode(cache_state_[node]); + } + } +} + +inline std::unique_ptr dsacache::Cache::GetFromCache(uint8_t* src, const size_t size, const int dst_node) { + // the best situation is if this data is already cached + // which we check in an unnamed block in which the cache + // is locked for reading to prevent another thread + // from marking the element we may find as unused and + // clearing it + + // lock the cache state in shared-mode because we read + + std::shared_lock lock(cache_mutex_); + + // search for the data in our cache state structure at the given node + + const auto search = cache_state_[dst_node].find(src); + + // if the data is in our structure we continue + + if (search != cache_state_[dst_node].end()) { + + // now check whether the sizes match + + if (search->second.size_ >= size) { + // return a unique copy of the entry which uses the object + // lifetime and destructor to safely handle deallocation + + return std::move(std::make_unique(search->second)); + } + else { + // if the sizes missmatch then we clear the current entry from cache + // which will cause its deletion only after the last possible outside + // reference is also destroyed + + cache_state_[dst_node].erase(search); + } + } + + return nullptr; +} + +void dsacache::Cache::Invalidate(uint8_t* data) { + // as the cache is modified we must obtain a unique writers lock + + std::unique_lock lock(cache_mutex_); + + // loop through all per-node-caches available + + for (auto node : cache_state_) { + // search for an entry for the given data pointer + + auto search = node.second.find(data); + + if (search != node.second.end()) { + // if the data is represented in-cache + // then it will be erased to re-trigger + // caching on next access + + node.second.erase(search); + } + } +} + +inline dsacache::CacheData::CacheData(uint8_t* data, const size_t size) { + src_ = data; + size_ = size; + active_ = new std::atomic(1); + cache_ = new std::atomic(); + incomplete_cache_ = nullptr; + handlers_ = std::make_unique>(); +} + +inline dsacache::CacheData::CacheData(const dsacache::CacheData& other) { + // we copy the ptr to the global atomic reference counter + // and increase the amount of active references + + active_ = other.active_; + const int current_active = active_->fetch_add(1); + + // source and size will be copied too + // as well as the reference to the global + // atomic cache pointer + + src_ = other.src_; + size_ = other.size_; + cache_ = other.cache_; + + // incomplete cache and handlers will not + // be copied because only the first instance + // will wait on the completion of handlers + + incomplete_cache_ = nullptr; + handlers_ = nullptr; +} + +inline dsacache::CacheData::~CacheData() { + // if this is the first instance of this cache structure + // and it has not been waited on and is now being destroyed + // we must wait on completion here to ensure the cache + // remains in a valid state + + if (handlers_ != nullptr) { + WaitOnCompletion(); + } + + // due to fetch_sub returning the preivously held value + // we must subtract one locally to get the current value + + const int32_t v = active_->fetch_sub(1) - 1; + + // if the returned value is zero or lower + // then we must execute proper deletion + // as this was the last reference + + if (v <= 0) { + Deallocate(); + + delete active_; + delete cache_; + } +} + +inline void dsacache::CacheData::Deallocate() { + // although deallocate should only be called from + // a safe context to do so, it can not hurt to + // defensively perform the operation atomically + + uint8_t* cache_local = cache_->exchange(nullptr); + if (cache_local != nullptr) numa_free(cache_local, size_); + + // if the cache was never waited for then incomplete_cache_ + // may still contain a valid pointer which has to be freed + + if (incomplete_cache_ != nullptr) numa_free(incomplete_cache_, size_); +} + +inline uint8_t* dsacache::CacheData::GetDataLocation() const { + return cache_->load(); +} + +inline bool dsacache::CacheData::Active() const { + // this entry is active if more than one + // reference exists to it, as the Cache + // will always keep one internally until + // the entry is cleared from cache + + return active_->load() > 1; +} + +inline void dsacache::CacheData::WaitOnCompletion() { + // the cache data entry can be in two states + // either it is the original one which has not + // been waited for in which case the handlers + // are non-null or it is not + + if (handlers_ == nullptr) { + // when no handlers are attached to this cache entry we wait on a + // value change for the cache structure from nullptr to non-null + // which will either go through immediately if the cache is valid + // already or wait until the handler-owning thread notifies us + + cache_->wait(nullptr); + } + else { + // when the handlers are non-null there are some DSA task handlers + // available on which we must wait here + + // abort is set if any operation encountered an error + + bool abort = false; + + for (auto& handler : *handlers_) { + auto result = handler.get(); + + if (result.status != dml::status_code::ok) { + std::cerr << "[x] Encountered bad status code for operation: " << dml::StatusCodeToString(result.status) << std::endl; + + // if one of the copy tasks failed we abort the whole task + // after all operations are completed on it + + abort = true; + } + } + + // the handlers are cleared after all have completed + + handlers_ = nullptr; + + // now we act depending on whether an abort has been + // called for which signals operation incomplete + + if (abort) { + // store nullptr in the cache location + + cache_->store(nullptr); + + // then free the now incomplete cache + + // TODO: it would be possible to salvage the + // TODO: operation at this point but this + // TODO: is quite complicated so we just abort + + numa_free(incomplete_cache_, size_); + } + else { + // incomplete cache is now safe to use and therefore we + // swap it with the global cache state of this entry + // and notify potentially waiting threads + + cache_->store(incomplete_cache_); + } + + // as a last step all waiting threads must + // be notified (copies of this will wait on value + // change of the cache) and the incomplete cache + // is cleared to nullptr as it is not incomplete + + cache_->notify_all(); + incomplete_cache_ = nullptr; + } +} diff --git a/offloading-cacher/cmake/modules/FindNUMA.cmake b/offloading-cacher/cmake/modules/FindNUMA.cmake new file mode 100644 index 0000000..94b23c8 --- /dev/null +++ b/offloading-cacher/cmake/modules/FindNUMA.cmake @@ -0,0 +1,43 @@ +# Module for locating libnuma +# +# Read-only variables: +# NUMA_FOUND +# Indicates that the library has been found. +# +# NUMA_INCLUDE_DIR +# Points to the libnuma include directory. +# +# NUMA_LIBRARY_DIR +# Points to the directory that contains the libraries. +# The content of this variable can be passed to link_directories. +# +# NUMA_LIBRARY +# Points to the libnuma that can be passed to target_link_libararies. +# +# Copyright (c) 2013-2020 MulticoreWare, Inc + +include(FindPackageHandleStandardArgs) + +find_path(NUMA_ROOT_DIR + NAMES include/numa.h + PATHS ENV NUMA_ROOT + DOC "NUMA root directory") + +find_path(NUMA_INCLUDE_DIR + NAMES numa.h + HINTS ${NUMA_ROOT_DIR} + PATH_SUFFIXES include + DOC "NUMA include directory") + +find_library(NUMA_LIBRARY + NAMES numa + HINTS ${NUMA_ROOT_DIR} + DOC "NUMA library") + +if (NUMA_LIBRARY) + get_filename_component(NUMA_LIBRARY_DIR ${NUMA_LIBRARY} PATH) +endif() + +mark_as_advanced(NUMA_INCLUDE_DIR NUMA_LIBRARY_DIR NUMA_LIBRARY) + +find_package_handle_standard_args(NUMA REQUIRED_VARS NUMA_ROOT_DIR NUMA_INCLUDE_DIR NUMA_LIBRARY) \ No newline at end of file diff --git a/offloading-cacher/main.cpp b/offloading-cacher/main.cpp new file mode 100644 index 0000000..8193f5a --- /dev/null +++ b/offloading-cacher/main.cpp @@ -0,0 +1,252 @@ +#include +#include +#include +#include + +#include + +#include "cache.hpp" + +static constexpr size_t SIZE_64_MIB = 64 * 1024 * 1024; + +dsacache::Cache CACHE; + +void InitCache(const std::string& device) { + if (device == "default") { + auto cache_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) { + return numa_dst_node; + }; + + auto copy_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) { + return std::vector{ numa_dst_node }; + }; + + CACHE.Init(cache_policy,copy_policy); + } + else if (device == "xeonmax") { + auto cache_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) { + // xeon max is configured to have hbm on node ids that are +8 + + return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node; + }; + + auto copy_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) { + if (data_size < SIZE_64_MIB) { + // if the data size is small then the copy will just be carried + // out by the destination node which does not require setting numa + // thread affinity as the selected dsa engine is already the one + // present on the calling thread + + return std::vector{ (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) }; + } + else { + // for sufficiently large data, smart copy is used which will utilize + // all four engines for intra-socket copy operations and cross copy on + // the source and destination nodes for inter-socket copy + + const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0; + + if (same_socket) { + const bool socket_number = numa_dst_node >> 2; + if (socket_number == 0) return std::vector{ 0, 1, 2, 3 }; + else return std::vector{ 4, 5, 6, 7 }; + } + else { + return std::vector{ + (numa_src_node >= 8 ? numa_src_node - 8 : numa_src_node), + (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) + }; + } + } + }; + + CACHE.Init(cache_policy,copy_policy); + } + else { + std::cerr << "Given device '" << device << "' not supported!" << std::endl; + exit(-1); + } +} + +uint8_t* GetRandomArray(const size_t size) { + uint8_t* array = new uint8_t[size]; + + std::uniform_int_distribution unif(std::numeric_limits::min(), std::numeric_limits::max()); + std::default_random_engine re; + + for (size_t i = 0; i < size; i++) { + array[i] = unif(re); + } + + return array; +} + +bool IsEqual(const uint8_t* a, const uint8_t* b, const size_t size) { + for (size_t i = 0; i < size; i++) { + try { + if (a[i] != b[i]) return false; + } + catch (...) { + return false; + } + } + + return true; +} + +std::unique_ptr PerformAccessAndTest(uint8_t* src, const size_t size, const int tid) { + std::unique_ptr data_cache = CACHE.Access( + reinterpret_cast(src), + size * sizeof(uint8_t) + ); + + uint8_t* cached_imm = reinterpret_cast(data_cache->GetDataLocation()); + + // check the value immediately just to see if ram or cache was returned + + if (src == cached_imm) { + std::cout << "[" << tid << "] Caching did not immediately yield different data location." << std::endl; + } + else if (cached_imm == nullptr) { + std::cout << "[" << tid << "] Immediately got nullptr." << std::endl; + } + else { + std::cout << "[" << tid << "] Immediately got different data location." << std::endl; + } + + // waits for the completion of the asynchronous caching operation + + data_cache->WaitOnCompletion(); + + // gets the cache-data-location from the struct + + uint8_t* cached = reinterpret_cast(data_cache->GetDataLocation()); + + // tests on the resulting value + + if (src == cached) { + std::cout << "[" << tid << "] Caching did not affect data location." << std::endl; + } + else if (cached == nullptr) { + std::cerr << "[" << tid << "] Got nullptr from cache." << std::endl; + } + else { + std::cout << "[" << tid << "] Got different data location from cache." << std::endl; + } + + if (IsEqual(src,cached,size)) { + std::cout << "[" << tid << "] Cached data is correct." << std::endl; + } + else { + std::cerr << "[" << tid << "] Cached data is wrong." << std::endl; + } + + return std::move(data_cache); +} + +void RunTestST(const size_t size) { + uint8_t* data = GetRandomArray(size); + + static constexpr int tid = 0; + + std::cout << "[" << tid << "] first access --- " << std::endl; + + PerformAccessAndTest(data, size, tid); + + std::cout << "[" << tid << "] second access --- " << std::endl; + + PerformAccessAndTest(data, size, tid); + + std::cout << "[" << tid << "] end of application --- " << std::endl; +} + +void RunTestMT(const size_t size) { + uint8_t* data = GetRandomArray(size); + + #pragma omp parallel + { + const int tid = omp_get_thread_num(); + + std::cout << "[" << tid << "] first access --- " << std::endl; + + PerformAccessAndTest(data, size, tid); + + std::cout << "[" << tid << "] second access --- " << std::endl; + + PerformAccessAndTest(data, size, tid); + + std::cout << "[" << tid << "] end of block --- " << std::endl; + } +} + +void RunTestFlush(const size_t size) { + uint8_t* data1 = GetRandomArray(size); + uint8_t* data2 = GetRandomArray(size); + uint8_t* data3 = GetRandomArray(size); + + static constexpr int tid = 0; + + std::cout << "[" << tid << "] first access to data d1 and keepalive --- " << std::endl; + + const auto c1 = PerformAccessAndTest(data1, size, tid); + + std::cout << "[" << tid << "] second access to d2 lets d2 vanish --- " << std::endl; + + PerformAccessAndTest(data2, size, tid); + + std::cout << "[" << tid << "] third access to d3 should clear d2 --- " << std::endl; + + PerformAccessAndTest(data3, size, tid); + + std::cout << "[" << tid << "] end of block and test d1 == cache1 --- " << std::endl; + + if (IsEqual(data1, c1->GetDataLocation(), size)) { + std::cout << "[" << tid << "] Cached d1 is still correct." << std::endl; + } + else { + std::cerr << "[" << tid << "] Cached d1 is bad." << std::endl; + } +} + +int main(int argc, char **argv) { + if (argc != 4) { + std::cerr << "This application requires three parameters!" << std::endl; + + std::cout << "Please provide the following positional arguments: [device] [mode] [size]" << std::endl; + std::cout << "[device] from { default, xeonmax } which influences cache and execution placement" << std::endl; + std::cout << "[mode] from { st, mt, flt } or single and multi threaded and flushtest respectively" << std::endl; + std::cout << "[size] positive integral number, amount of bytes in data array" << std::endl; + std::cout << "for flushtest the given size should be 1/3 of the available cache size" << std::endl; + + exit(-1); + } + + const std::string device = argv[1]; + const std::string mode = argv[2]; + const std::string size_s = argv[3]; + + uint32_t size = 0; + + try { + size = std::stoul(size_s); + } + catch (...) { + std::cerr << "Given Size '" << size_s << "' caused error during conversion to number!" << std::endl; + } + + InitCache(device); + + if (mode == "st") { + RunTestST(size); + } + else if (mode == "mt") { + RunTestMT(size); + } + else if (mode == "flt") { + RunTestFlush(size); + } + else { + std::cerr << "Given Mode '" << mode << "' not supported!" << std::endl; + exit(-1); + } +} diff --git a/qdp_project/.gitignore b/qdp_project/.gitignore new file mode 100644 index 0000000..1a8b920 --- /dev/null +++ b/qdp_project/.gitignore @@ -0,0 +1,104 @@ + + +bin/ + + +# CMake building files +CMakeLists.txt.user +CMakeCache.txt +CMakeFiles +CMakeScripts +Testing +Makefile +cmake_install.cmake +install_manifest.txt +compile_commands.json +CTestTestfile.cmake +_deps +.cmake + +# Prerequisites +*.d + +# Object files +*.o +*.ko +*.obj +*.elf + +# Linker output +*.ilk +*.map +*.exp + +# Precompiled Headers +*.gch +*.pch + +# Libraries +*.lib +*.a +*.la +*.lo + +# Shared objects (inc. Windows DLLs) +*.dll +*.so +*.so.* +*.dylib + +# Executables +*.exe +*.out +*.app +*.i*86 +*.x86_64 +*.hex + +# Debug files +*.dSYM/ +*.su +*.idb +*.pdb + +# Kernel Module Compile Results +*.mod* +*.cmd +.tmp_versions/ +modules.order +Module.symvers +Mkfile.old +dkms.conf + +# Prerequisites +*.d + +# Compiled Object files +*.slo +*.lo +*.o +*.obj + +# Precompiled Headers +*.gch +*.pch + +# Compiled Dynamic libraries +*.so +*.dylib +*.dll + +# Fortran module files +*.mod +*.smod + +# Compiled Static libraries +*.lai +*.la +*.a +*.lib + +# Executables +*.exe +*.out +*.app diff --git a/qdp_project/CMakeLists.txt b/qdp_project/CMakeLists.txt new file mode 100644 index 0000000..97c1915 --- /dev/null +++ b/qdp_project/CMakeLists.txt @@ -0,0 +1,82 @@ +cmake_minimum_required(VERSION 3.18) + +# set the project name +project(NUMA_Slow_Fast_Datamigration_Test VERSION 0.1) + +# specify the C standard +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED True) + +#set flags on need cross compile for sapphirerapids architecture +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=sapphirerapids") +#set flags on need cross compile for skylake micro architecture +#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=skylake-avx512") +#set flags on need cross compile for knights landing micro architecture (for debugging) +#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512cd -mavx512er -mavx512pf") + +#suppress selected! warnigs that are not very important to resolve. This is to keep the compileation output clean +set(SUPPRESS_WARNINGS "-Wno-literal-suffix -Wno-volatile") + +set(DEBUG_FLAGS "-g3" "-ggdb") +set(RELEASE_FLAGS "-O3") + +#set flags used for Release and Debug build type +add_compile_options( + "$<$:${RELEASE_FLAGS}>" + "$<$:${DEBUG_FLAGS}>" +) + +# evaluate custom variables +function(eval vname vvalid vdefault) + # is variable is set to the below value if its not already defined from the comand line + set(VALID ${vvalid} CACHE INTERNAL "Possible values for ${vname}") + set(${vname} ${vdefault} CACHE STRING "The barrier mode") + # command for GUI shenanigans + set_property(CACHE ${vname} PROPERTY STRINGS VALID) + + if(${vname} IN_LIST VALID) + message(STATUS "Variable ${vname} = ${${vname}}") + else() + message(STATUS "Variable ${vname} has invalid value ${${vname}}") + # set the fallback value for use in parent function + unset(${vname} CACHE) + message(STATUS "Fallback to default: ${vname} = ${vdefault}") + set(${vname} ${vdefault} PARENT_SCOPE) + endif() +endfunction() + +eval(WSUPPRESS "suppress;show" "show") +if($ EQUAL 1) + add_compile_options("${SUPPRESS_WARNINGS}") +endif() + +eval(BARRIER_MODE "global;local" "global") +add_definitions(-DBARRIER_MODE="${BARRIER_MODE}") + +eval(BUFFER_LIMIT "unlimited;limited" "unlimited") +add_definitions(-DBUFFER_LIMIT=$) + +eval(QUERY "simple;complex" "simple") +add_definitions(-DQUERY=$) + +eval(THREAD_FACTOR "1;2;3;4;5;6;7;8;9;10" "4") +add_definitions(-DTHREAD_GROUP_MULTIPLIER=${THREAD_FACTOR}) + +eval(PINNING "cpu;numa" "cpu") +add_definitions(-DPINNING=$) + +# build directory +set(CMAKE_BINARY_DIR "../bin") #relative to inside build +set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}) + +# include directories +include_directories(src/utils) +include_directories(src/algorithm) +include_directories(src/algorithm/operators) + +# link libraries +link_libraries(-lnuma -lpthread -l:libdml.a) + +# Add targets only below +# specify build targets +add_executable(MAXBench src/benchmark/MAX_benchmark.cpp) \ No newline at end of file diff --git a/qdp_project/README.md b/qdp_project/README.md new file mode 100644 index 0000000..7b774b4 --- /dev/null +++ b/qdp_project/README.md @@ -0,0 +1,5 @@ +This is a copy of the Query Driven Prefetching Repository + +https://os.inf.tu-dresden.de/repo/gitbox/andre.berthold/Query-driven_Prefetching/src/branch/qdp_minimal/code + +Original Authors: André Berthold and Anna Bartuschka \ No newline at end of file diff --git a/qdp_project/bench_max.sh b/qdp_project/bench_max.sh new file mode 100644 index 0000000..e49275b --- /dev/null +++ b/qdp_project/bench_max.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +current_date_time=$(date) +echo "Benchmark start at: $current_date_time" + +sudo numactl --cpunodebind=2 -- taskset -c 0,1,2,3,4,5 ../bin/MAXBench + +current_date_time=$(date) +echo "Benchmark end at: $current_date_time" \ No newline at end of file diff --git a/qdp_project/src/.gitkeep b/qdp_project/src/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/qdp_project/src/algorithm/operators/aggregation.h b/qdp_project/src/algorithm/operators/aggregation.h new file mode 100644 index 0000000..119ab14 --- /dev/null +++ b/qdp_project/src/algorithm/operators/aggregation.h @@ -0,0 +1,316 @@ +#pragma once + +#include +#include +#include +#include + +#include "vector_loader.h" +#include "const.h" + + +/** + * @brief Super Class for all Aggregation functions. Guards Sub Classes from having an non integral base type. + * + * @tparam T + */ +template +class AggFunction { + static_assert(std::is_integral::value, "The base type of an AggFunction must be an integral"); +}; + +/** + * @brief Template class that implements methods used for Summation. It wraps the corresponding vector intrinsics + * + * @tparam T base datatype for the implemented methods + */ +template +class Sum : public AggFunction { +public: + static inline __m512i simd_agg(__m512i aggregator, __m512i vector) { + if constexpr (sizeof(T) == 4) return _mm512_add_epi32(aggregator, vector); + else if constexpr (sizeof(T) == 8) return _mm512_add_epi64(aggregator, vector); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers"); + }; + + static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) { + if constexpr (sizeof(T) == 4) return _mm512_mask_add_epi32(aggregator, mask, aggregator, vector); + else if constexpr (sizeof(T) == 8) return _mm512_mask_add_epi64(aggregator, mask, aggregator, vector); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers"); + }; + + static inline T simd_reduce(__m512i vector) { + if constexpr (sizeof(T) == 4) return _mm512_reduce_add_epi32(vector); + else if constexpr (sizeof(T) == 8) return _mm512_reduce_add_epi64(vector); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers"); + }; + + static inline T scalar_agg(T aggregator, T scalar) { return aggregator + scalar; }; + + static inline __m512i zero() { return _mm512_set1_epi32(0); }; +}; + + +/** + * @brief Template class that implements methods used for Maximum determination. It wraps the corresponding vector intrinsics + * + * @tparam T base datatype for the implemented methods + * + */ +template +class Max : public AggFunction { +public: + static inline __m512i simd_agg(__m512i aggregator, __m512i vector) { + if constexpr (sizeof(T) == 4) return _mm512_max_epi32(aggregator, vector); + else if constexpr (sizeof(T) == 8) return _mm512_max_epi64(aggregator, vector); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers"); + } + + static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) { + if constexpr (sizeof(T) == 4) return _mm512_mask_max_epi32(aggregator, mask, aggregator, vector); + else if constexpr (sizeof(T) == 8) return _mm512_mask_max_epi64(aggregator, mask, aggregator, vector); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers"); + } + + static inline T simd_reduce(__m512i vector) { + if constexpr (sizeof(T) == 4) return _mm512_reduce_max_epi32(vector); + else if constexpr (sizeof(T) == 8) return _mm512_reduce_max_epi64(vector); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers"); + } + + static inline T scalar_agg(T aggregator, T scalar) { return std::max(aggregator, scalar); } + + static inline __m512i zero() { + if constexpr (sizeof(T) == 4) { + if constexpr (std::is_signed::value) return _mm512_set1_epi32(0xFFFFFFFF); + else return _mm512_set1_epi32(0x0); + } + else if constexpr (sizeof(T) == 8) { + if constexpr (std::is_signed::value) return _mm512_set1_epi32(0xFFFFFFFFFFFFFFFF); + else return _mm512_set1_epi32(0x0); + } + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers"); + } +}; + +/** + * @brief Template class that implements methods used for Minimum determination. It wraps the corresponding vector intrinsics + * + * @tparam T base datatype for the implemented methods + * + */ +template +class Min : public AggFunction { +public: + static inline __m512i simd_agg(__m512i aggregator, __m512i vector) { + if constexpr (sizeof(T) == 4) return _mm512_min_epi32(aggregator, vector); + else if constexpr (sizeof(T) == 8) return _mm512_min_epi64(aggregator, vector); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers"); + } + + static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) { + if constexpr (sizeof(T) == 4) return _mm512_mask_min_epi32(aggregator, mask, aggregator, vector); + else if constexpr (sizeof(T) == 8) return _mm512_mask_min_epi64(aggregator, mask, aggregator, vector); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers"); + } + + static inline T simd_reduce(__m512i vector) { + if constexpr (sizeof(T) == 4) return _mm512_reduce_min_epi32(vector); + else if constexpr (sizeof(T) == 8) return _mm512_reduce_min_epi64(vector); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers"); + } + + static inline T scalar_agg(T aggregator, T scalar) { return std::min(aggregator, scalar); } + + static inline __m512i zero() { + if constexpr (sizeof(T) == 4) { + if constexpr (std::is_signed::value) return _mm512_set1_epi32(0xEFFFFFFF); + else return _mm512_set1_epi32(0xFFFFFFFF); + } + else if constexpr (sizeof(T) == 8) { + if constexpr (std::is_signed::value) return _mm512_set1_epi32(0xEFFFFFFFFFFFFFFF); + else return _mm512_set1_epi32(0xFFFFFFFFFFFFFFFF); + } + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers"); + } +}; + +/** + * @brief Template Class that implements an aggregation operation. + * + * @tparam base_t Base type of the values for aggregation + * @tparam func + * @tparam load_mode + */ +template class func, load_mode load_mode> +class Aggregation{ +public: + + static_assert(std::is_same_v, "Enforce unsigned 64 bit ints."); + + using OP = func; + /** + * @brief Calculates the memory maximal needed to store a chunk's processing result. + * + * @param chunk_size_b Size of the chunk in byte + * @return size_t Size of the chunk's processing result in byte + */ + static size_t result_bytes_per_chunk(size_t chunk_size_b) { + // aggregation returns a single value of type base_t + return sizeof(base_t); + } + + /** + * @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes. + * The result is written to main memory. + * + * @param dest Pointer to the start of the result chunk + * @param src Pointer to the start of the source chunk + * @param chunk_size_b Size of the source chunk in bytes + * @return true When the aggregation is done + * @return false Never + */ + static bool apply (base_t *dest, base_t *src, size_t chunk_size_b) { + constexpr size_t lanes = VECTOR_SIZE(); + size_t value_count = chunk_size_b / sizeof(base_t); + __m512i agg_vec = func::zero(); + size_t i = 0; + base_t result = 0; + // stop before! running out of space + if(value_count >= lanes) {// keep in mind value_count is unsigned so if it becomes negative, it doesn't. + for(; i <= value_count - lanes; i += lanes) { + __m512i vec = Vector_Loader::load(src + i); + + agg_vec = func::simd_agg(agg_vec, vec); + } + result = func::simd_reduce(agg_vec); + } + + for(; i < value_count; ++i) { + result = func::scalar_agg(result, src[i]); + } + *dest = result; + + return true; + } + + /** + * @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes, + * while applying the bit string stored in *masks*. The result is written to main memory. + * + * @param dest Pointer to the start of the result chunk + * @param src Pointer to the start of the source chunk + * @param masks Pointer the bitstring that marks the values that should be aggregated + * @param chunk_size_b Size of the source chunk in bytes + * @return true When the aggregation is done + * @return false Never + */ + static bool apply_masked (base_t *dest, base_t *src, uint16_t* msks, size_t chunk_size_b) { + constexpr size_t lanes = VECTOR_SIZE(); + uint8_t* masks = (uint8_t *)msks; + size_t value_count = chunk_size_b / sizeof(base_t); + __m512i agg_vec = func::zero(); + size_t i = 0; + // stop before! running out of space + if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't. + for(; i <= value_count - lanes; i += lanes) { + __m512i vec = Vector_Loader::load(src + i); + __mmask8 mask = _mm512_int2mask(masks[i / lanes]); + + agg_vec = func::simd_mask_agg(agg_vec, mask, vec); + } + *dest = func::simd_reduce(agg_vec); + + for(; i < value_count; ++i) { + uint8_t mask = masks[i / lanes]; + if(mask & (0b1 << (i % lanes))){ + *dest = func::scalar_agg(*dest, src[i]); + } + } + + return true; + } + + /** + * @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes, + * while applying the bit string stored in *masks*. The values are agggegated in the register *dest* without + * clearing beforehand. + * + * NOTE! This function only works correctly if the the chunk_size_b is a multiple of 64 byte + * + * @param dest Vector register used for storing and passing the result around + * @param src Pointer to the start of the source chunk + * @param masks Pointer the bitstring that marks the values that should be aggregated + * @param chunk_size_b Size of the source chunk in bytes + * @return __m512i Vector register holding the aggregation result + */ + static __m512i apply_masked (__m512i dest, base_t *src, uint16_t* msks, size_t chunk_size_b) { + constexpr size_t lanes = VECTOR_SIZE(); + uint8_t* masks = (uint8_t*) msks; + //TODO this function does not work if value_count % lanes != 0 + size_t value_count = chunk_size_b / sizeof(base_t); + size_t i = 0; + // stop before! running out of space + if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't. + for(; i <= value_count - lanes; i += lanes) { + __m512i vec = Vector_Loader::load(src + i); + __mmask8 mask = _mm512_int2mask(masks[i / lanes]); + dest = func::simd_agg(dest, mask, vec); + } + + return dest; + } + + /** + * @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes, + * while applying two bit strings stored in *masks_0* and *masks_1*. The values are aggregated in the register + * *dest* without clearing beforehand. + * + * NOTE! This function only works correctly if the the chunk_size_b is a multiple of 64 byte + * + * @param dest Vector register used for storing and passing the result around + * @param src Pointer to the start of the source chunk + * @param masks_0 Pointer the bitstring that marks the values that should be aggregated + * @param masks_1 Pointer the bitstring that marks the values that should be aggregated + * @param chunk_size_b Size of the source chunk in bytes + * @return __m512i Vector register holding the aggregation result + */ + static __m512i apply_masked (__m512i dest, base_t *src, uint16_t* msks0, uint16_t* msks1, size_t chunk_size_b) { + constexpr size_t lanes = VECTOR_SIZE(); + uint8_t* masks0 = (uint8_t*) msks0; + uint8_t* masks1 = (uint8_t*) msks1; + //TODO this function does not work if value_count % lanes != 0 + size_t value_count = chunk_size_b / sizeof(base_t); + size_t i = 0; + // stop before! running out of space + if(value_count >= lanes) // keep in mind value_count is unsigned so if it becomes negative, it doesn't. + for(; i <= value_count - lanes; i += lanes) { + __m512i vec = Vector_Loader::load(src + i); + __mmask8 mask0 = _mm512_int2mask(masks0[i / lanes]); + __mmask8 mask1 = _mm512_int2mask(masks1[i / lanes]); + + mask0 = _kand_mask8(mask0, mask1); + dest = func::simd_agg(dest, mask0, vec); + } + + return dest; + } + + /** + * @brief Reduces a vector by applying the aggregation function horizontally. + * + * @param dest Result of the horizontal aggregation + * @param src Vector as source for the horizontal aggregation + * @return true When the operation is done + * @return false Never + */ + static bool happly (base_t *dest, __m512i src) { + *dest = func::simd_reduce(src); + + return true; + } + + static __m512i get_zero() { + return func::zero(); + } +}; \ No newline at end of file diff --git a/qdp_project/src/algorithm/operators/filter.h b/qdp_project/src/algorithm/operators/filter.h new file mode 100644 index 0000000..a58a761 --- /dev/null +++ b/qdp_project/src/algorithm/operators/filter.h @@ -0,0 +1,170 @@ +#pragma once + +#include +#include + +#include + +#include "vector_loader.h" + +/** + * @brief Super Class for all Aggregation functions. Guards Sub Classes from having an non integral base type. + * + * @tparam T An integral datatype + */ +template +class FilterFunction { + static_assert(std::is_integral::value, "The base type of a FilterFunction must be an integeral."); +}; + +/** + * @brief Template class that implements methods used for finding values that are not equal to the compare value. + * It wraps the corresponding vector intrinsics. + * + * @tparam T base datatype for the implemented methods + */ +template +class NEQ : public FilterFunction { +public: + static inline __mmask16 simd_filter(__m512i vector, __m512i comp) { + if constexpr (sizeof(T) == 4) return _mm512_cmpneq_epi32_mask(vector, comp); + else if constexpr (sizeof(T) == 8) return _mm512_cmpneq_epi64_mask(vector, comp); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "NEQ is only implemented for 32 and 64 wide integers"); + } + + static inline bool scalar_filter(T scalar, T comp) { return scalar != comp; } +}; + +template +class EQ : public FilterFunction { +public: + static inline __mmask16 simd_filter(__m512i vector, __m512i comp) { + if constexpr (sizeof(T) == 4) return _mm512_cmpeq_epi32_mask(vector, comp); + else if constexpr (sizeof(T) == 8) return _mm512_cmpeq_epi64_mask(vector, comp); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "EQ is only implemented for 32 and 64 wide integers"); + } + + static inline bool scalar_filter(T scalar, T comp) { return scalar == comp; } +}; + +template +class LT : public FilterFunction { +public: + static inline __mmask16 simd_filter(__m512i vector, __m512i comp) { + if constexpr (sizeof(T) == 4) return _mm512_cmplt_epi32_mask(vector, comp); + else if constexpr (sizeof(T) == 8) return _mm512_cmplt_epi64_mask(vector, comp); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "LT is only implemented for 32 and 64 wide integers"); + } + + static inline bool scalar_filter(T scalar, T comp) { return scalar < comp; } +}; + +template +class LEQ : public FilterFunction { +public: + static inline __mmask16 simd_filter(__m512i vector, __m512i comp) { + if constexpr (sizeof(T) == 4) return _mm512_cmple_epi32_mask(vector, comp); + else if constexpr (sizeof(T) == 8) return _mm512_cmple_epi64_mask(vector, comp); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "LEQ is only implemented for 32 and 64 wide integers"); + } + + static inline bool scalar_filter(T scalar, T comp) { return scalar <= comp; } +}; + +template +class GT : public FilterFunction { +public: + static inline __mmask16 simd_filter(__m512i vector, __m512i comp) { + if constexpr (sizeof(T) == 4) return _mm512_cmpgt_epi32_mask(vector, comp); + else if constexpr (sizeof(T) == 8) return _mm512_cmpgt_epi64_mask(vector, comp); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "GT is only implemented for 32 and 64 wide integers"); + } + + static inline bool scalar_filter(T scalar, T comp) { return scalar > comp; } +}; + +template +class GEQ : public FilterFunction { +public: + static inline __mmask16 simd_filter(__m512i vector, __m512i comp) { + if constexpr (sizeof(T) == 4) return _mm512_cmpge_epi32_mask(vector, comp); + else if constexpr (sizeof(T) == 8) return _mm512_cmpge_epi64_mask(vector, comp); + static_assert(sizeof(T) == 4 || sizeof(T) == 8, "GEQ is only implemented for 32 and 64 wide integers"); + } + + static inline bool scalar_filter(T scalar, T comp) { return scalar >= comp; } +}; + + +template class func, load_mode load_mode, bool copy> +class Filter { +public: + + static_assert(std::is_same_v, "We enforce 64 bit integer"); + + /** + * @brief Calculates the memory maximal needed to store a chunk's processing result. + * + * @param chunk_size_b Size of the chunk in byte + * @return size_t Size of the chunk's processing result in byte + */ + static size_t result_bytes_per_chunk(size_t chunk_size_b) { + // + 7 to enshure that we have enougth bytes -> / 8 -> rounds down + // if we had 17 / 8 = 2 but (17 + 7) / 8 = 3 + // if we hat 16 / 8 = 2 is right, as well as, 16 + 7 / 8 = 2 + return (chunk_size_b / sizeof(base_t) + 7) / 8; + } + + + /** + * @brief Applies the filter function on the chunk starting at *src* and spanning *chunk_size_b* bytes, while comparing with he same value every time. + * The resulting bit string is written to main memory. + * + * @param dest Pointer to the start of the result chunk + * @param src Pointer to the start of the source chunk + * @param cmp_value Comparision value to compare the values from source to + * @param chunk_size_b Size of the source chunk in bytes + * @return true When the filter operation is done + * @return false Never + */ + // we only need this impl. yet, as all filter are at the end of a pipeline + static bool apply_same (uint16_t *dst, base_t *buffer, base_t *src, base_t cmp_value, size_t chunk_size_b) { + constexpr uint32_t lanes = VECTOR_SIZE(); + uint8_t* dest = (uint8_t*) dst; + size_t value_count = chunk_size_b / sizeof(base_t); + __m512i cmp_vec = _mm512_set1_epi64(cmp_value); + size_t i = 0; + // this weird implementetion is neccessary, see analogous impl in aggregation for explaination + if(value_count > lanes) { + for(; (i < value_count - lanes); i += lanes) { + __m512i vec = Vector_Loader::load(src + i); + __mmask8 bitmask = func::simd_filter(vec, cmp_vec); + + uint8_t int_mask = (uint8_t) _mm512_mask2int(bitmask); + + dest[i / lanes] = int_mask; + if constexpr(copy){ + Vector_Loader::store(buffer + i, vec); + } + } + } + + auto dest_pos = i / lanes; + uint8_t int_mask = 0; + for(; i < value_count; ++i) { + base_t val = src[i]; + + uint8_t result = func::scalar_filter(val, cmp_value); + + int_mask |= (result << (i % lanes)); + + if constexpr(copy){ + buffer[i] = val; + } + } + dest[dest_pos] = int_mask; + + return true; + } + +}; \ No newline at end of file diff --git a/qdp_project/src/benchmark/MAX_benchmark.cpp b/qdp_project/src/benchmark/MAX_benchmark.cpp new file mode 100644 index 0000000..0414e29 --- /dev/null +++ b/qdp_project/src/benchmark/MAX_benchmark.cpp @@ -0,0 +1,284 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#ifndef THREAD_GROUP_MULTIPLIER +#define THREAD_GROUP_MULTIPLIER 2 +#endif + +#ifndef QUERY +#define QUERY 1 +#endif + +#ifndef BARRIER_MODE +#define BARRIER_MODE "global" +#endif + +#ifndef BUFFER_LIMIT +#define BUFFER_LIMIT 1 +#endif + +#ifndef PINNING +#define PINNING 1 +#endif + +#ifndef PCM_M +#define PCM_M 0 +#endif + +#if PCM_M == 1 +#include "pcm.h" +#endif + +#include "const.h" + +#include "file_output.h" +#include "array_utils.h" +#include "timer_utils.h" +#include "barrier_utils.h" +#include "measurement_utils.h" +#include "cpu_set_utils.h" +#include "iterable_range.h" +#include "memory_literals.h" +#include "pipelines/MAX_scan_filter_pipe.h" + +#include "aggregation.h" +#include "filter.h" + +using base_t = uint64_t; + +base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) { + base_t sum = 0; + for(int i = 0; i < row_size / sizeof(base_t); ++i) { + sum += (row_A[i] < compare_value) * row_B[i]; + } + return sum; +} + +base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t* row_A, base_t* row_B, size_t row_size) { + base_t sum = 0; + for(int i = 0; i < row_size / sizeof(base_t); ++i) { + sum += (row_A[i] < compare_value_a && row_B[i] < compare_value_b) * row_B[i]; + } + return sum; +} + +int main(int argc, char** argv) { +#if PCM == 1 + pcm::PCM *pcm = pcm::PCM::getInstance(); + //and check for errors + auto error_code = pcm->program(); + if(error_code != pcm::PCM::Success) { + std::cerr << "PCM couldn't start" << std::endl; + std::cerr << "Error code: " << error_code << std::endl; + std::cerr << "Try to execute 'sudo modprobe msr' and execute this program with root privigeges."; + return 1; + } +#endif + + // set constants + constexpr size_t workload_b = 2_GiB; + constexpr base_t compare_value_a = 50; + constexpr base_t compare_value_b = 42; + constexpr bool simple_query = (QUERY == 1); + constexpr bool cache_a = false; + constexpr bool wait_b = false; + + constexpr size_t chunk_min = 1_MiB; + constexpr size_t chunk_max = 8_MiB + 1; + constexpr size_t chunk_incr = 128_kiB; + + // thread count is 12 here but as the default measurement uses 6 + // we must restrict the core assignment of these 12 threads to + // 6 physical cpu cores on the executing node + + constexpr size_t thread_count = 12; + + std::ofstream out_file; + + out_file.open("../results/max_" + "q-" + (std::string)(simple_query == true ? "simple" : "complex") + + "_bm-" + (std::string) BARRIER_MODE + + "_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") + + "_tc-" + std::to_string(thread_count) + "1MiB-2MiB.csv"); + + // set benchmark parameter + Linear_Int_Range run("run"); + Linear_Int_Range chunk_size("chunk_size"); + Range mode("mode"); + + print_to_file(out_file, generateHead(run, chunk_size, mode), "thread_group", "time", + #ifdef THREAD_TIMINGS + "scan_a", "scan_b", "aggr_j", + #endif + #ifdef BARRIER_TIMINGS + "wait_scan_a", "wait_scan_b", "wait_aggr_j", + #endif + #if PCM == 1 + pcm_value_collector::getHead("scan_a"), + pcm_value_collector::getHead("scan_b"), + pcm_value_collector::getHead("aggr_j"), + #endif + "result"); + + /*** alloc data and buffers ************************************************/ + + base_t* data_a = (base_t*) numa_alloc_local(workload_b); + base_t* data_b = (base_t*) numa_alloc_local(workload_b); + base_t* results = (base_t*) numa_alloc_local(thread_count * sizeof(base_t)); + + fill_mt(data_a, workload_b, 0, 100, 42); + fill_mt(data_b, workload_b, 0, 100, 420); + + + std::ofstream check_file; + check_file.open("../results/max_" + "q-" + (std::string)(simple_query == true ? "simple" : "complex") + + "_bm-" + (std::string) BARRIER_MODE + + "_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") + + "_tc-" + std::to_string(thread_count) + ".checksum"); + if constexpr (QUERY == 1) { + //calculate simple checksum if QUERY == 1 -> simple query is applied + check_file << sum_check(compare_value_a, data_a, data_b, workload_b); + } else { + check_file << sum_check_complex(compare_value_a, compare_value_b, data_a, data_b, workload_b); + } + check_file.close(); + + std::string iteration("init"); + Query_Wrapper* qw = nullptr; + + while(iteration != "false") { + + std::promise p; + std::shared_future ready_future(p.get_future()); + + if(iteration != "run") { + if(qw != nullptr) { + delete qw; + } + + uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A); + uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B); + uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J); + + switch(mode.current) { + case NewPMode::Prefetch: + qw = new Query_Wrapper( + &ready_future, workload_b, chunk_size.current, + data_a, data_b, results, tc_filter, tc_copy, tc_agg, + mode.current, 50, 42 + ); + + break; + default: + std::cerr << "[x] Unsupported Execution Mode by this build." << std::endl; + exit(-1); + } + } + + qw->ready_future = &ready_future; + qw->clear_buffers(); + + auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_a(gid, gcnt, tid); }; + auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_b(gid, gcnt, tid); }; + auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->aggr_j(gid, gcnt, tid); }; + + std::vector filter_pool; + std::vector copy_pool; + std::vector agg_pool; + + uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A); + uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B); + uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J); + + int thread_id = 0; + // std::vector> pinning_ranges {std::make_pair(28, 42), std::make_pair(84, 98)}; // node 2 heacboehm II + //std::vector> pinning_ranges {std::make_pair(32, 48), std::make_pair(96, 112)}; // node 2 heacboehm + std::vector> pinning_ranges {std::make_pair(24, 36), std::make_pair(120, 132)}; // node 2 sapphire rapids + //std::vector> pinning_ranges {std::make_pair(24, 48)}; // node 2+3 sapphire rapids + //std::vector> pinning_ranges {std::make_pair(0, 48)}; // node 0-3 sapphire rapids + + for(uint32_t gid = 0; gid < THREAD_GROUP_MULTIPLIER; ++gid) { + + for(uint32_t tid = 0; tid < tc_filter; ++tid) { + filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); +#if PINNING + pin_thread_in_range(filter_pool.back(), thread_id++, pinning_ranges); +#else + pin_thread_in_range(filter_pool.back(), pinning_ranges); +#endif + } + + // if tc_copy == 0 this loop is skipped + for(uint32_t tid = 0; tid < tc_copy; ++tid) { + copy_pool.emplace_back(copy_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); +#if PINNING + pin_thread_in_range(copy_pool.back(), thread_id++, pinning_ranges); +#else + pin_thread_in_range(copy_pool.back(), pinning_ranges); +#endif + } + + for(uint32_t tid = 0; tid < tc_agg; ++tid) { + agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); +#if PINNING + pin_thread_in_range(agg_pool.back(), thread_id++, pinning_ranges); +#else + pin_thread_in_range(agg_pool.back(), pinning_ranges); +#endif + } + } + + auto start = std::chrono::steady_clock::now(); + p.set_value(); + + for(std::thread& t : filter_pool) { t.join(); } + for(std::thread& t : copy_pool) { t.join(); } + for(std::thread& t : agg_pool) { t.join(); } + + Aggregation::apply(results, results, sizeof(base_t) * tc_agg * THREAD_GROUP_MULTIPLIER); + auto end = std::chrono::steady_clock::now(); + + constexpr double nanos_per_second = ((double)1000) * 1000 * 1000; + uint64_t nanos = std::chrono::duration_cast(end - start).count(); + double seconds = (double)(nanos) / nanos_per_second; + + + + print_to_file(out_file, run, chunk_size, new_mode_manager::string(mode.current), THREAD_GROUP_MULTIPLIER, seconds, + #ifdef THREAD_TIMINGS + qw->trt->summarize_time(0), qw->trt->summarize_time(1), qw->trt->summarize_time(2), + #endif + #ifdef BARRIER_TIMINGS + qw->bt->summarize_time(0), qw->bt->summarize_time(1), qw->bt->summarize_time(2), + #endif + #if PCM == 1 + qw->pvc->summarize_as_string("scan_a"), + qw->pvc->summarize_as_string("scan_b"), + qw->pvc->summarize_as_string("aggr_j"), + #endif + results[0]); + + iteration = IterateOnce(run, chunk_size, mode); + } + + numa_free(data_a, workload_b); + numa_free(data_b, workload_b); + numa_free(results, THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t)); +} \ No newline at end of file diff --git a/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h b/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h new file mode 100644 index 0000000..e224391 --- /dev/null +++ b/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h @@ -0,0 +1,420 @@ + +#include +#include +#include +#include +#include + +#include + +#include "filter.h" +#include "aggregation.h" +#include "vector_loader.h" +#include "timer_utils.h" +#include "barrier_utils.h" +#include "measurement_utils.h" +#include "execution_modes.h" + +#include "../../../../offloading-cacher/cache.hpp" + +template +class Query_Wrapper { +public: + // sync + std::shared_future* ready_future; + + thread_runtime_timing* trt; + barrier_timing* bt; + pcm_value_collector* pvc; + +private: + static constexpr size_t COPY_POLICY_MIN_SIZE = 64 * 1024 * 1024; + + dsacache::Cache cache_; + + // data + size_t size_b; + size_t chunk_size_b; + size_t chunk_size_w; + size_t chunk_cnt; + base_t* data_a; + base_t* data_b; + base_t* dest; + + // ratios + uint32_t thread_count_fc; + uint32_t thread_count_fi; + uint32_t thread_count_ag; + uint32_t thread_count; + + // done bits + volatile uint8_t* ready_flag_a; + volatile uint8_t* ready_flag_b; + + // buffer + uint16_t* mask_a; + uint16_t* mask_b; + + // params + base_t cmp_a; + base_t cmp_b; + NewPMode mode; + + // sync + std::unique_ptr*>> sync_barrier; + std::string barrier_mode = BARRIER_MODE; + + using filterCopy = Filter; + using filterNoCopy = Filter; + using filter = Filter; + using aggregation = Aggregation; + + static int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { + return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node; + } + + static std::vector CopyMethodPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { + if (data_size < COPY_POLICY_MIN_SIZE) { + // if the data size is small then the copy will just be carried + // out by the destination node which does not require setting numa + // thread affinity as the selected dsa engine is already the one + // present on the calling thread + + return std::vector{ (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) }; + } + else { + // for sufficiently large data, smart copy is used which will utilize + // all four engines for intra-socket copy operations and cross copy on + // the source and destination nodes for inter-socket copy + + const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0; + + if (same_socket) { + const bool socket_number = numa_dst_node >> 2; + if (socket_number == 0) return std::vector{ 0, 1, 2, 3 }; + else return std::vector{ 4, 5, 6, 7 }; + } + else { + return std::vector{ + (numa_src_node >= 8 ? numa_src_node - 8 : numa_src_node), + (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) + }; + } + } + } + +public: + Query_Wrapper(std::shared_future* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a, + base_t* data_b, base_t* dest, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag, + NewPMode mode, base_t cmp_a = 50, base_t cmp_b = 42) : + ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b), + dest(dest), mode(mode), cmp_a(cmp_a), cmp_b(cmp_b) { + + const int current_cpu = sched_getcpu(); + const int current_node = numa_node_of_cpu(current_cpu); + const int cache_node = CachePlacementPolicy(current_node, current_node, 0); + + chunk_size_w = chunk_size_b / sizeof(base_t); + chunk_cnt = size_b / chunk_size_b; + + thread_count_fi = tc_fi; + thread_count_fc = tc_fc; + thread_count_ag = tc_ag; + + thread_count = tc_fi + tc_fc + tc_ag; + + ready_flag_a = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), cache_node); + ready_flag_b = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), cache_node); + + mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node); + mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node); + + cache_.Init(CachePlacementPolicy, CopyMethodPolicy); + + size_t measurement_space = std::max(std::max(tc_fi, tc_fc), tc_ag); + trt = new thread_runtime_timing(3, measurement_space, current_node); + bt = new barrier_timing(3, measurement_space, current_node); + pvc = new pcm_value_collector({"scan_a", "scan_b", "aggr_j"}, measurement_space, current_node); + reset_barriers(); + }; + + void reset_barriers(){ + if(sync_barrier != nullptr) { + for(auto& barrier : *sync_barrier) { + delete barrier; + } + sync_barrier.reset(); + } + + sync_barrier = std::make_unique*>>(thread_count); + uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc; + uint32_t barrier_count = barrier_mode.compare("global") == 0 ? 1 : thread_count; + uint32_t barrier_thread_count; + + if constexpr(simple){ + barrier_thread_count = (thread_count / barrier_count) * (mode == NewPMode::Prefetch ? thread_count_sum : (thread_count_ag + thread_count_fi)); + } else { + barrier_thread_count = (thread_count / barrier_count) * thread_count_sum; + } + for(uint32_t i = 0; i < barrier_count; ++i) { + (*sync_barrier)[i] = new std::barrier(barrier_thread_count); + } + } + + void clear_buffers () { + std::memset((void*)ready_flag_a, 0x00, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0)); + std::memset((void*)ready_flag_b, 0x00, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0)); + + std::memset(mask_a, 0x00, size_b / sizeof(base_t)); + std::memset(mask_b, 0x00, size_b / sizeof(base_t)); + + cache_.Clear(); + + trt->reset_accumulator(); + bt->reset_accumulator(); + pvc->reset(); + reset_barriers(); + }; + + ~Query_Wrapper() { + numa_free((void*)ready_flag_a, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0)); + numa_free((void*)ready_flag_b, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0)); + + numa_free(mask_a, size_b / sizeof(base_t)); + numa_free(mask_b, size_b / sizeof(base_t)); + + delete trt; + for(auto& barrier : *sync_barrier) { + delete barrier; + } + delete bt; + delete pvc; + }; + + //this can be set without need to change allocations + void set_thread_group_count(uint32_t value) { + this->thread_group = value; + }; + +private: + static inline base_t* get_sub_chunk_ptr(base_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) { + base_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w; + return chunk_ptr + tid * (chunk_size_w / tcnt); + } + + static inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) { + // 16 integer are addressed with one uint16_t in mask buffer + size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt); + return base_ptr + (offset / 16); + } + + static bool bit_at(volatile uint8_t* bitmap, uint32_t bitpos) { + uint8_t value = bitmap[bitpos / 8]; + switch(bitpos % 8) { + case 0: return value & 0b00000001; + case 1: return value & 0b00000010; + case 2: return value & 0b00000100; + case 3: return value & 0b00001000; + case 4: return value & 0b00010000; + case 5: return value & 0b00100000; + case 6: return value & 0b01000000; + case 7: return value & 0b10000000; + default: return false; + } + } + + static void set_bit_at(volatile uint8_t* bitmap, std::mutex& mutex, uint32_t bitpos) { + mutex.lock(); + switch(bitpos % 8) { + case 0: bitmap[bitpos / 8] |= 0b00000001;break; + case 1: bitmap[bitpos / 8] |= 0b00000010;break; + case 2: bitmap[bitpos / 8] |= 0b00000100;break; + case 3: bitmap[bitpos / 8] |= 0b00001000;break; + case 4: bitmap[bitpos / 8] |= 0b00010000;break; + case 5: bitmap[bitpos / 8] |= 0b00100000;break; + case 6: bitmap[bitpos / 8] |= 0b01000000;break; + case 7: bitmap[bitpos / 8] |= 0b10000000;break; + } + mutex.unlock(); + } + +public: + void scan_b(size_t gid, size_t gcnt, size_t tid) { + size_t tcnt = thread_count_fc; + assert(chunk_size_w % tcnt == 0); + assert(chunk_size_w % 16 == 0); + assert(chunk_size_w % tcnt * 16 == 0); + + // wait till everyone can start + ready_future->wait(); + + // the lower gids run once more if the chunks are not evenly distributable + uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); + uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; + + for(uint32_t i = 0; i < runs; ++i) { + trt->start_timer(1, tid * gcnt + gid); + pvc->start("scan_b", tid * gcnt + gid); + + // calculate pointers + size_t chunk_id = gid + gcnt * i; + base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); + uint16_t* mask_ptr = get_sub_mask_ptr(mask_b, chunk_id, chunk_size_w, tid, tcnt); + + if constexpr(simple){ + cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); + } else { + const auto data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); + + if constexpr(wait_b) { + // wait on copy to complete - during this time other threads may + // continue with their calculation which leads to little impact + // and we will be faster if the cache is used + + data->WaitOnCompletion(); + + // obtain the data location from the cache entry + + base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); + + // nullptr is still a legal return value for CacheData::GetLocation() + // even after waiting, so this must be checked + + if (data_ptr == nullptr) { + std::cerr << "[!] Cache Miss in ScanB" << std::endl; + data_ptr = chunk_ptr; + } + + filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt); + } + else { + // obtain the data location from the cache entry + + base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); + + // nullptr is still a legal return value for CacheData::GetLocation() + // even after waiting, so this must be checked + + if (data_ptr == nullptr) { + data_ptr = chunk_ptr; + } + + filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt); + } + } + + pvc->stop("scan_b", tid * gcnt + gid); + trt->stop_timer(1, tid * gcnt + gid); + + bt->timed_wait(*(*sync_barrier)[barrier_idx], 1, tid * gcnt + gid); + } + (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); + } + + void scan_a(size_t gid, size_t gcnt, size_t tid) { + size_t tcnt = thread_count_fi; + assert(chunk_size_w % tcnt == 0); + assert(chunk_size_w % 16 == 0); + assert(chunk_size_w % tcnt * 16 == 0); + + // wait till everyone can start + ready_future->wait(); + + // the lower gids run once more if the chunks are not evenly distributable + uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); + uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; + + for(uint32_t i = 0; i < runs; ++i) { + trt->start_timer(0, tid * gcnt + gid); + pvc->start("scan_a", tid * gcnt + gid); + // calculate pointers + size_t chunk_id = gid + gcnt * i; + base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt); + uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); + + if constexpr (cache_a) { + const auto data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); + data->WaitOnCompletion(); + base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); + + if (data_ptr == nullptr) { + std::cerr << "[!] Cache Miss in ScanA" << std::endl; + data_ptr = chunk_ptr; + } + + filter::apply_same(mask_ptr, nullptr, data_ptr, cmp_a, chunk_size_b / tcnt); + } + else { + filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt); + } + + pvc->stop("scan_a", tid * gcnt + gid); + trt->stop_timer(0, tid * gcnt + gid); + bt->timed_wait(*(*sync_barrier)[barrier_idx], 0, tid * gcnt + gid); + } + (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); + } + + void aggr_j(size_t gid, size_t gcnt, size_t tid) { + size_t tcnt = thread_count_ag; + // wait till everyone can start + ready_future->wait(); + + // calculate values + __m512i aggregator = aggregation::OP::zero(); + // the lower gids run once more if the chunks are not evenly distributable + uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); + uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; + + for(uint32_t i = 0; i < runs; ++i) { + bt->timed_wait(*(*sync_barrier)[barrier_idx], 2, tid * gcnt + gid); + trt->start_timer(2, tid * gcnt + gid); + pvc->start("aggr_j", tid * gcnt + gid); + + // calculate pointers + size_t chunk_id = gid + gcnt * i; + base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); + + // access the cache for the given chunk which will have been accessed in scan_b + + const auto data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); + + // wait on the caching task to complete, this will give time for other processes + // to make progress here which will therefore not hurt performance + + data->WaitOnCompletion(); + + // after the copy task has finished we obtain the pointer to the cached + // copy of data_b which is then used from now on + + base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); + + // nullptr is still a legal return value for CacheData::GetLocation() + // even after waiting, so this must be checked + + if (data_ptr == nullptr) { + data_ptr = chunk_ptr; + std::cerr << "[!] Cache Miss in AggrJ" << std::endl; + } + + uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); + uint16_t* mask_ptr_b = get_sub_mask_ptr (mask_b, chunk_id, chunk_size_w, tid, tcnt); + + base_t tmp = _mm512_reduce_add_epi64(aggregator); + + if constexpr(simple){ + aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, chunk_size_b / tcnt); + } else { + aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, mask_ptr_b, chunk_size_b / tcnt); + } + + pvc->stop("aggr_j", tid * gcnt + gid); + trt->stop_timer(2, tid * gcnt + gid); + } + + // so threads with more runs dont wait for alerady finished threads + (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); + + aggregation::happly(dest + (tid * gcnt + gid), aggregator); + } +}; \ No newline at end of file diff --git a/qdp_project/src/utils/array_utils.h b/qdp_project/src/utils/array_utils.h new file mode 100644 index 0000000..52eba76 --- /dev/null +++ b/qdp_project/src/utils/array_utils.h @@ -0,0 +1,80 @@ +#pragma once +#include +#include +#include +#include +#include +#include + +#include + +/// @brief Fills a given array with random generated integers. +/// @tparam base_t Datatype of the array +/// @param dest Pointer to the array +/// @param size Size of the array +/// @param min Minumum value of the generated integers +/// @param max Maximum value of the generated integers +template +void fill(base_t * dest, uint64_t size, base_t min, base_t max) { + std::srand(std::time(nullptr)); + for(uint64_t i = 0; i < size/sizeof(base_t); ++i) { + dest[i] = (std::rand() % (max - min)) + min; + } +} + +/// @brief Fills a given array with random generated integers using the mersenne twister engine (type std::mt19937). +/// @tparam base_t Datatype of the array +/// @param dest Pointer to the array +/// @param size Size of the array +/// @param min Minumum value of the generated integers +/// @param max Maximum value of the generated integers +template +void fill_mt(T* array, uint64_t size, T min, T max, uint64_t int_seed = 0) { + static_assert(std::is_integral::value, "Data type is not integral."); + + size = size / sizeof(T); + + std::mt19937::result_type seed; + if (int_seed == 0) { + std::random_device rd; + seed = rd() ^ ( + (std::mt19937::result_type) std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count() + + (std::mt19937::result_type) std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()).count()); + } else seed = int_seed; + + std::mt19937 gen(seed); + std::uniform_int_distribution distrib(min, max); + + for (uint64_t j = 0; j < size; ++j) { + array[j] = distrib(gen); + } + +} + +/** + * @brief Checks if two arrays of the integral type *T* contain the same values + * + * @tparam T Integral type of *array0* and *array1* + * @param array0 Array 0 to check + * @param array1 Array 1 to check + * @param size_b Size of the two arrays in byte + * @param verbose Decides if outputs are verbose of not (print every not matching numbers with their index) + * @return bool Weathor or not the content is equal or not + */ +template +typename std::enable_if::value, bool>::type + check_same(T* array0, T* array1, size_t size_b, bool verbose) { + for(uint64_t i = 0; i <= size_b / sizeof(T); i += 64 / sizeof(T)) { + __m512i vec0 = _mm512_stream_load_si512(array0 + i); + __m512i vec1 = _mm512_stream_load_si512(array1 + i); + + __mmask8 res = _mm512_cmpeq_epi64_mask(vec0, vec1); + } + + //TODO complete function + + return false; +} + diff --git a/qdp_project/src/utils/barrier_utils.h b/qdp_project/src/utils/barrier_utils.h new file mode 100644 index 0000000..a68f801 --- /dev/null +++ b/qdp_project/src/utils/barrier_utils.h @@ -0,0 +1,73 @@ +#pragma once + +#include +#include +#include +#include + +#define BARRIER_TIMINGS 1 + + +struct barrier_completion_function { + inline void operator() () { + return; + } +}; + +struct barrier_timing { + + uint32_t time_points, time_threads; + double** time_accumulator; + + barrier_timing(uint32_t timing_points, uint32_t timing_threads, uint32_t memory_node) { +#ifdef BARRIER_TIMINGS + time_points = timing_points; + time_threads = timing_threads; + time_accumulator = (double**) numa_alloc_onnode(timing_points * sizeof(double*), memory_node); + for(uint32_t i = 0; i < timing_points; ++i) { + time_accumulator[i] = (double*) numa_alloc_onnode(timing_threads * sizeof(double), memory_node); + } +#endif + } + + ~barrier_timing() { +#ifdef BARRIER_TIMINGS + for(uint32_t i = 0; i < time_points; ++i) { + numa_free(time_accumulator[i], time_threads * sizeof(double)); + } + numa_free(time_accumulator, time_points * sizeof(double*)); +#endif + } + + void reset_accumulator() { +#ifdef BARRIER_TIMINGS + for(uint32_t i = 0; i < time_points; ++i){ + for(uint32_t j = 0; j < time_threads; ++j){ + time_accumulator[i][j] = 0.0; + }} +#endif + } + + double summarize_time(uint32_t time_point) { +#ifdef BARRIER_TIMINGS + double sum = 0.0; + for(uint32_t i = 0; i < time_threads; ++i) { + sum += time_accumulator[time_point][i]; + } + return sum; +#endif + } + + void timed_wait(std::barrier& barrier, uint32_t point_id, uint32_t thread_id) { +#ifdef BARRIER_TIMINGS + auto before_barrier = std::chrono::steady_clock::now(); +#endif + barrier.arrive_and_wait(); +#ifdef BARRIER_TIMINGS + auto after_barrier = std::chrono::steady_clock::now(); + uint64_t barrier_wait_time = std::chrono::duration_cast(after_barrier - before_barrier).count(); + double seconds = barrier_wait_time / (1000.0 * 1000.0 * 1000.0); + time_accumulator[point_id][thread_id] += seconds; +#endif + } +}; \ No newline at end of file diff --git a/qdp_project/src/utils/const.h b/qdp_project/src/utils/const.h new file mode 100644 index 0000000..fde4b55 --- /dev/null +++ b/qdp_project/src/utils/const.h @@ -0,0 +1,33 @@ +/** + * @file const.h + * @author André Berthold + * @brief Defines handy constants. + * @version 0.1 + * @date 2023-05-25 + * + * @copyright Copyright (c) 2023 + * + */ + +#pragma once + +#include +#include + +constexpr size_t VECTOR_SIZE_I = 512; +constexpr size_t VECTOR_SIZE_B = VECTOR_SIZE_I / 8; +constexpr size_t VECTOR_SIZE_H = VECTOR_SIZE_B / sizeof(uint32_t); +constexpr size_t VECTOR_SIZE_W = VECTOR_SIZE_B / sizeof(uint64_t); + +template +constexpr size_t VECTOR_SIZE() { + return VECTOR_SIZE_B / sizeof(T); +} + +template +constexpr size_t V_MASK_SIZE() { + return VECTOR_SIZE() / 8; +} + + +const __mmask16 full_m16 = _mm512_int2mask(0xFFFF); \ No newline at end of file diff --git a/qdp_project/src/utils/cpu_set_utils.h b/qdp_project/src/utils/cpu_set_utils.h new file mode 100644 index 0000000..ba82604 --- /dev/null +++ b/qdp_project/src/utils/cpu_set_utils.h @@ -0,0 +1,82 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +/** Sets all bits in a given cpu_set_t between L and H (condition L <= H)*/ +#define CPU_BETWEEN(L, H, SET) assert(L <= H); for(; L < H; ++L) {CPU_SET(L, SET);} + +/** + * Applies the affinity defined in set to the thread, through pthread library + * calls. If it fails it wites the problem to stderr and terminated the program. +*/ +inline void pin_thread(std::thread& thread, cpu_set_t* set) { + int error_code = pthread_setaffinity_np(thread.native_handle(), sizeof(cpu_set_t), set); + if (error_code != 0) { + std::cerr << "Error calling pthread_setaffinity_np in copy_pool assignment: " << error_code << std::endl; + exit(-1); + } +} + +/** + * Returns the cpu id of the thread_id-th cpu in a given (multi)range. Thread_id + * greater than the number of cpus in the (multi)range are valid. In this case + * the (thread_id % #cpus in the range)-th cpu in the range is returned. +*/ +int get_cpu_id(int thread_id, const std::vector>& range) { + int subrange_size = range[0].second - range[0].first; + + int i = 0; + while(subrange_size <= thread_id) { + thread_id -= subrange_size; + i = (i + 1) % range.size(); + subrange_size = range[i].second - range[i].first; + } + return thread_id + range[i].first; +} + +/*inline void cpu_set_between(cpu_set_t* set, uint32_t low, uint32_t high) { + assert(low != high); + if (low > high) std::swap(low, high); + + for(; low < high; ++low) { + CPU_SET(low, set); + } +}*/ + +/** + * Pins the given thread to the thread_id-th cpu in the given range. +*/ +void pin_thread_in_range(std::thread& thread, int thread_id, std::vector>& range) { + cpu_set_t set; + CPU_ZERO(&set); + CPU_SET(get_cpu_id(thread_id, range), &set); + + pin_thread(thread, &set); +} + +/** + * Pins the given thread to all cpus in the given range. +*/ +void pin_thread_in_range(std::thread& thread, std::vector>& range) { + cpu_set_t set; + CPU_ZERO(&set); + for(auto r : range) { CPU_BETWEEN(r.first, r.second, &set); } + + pin_thread(thread, &set); +} + +/** + * Pins the given thread to all cpu ids between low (incl.) and high (excl.). +*/ +inline void pin_thread_between(std::thread& thread, uint32_t low, uint32_t high) { + cpu_set_t set; + CPU_ZERO(&set); + CPU_BETWEEN(low, high, &set); + + pin_thread(thread, &set); +} \ No newline at end of file diff --git a/qdp_project/src/utils/execution_modes.h b/qdp_project/src/utils/execution_modes.h new file mode 100644 index 0000000..b494fab --- /dev/null +++ b/qdp_project/src/utils/execution_modes.h @@ -0,0 +1,104 @@ +#include + +enum PMode{no_copy = 0, hbm = 1, expl_copy = 2}; +struct mode_manager { + static inline PMode inc(PMode value) { + return static_cast(value + 1); + }; + static inline bool pred(PMode value) { + return no_copy <= value && value <= expl_copy; + }; + static std::string string(PMode value) { + switch(value) { + case no_copy: return "no_copy"; + case hbm: return "hbm_pre"; + case expl_copy:return "expl_co"; + } return "no_copy"; + }; +}; + +#define SIMPLE_Q 0 +#define COMPLEX_Q 1 + +#define SCAN_A 0 +#define SCAN_B 1 +#define AGGR_J 2 + +enum NewPMode{DRAM_base = 0, HBM_base = 1, Mixed_base = 2, Prefetch = 3}; +struct new_mode_manager { + /*constexpr static int thread_counts[2][4][3] = { + //simple query + //scan_a, scan_b, aggr_j + {{3, 0, 3}, // DRAM_base + {3, 0, 3}, // HBM_base + {3, 0, 3}, // Mixed_base + {1, 4, 1}},// Prefetching + //complex query + {{1, 4, 1}, // DRAM_base + {1, 4, 1}, // HBM_base + {1, 4, 1}, // Mixed_base + {1, 4, 1}},// Prefetching + };*/ + + /*constexpr static int thread_counts[2][4][3] = { + //simple query + //scan_a, scan_b, aggr_j + {{2, 0, 4}, // DRAM_base + {2, 0, 4}, // HBM_base + {2, 0, 4}, // Mixed_base + {1, 4, 1}},// Prefetching + //complex query + {{1, 4, 1}, // DRAM_base + {1, 4, 1}, // HBM_base + {1, 4, 1}, // Mixed_base + {1, 4, 1}},// Prefetching + };*/ + + constexpr static int thread_counts[2][4][3] = { + // thread counts for both simple and complex querry + // inner layout: { scan_a, scan_b, aggr_j } + + //simple query + { + {4, 0, 2}, // DRAM_base + {4, 0, 2}, // HBM_base + {4, 0, 2}, // Mixed_base + {4, 4, 4} // Prefetching + }, + + //complex query + { + {1, 4, 1}, // DRAM_base + {1, 4, 1}, // HBM_base + {1, 4, 1}, // Mixed_base + {4, 4, 4} // Prefetching + } + }; + + static inline NewPMode inc(NewPMode value) { + return static_cast(value + 1); + }; + static inline bool pred(NewPMode value) { + return DRAM_base <= value && value <= Prefetch; + }; + static int thread_count(uint8_t query_type, NewPMode mode, uint8_t thread_type){ + if(query_type > 1) query_type = 1; + if(thread_type > 2) thread_type = 2; + return (thread_counts[query_type][mode][thread_type]); + }; + static std::string string(NewPMode value) { + switch(value) { + case DRAM_base: + return "DRAM_Baseline"; + case HBM_base: + return "HBM_Baseline"; + case Mixed_base: + return "DRAM_HBM_Baseline"; + case Prefetch: + return "Q-d_Prefetching"; + default: + std::cerr << "[x] Unknown Processing Mode" << std::endl; + exit(-1); + } + }; +}; \ No newline at end of file diff --git a/qdp_project/src/utils/file_output.h b/qdp_project/src/utils/file_output.h new file mode 100644 index 0000000..1dd85ba --- /dev/null +++ b/qdp_project/src/utils/file_output.h @@ -0,0 +1,76 @@ +/** + * @file file_output.h + * @author André Berthold + * @brief Implements a template-function that accepts an arbitrary number of parameters that should be printed + * @version 0.1 + * @date 2023-05-25 + * + * @copyright Copyright (c) 2023 + * + */ +#pragma once + +#include +#include +#include + +#include "iterable_range.h" + +template +inline constexpr bool is_numeric_v = std::disjunction< + std::is_integral, + std::is_floating_point>::value; + +/** + * @brief Converts a parameter to a string by either using it directly or its member current (if it is of type Labeled) + * as parameter to the std::string-Constructor. + * + * @tparam T Type of the parameter + * @param value Parameter to be converted + * @return std::string The converted parameter + */ +template +inline std::string to_string(T value) { + if constexpr(std::is_base_of::value){ + // integrals cannot be use in the string constructor and must be translated by the std::to_string-function + if constexpr (is_numeric_v) { + return std::to_string(value.current); + } else { + return std::string(value.current); + } + } else { + // integrals cannot be use in the string constructor and must be translated by the std::to_string-function + if constexpr (is_numeric_v) { + return std::to_string(value); + } else { + return std::string(value); + } + } +} + +/** + * @brief This function wites the content of *val* to *file*. Terminates terecursive function definition. + * + * @tparam type Type of the paramter *val* (is usually implicitly defeined) + * @param file File that is written to + * @param val Value that is translated to a char stream and written to the file + */ +template +inline void print_to_file(std::ofstream &file, type val) { + file << to_string(val) << std::endl; +} + +/** + * @brief This function wites the content of *val* and that content if *vals* to *file*. + * + * @tparam type Type of the paramter *val* (is usually implicitly defeined) + * @tparam types Parameter pack that describes the types of *vals* + * @param file File that is written to + * @param val Value that is translated to a char stream and written to the file + * @param vals Paramater pack of values that are gonna be printed to the file + */ +template +inline void print_to_file(std::ofstream &file, type val, types ... vals) { + file << to_string(val) << ","; + print_to_file(file, vals...); +} \ No newline at end of file diff --git a/qdp_project/src/utils/iterable_range.h b/qdp_project/src/utils/iterable_range.h new file mode 100644 index 0000000..95fc57e --- /dev/null +++ b/qdp_project/src/utils/iterable_range.h @@ -0,0 +1,208 @@ + #pragma once + +#include +#include +#include + + +constexpr auto NO_NEXT = "false"; + +/** + * @brief Class that adds an label member-parameter to a sub-class + * + */ +class Labeled { +public: + std::string label; +public: + Labeled(std::string str) : label(str) {}; + Labeled(const char* str) { this->label = std::string(str); }; +}; + +/** + * @brief Converts a parameter to a string by either reading the member label (if it is of type Labeled) or using it + * as parameter to the std::string-Constructor. + * + * @tparam T Type of the parameter + * @param value Parameter to be converted + * @return std::string The converted parameter + */ +template +inline std::string generateHead(T value) { + if constexpr(std::is_base_of::value){ + return value.label; + } else { + return std::string(value); + } +} + +/** + * @brief Converts a parameter-pack to a string calling genarateHead(T) on every parameter and concatenatin the results. + * + * @tparam T Type of the first parameter + * @tparam Ts Parameter pack specifying the preceeding parameters' types + * @param value Parameter to be transformed + * @param values Parameter-pack of the next prameters to be transformed + * @return std::string Comma-separated concatenation of all parameters string representation + */ +template +inline std::string generateHead(T value, Ts... values) { + return generateHead(value) + ',' + generateHead(values...); +} + + +/** + * @brief Takes a single Range object and calls its next function. + * + * @tparam T Specific type of the Range object + * @param t Instance of the Range object + * @return std::string Label of the Range object or "false" if the Range reaced its end and was reset + */ +template +std::string IterateOnce(T& t) { + if(t.next()) return t.label; + else t.reset(); + return std::string(NO_NEXT); //the string signalises that the iteration has to be terminiated. +} + +/** + * @brief Takes a number of Range objects and recusively increments them till the first Range does not reach its end + * upon incrementing. It tarts at the first Range object given. Every Range object that reached its end is reset to + * its start value. + * + * @tparam T Specific type of the first Range object + * @tparam Ts Types to the following Range objects + * @param t First instance of the Range object + * @param ts Parameter pack of the following Range objects + * @return std::string Label of the highest index Range object that was altered, or "false" if the last Range object + * reache its end and was reset + */ +template +std::string IterateOnce(T& t , Ts&... ts) { + if(t.next()) return t.label; + else t.reset(); + return IterateOnce(ts...); +} + + +/** + * @brief Class that provides a convenient interface for iteratin throug a parameter range. It stores a public value + * that can be altered by the classes' methods. + * + * @tparam T Base type of the parameter + * @tparam INIT Initial value of the current pointer + * @tparam PRED Struct providing an apply function testing if the current value is in range or not + * @tparam INC Struct providing an apply function setting the current value to the value following the current value + */ +template +class Range : public Labeled { +public: + /** + * @brief Current value of the parameter + */ + T current = INIT; + + /** + * @brief Resets current to its initial value + */ + void reset() {current = INIT; }; + + /** + * @brief Sets current to its next value (according to INC::inc) and returns if the range Reached its end + * (accordingt to PRED::pred). + * + * @return true The newly assigned value of current is in the range + * @return false Otherwise + */ + bool next() { + current = INC::inc(current); + return PRED::pred(current); + }; + + /** + * @brief Checks if current is in the Range (according to PRED). + * + * @return true PRED returns true + * @return false Otherwise + */ + bool valid() { return PRED::apply(current); }; +}; + +/** + * @brief Class that is in contrast to Range specialized for integral values. + * + * @tparam T Integral base type of the Range + * @tparam INIT Initial value of the parameter + * @tparam MAX Maximal value of the parameter + * @tparam INC Struct providing an apply function setting the current value to the value following the current value + */ +template +class Int_Range : public Labeled { +static_assert(std::is_integral::value, "Int_Range requires an integral base type"); + +public: + const T max = MAX; + T current = INIT; + + void reset() {current = INIT; }; + + bool next() { + current = INC::inc(current); + return current < MAX; + }; + + bool valid() { return current < MAX; }; + +}; + +/** + * @brief Class that is in contrast to Int_Range specialized for integrals that grow linearly. + * + * @tparam T Integral base type of the Range + * @tparam INIT Initial value of the parameter + * @tparam MAX Maximal value of the parameter + * @tparam STEP Increase of the value per next()-call + */ +template +class Linear_Int_Range : public Labeled { +static_assert(std::is_integral::value, "Linear_Int_Range requires an integral base type"); + +public: + const T max = MAX; + T current = INIT; + + void reset() {current = INIT; }; + + bool next() { + current += STEP; + return current < MAX; + }; + + bool valid() { return current < MAX; }; +}; + +/** + * @brief Class that is in contrast to Int_Range specialized for integrals that grow exponetially. + * + * @tparam T Integral base type of the Range + * @tparam INIT Initial value of the parameter + * @tparam MAX Maximal value of the parameter + * @tparam FACTOR Multiplicative Increase of the value per next()-call + */ +template +class Exp_Int_Range : public Labeled { +static_assert(std::is_integral::value, "Exp_Int_Range requires an integral base type"); + +public: + const T max = MAX; + T current = INIT; + + void reset() {current = INIT; }; + + bool next() { + current *= FACTOR; + return current < MAX; + }; + + bool valid() { return current < MAX; }; +}; \ No newline at end of file diff --git a/qdp_project/src/utils/measurement_utils.h b/qdp_project/src/utils/measurement_utils.h new file mode 100644 index 0000000..f403de0 --- /dev/null +++ b/qdp_project/src/utils/measurement_utils.h @@ -0,0 +1,152 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + + +#if PCM_M == 1 +#define PCM_MEASURE 1 +#include "pcm.h" +#endif + + + +struct pcm_value_collector { + const uint32_t value_count = 6; + + uint32_t threads; + std::vector points; +#ifdef PCM_MEASURE + pcm::SystemCounterState** states; +#endif + uint64_t** collection; + + pcm_value_collector(const std::vector& in_points, uint32_t threads, uint32_t memory_node) : threads(threads) { +#ifdef PCM_MEASURE + points = std::vector(in_points); + + collection = (uint64_t**) numa_alloc_onnode(threads * sizeof(uint64_t*), memory_node); + states = (pcm::SystemCounterState**) numa_alloc_onnode(threads * sizeof(pcm::SystemCounterState*), memory_node); + for(int i = 0; i < threads; ++i) { + collection[i] = (uint64_t*) numa_alloc_onnode(points.size() * value_count * sizeof(uint64_t), memory_node); + states[i] = (pcm::SystemCounterState*) numa_alloc_onnode(points.size() * sizeof(pcm::SystemCounterState), memory_node); + } +#endif + } + + ~pcm_value_collector() { +#ifdef PCM_MEASURE + for(int i = 0; i < threads; ++i) { + numa_free(collection[threads], points.size() * value_count * sizeof(uint64_t)); + } + numa_free(collection, threads * sizeof(uint64_t*)); + numa_free(states, threads * sizeof(pcm::SystemCounterState)); +#endif + } + + void reset() { +#ifdef PCM_MEASURE + for(int i = 0; i < threads; ++i) + for(uint32_t j = 0; j < points.size() * value_count; ++j){ + collection[i][j] = 0; + } +#endif + } + + int64_t point_index(const std::string& value) { + auto it = std::find(points.begin(), points.end(), value); + + if(it == points.end()) return -1; + else return it - points.begin(); + } + + std::vector summarize(const std::string &point) { +#ifdef PCM_MEASURE + std::vector sums(value_count); + int64_t idx = point_index(point); + if(idx < 0) return sums; + + for(uint32_t v = 0; v < value_count; ++v) { + for(uint32_t i = 0; i < threads; ++i) { + sums[v] += collection[i][static_cast(idx) + points.size() * v]; + } + } + return sums; +#endif + return std::vector {0}; + } + + std::string summarize_as_string(const std::string &point) { +#ifdef PCM_MEASURE + auto summary = summarize(point); + auto it = summary.begin(); + auto end = summary.end(); + + if(it >= end) return ""; + + std::string result(""); + result += std::to_string(*it); + ++it; + + while(it < end) { + result += ","; + result += std::to_string(*it); + ++it; + } + return result; +#endif + return ""; + } + + void start(const std::string& point, uint32_t thread) { +#ifdef PCM_MEASURE + int64_t idx = point_index(point); + if(idx < 0) { + std::cerr << "Invalid 'point' given. Ignored!" << std::endl; + return; + } + + states[thread][static_cast(idx)] = pcm::getSystemCounterState(); +#endif + } + + static std::string getHead(const std::string& point) { + return point + "_l2h," + + point + "_l2m," + + point + "_l3h," + + point + "_l3hns," + + point + "_l3m," + + point + "_mc"; + } + +#ifdef PCM_MEASURE + void read_values(uint32_t point_idx, uint32_t thread, pcm::SystemCounterState& start, pcm::SystemCounterState& end) { + collection[thread][point_idx + points.size() * 0] += getL2CacheHits(start, end); + collection[thread][point_idx + points.size() * 1] += getL2CacheMisses(start, end); + collection[thread][point_idx + points.size() * 2] += getL3CacheHits(start, end); + collection[thread][point_idx + points.size() * 3] += getL3CacheHitsNoSnoop(start, end); + collection[thread][point_idx + points.size() * 4] += getL3CacheMisses(start, end); + collection[thread][point_idx + points.size() * 5] += getBytesReadFromMC(start, end); + } +#endif + + void stop(const std::string& point, uint32_t thread) { +#ifdef PCM_MEASURE + auto state = pcm::getSystemCounterState(); + + int64_t idx = point_index(point); + if(idx < 0) { + std::cerr << "Invalid 'point' given. Ignored!" << std::endl; + return; + } + + auto start = states[thread][static_cast(idx)]; + read_values(static_cast(idx), thread, start, state); +#endif + } +}; diff --git a/qdp_project/src/utils/memory_literals.h b/qdp_project/src/utils/memory_literals.h new file mode 100644 index 0000000..bcf6395 --- /dev/null +++ b/qdp_project/src/utils/memory_literals.h @@ -0,0 +1,45 @@ +/** + * @file memory_literals.h + * @author André Berthold + * @brief Defines some operators that ease to define a certain size of memory. + * e.g. to alloc 3 Gib (Gibibit = 2^30 bit) of memory one can now simply write: "std::malloc(3_Gib)" + * to alloc 512 MB (Megabyte = 10^2 byte) of memory one can now simply write: "std::malloc(512_MB)" + * @version 0.1 + * @date 2023-05-25 + * + * @copyright Copyright (c) 2023 + * + */ +#pragma once + +#include + +typedef const unsigned long long int ull_int; +//***************************************************************************// +// Bit **********************************************************************// +//***************************************************************************// +constexpr size_t operator ""_b(ull_int value) { + // one byte is 8 bit + one byte if bit is no multiple of 8 + return value / 8 + value % 8; +} +constexpr size_t operator ""_kb (ull_int value) { return value * 1000 / 8; } +constexpr size_t operator ""_kib(ull_int value) { return value * 1024 / 8; } +constexpr size_t operator ""_Mb (ull_int value) { return value * 1000 * 1000 / 8; } +constexpr size_t operator ""_Mib(ull_int value) { return value * 1024 * 1024 / 8; } +constexpr size_t operator ""_Gb (ull_int value) { return value * 1000 * 1000 * 1000 / 8; } +constexpr size_t operator ""_Gib(ull_int value) { return value * 1024 * 1024 * 1024 / 8; } +constexpr size_t operator ""_Tb (ull_int value) { return value * 1000 * 1000 * 1000 * 1000 / 8; } +constexpr size_t operator ""_Tib(ull_int value) { return value * 1024 * 1024 * 1024 * 1024 / 8; } + +//***************************************************************************// +// Byte *********************************************************************// +//***************************************************************************// +constexpr size_t operator ""_B (ull_int value) { return value; } +constexpr size_t operator ""_kB (ull_int value) { return value * 1000; } +constexpr size_t operator ""_kiB(ull_int value) { return value * 1024; } +constexpr size_t operator ""_MB (ull_int value) { return value * 1000 * 1000; } +constexpr size_t operator ""_MiB(ull_int value) { return value * 1024 * 1024; } +constexpr size_t operator ""_GB (ull_int value) { return value * 1000 * 1000 * 1000; } +constexpr size_t operator ""_GiB(ull_int value) { return value * 1024 * 1024 * 1024; } +constexpr size_t operator ""_TB (ull_int value) { return value * 1000 * 1000 * 1000 * 1000; } +constexpr size_t operator ""_TiB(ull_int value) { return value * 1024 * 1024 * 1024 * 1024; } \ No newline at end of file diff --git a/qdp_project/src/utils/pcm.h b/qdp_project/src/utils/pcm.h new file mode 100644 index 0000000..91a19e0 --- /dev/null +++ b/qdp_project/src/utils/pcm.h @@ -0,0 +1,6 @@ +#pragma once +//this file includes all important header from the pcm repository +#include "cpucounters.h" +#include "msr.h" +#include "pci.h" +#include "mutex.h" diff --git a/qdp_project/src/utils/timer_utils.h b/qdp_project/src/utils/timer_utils.h new file mode 100644 index 0000000..b6ec54f --- /dev/null +++ b/qdp_project/src/utils/timer_utils.h @@ -0,0 +1,80 @@ +#pragma once + +#include +#include +#include + +#include + +#define THREAD_TIMINGS 1 + + + +struct thread_runtime_timing { + using time_point_t = std::chrono::time_point; + + uint32_t time_points, time_threads; + time_point_t** start_times; + double** time_accumulator; + + thread_runtime_timing(uint32_t timing_points, uint32_t timing_threads, uint32_t memory_node) { +#ifdef THREAD_TIMINGS + time_points = timing_points; + time_threads = timing_threads; + start_times = (time_point_t**) numa_alloc_onnode(timing_points * sizeof(time_point_t*), memory_node); + time_accumulator = (double**) numa_alloc_onnode(timing_points * sizeof(double*), memory_node); + for(uint32_t i = 0; i < timing_points; ++i) { + start_times[i] = (time_point_t*) numa_alloc_onnode(timing_threads * sizeof(time_point_t), memory_node); + time_accumulator[i] = (double*) numa_alloc_onnode(timing_threads * sizeof(double), memory_node); + } +#endif + } + + ~thread_runtime_timing() { +#ifdef THREAD_TIMINGS + for(uint32_t i = 0; i < time_points; ++i) { + numa_free(start_times[i], time_threads * sizeof(time_point_t)); + numa_free(time_accumulator[i], time_threads * sizeof(double)); + } + numa_free(start_times, time_points * sizeof(time_point_t*)); + numa_free(time_accumulator, time_points * sizeof(double*)); +#endif + } + + void reset_accumulator() { +#ifdef THREAD_TIMINGS + for(uint32_t i = 0; i < time_points; ++i){ + for(uint32_t j = 0; j < time_threads; ++j){ + time_accumulator[i][j] = 0.0; + }} +#endif + } + + double summarize_time(uint32_t time_point) { +#ifdef THREAD_TIMINGS + double sum = 0.0; + for(uint32_t i = 0; i < time_threads; ++i) { + sum += time_accumulator[time_point][i]; + } + return sum; +#endif + } + + void stop_timer(uint32_t point_id, uint32_t thread_id) { +#ifdef THREAD_TIMINGS + auto end_time = std::chrono::steady_clock::now(); + auto start_time = start_times[point_id][thread_id]; + + uint64_t time = std::chrono::duration_cast(end_time - start_time).count(); + double seconds = time / (1000.0 * 1000.0 * 1000.0); + time_accumulator[point_id][thread_id] += seconds; +#endif + } + + void start_timer(uint32_t point_id, uint32_t thread_id) { +#ifdef THREAD_TIMINGS + start_times[point_id][thread_id] = std::chrono::steady_clock::now(); +#endif + } + +}; diff --git a/qdp_project/src/utils/vector_loader.h b/qdp_project/src/utils/vector_loader.h new file mode 100644 index 0000000..ceab169 --- /dev/null +++ b/qdp_project/src/utils/vector_loader.h @@ -0,0 +1,93 @@ +/** + * @file vector_loader.h + * @author André Berthold + * @brief Provides an interface to easily excange vector loading strategies + * @version 0.1 + * @date 2023-05-25 + * + * @copyright Copyright (c) 2023 + * + */ + +#pragma once + +#include +#include + +#include + +enum load_mode {Unaligned = 0, Aligned = 1, Stream = 2}; + +/** + * @brief A class template that provides functions for loading and storing data of type *base_t* into/from vectors using the stretegy *mode*. + * + * @tparam base_t Base type of the data + * @tparam mode Strategy for loading the vector + */ +template +class Vector_Loader {}; + +/** + * @brief Template specialization for Vector_Loader with base_t = uint32_t. + * + * @tparam mode Strategy for loading the vector + */ +template +class Vector_Loader { + using base_t = uint32_t; + using mask_t = __mmask16; + using mask_base_t = uint8_t; +public: + + /** + * @brief Loads 512 bit of data into a vector register + * + * @param src Pointer to the data to load + * @return __m512i The vector register with the loaded data + */ + static inline __m512i load(base_t* src) { + if constexpr (mode == load_mode::Unaligned) return _mm512_loadu_epi32(src); + else if constexpr (mode == load_mode::Aligned) return _mm512_load_epi32 (src); + else if constexpr (mode == load_mode::Stream) return _mm512_stream_load_si512(src); + }; + + /** + * @brief Stroes data from a given vector register to a destination pointer + * + * @param dst Pointer to the data destination + * @param vector Vector register containing the data to store + */ + static inline void store(base_t* dst, __m512i vector) { + if constexpr (mode == load_mode::Unaligned) _mm512_storeu_epi32(dst, vector); + else if constexpr (mode == load_mode::Aligned) _mm512_store_epi32 (dst, vector); + else if constexpr (mode == load_mode::Stream) _mm512_stream_si512((__m512i*)(dst), vector); + }; +}; + +/** + * @brief Template specialization for Vector_Loader with base_t = uint64_t. + * + * @tparam mode Strategy for loading the vector + */ +template +class Vector_Loader { + using base_t = uint64_t; + using mask_t = __mmask8; + using mask_base_t = uint8_t; +public: + + + + static inline __m512i load(base_t* src) { + if constexpr (mode == load_mode::Unaligned) return _mm512_loadu_epi64(src); + else if constexpr (mode == load_mode::Aligned) return _mm512_load_epi64 (src); + else if constexpr (mode == load_mode::Stream) return _mm512_stream_load_si512(src); + }; + + static inline void store(base_t* dst, __m512i vector) { + if constexpr (mode == load_mode::Unaligned) _mm512_storeu_epi64(dst, vector); + else if constexpr (mode == load_mode::Aligned) _mm512_store_epi64 (dst, vector); + else if constexpr (mode == load_mode::Stream) _mm512_stream_si512((__m512i*)(dst), vector); + }; + +};