From aa0867aa3ac0b6eac10e575b7a83ce83492a9932 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Tue, 23 Jan 2024 21:30:46 +0100 Subject: [PATCH] reworking the qdp benchmark --- qdp_project/CMakeLists.txt | 39 +- qdp_project/src/.gitkeep | 0 qdp_project/src/Benchmark.cpp | 215 +++++++++++ qdp_project/src/benchmark/MAX_benchmark.cpp | 232 ----------- .../pipelines/MAX_scan_filter_pipe.h | 360 ------------------ qdp_project/src/utils/BenchmarkHelpers.cpp | 50 +++ .../operators => utils}/aggregation.h | 0 qdp_project/src/utils/barrier_utils.h | 73 ---- qdp_project/src/utils/cpu_set_utils.h | 82 ---- qdp_project/src/utils/file_output.h | 76 ---- .../{algorithm/operators => utils}/filter.h | 0 qdp_project/src/utils/iterable_range.h | 208 ---------- qdp_project/src/utils/measurement_utils.h | 152 -------- qdp_project/src/utils/pcm.h | 6 - qdp_project/src/utils/timer_utils.h | 80 ---- 15 files changed, 267 insertions(+), 1306 deletions(-) delete mode 100644 qdp_project/src/.gitkeep create mode 100644 qdp_project/src/Benchmark.cpp delete mode 100644 qdp_project/src/benchmark/MAX_benchmark.cpp delete mode 100755 qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h create mode 100644 qdp_project/src/utils/BenchmarkHelpers.cpp rename qdp_project/src/{algorithm/operators => utils}/aggregation.h (100%) delete mode 100644 qdp_project/src/utils/barrier_utils.h delete mode 100644 qdp_project/src/utils/cpu_set_utils.h delete mode 100644 qdp_project/src/utils/file_output.h rename qdp_project/src/{algorithm/operators => utils}/filter.h (100%) delete mode 100644 qdp_project/src/utils/iterable_range.h delete mode 100644 qdp_project/src/utils/measurement_utils.h delete mode 100644 qdp_project/src/utils/pcm.h delete mode 100644 qdp_project/src/utils/timer_utils.h diff --git a/qdp_project/CMakeLists.txt b/qdp_project/CMakeLists.txt index 2c610a2..29385fa 100644 --- a/qdp_project/CMakeLists.txt +++ b/qdp_project/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 3.18) # set the project name -project(NUMA_Slow_Fast_Datamigration_Test VERSION 0.1) +project(QDPBench VERSION 0.1) # specify the C standard set(CMAKE_CXX_STANDARD 20) @@ -26,47 +26,12 @@ add_compile_options( "$<$:${DEBUG_FLAGS}>" ) -# evaluate custom variables -function(eval vname vvalid vdefault) - # is variable is set to the below value if its not already defined from the comand line - set(VALID ${vvalid} CACHE INTERNAL "Possible values for ${vname}") - set(${vname} ${vdefault} CACHE STRING "The barrier mode") - # command for GUI shenanigans - set_property(CACHE ${vname} PROPERTY STRINGS VALID) - - if(${vname} IN_LIST VALID) - message(STATUS "Variable ${vname} = ${${vname}}") - else() - message(STATUS "Variable ${vname} has invalid value ${${vname}}") - # set the fallback value for use in parent function - unset(${vname} CACHE) - message(STATUS "Fallback to default: ${vname} = ${vdefault}") - set(${vname} ${vdefault} PARENT_SCOPE) - endif() -endfunction() - -eval(WSUPPRESS "suppress;show" "show") -if($ EQUAL 1) - add_compile_options("${SUPPRESS_WARNINGS}") -endif() - -eval(BARRIER_MODE "global;local" "global") -add_definitions(-DBARRIER_MODE="${BARRIER_MODE}") - -eval(BUFFER_LIMIT "unlimited;limited" "unlimited") -add_definitions(-DBUFFER_LIMIT=$) - -eval(THREAD_FACTOR "1;2;3;4;5;6;7;8;9;10" "1") -add_definitions(-DTHREAD_GROUP_MULTIPLIER=${THREAD_FACTOR}) - # include directories include_directories(src/utils) -include_directories(src/algorithm) -include_directories(src/algorithm/operators) # link libraries link_libraries(-lnuma -lpthread -l:libdml.a) # Add targets only below # specify build targets -add_executable(MAXBench src/benchmark/MAX_benchmark.cpp) +add_executable(QDPBench src/Benchmark.cpp) diff --git a/qdp_project/src/.gitkeep b/qdp_project/src/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/qdp_project/src/Benchmark.cpp b/qdp_project/src/Benchmark.cpp new file mode 100644 index 0000000..63699b5 --- /dev/null +++ b/qdp_project/src/Benchmark.cpp @@ -0,0 +1,215 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "const.h" +#include "filter.h" +#include "aggregation.h" +#include "array_utils.h" +#include "memory_literals.h" + +#include "../../offloading-cacher/cache.hpp" + +#include "BenchmarkHelpers.cpp" + +//////////////////////////////// +/// BENCHMARK SETUP + +constexpr size_t WL_SIZE_B = 4_GiB; +constexpr size_t CHUNK_SIZE_B = 128_MiB; +constexpr uint64_t CMP_A = 50; +constexpr uint32_t WARMUP_ITERATION_COUNT = 5; +constexpr uint32_t ITERATION_COUNT = 10; +constexpr size_t GROUP_COUNT = 4; +constexpr size_t TC_SCANA = 2; +constexpr size_t TC_SCANB = 2; +constexpr size_t TC_AGGRJ = 1; +constexpr bool PERFORM_CACHING = false; +constexpr bool DATA_IN_HBM = false; +constexpr char MODE_STRING[] = "DramBase"; + +/// DO NOT CONFIGURE BEYOND THIS +//////////////////////////////// + +constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t); +constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B; +constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t); + +using filter = Filter; +using aggregation = Aggregation; + +dsacache::Cache CACHE_; + +std::vector> BARRIERS_; +std::shared_future LAUNCH_; + +uint64_t* DATA_A_; +uint64_t* DATA_B_; +uint16_t* MASK_A_; +uint64_t* DATA_DST_; + +void scan_b(size_t gid, size_t tid) { + LAUNCH_.wait(); + + uint32_t runs = CHUNK_COUNT / GROUP_COUNT + (CHUNK_COUNT % GROUP_COUNT > gid); + + std::unique_ptr data; + + for(uint32_t i = 0; i < runs; ++i) { + // calculate pointers + size_t chunk_id = gid + GROUP_COUNT * i; + uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_B_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANB); + + if constexpr (PERFORM_CACHING) { + data = CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B / TC_SCANB); + data->WaitOnCompletion(); + } + } +} + +void scan_a(size_t gid, size_t tid) { + LAUNCH_.wait(); + + uint32_t runs = CHUNK_COUNT / GROUP_COUNT + (CHUNK_COUNT % GROUP_COUNT > gid); + + for(uint32_t i = 0; i < runs; ++i) { + // calculate pointers + size_t chunk_id = gid + GROUP_COUNT * i; + uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_B_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA); + uint16_t* mask_ptr = get_sub_mask_ptr (MASK_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA); + + filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA); + } +} + +void aggr_j(size_t gid, size_t tid) { + LAUNCH_.wait(); + + // calculate values + __m512i aggregator = aggregation::OP::zero(); + // the lower gids run once more if the chunks are not evenly distributable + uint32_t runs = CHUNK_COUNT / GROUP_COUNT + (CHUNK_COUNT % GROUP_COUNT > gid); + + for(uint32_t i = 0; i < runs; ++i) { + // calculate pointers + size_t chunk_id = gid + GROUP_COUNT * i; + uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ); + + std::unique_ptr data; + uint64_t* data_ptr; + + if constexpr (PERFORM_CACHING) { + // access the cache for the given chunk which will have been accessed in scan_b + + data = CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B / TC_AGGRJ); + + // after the copy task has finished we obtain the pointer to the cached + // copy of data_b which is then used from now on + + data_ptr = reinterpret_cast(data->GetDataLocation()); + + // nullptr is still a legal return value for CacheData::GetLocation() + // even after waiting, so this must be checked + + if (data_ptr == nullptr) { + std::cerr << "[x] Cache Miss!" << std::endl; + exit(-1); + } + } + else { + data_ptr = chunk_ptr; + } + + uint16_t* mask_ptr_a = get_sub_mask_ptr(MASK_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ); + uint64_t tmp = _mm512_reduce_add_epi64(aggregator); + aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, CHUNK_SIZE_B / TC_AGGRJ); + } + + aggregation::happly(DATA_DST_ + (tid * GROUP_COUNT + gid), aggregator); +} + +int main() { + const int current_cpu = sched_getcpu(); + const int current_node = numa_node_of_cpu(current_cpu); + const int cache_node = CachePlacementPolicy(current_node, current_node, 0); + + const std::string ofname = "results/qdp-xeonmax-simpleq-" + std::string(MODE_STRING) + "-tca" + std::to_string(TC_SCANA) + "-tcb" + std::to_string(TC_SCANB) + "-tcj" + std::to_string(TC_AGGRJ) + "-tmul" + std::to_string(GROUP_COUNT) + "-wl" + std::to_string(WL_SIZE_B) + "-cs" + std::to_string(CHUNK_SIZE_B) + ".csv"; + std::ofstream fout(ofname); + + fout << "run;time;" << std::endl; + + if constexpr (DATA_IN_HBM) { + DATA_A_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, cache_node); + DATA_B_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, cache_node); + MASK_A_ = (uint16_t*) numa_alloc_onnode(WL_SIZE_ELEMENTS, cache_node); + DATA_DST_ = (uint64_t*) numa_alloc_onnode(TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t), cache_node); + } + else { + DATA_A_ = (uint64_t*) numa_alloc_local(WL_SIZE_B); + DATA_B_ = (uint64_t*) numa_alloc_local(WL_SIZE_B); + MASK_A_ = (uint16_t*) numa_alloc_local(WL_SIZE_ELEMENTS); + DATA_DST_ = (uint64_t*) numa_alloc_local(TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t)); + } + + if constexpr (PERFORM_CACHING) { + CACHE_.Init(CachePlacementPolicy, CopyMethodPolicy); + } + + fill_mt(DATA_A_, WL_SIZE_B, 0, 100, 42); + fill_mt(DATA_A_, WL_SIZE_B, 0, 100, 420); + + for (uint32_t i = 0; i < ITERATION_COUNT + WARMUP_ITERATION_COUNT; i++) { + CACHE_.Clear(); + + std::promise launch_promise; + LAUNCH_ = launch_promise.get_future(); + + std::vector filter_pool; + std::vector copy_pool; + std::vector agg_pool; + + for(uint32_t gid = 0; gid < GROUP_COUNT; ++gid) { + for(uint32_t tid = 0; tid < TC_SCANA; ++tid) { + filter_pool.emplace_back(scan_a, gid, tid); + } + + for(uint32_t tid = 0; tid < TC_SCANB; ++tid) { + copy_pool.emplace_back(scan_b, gid, tid); + } + + for(uint32_t tid = 0; tid < TC_AGGRJ; ++tid) { + agg_pool.emplace_back(aggr_j, gid, tid); + } + } + + const auto time_start = std::chrono::steady_clock::now(); + + launch_promise.set_value(); + + for(std::thread& t : filter_pool) { t.join(); } + for(std::thread& t : copy_pool) { t.join(); } + for(std::thread& t : agg_pool) { t.join(); } + + Aggregation::apply(DATA_DST_, DATA_DST_, sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT); + + const auto time_end = std::chrono::steady_clock::now(); + + if (i >= WARMUP_ITERATION_COUNT) { + fout << i << ";" << std::chrono::duration_cast(time_end - time_start).count() << std::endl; + } + } + + numa_free(DATA_A_, WL_SIZE_B); + numa_free(DATA_B_, WL_SIZE_B); + numa_free(MASK_A_, WL_SIZE_ELEMENTS); + numa_free(DATA_DST_, TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t)); + + return 0; +} \ No newline at end of file diff --git a/qdp_project/src/benchmark/MAX_benchmark.cpp b/qdp_project/src/benchmark/MAX_benchmark.cpp deleted file mode 100644 index ff559cc..0000000 --- a/qdp_project/src/benchmark/MAX_benchmark.cpp +++ /dev/null @@ -1,232 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#ifndef THREAD_GROUP_MULTIPLIER -#define THREAD_GROUP_MULTIPLIER 2 -#endif - -#ifndef BARRIER_MODE -#define BARRIER_MODE "global" -#endif - -#ifndef BUFFER_LIMIT -#define BUFFER_LIMIT 1 -#endif - -#include "const.h" - -#include "file_output.h" -#include "array_utils.h" -#include "timer_utils.h" -#include "barrier_utils.h" -#include "measurement_utils.h" -#include "cpu_set_utils.h" -#include "iterable_range.h" -#include "memory_literals.h" - -#include "pipelines/MAX_scan_filter_pipe.h" - -#include "aggregation.h" -#include "filter.h" - -using base_t = uint64_t; - -static int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { - return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node; -} - -base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) { - base_t sum = 0; - for(int i = 0; i < row_size / sizeof(base_t); ++i) { - sum += (row_A[i] < compare_value) * row_B[i]; - } - return sum; -} - -base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t* row_A, base_t* row_B, size_t row_size) { - base_t sum = 0; - for(int i = 0; i < row_size / sizeof(base_t); ++i) { - sum += (row_A[i] < compare_value_a && row_B[i] < compare_value_b) * row_B[i]; - } - return sum; -} - -enum class ExecMode { - DramBaseline, - HbmPeak, - HbmPrefetch -}; - -int main(int argc, char** argv) { - constexpr ExecMode mode = ExecMode::HbmPrefetch; - - constexpr size_t workload_b = 4_GiB; - constexpr base_t compare_value_a = 50; - constexpr base_t compare_value_b = 42; - - constexpr size_t chunk_size = 64_MiB; - - // thread count is 12 here but as the default measurement uses 6 - // we must restrict the core assignment of these 12 threads to - // 6 physical cpu cores on the executing node - - - - /*** alloc data and buffers ************************************************/ - - uint8_t tc_filter; - uint8_t tc_copy; - uint8_t tc_agg; - - if constexpr (mode == ExecMode::HbmPrefetch) { - tc_filter = 8; - tc_copy = 1; - tc_agg = 4; - } - else { - tc_filter = 8; - tc_copy = 0; - tc_agg = 4; - } - - const uint8_t tc_combined = tc_filter + tc_copy + tc_agg; - - base_t* data_a; - base_t* data_b; - base_t* results; - - const int current_cpu = sched_getcpu(); - const int current_node = numa_node_of_cpu(current_cpu); - const int cache_node = CachePlacementPolicy(current_node, current_node, 0); - - std::ofstream out_file; - - std::string mode_string; - if constexpr (mode == ExecMode::HbmPrefetch) { - mode_string = "HbmDsaPrefetch"; - } - else if constexpr (mode == ExecMode::HbmPeak) { - mode_string = "HbmAllocPeak"; - } - else if constexpr (mode == ExecMode::DramBaseline) { - mode_string = "DramBaseline"; - } - else { - mode_string = "Unknown"; - } - - const std::string ofname = "results/qdp-xeonmax-simpleq-" + mode_string + "-tca" + std::to_string(tc_filter) + "-tcb" + std::to_string(tc_copy) + "-tcj" + std::to_string(tc_agg) + "-tmul" + std::to_string(THREAD_GROUP_MULTIPLIER) + "-wl" + std::to_string(workload_b) + "-cs" + std::to_string(chunk_size) + ".csv"; - - out_file.open(ofname); - - if (!out_file.is_open()) { - std::cerr << "Failed to open Output File '" << ofname << "'" << std::endl; - } - - print_to_file(out_file, "thread_group", "time", -#ifdef THREAD_TIMINGS - "scan_a", "scan_b", "aggr_j", -#endif -#ifdef BARRIER_TIMINGS - "wait_scan_a", "wait_scan_b", "wait_aggr_j", -#endif - "result"); - - if constexpr (mode == ExecMode::HbmPeak) { - data_a = (base_t*) numa_alloc_onnode(workload_b, cache_node); - data_b = (base_t*) numa_alloc_onnode(workload_b, cache_node); - results = (base_t*) numa_alloc_onnode(tc_combined * sizeof(base_t), cache_node); - } - else { - data_a = (base_t*) numa_alloc_onnode(workload_b, current_node); - data_b = (base_t*) numa_alloc_onnode(workload_b, current_node); - results = (base_t*) numa_alloc_onnode(tc_combined * sizeof(base_t), current_node); - } - - fill_mt(data_a, workload_b, 0, 100, 42); - fill_mt(data_b, workload_b, 0, 100, 420); - - for(uint32_t i = 0; i < 15; i++) { - std::promise p; - std::shared_future ready_future(p.get_future()); - - Query_Wrapper qw ( - &ready_future, workload_b, chunk_size, - data_a, data_b, results, tc_filter, tc_copy, - tc_agg,compare_value_a, compare_value_b - ); - - qw.ready_future = &ready_future; - qw.clear_buffers(); - - auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.scan_a(gid, gcnt, tid); }; - auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.scan_b(gid, gcnt, tid); }; - auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.aggr_j(gid, gcnt, tid); }; - - std::vector filter_pool; - std::vector copy_pool; - std::vector agg_pool; - std::vector combined_pool; - - for(uint32_t gid = 0; gid < THREAD_GROUP_MULTIPLIER; ++gid) { - for(uint32_t tid = 0; tid < tc_filter; ++tid) { - filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); - } - - // if tc_copy == 0 this loop is skipped - for(uint32_t tid = 0; tid < tc_copy; ++tid) { - copy_pool.emplace_back(copy_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); - } - - for(uint32_t tid = 0; tid < tc_agg; ++tid) { - agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); - } - } - - auto start = std::chrono::steady_clock::now(); - p.set_value(); - - for(std::thread& t : filter_pool) { t.join(); } - for(std::thread& t : copy_pool) { t.join(); } - for(std::thread& t : agg_pool) { t.join(); } - - Aggregation::apply(results, results, sizeof(base_t) * tc_agg * THREAD_GROUP_MULTIPLIER); - auto end = std::chrono::steady_clock::now(); - - constexpr double nanos_per_second = ((double)1000) * 1000 * 1000; - uint64_t nanos = std::chrono::duration_cast(end - start).count(); - double seconds = (double)(nanos) / nanos_per_second; - - if (i >= 5) { - print_to_file(out_file, THREAD_GROUP_MULTIPLIER, seconds, - #ifdef THREAD_TIMINGS - qw.trt->summarize_time(0), qw.trt->summarize_time(1), qw.trt->summarize_time(2), - #endif - #ifdef BARRIER_TIMINGS - qw.bt->summarize_time(0), qw.bt->summarize_time(1), qw.bt->summarize_time(2), - #endif - results[0]); - out_file << std::endl; - } - } - - numa_free(data_a, workload_b); - numa_free(data_b, workload_b); - numa_free(results, THREAD_GROUP_MULTIPLIER * tc_combined * sizeof(base_t)); -} diff --git a/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h b/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h deleted file mode 100755 index 9b7c178..0000000 --- a/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h +++ /dev/null @@ -1,360 +0,0 @@ - -#include -#include -#include -#include -#include - -#include - -#include "filter.h" -#include "aggregation.h" -#include "vector_loader.h" -#include "timer_utils.h" -#include "barrier_utils.h" -#include "measurement_utils.h" - -#include "../../../../offloading-cacher/cache.hpp" - -template -class Query_Wrapper { -public: - // sync - std::shared_future* ready_future; - - thread_runtime_timing* trt; - barrier_timing* bt; - pcm_value_collector* pvc; - -private: - static constexpr size_t COPY_POLICY_MIN_SIZE = 64 * 1024 * 1024; - dsacache::Cache cache_; - - // data - size_t size_b; - size_t chunk_size_b; - size_t chunk_size_w; - size_t chunk_cnt; - base_t* data_a; - base_t* data_b; - base_t* dest; - - // ratios - uint32_t thread_count_fc; - uint32_t thread_count_fi; - uint32_t thread_count_ag; - - // done bits - volatile uint8_t* ready_flag_a; - volatile uint8_t* ready_flag_b; - - // buffer - uint16_t* mask_a; - uint16_t* mask_b; - - // params - base_t cmp_a; - base_t cmp_b; - - // sync - std::unique_ptr*>> sync_barrier; - std::string barrier_mode = BARRIER_MODE; - - using filterCopy = Filter; - using filterNoCopy = Filter; - using filter = Filter; - using aggregation = Aggregation; - - static int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { - return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node; - } - - static std::vector CopyMethodPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { - if (data_size < COPY_POLICY_MIN_SIZE) { - // if the data size is small then the copy will just be carried - // out by the destination node which does not require setting numa - // thread affinity as the selected dsa engine is already the one - // present on the calling thread - - return std::vector{ (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) }; - } - else { - // for sufficiently large data, smart copy is used which will utilize - // all four engines for intra-socket copy operations and cross copy on - // the source and destination nodes for inter-socket copy - - const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0; - - if (same_socket) { - const bool socket_number = numa_dst_node >> 2; - if (socket_number == 0) return std::vector{ 0, 1, 2, 3 }; - else return std::vector{ 4, 5, 6, 7 }; - } - else { - return std::vector{ - (numa_src_node >= 8 ? numa_src_node - 8 : numa_src_node), - (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) - }; - } - } - } - -public: - Query_Wrapper(std::shared_future* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a, - base_t* data_b, base_t* dest, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag, - base_t cmp_a = 50, base_t cmp_b = 42) : - ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b), - dest(dest), cmp_a(cmp_a), cmp_b(cmp_b) { - - const int current_cpu = sched_getcpu(); - const int current_node = numa_node_of_cpu(current_cpu); - const int cache_node = CachePlacementPolicy(current_node, current_node, 0); - - chunk_size_w = chunk_size_b / sizeof(base_t); - chunk_cnt = size_b / chunk_size_b; - - thread_count_fi = tc_fi; - thread_count_fc = tc_fc; - thread_count_ag = tc_ag; - - ready_flag_a = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), cache_node); - ready_flag_b = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), cache_node); - - mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node); - mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node); - - if constexpr (caching) { - cache_.Init(CachePlacementPolicy, CopyMethodPolicy); - } - - size_t measurement_space = std::max(std::max(tc_fi, tc_fc), tc_ag); - trt = new thread_runtime_timing(3, measurement_space, current_node); - bt = new barrier_timing(3, measurement_space, current_node); - pvc = new pcm_value_collector({"scan_a", "scan_b", "aggr_j"}, measurement_space, current_node); - reset_barriers(); - }; - - void reset_barriers(){ - if(sync_barrier != nullptr) { - for(auto& barrier : *sync_barrier) { - delete barrier; - } - sync_barrier.reset(); - } - - uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc; - sync_barrier = std::make_unique*>>(thread_count_sum); - uint32_t barrier_count = barrier_mode == "global" ? 1 : thread_count_sum; - - for(uint32_t i = 0; i < barrier_count; ++i) { - (*sync_barrier)[i] = new std::barrier(thread_count_sum); - } - } - - void clear_buffers () { - std::memset((void*)ready_flag_a, 0x00, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0)); - std::memset((void*)ready_flag_b, 0x00, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0)); - - std::memset(mask_a, 0x00, size_b / sizeof(base_t)); - std::memset(mask_b, 0x00, size_b / sizeof(base_t)); - - if constexpr (caching) { - cache_.Clear(); - } - - trt->reset_accumulator(); - bt->reset_accumulator(); - pvc->reset(); - reset_barriers(); - }; - - ~Query_Wrapper() { - numa_free((void*)ready_flag_a, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0)); - numa_free((void*)ready_flag_b, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0)); - - numa_free(mask_a, size_b / sizeof(base_t)); - numa_free(mask_b, size_b / sizeof(base_t)); - - delete trt; - for(auto& barrier : *sync_barrier) { - delete barrier; - } - delete bt; - delete pvc; - }; - - //this can be set without need to change allocations - void set_thread_group_count(uint32_t value) { - this->thread_group = value; - }; - -private: - static inline base_t* get_sub_chunk_ptr(base_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) { - base_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w; - return chunk_ptr + tid * (chunk_size_w / tcnt); - } - - static inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) { - // 16 integer are addressed with one uint16_t in mask buffer - size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt); - return base_ptr + (offset / 16); - } - - static bool bit_at(volatile uint8_t* bitmap, uint32_t bitpos) { - uint8_t value = bitmap[bitpos / 8]; - switch(bitpos % 8) { - case 0: return value & 0b00000001; - case 1: return value & 0b00000010; - case 2: return value & 0b00000100; - case 3: return value & 0b00001000; - case 4: return value & 0b00010000; - case 5: return value & 0b00100000; - case 6: return value & 0b01000000; - case 7: return value & 0b10000000; - default: return false; - } - } - - static void set_bit_at(volatile uint8_t* bitmap, std::mutex& mutex, uint32_t bitpos) { - mutex.lock(); - switch(bitpos % 8) { - case 0: bitmap[bitpos / 8] |= 0b00000001;break; - case 1: bitmap[bitpos / 8] |= 0b00000010;break; - case 2: bitmap[bitpos / 8] |= 0b00000100;break; - case 3: bitmap[bitpos / 8] |= 0b00001000;break; - case 4: bitmap[bitpos / 8] |= 0b00010000;break; - case 5: bitmap[bitpos / 8] |= 0b00100000;break; - case 6: bitmap[bitpos / 8] |= 0b01000000;break; - case 7: bitmap[bitpos / 8] |= 0b10000000;break; - } - mutex.unlock(); - } - -public: - void scan_b(size_t gid, size_t gcnt, size_t tid) { - size_t tcnt = thread_count_ag; - assert(chunk_size_w % tcnt == 0); - assert(chunk_size_w % 16 == 0); - assert(chunk_size_w % tcnt * 16 == 0); - - // wait till everyone can start - ready_future->wait(); - - // the lower gids run once more if the chunks are not evenly distributable - uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); - uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; - - std::unique_ptr data; - - for(uint32_t i = 0; i < runs; ++i) { - trt->start_timer(1, tid * gcnt + gid); - - // calculate pointers - size_t chunk_id = gid + gcnt * i; - base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); - - if constexpr (caching) { - data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); - } - - trt->stop_timer(1, tid * gcnt + gid); - } - - if constexpr (caching) { - data->WaitOnCompletion(); - } - - (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); - } - - void scan_a(size_t gid, size_t gcnt, size_t tid) { - size_t tcnt = thread_count_fi; - assert(chunk_size_w % tcnt == 0); - assert(chunk_size_w % 16 == 0); - assert(chunk_size_w % tcnt * 16 == 0); - - // wait till everyone can start - ready_future->wait(); - - // the lower gids run once more if the chunks are not evenly distributable - uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); - uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; - - for(uint32_t i = 0; i < runs; ++i) { - trt->start_timer(0, tid * gcnt + gid); - pvc->start("scan_a", tid * gcnt + gid); - // calculate pointers - size_t chunk_id = gid + gcnt * i; - base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt); - uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); - - filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt); - - pvc->stop("scan_a", tid * gcnt + gid); - trt->stop_timer(0, tid * gcnt + gid); - bt->timed_wait(*(*sync_barrier)[barrier_idx], 0, tid * gcnt + gid); - } - (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); - } - - void aggr_j(size_t gid, size_t gcnt, size_t tid) { - size_t tcnt = thread_count_ag; - // wait till everyone can start - ready_future->wait(); - - // calculate values - __m512i aggregator = aggregation::OP::zero(); - // the lower gids run once more if the chunks are not evenly distributable - uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); - uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; - - for(uint32_t i = 0; i < runs; ++i) { - bt->timed_wait(*(*sync_barrier)[barrier_idx], 2, tid * gcnt + gid); - trt->start_timer(2, tid * gcnt + gid); - pvc->start("aggr_j", tid * gcnt + gid); - - // calculate pointers - size_t chunk_id = gid + gcnt * i; - base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); - - std::unique_ptr data; - base_t* data_ptr; - - if constexpr (caching) { - // access the cache for the given chunk which will have been accessed in scan_b - - data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); - - // after the copy task has finished we obtain the pointer to the cached - // copy of data_b which is then used from now on - - data_ptr = reinterpret_cast(data->GetDataLocation()); - - // nullptr is still a legal return value for CacheData::GetLocation() - // even after waiting, so this must be checked - - if (data_ptr == nullptr) { - std::cerr << "[x] Cache Miss!" << std::endl; - exit(-1); - } - } - else { - data_ptr = chunk_ptr; - } - - uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); - - base_t tmp = _mm512_reduce_add_epi64(aggregator); - - aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, chunk_size_b / tcnt); - - pvc->stop("aggr_j", tid * gcnt + gid); - trt->stop_timer(2, tid * gcnt + gid); - } - - // so threads with more runs dont wait for alerady finished threads - (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); - - aggregation::happly(dest + (tid * gcnt + gid), aggregator); - } -}; diff --git a/qdp_project/src/utils/BenchmarkHelpers.cpp b/qdp_project/src/utils/BenchmarkHelpers.cpp new file mode 100644 index 0000000..10aa6aa --- /dev/null +++ b/qdp_project/src/utils/BenchmarkHelpers.cpp @@ -0,0 +1,50 @@ +#include + +int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { + return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node; +} + +std::vector CopyMethodPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { + // for sufficiently large data, smart copy is used which will utilize + // all four engines for intra-socket copy operations and cross copy on + // the source and destination nodes for inter-socket copy + + const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0; + + if (same_socket) { + const bool socket_number = numa_dst_node >> 2; + if (socket_number == 0) return std::vector{ 0, 1, 2, 3 }; + else return std::vector{ 4, 5, 6, 7 }; + } + else { + return std::vector{ + (numa_src_node >= 8 ? numa_src_node - 8 : numa_src_node), + (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) + }; + } +} + +struct NopStruct { + inline void operator() () { + return; + } +}; + +inline uint64_t* get_sub_chunk_ptr(uint64_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) { + uint64_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w; + return chunk_ptr + tid * (chunk_size_w / tcnt); +} + +inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) { + // 16 integer are addressed with one uint16_t in mask buffer + size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt); + return base_ptr + (offset / 16); +} + +uint64_t sum_check(uint64_t compare_value, uint64_t* row_A, uint64_t* row_B, size_t row_size) { + uint64_t sum = 0; + for(int i = 0; i < row_size / sizeof(uint64_t); ++i) { + sum += (row_A[i] < compare_value) * row_B[i]; + } + return sum; +} \ No newline at end of file diff --git a/qdp_project/src/algorithm/operators/aggregation.h b/qdp_project/src/utils/aggregation.h similarity index 100% rename from qdp_project/src/algorithm/operators/aggregation.h rename to qdp_project/src/utils/aggregation.h diff --git a/qdp_project/src/utils/barrier_utils.h b/qdp_project/src/utils/barrier_utils.h deleted file mode 100644 index a68f801..0000000 --- a/qdp_project/src/utils/barrier_utils.h +++ /dev/null @@ -1,73 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#define BARRIER_TIMINGS 1 - - -struct barrier_completion_function { - inline void operator() () { - return; - } -}; - -struct barrier_timing { - - uint32_t time_points, time_threads; - double** time_accumulator; - - barrier_timing(uint32_t timing_points, uint32_t timing_threads, uint32_t memory_node) { -#ifdef BARRIER_TIMINGS - time_points = timing_points; - time_threads = timing_threads; - time_accumulator = (double**) numa_alloc_onnode(timing_points * sizeof(double*), memory_node); - for(uint32_t i = 0; i < timing_points; ++i) { - time_accumulator[i] = (double*) numa_alloc_onnode(timing_threads * sizeof(double), memory_node); - } -#endif - } - - ~barrier_timing() { -#ifdef BARRIER_TIMINGS - for(uint32_t i = 0; i < time_points; ++i) { - numa_free(time_accumulator[i], time_threads * sizeof(double)); - } - numa_free(time_accumulator, time_points * sizeof(double*)); -#endif - } - - void reset_accumulator() { -#ifdef BARRIER_TIMINGS - for(uint32_t i = 0; i < time_points; ++i){ - for(uint32_t j = 0; j < time_threads; ++j){ - time_accumulator[i][j] = 0.0; - }} -#endif - } - - double summarize_time(uint32_t time_point) { -#ifdef BARRIER_TIMINGS - double sum = 0.0; - for(uint32_t i = 0; i < time_threads; ++i) { - sum += time_accumulator[time_point][i]; - } - return sum; -#endif - } - - void timed_wait(std::barrier& barrier, uint32_t point_id, uint32_t thread_id) { -#ifdef BARRIER_TIMINGS - auto before_barrier = std::chrono::steady_clock::now(); -#endif - barrier.arrive_and_wait(); -#ifdef BARRIER_TIMINGS - auto after_barrier = std::chrono::steady_clock::now(); - uint64_t barrier_wait_time = std::chrono::duration_cast(after_barrier - before_barrier).count(); - double seconds = barrier_wait_time / (1000.0 * 1000.0 * 1000.0); - time_accumulator[point_id][thread_id] += seconds; -#endif - } -}; \ No newline at end of file diff --git a/qdp_project/src/utils/cpu_set_utils.h b/qdp_project/src/utils/cpu_set_utils.h deleted file mode 100644 index ba82604..0000000 --- a/qdp_project/src/utils/cpu_set_utils.h +++ /dev/null @@ -1,82 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include - -/** Sets all bits in a given cpu_set_t between L and H (condition L <= H)*/ -#define CPU_BETWEEN(L, H, SET) assert(L <= H); for(; L < H; ++L) {CPU_SET(L, SET);} - -/** - * Applies the affinity defined in set to the thread, through pthread library - * calls. If it fails it wites the problem to stderr and terminated the program. -*/ -inline void pin_thread(std::thread& thread, cpu_set_t* set) { - int error_code = pthread_setaffinity_np(thread.native_handle(), sizeof(cpu_set_t), set); - if (error_code != 0) { - std::cerr << "Error calling pthread_setaffinity_np in copy_pool assignment: " << error_code << std::endl; - exit(-1); - } -} - -/** - * Returns the cpu id of the thread_id-th cpu in a given (multi)range. Thread_id - * greater than the number of cpus in the (multi)range are valid. In this case - * the (thread_id % #cpus in the range)-th cpu in the range is returned. -*/ -int get_cpu_id(int thread_id, const std::vector>& range) { - int subrange_size = range[0].second - range[0].first; - - int i = 0; - while(subrange_size <= thread_id) { - thread_id -= subrange_size; - i = (i + 1) % range.size(); - subrange_size = range[i].second - range[i].first; - } - return thread_id + range[i].first; -} - -/*inline void cpu_set_between(cpu_set_t* set, uint32_t low, uint32_t high) { - assert(low != high); - if (low > high) std::swap(low, high); - - for(; low < high; ++low) { - CPU_SET(low, set); - } -}*/ - -/** - * Pins the given thread to the thread_id-th cpu in the given range. -*/ -void pin_thread_in_range(std::thread& thread, int thread_id, std::vector>& range) { - cpu_set_t set; - CPU_ZERO(&set); - CPU_SET(get_cpu_id(thread_id, range), &set); - - pin_thread(thread, &set); -} - -/** - * Pins the given thread to all cpus in the given range. -*/ -void pin_thread_in_range(std::thread& thread, std::vector>& range) { - cpu_set_t set; - CPU_ZERO(&set); - for(auto r : range) { CPU_BETWEEN(r.first, r.second, &set); } - - pin_thread(thread, &set); -} - -/** - * Pins the given thread to all cpu ids between low (incl.) and high (excl.). -*/ -inline void pin_thread_between(std::thread& thread, uint32_t low, uint32_t high) { - cpu_set_t set; - CPU_ZERO(&set); - CPU_BETWEEN(low, high, &set); - - pin_thread(thread, &set); -} \ No newline at end of file diff --git a/qdp_project/src/utils/file_output.h b/qdp_project/src/utils/file_output.h deleted file mode 100644 index 1dd85ba..0000000 --- a/qdp_project/src/utils/file_output.h +++ /dev/null @@ -1,76 +0,0 @@ -/** - * @file file_output.h - * @author André Berthold - * @brief Implements a template-function that accepts an arbitrary number of parameters that should be printed - * @version 0.1 - * @date 2023-05-25 - * - * @copyright Copyright (c) 2023 - * - */ -#pragma once - -#include -#include -#include - -#include "iterable_range.h" - -template -inline constexpr bool is_numeric_v = std::disjunction< - std::is_integral, - std::is_floating_point>::value; - -/** - * @brief Converts a parameter to a string by either using it directly or its member current (if it is of type Labeled) - * as parameter to the std::string-Constructor. - * - * @tparam T Type of the parameter - * @param value Parameter to be converted - * @return std::string The converted parameter - */ -template -inline std::string to_string(T value) { - if constexpr(std::is_base_of::value){ - // integrals cannot be use in the string constructor and must be translated by the std::to_string-function - if constexpr (is_numeric_v) { - return std::to_string(value.current); - } else { - return std::string(value.current); - } - } else { - // integrals cannot be use in the string constructor and must be translated by the std::to_string-function - if constexpr (is_numeric_v) { - return std::to_string(value); - } else { - return std::string(value); - } - } -} - -/** - * @brief This function wites the content of *val* to *file*. Terminates terecursive function definition. - * - * @tparam type Type of the paramter *val* (is usually implicitly defeined) - * @param file File that is written to - * @param val Value that is translated to a char stream and written to the file - */ -template -inline void print_to_file(std::ofstream &file, type val) { - file << to_string(val) << std::endl; -} - -/** - * @brief This function wites the content of *val* and that content if *vals* to *file*. - * - * @tparam type Type of the paramter *val* (is usually implicitly defeined) - * @tparam types Parameter pack that describes the types of *vals* - * @param file File that is written to - * @param val Value that is translated to a char stream and written to the file - * @param vals Paramater pack of values that are gonna be printed to the file - */ -template -inline void print_to_file(std::ofstream &file, type val, types ... vals) { - file << to_string(val) << ","; - print_to_file(file, vals...); -} \ No newline at end of file diff --git a/qdp_project/src/algorithm/operators/filter.h b/qdp_project/src/utils/filter.h similarity index 100% rename from qdp_project/src/algorithm/operators/filter.h rename to qdp_project/src/utils/filter.h diff --git a/qdp_project/src/utils/iterable_range.h b/qdp_project/src/utils/iterable_range.h deleted file mode 100644 index 95fc57e..0000000 --- a/qdp_project/src/utils/iterable_range.h +++ /dev/null @@ -1,208 +0,0 @@ - #pragma once - -#include -#include -#include - - -constexpr auto NO_NEXT = "false"; - -/** - * @brief Class that adds an label member-parameter to a sub-class - * - */ -class Labeled { -public: - std::string label; -public: - Labeled(std::string str) : label(str) {}; - Labeled(const char* str) { this->label = std::string(str); }; -}; - -/** - * @brief Converts a parameter to a string by either reading the member label (if it is of type Labeled) or using it - * as parameter to the std::string-Constructor. - * - * @tparam T Type of the parameter - * @param value Parameter to be converted - * @return std::string The converted parameter - */ -template -inline std::string generateHead(T value) { - if constexpr(std::is_base_of::value){ - return value.label; - } else { - return std::string(value); - } -} - -/** - * @brief Converts a parameter-pack to a string calling genarateHead(T) on every parameter and concatenatin the results. - * - * @tparam T Type of the first parameter - * @tparam Ts Parameter pack specifying the preceeding parameters' types - * @param value Parameter to be transformed - * @param values Parameter-pack of the next prameters to be transformed - * @return std::string Comma-separated concatenation of all parameters string representation - */ -template -inline std::string generateHead(T value, Ts... values) { - return generateHead(value) + ',' + generateHead(values...); -} - - -/** - * @brief Takes a single Range object and calls its next function. - * - * @tparam T Specific type of the Range object - * @param t Instance of the Range object - * @return std::string Label of the Range object or "false" if the Range reaced its end and was reset - */ -template -std::string IterateOnce(T& t) { - if(t.next()) return t.label; - else t.reset(); - return std::string(NO_NEXT); //the string signalises that the iteration has to be terminiated. -} - -/** - * @brief Takes a number of Range objects and recusively increments them till the first Range does not reach its end - * upon incrementing. It tarts at the first Range object given. Every Range object that reached its end is reset to - * its start value. - * - * @tparam T Specific type of the first Range object - * @tparam Ts Types to the following Range objects - * @param t First instance of the Range object - * @param ts Parameter pack of the following Range objects - * @return std::string Label of the highest index Range object that was altered, or "false" if the last Range object - * reache its end and was reset - */ -template -std::string IterateOnce(T& t , Ts&... ts) { - if(t.next()) return t.label; - else t.reset(); - return IterateOnce(ts...); -} - - -/** - * @brief Class that provides a convenient interface for iteratin throug a parameter range. It stores a public value - * that can be altered by the classes' methods. - * - * @tparam T Base type of the parameter - * @tparam INIT Initial value of the current pointer - * @tparam PRED Struct providing an apply function testing if the current value is in range or not - * @tparam INC Struct providing an apply function setting the current value to the value following the current value - */ -template -class Range : public Labeled { -public: - /** - * @brief Current value of the parameter - */ - T current = INIT; - - /** - * @brief Resets current to its initial value - */ - void reset() {current = INIT; }; - - /** - * @brief Sets current to its next value (according to INC::inc) and returns if the range Reached its end - * (accordingt to PRED::pred). - * - * @return true The newly assigned value of current is in the range - * @return false Otherwise - */ - bool next() { - current = INC::inc(current); - return PRED::pred(current); - }; - - /** - * @brief Checks if current is in the Range (according to PRED). - * - * @return true PRED returns true - * @return false Otherwise - */ - bool valid() { return PRED::apply(current); }; -}; - -/** - * @brief Class that is in contrast to Range specialized for integral values. - * - * @tparam T Integral base type of the Range - * @tparam INIT Initial value of the parameter - * @tparam MAX Maximal value of the parameter - * @tparam INC Struct providing an apply function setting the current value to the value following the current value - */ -template -class Int_Range : public Labeled { -static_assert(std::is_integral::value, "Int_Range requires an integral base type"); - -public: - const T max = MAX; - T current = INIT; - - void reset() {current = INIT; }; - - bool next() { - current = INC::inc(current); - return current < MAX; - }; - - bool valid() { return current < MAX; }; - -}; - -/** - * @brief Class that is in contrast to Int_Range specialized for integrals that grow linearly. - * - * @tparam T Integral base type of the Range - * @tparam INIT Initial value of the parameter - * @tparam MAX Maximal value of the parameter - * @tparam STEP Increase of the value per next()-call - */ -template -class Linear_Int_Range : public Labeled { -static_assert(std::is_integral::value, "Linear_Int_Range requires an integral base type"); - -public: - const T max = MAX; - T current = INIT; - - void reset() {current = INIT; }; - - bool next() { - current += STEP; - return current < MAX; - }; - - bool valid() { return current < MAX; }; -}; - -/** - * @brief Class that is in contrast to Int_Range specialized for integrals that grow exponetially. - * - * @tparam T Integral base type of the Range - * @tparam INIT Initial value of the parameter - * @tparam MAX Maximal value of the parameter - * @tparam FACTOR Multiplicative Increase of the value per next()-call - */ -template -class Exp_Int_Range : public Labeled { -static_assert(std::is_integral::value, "Exp_Int_Range requires an integral base type"); - -public: - const T max = MAX; - T current = INIT; - - void reset() {current = INIT; }; - - bool next() { - current *= FACTOR; - return current < MAX; - }; - - bool valid() { return current < MAX; }; -}; \ No newline at end of file diff --git a/qdp_project/src/utils/measurement_utils.h b/qdp_project/src/utils/measurement_utils.h deleted file mode 100644 index f403de0..0000000 --- a/qdp_project/src/utils/measurement_utils.h +++ /dev/null @@ -1,152 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -#include - - -#if PCM_M == 1 -#define PCM_MEASURE 1 -#include "pcm.h" -#endif - - - -struct pcm_value_collector { - const uint32_t value_count = 6; - - uint32_t threads; - std::vector points; -#ifdef PCM_MEASURE - pcm::SystemCounterState** states; -#endif - uint64_t** collection; - - pcm_value_collector(const std::vector& in_points, uint32_t threads, uint32_t memory_node) : threads(threads) { -#ifdef PCM_MEASURE - points = std::vector(in_points); - - collection = (uint64_t**) numa_alloc_onnode(threads * sizeof(uint64_t*), memory_node); - states = (pcm::SystemCounterState**) numa_alloc_onnode(threads * sizeof(pcm::SystemCounterState*), memory_node); - for(int i = 0; i < threads; ++i) { - collection[i] = (uint64_t*) numa_alloc_onnode(points.size() * value_count * sizeof(uint64_t), memory_node); - states[i] = (pcm::SystemCounterState*) numa_alloc_onnode(points.size() * sizeof(pcm::SystemCounterState), memory_node); - } -#endif - } - - ~pcm_value_collector() { -#ifdef PCM_MEASURE - for(int i = 0; i < threads; ++i) { - numa_free(collection[threads], points.size() * value_count * sizeof(uint64_t)); - } - numa_free(collection, threads * sizeof(uint64_t*)); - numa_free(states, threads * sizeof(pcm::SystemCounterState)); -#endif - } - - void reset() { -#ifdef PCM_MEASURE - for(int i = 0; i < threads; ++i) - for(uint32_t j = 0; j < points.size() * value_count; ++j){ - collection[i][j] = 0; - } -#endif - } - - int64_t point_index(const std::string& value) { - auto it = std::find(points.begin(), points.end(), value); - - if(it == points.end()) return -1; - else return it - points.begin(); - } - - std::vector summarize(const std::string &point) { -#ifdef PCM_MEASURE - std::vector sums(value_count); - int64_t idx = point_index(point); - if(idx < 0) return sums; - - for(uint32_t v = 0; v < value_count; ++v) { - for(uint32_t i = 0; i < threads; ++i) { - sums[v] += collection[i][static_cast(idx) + points.size() * v]; - } - } - return sums; -#endif - return std::vector {0}; - } - - std::string summarize_as_string(const std::string &point) { -#ifdef PCM_MEASURE - auto summary = summarize(point); - auto it = summary.begin(); - auto end = summary.end(); - - if(it >= end) return ""; - - std::string result(""); - result += std::to_string(*it); - ++it; - - while(it < end) { - result += ","; - result += std::to_string(*it); - ++it; - } - return result; -#endif - return ""; - } - - void start(const std::string& point, uint32_t thread) { -#ifdef PCM_MEASURE - int64_t idx = point_index(point); - if(idx < 0) { - std::cerr << "Invalid 'point' given. Ignored!" << std::endl; - return; - } - - states[thread][static_cast(idx)] = pcm::getSystemCounterState(); -#endif - } - - static std::string getHead(const std::string& point) { - return point + "_l2h," + - point + "_l2m," + - point + "_l3h," + - point + "_l3hns," + - point + "_l3m," + - point + "_mc"; - } - -#ifdef PCM_MEASURE - void read_values(uint32_t point_idx, uint32_t thread, pcm::SystemCounterState& start, pcm::SystemCounterState& end) { - collection[thread][point_idx + points.size() * 0] += getL2CacheHits(start, end); - collection[thread][point_idx + points.size() * 1] += getL2CacheMisses(start, end); - collection[thread][point_idx + points.size() * 2] += getL3CacheHits(start, end); - collection[thread][point_idx + points.size() * 3] += getL3CacheHitsNoSnoop(start, end); - collection[thread][point_idx + points.size() * 4] += getL3CacheMisses(start, end); - collection[thread][point_idx + points.size() * 5] += getBytesReadFromMC(start, end); - } -#endif - - void stop(const std::string& point, uint32_t thread) { -#ifdef PCM_MEASURE - auto state = pcm::getSystemCounterState(); - - int64_t idx = point_index(point); - if(idx < 0) { - std::cerr << "Invalid 'point' given. Ignored!" << std::endl; - return; - } - - auto start = states[thread][static_cast(idx)]; - read_values(static_cast(idx), thread, start, state); -#endif - } -}; diff --git a/qdp_project/src/utils/pcm.h b/qdp_project/src/utils/pcm.h deleted file mode 100644 index 91a19e0..0000000 --- a/qdp_project/src/utils/pcm.h +++ /dev/null @@ -1,6 +0,0 @@ -#pragma once -//this file includes all important header from the pcm repository -#include "cpucounters.h" -#include "msr.h" -#include "pci.h" -#include "mutex.h" diff --git a/qdp_project/src/utils/timer_utils.h b/qdp_project/src/utils/timer_utils.h deleted file mode 100644 index b6ec54f..0000000 --- a/qdp_project/src/utils/timer_utils.h +++ /dev/null @@ -1,80 +0,0 @@ -#pragma once - -#include -#include -#include - -#include - -#define THREAD_TIMINGS 1 - - - -struct thread_runtime_timing { - using time_point_t = std::chrono::time_point; - - uint32_t time_points, time_threads; - time_point_t** start_times; - double** time_accumulator; - - thread_runtime_timing(uint32_t timing_points, uint32_t timing_threads, uint32_t memory_node) { -#ifdef THREAD_TIMINGS - time_points = timing_points; - time_threads = timing_threads; - start_times = (time_point_t**) numa_alloc_onnode(timing_points * sizeof(time_point_t*), memory_node); - time_accumulator = (double**) numa_alloc_onnode(timing_points * sizeof(double*), memory_node); - for(uint32_t i = 0; i < timing_points; ++i) { - start_times[i] = (time_point_t*) numa_alloc_onnode(timing_threads * sizeof(time_point_t), memory_node); - time_accumulator[i] = (double*) numa_alloc_onnode(timing_threads * sizeof(double), memory_node); - } -#endif - } - - ~thread_runtime_timing() { -#ifdef THREAD_TIMINGS - for(uint32_t i = 0; i < time_points; ++i) { - numa_free(start_times[i], time_threads * sizeof(time_point_t)); - numa_free(time_accumulator[i], time_threads * sizeof(double)); - } - numa_free(start_times, time_points * sizeof(time_point_t*)); - numa_free(time_accumulator, time_points * sizeof(double*)); -#endif - } - - void reset_accumulator() { -#ifdef THREAD_TIMINGS - for(uint32_t i = 0; i < time_points; ++i){ - for(uint32_t j = 0; j < time_threads; ++j){ - time_accumulator[i][j] = 0.0; - }} -#endif - } - - double summarize_time(uint32_t time_point) { -#ifdef THREAD_TIMINGS - double sum = 0.0; - for(uint32_t i = 0; i < time_threads; ++i) { - sum += time_accumulator[time_point][i]; - } - return sum; -#endif - } - - void stop_timer(uint32_t point_id, uint32_t thread_id) { -#ifdef THREAD_TIMINGS - auto end_time = std::chrono::steady_clock::now(); - auto start_time = start_times[point_id][thread_id]; - - uint64_t time = std::chrono::duration_cast(end_time - start_time).count(); - double seconds = time / (1000.0 * 1000.0 * 1000.0); - time_accumulator[point_id][thread_id] += seconds; -#endif - } - - void start_timer(uint32_t point_id, uint32_t thread_id) { -#ifdef THREAD_TIMINGS - start_times[point_id][thread_id] = std::chrono::steady_clock::now(); -#endif - } - -};