From 6cc49daf893c349d00583c203154f1eb48fa9dc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Wed, 17 Jan 2024 13:45:24 +0100 Subject: [PATCH] remove all unused files and benchmark methods, adapt the MAX-Benchmark to use the cacher, remove manually-set numa configuration and replace it with dynamically adapting to the configured affinity, add two more template-options to the worker that control whether a is cached as well and whether scanb waits on the caching --- qdp_project/CMakeLists.txt | 26 +- qdp_project/bench_all_dimes.sh | 10 - qdp_project/bench_max.sh | 7 +- qdp_project/cmake_all_dimes.sh | 33 -- qdp_project/src/benchmark/DIMES_benchmark.cpp | 240 ----------- .../src/benchmark/DIMES_cores_benchmark.cpp | 260 ------------ qdp_project/src/benchmark/MAX_benchmark.cpp | 81 ++-- qdp_project/src/benchmark/QDP_minimal.h | 147 ------- .../src/benchmark/doubly_filtered_agg.cpp | 149 ------- .../benchmark/filter_aggregate_pipeline.cpp | 184 --------- qdp_project/src/benchmark/latency.cpp | 188 --------- .../src/benchmark/micro_benchmarks.cpp | 271 ------------ .../pipelines/DIMES_scan_filter_pipe.h | 391 ------------------ .../pipelines/MAX_scan_filter_pipe.h | 199 +++++---- .../benchmark/pipelines/scan_filter_pipe.h | 387 ----------------- qdp_project/src/utils/execution_modes.h | 41 +- 16 files changed, 181 insertions(+), 2433 deletions(-) delete mode 100644 qdp_project/bench_all_dimes.sh delete mode 100644 qdp_project/cmake_all_dimes.sh delete mode 100644 qdp_project/src/benchmark/DIMES_benchmark.cpp delete mode 100644 qdp_project/src/benchmark/DIMES_cores_benchmark.cpp delete mode 100644 qdp_project/src/benchmark/QDP_minimal.h delete mode 100644 qdp_project/src/benchmark/doubly_filtered_agg.cpp delete mode 100644 qdp_project/src/benchmark/filter_aggregate_pipeline.cpp delete mode 100644 qdp_project/src/benchmark/latency.cpp delete mode 100644 qdp_project/src/benchmark/micro_benchmarks.cpp delete mode 100644 qdp_project/src/benchmark/pipelines/DIMES_scan_filter_pipe.h delete mode 100644 qdp_project/src/benchmark/pipelines/scan_filter_pipe.h diff --git a/qdp_project/CMakeLists.txt b/qdp_project/CMakeLists.txt index 71c8452..97c1915 100644 --- a/qdp_project/CMakeLists.txt +++ b/qdp_project/CMakeLists.txt @@ -20,12 +20,6 @@ set(SUPPRESS_WARNINGS "-Wno-literal-suffix -Wno-volatile") set(DEBUG_FLAGS "-g3" "-ggdb") set(RELEASE_FLAGS "-O3") -#set pcm location -set(PCM_LOCATION ./thirdParty/pcm) -set(PCM_LINKS -lpcm -L${CMAKE_CURRENT_LIST_DIR}/${PCM_LOCATION}/build/lib) -# pass the in formation about the shared library location to the linker -link_directories(${CMAKE_CURRENT_LIST_DIR}/${PCM_LOCATION}/build/lib) - #set flags used for Release and Debug build type add_compile_options( "$<$:${RELEASE_FLAGS}>" @@ -71,34 +65,18 @@ add_definitions(-DTHREAD_GROUP_MULTIPLIER=${THREAD_FACTOR}) eval(PINNING "cpu;numa" "cpu") add_definitions(-DPINNING=$) -eval(PCM_M "true;false" "false") -add_definitions(-DPCM_M=$) -add_definitions(${PCM_LINKS}) - # build directory set(CMAKE_BINARY_DIR "../bin") #relative to inside build set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}) - - # include directories include_directories(src/utils) include_directories(src/algorithm) include_directories(src/algorithm/operators) -include_directories(thirdParty/pcm/src) # link libraries -link_libraries(-lnuma -lpthread) +link_libraries(-lnuma -lpthread -l:libdml.a) # Add targets only below # specify build targets -add_executable(FilterAggregatePipeline src/benchmark/filter_aggregate_pipeline.cpp) -add_executable(DoublyFiltered src/benchmark/doubly_filtered_agg.cpp) -add_executable(DIMESBench src/benchmark/DIMES_benchmark.cpp) -add_executable(DIMESCoreBench src/benchmark/DIMES_cores_benchmark.cpp) -add_executable(MicroBench src/benchmark/micro_benchmarks.cpp) -add_executable(MAXBench src/benchmark/MAX_benchmark.cpp - src/benchmark/QDP_minimal.h) -target_link_libraries(MAXBench libpcm.so) -add_executable(LatencyBench src/benchmark/latency.cpp) - +add_executable(MAXBench src/benchmark/MAX_benchmark.cpp) \ No newline at end of file diff --git a/qdp_project/bench_all_dimes.sh b/qdp_project/bench_all_dimes.sh deleted file mode 100644 index 9c05e62..0000000 --- a/qdp_project/bench_all_dimes.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!bin/bash - -../bin/DIMESBench_gus -../bin/DIMESBench_guc -../bin/DIMESBench_gls -../bin/DIMESBench_glc -../bin/DIMESBench_lus -../bin/DIMESBench_luc -../bin/DIMESBench_lls -../bin/DIMESBench_llc \ No newline at end of file diff --git a/qdp_project/bench_max.sh b/qdp_project/bench_max.sh index fb08bd8..b7e0168 100644 --- a/qdp_project/bench_max.sh +++ b/qdp_project/bench_max.sh @@ -3,13 +3,8 @@ current_date_time=$(date) echo "Benchmark start at: $current_date_time" -../bin/MAXBench_gcc -cp ../results/max_q-complex_bm-global_bl-unlimited_tc-121MiB-2MiB.csv ../results/max_q-complex_bm-global_bl-unlimited_tc-121MiB-2MiB_pin_c_HBM.csv - -../bin/MAXBench_gcn - -cp ../results/max_q-complex_bm-global_bl-unlimited_tc-121MiB-2MiB.csv ../results/max_q-complex_bm-global_bl-unlimited_tc-121MiB-2MiB_pin_n_HBM.csv +../bin/MAXBench current_date_time=$(date) echo "Benchmark end at: $current_date_time" \ No newline at end of file diff --git a/qdp_project/cmake_all_dimes.sh b/qdp_project/cmake_all_dimes.sh deleted file mode 100644 index 9ce3a96..0000000 --- a/qdp_project/cmake_all_dimes.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!bin/bash - -cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=global -DBUFFER_LIMIT=unlimited -DQUERY=simple .. -cmake --build . --target DIMESBench -mv ../bin/DIMESBench ../bin/DIMESBench_gus - -cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=global -DBUFFER_LIMIT=unlimited -DQUERY=complex .. -cmake --build . --target DIMESBench -mv ../bin/DIMESBench ../bin/DIMESBench_guc - -cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=global -DBUFFER_LIMIT=limited -DQUERY=simple .. -cmake --build . --target DIMESBench -mv ../bin/DIMESBench ../bin/DIMESBench_gls - -cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=global -DBUFFER_LIMIT=limited -DQUERY=complex .. -cmake --build . --target DIMESBench -mv ../bin/DIMESBench ../bin/DIMESBench_glc - -cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=local -DBUFFER_LIMIT=unlimited -DQUERY=simple .. -cmake --build . --target DIMESBench -mv ../bin/DIMESBench ../bin/DIMESBench_lus - -cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=local -DBUFFER_LIMIT=unlimited -DQUERY=complex .. -cmake --build . --target DIMESBench -mv ../bin/DIMESBench ../bin/DIMESBench_luc - -cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=local -DBUFFER_LIMIT=limited -DQUERY=simple .. -cmake --build . --target DIMESBench -mv ../bin/DIMESBench ../bin/DIMESBench_lls - -cmake -DCMAKE_BUILD_TYPE=Release -DWSUPPRESS=suppress -DBARRIER_MODE=local -DBUFFER_LIMIT=limited -DQUERY=complex .. -cmake --build . --target DIMESBench -mv ../bin/DIMESBench ../bin/DIMESBench_llc \ No newline at end of file diff --git a/qdp_project/src/benchmark/DIMES_benchmark.cpp b/qdp_project/src/benchmark/DIMES_benchmark.cpp deleted file mode 100644 index 2ca9705..0000000 --- a/qdp_project/src/benchmark/DIMES_benchmark.cpp +++ /dev/null @@ -1,240 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#ifndef THREAD_GROUP_MULTIPLIER -#define THREAD_GROUP_MULTIPLIER 8 -#endif - -#ifndef QUERY -#define QUERY 1 -#endif - -#ifndef BARRIER_MODE -#define BARRIER_MODE "global" -#endif - -#ifndef BUFFER_LIMIT -#define BUFFER_LIMIT 1 -#endif - -#include "const.h" - -#include "file_output.h" -#include "array_utils.h" -#include "timer_utils.h" -#include "barrier_utils.h" -#include "cpu_set_utils.h" -#include "iterable_range.h" -#include "memory_literals.h" -#include "pipelines/DIMES_scan_filter_pipe.h" - -#include "aggregation.h" -#include "filter.h" - -using base_t = uint64_t; - -base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) { - base_t sum = 0; - for(int i = 0; i < row_size / sizeof(base_t); ++i) { - sum += (row_A[i] < compare_value) * row_B[i]; - } - return sum; -} - -base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t* row_A, base_t* row_B, size_t row_size) { - base_t sum = 0; - for(int i = 0; i < row_size / sizeof(base_t); ++i) { - sum += (row_A[i] < compare_value_a && row_B[i] < compare_value_b) * row_B[i]; - } - return sum; -} - -int main(int argc, char** argv) { - // set constants - const size_t workload_b = 4_GiB; - const base_t compare_value_a = 50; - const base_t compare_value_b = 42; - constexpr bool simple_query = (QUERY == 1); - - const size_t thread_count = 6; - std::ofstream out_file; - out_file.open("../results/dimes_" - "q-" + (std::string)(simple_query == true ? "simple" : "complex") + - "_bm-" + (std::string) BARRIER_MODE + - "_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") + - "_tc-" + std::to_string(thread_count * THREAD_GROUP_MULTIPLIER) + ".csv"); - - // set benchmark parameter - Linear_Int_Range run("run"); - Exp_Int_Range chunk_size("chunk_size"); - Range mode("mode"); - - uint32_t remote_node = 3; - uint32_t remote_node_2 = 2; - uint32_t local_node = 10; - - print_to_file(out_file, generateHead(run, chunk_size, mode), "thread_group", "time", - #ifdef THREAD_TIMINGS - "scan_a", "scan_b", "aggr_j", - #endif - #ifdef BARRIER_TIMINGS - "wait_scan_a", "wait_scan_b", "wait_aggr_j", - #endif - "result"); - - - /*** alloc data and buffers ************************************************/ - base_t* data_a = (base_t*) numa_alloc_onnode(workload_b, remote_node); - base_t* data_b = (base_t*) numa_alloc_onnode(workload_b, remote_node_2); - base_t* data_a_hbm = (base_t*) numa_alloc_onnode(workload_b, local_node); - base_t* data_b_hbm = (base_t*) numa_alloc_onnode(workload_b, local_node); - fill_mt(data_a, workload_b, 0, 100, 42); - fill_mt(data_b, workload_b, 0, 100, 420); - std::memcpy(data_a_hbm, data_a, workload_b); - std::memcpy(data_b_hbm, data_b, workload_b); - base_t* results = (base_t*) numa_alloc_onnode(THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t), remote_node); - - std::ofstream check_file; - check_file.open("../results/dimes_" - "q-" + (std::string)(simple_query == true ? "simple" : "complex") + - "_bm-" + (std::string) BARRIER_MODE + - "_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") + - "_tc-" + std::to_string(thread_count * THREAD_GROUP_MULTIPLIER) + ".checksum"); - if constexpr (QUERY == 1) { - //calculate simple checksum if QUERY == 1 -> simple query is applied - check_file << sum_check(compare_value_a, data_a, data_b, workload_b); - } else { - check_file << sum_check_complex(compare_value_a, compare_value_b, data_a, data_b, workload_b); - } - check_file.close(); - - std::string iteration("init"); - Query_Wrapper* qw = nullptr; - while(iteration != "false") { - - std::promise p; - std::shared_future ready_future(p.get_future()); - - if(iteration != "run") { - - if(qw != nullptr) { - delete qw; - } - - std::cout << "Changing to mode " << mode.current << " chunksize " << chunk_size.current << std::endl; - - uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A); - uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B); - uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J); - switch(mode.current) { - case NewPMode::DRAM_base: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size.current, data_a, data_b, results, local_node, remote_node, - tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, true); - break; - case NewPMode::HBM_base: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size.current, data_a_hbm, data_b_hbm, results, local_node, remote_node, - tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, true); - break; - case NewPMode::Mixed_base: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size.current, data_a, data_b_hbm, results, local_node, remote_node, - tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, true); - break; - case NewPMode::Prefetch: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size.current, data_a, data_b, results, local_node, remote_node, - tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, false); - break; - } - } - - qw->ready_future = &ready_future; - qw->clear_buffers(); - - auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_a(gid, gcnt, tid); }; - auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_b(gid, gcnt, tid); }; - auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->aggr_j(gid, gcnt, tid); }; - - std::vector filter_pool; - std::vector copy_pool; - std::vector agg_pool; - - uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A); - uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B); - uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J); - - int thread_id = 0; - // std::vector> pinning_ranges {std::make_pair(28, 42), std::make_pair(84, 98)}; // node 2 heacboehm II - //std::vector> pinning_ranges {std::make_pair(32, 48), std::make_pair(96, 112)}; // node 2 heacboehm - //std::vector> pinning_ranges {std::make_pair(24, 36), std::make_pair(120, 132)}; // node 2 sapphire rapids - //std::vector> pinning_ranges {std::make_pair(24, 48)}; // node 2+3 sapphire rapids - std::vector> pinning_ranges {std::make_pair(0, 48)}; // node 0-3 sapphire rapids - - for(uint32_t gid = 0; gid < THREAD_GROUP_MULTIPLIER; ++gid) { - - for(uint32_t tid = 0; tid < tc_filter; ++tid) { - filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); - pin_thread_in_range(filter_pool.back(), thread_id++, pinning_ranges); - } - - // if tc_copy == 0 this loop is skipped - for(uint32_t tid = 0; tid < tc_copy; ++tid) { - copy_pool.emplace_back(copy_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); - pin_thread_in_range(copy_pool.back(), thread_id++, pinning_ranges); - } - - for(uint32_t tid = 0; tid < tc_agg; ++tid) { - agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); - pin_thread_in_range(agg_pool.back(), thread_id++, pinning_ranges); - } - } - - auto start = std::chrono::steady_clock::now(); - p.set_value(); - - for(std::thread& t : filter_pool) { t.join(); } - for(std::thread& t : copy_pool) { t.join(); } - for(std::thread& t : agg_pool) { t.join(); } - - Aggregation::apply(results, results, sizeof(base_t) * tc_agg * THREAD_GROUP_MULTIPLIER); - auto end = std::chrono::steady_clock::now(); - - constexpr double nanos_per_second = ((double)1000) * 1000 * 1000; - uint64_t nanos = std::chrono::duration_cast(end - start).count(); - double seconds = (double)(nanos) / nanos_per_second; - - - print_to_file(out_file, run, chunk_size, new_mode_manager::string(mode.current), THREAD_GROUP_MULTIPLIER, seconds, - #ifdef THREAD_TIMINGS - qw->trt->summarize_time(0), qw->trt->summarize_time(1), qw->trt->summarize_time(2), - #endif - #ifdef BARRIER_TIMINGS - qw->bt->summarize_time(0), qw->bt->summarize_time(1), qw->bt->summarize_time(2), - #endif - results[0]); - - - iteration = IterateOnce(run, chunk_size, mode); - } - - numa_free(data_b_hbm, workload_b); - numa_free(data_a, workload_b); - numa_free(data_b, workload_b); - - numa_free(results, THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t)); - -} \ No newline at end of file diff --git a/qdp_project/src/benchmark/DIMES_cores_benchmark.cpp b/qdp_project/src/benchmark/DIMES_cores_benchmark.cpp deleted file mode 100644 index 93c6b1b..0000000 --- a/qdp_project/src/benchmark/DIMES_cores_benchmark.cpp +++ /dev/null @@ -1,260 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#ifndef QUERY -#define QUERY 1 -#endif - -#ifndef BARRIER_MODE -#define BARRIER_MODE "global" -#endif - -#define BUFFER_LIMIT 0 - -#include "const.h" - -#include "file_output.h" -#include "array_utils.h" -#include "timer_utils.h" -#include "barrier_utils.h" -#include "cpu_set_utils.h" -#include "iterable_range.h" -#include "memory_literals.h" -#include "pipelines/DIMES_scan_filter_pipe.h" - -#include "aggregation.h" -#include "filter.h" - -using base_t = uint64_t; - -base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) { - base_t sum = 0; - for(int i = 0; i < row_size / sizeof(base_t); ++i) { - sum += (row_A[i] < compare_value) * row_B[i]; - } - return sum; -} - -base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t* row_A, base_t* row_B, size_t row_size) { - base_t sum = 0; - for(int i = 0; i < row_size / sizeof(base_t); ++i) { - sum += (row_A[i] < compare_value_a && row_B[i] < compare_value_b) * row_B[i]; - } - return sum; -} - - -int main(int argc, char** argv) { - // set constants - const size_t workload_b = 4_GiB; - const size_t chunk_size = 2_MiB; - const base_t compare_value_a = 50; - const base_t compare_value_b = 42; - constexpr bool simple_query = (QUERY == 1); - - - std::ofstream out_file; - out_file.open("../results/dimes_cores_" - "q-" + (std::string)(simple_query == true ? "simple" : "complex") + - "_bm-" + (std::string) BARRIER_MODE + - "_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") + - ".csv"); - - // set benchmark parameter - Linear_Int_Range run("run"); - - Exp_Int_Range scan_a_thread("scan_a_tc"); - Exp_Int_Range scan_b_thread("scan_b_tc"); - Exp_Int_Range aggr_j_thread("aggr_j_tc"); - Linear_Int_Range thread_group_count("thread_group_c"); - Range mode("mode"); - - uint32_t remote_node = 1; - uint32_t remote_node_2 = 0;//on heacboehm II: node 0 is two hops away from node 2 -> prefetching is more beneficial - uint32_t local_node = 2; - - print_to_file(out_file, generateHead(run, thread_group_count, mode, scan_a_thread, scan_b_thread, aggr_j_thread), - "time", - #ifdef THREAD_TIMINGS - "scan_a", "scan_b", "aggr_j", - #endif - #ifdef BARRIER_TIMINGS - "wait_scan_a", "wait_scan_b", "wait_aggr_j", - #endif - "result"); - - - /*** alloc data and buffers ************************************************/ - base_t* data_a = (base_t*) numa_alloc_onnode(workload_b, remote_node); - base_t* data_b = (base_t*) numa_alloc_onnode(workload_b, remote_node_2); - base_t* data_a_hbm = (base_t*) numa_alloc_onnode(workload_b, local_node); - base_t* data_b_hbm = (base_t*) numa_alloc_onnode(workload_b, local_node); - fill_mt(data_a, workload_b, 0, 100, 42); - fill_mt(data_b, workload_b, 0, 100, 420); - std::memcpy(data_a_hbm, data_a, workload_b); - std::memcpy(data_b_hbm, data_b, workload_b); - base_t* results = (base_t*) numa_alloc_onnode(thread_group_count.max * aggr_j_thread.max * sizeof(base_t), remote_node); - - std::ofstream check_file; - check_file.open("../results/dimes_cores_" - "q-" + (std::string)(simple_query == true ? "simple" : "complex") + - "_bm-" + (std::string) BARRIER_MODE + - "_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") + - ".checksum"); - if constexpr (QUERY == 1) { - //calculate simple checksum if QUERY == 1 -> simple query is applied - check_file << sum_check(compare_value_a, data_a, data_b, workload_b); - } else { - check_file << sum_check_complex(compare_value_a, compare_value_b, data_a, data_b, workload_b); - } - check_file.close(); - - std::string iteration("init"); - Query_Wrapper* qw = nullptr; - while(iteration != "false") { - - std::promise p; - std::shared_future ready_future(p.get_future()); - - // skipping iteration through scan_b_thread while not used - while(simple_query && mode.current != NewPMode::Prefetch && scan_b_thread.current != 1) { - iteration = IterateOnce(run, thread_group_count, mode, scan_a_thread, scan_b_thread, aggr_j_thread); - } - - if(iteration != "run") { - std::cout << "Changing to mode " << mode.current - << " thread_group_count " << thread_group_count.current - << " thread_ratio " << scan_a_thread.current <<":"<< scan_b_thread.current <<":"<< aggr_j_thread.current - << std::endl; - - if(qw != nullptr) { - if (iteration == thread_group_count.label) { - - } else { - delete qw; - - uint32_t sat = scan_a_thread.current; - uint32_t sbt = simple_query && mode.current != NewPMode::Prefetch ? 0 : scan_b_thread.current; - uint32_t ajt = aggr_j_thread.current; - - switch(mode.current) { - case NewPMode::DRAM_base: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size, data_a, data_b, results, local_node, remote_node, - sat, sbt, ajt, mode.current, thread_group_count.current, (base_t) 50, (base_t) 42, true); - break; - case NewPMode::HBM_base: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size, data_a_hbm, data_b_hbm, results, local_node, remote_node, - sat, sbt, ajt, mode.current, thread_group_count.current, (base_t) 50, (base_t) 42, true); - break; - case NewPMode::Mixed_base: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size, data_a, data_b_hbm, results, local_node, remote_node, - sat, sbt, ajt, mode.current, thread_group_count.current, (base_t) 50, (base_t) 42, true); - break; - case NewPMode::Prefetch: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size, data_a, data_b, results, local_node, remote_node, - sat, sbt, ajt, mode.current, thread_group_count.current, (base_t) 50, (base_t) 42, false); - break; - } - } - } - } - - qw->ready_future = &ready_future; - qw->clear_buffers(); - - auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_a(gid, gcnt, tid); }; - auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_b(gid, gcnt, tid); }; - auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->aggr_j(gid, gcnt, tid); }; - - std::vector filter_pool; - std::vector copy_pool; - std::vector agg_pool; - - uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A); - uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B); - uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J); - - int thread_id = 0; - // std::vector> pinning_ranges {std::make_pair(28, 42), std::make_pair(84, 98)}; // node 2 heacboehm II - std::vector> pinning_ranges {std::make_pair(32, 48), std::make_pair(96, 112)}; // node 2 heacboehm - - for(uint32_t gid = 0; gid < thread_group_count.current; ++gid) { - - for(uint32_t tid = 0; tid < tc_filter; ++tid) { - filter_pool.emplace_back(filter_lambda, gid, thread_group_count.current, tid); - pin_thread_in_range(filter_pool.back(), thread_id++, pinning_ranges); - } - - // if tc_copy == 0 this loop is skipped - for(uint32_t tid = 0; tid < tc_copy; ++tid) { - copy_pool.emplace_back(copy_lambda, gid, thread_group_count.current, tid); - pin_thread_in_range(copy_pool.back(), thread_id++, pinning_ranges); - } - - for(uint32_t tid = 0; tid < tc_agg; ++tid) { - agg_pool.emplace_back(aggregation_lambda, gid, thread_group_count.current, tid); - pin_thread_in_range(agg_pool.back(), thread_id++, pinning_ranges); - } - } - - auto start = std::chrono::steady_clock::now(); - p.set_value(); - - for(std::thread& t : filter_pool) { t.join(); } - for(std::thread& t : copy_pool) { t.join(); } - for(std::thread& t : agg_pool) { t.join(); } - - Aggregation::apply(results, results, sizeof(base_t) * tc_agg * thread_group_count.current); - auto end = std::chrono::steady_clock::now(); - - constexpr double nanos_per_second = ((double)1000) * 1000 * 1000; - uint64_t nanos = std::chrono::duration_cast(end - start).count(); - double seconds = (double)(nanos) / nanos_per_second; - -print_to_file(out_file, generateHead(run, thread_group_count, mode, scan_a_thread, scan_b_thread, aggr_j_thread), - "time", - #ifdef THREAD_TIMINGS - "scan_a", "scan_b", "aggr_j", - #endif - #ifdef BARRIER_TIMINGS - "wait_scan_a", "wait_scan_b", "wait_aggr_j", - #endif - "result"); - - print_to_file(out_file, run, thread_group_count.current, new_mode_manager::string(mode.current), scan_a_thread, - (simple_query && mode.current != NewPMode::Prefetch ? 0 : scan_b_thread.current), - aggr_j_thread, seconds, - #ifdef THREAD_TIMINGS - qw->trt->summarize_time(0), qw->trt->summarize_time(1), qw->trt->summarize_time(2), - #endif - #ifdef BARRIER_TIMINGS - qw->bt->summarize_time(0), qw->bt->summarize_time(1), qw->bt->summarize_time(2), - #endif - results[0]); - - iteration = IterateOnce(run, thread_group_count, mode, scan_a_thread, scan_b_thread, aggr_j_thread); - } - - numa_free(data_b_hbm, workload_b); - numa_free(data_a, workload_b); - numa_free(data_b, workload_b); - - numa_free(results, thread_group_count.max * aggr_j_thread.max * sizeof(base_t)); - -} \ No newline at end of file diff --git a/qdp_project/src/benchmark/MAX_benchmark.cpp b/qdp_project/src/benchmark/MAX_benchmark.cpp index fb50f5a..0414e29 100644 --- a/qdp_project/src/benchmark/MAX_benchmark.cpp +++ b/qdp_project/src/benchmark/MAX_benchmark.cpp @@ -92,33 +92,36 @@ int main(int argc, char** argv) { #endif // set constants - const size_t workload_b = 2_GiB; - const base_t compare_value_a = 50; - const base_t compare_value_b = 42; + constexpr size_t workload_b = 2_GiB; + constexpr base_t compare_value_a = 50; + constexpr base_t compare_value_b = 42; constexpr bool simple_query = (QUERY == 1); + constexpr bool cache_a = false; + constexpr bool wait_b = false; + + constexpr size_t chunk_min = 1_MiB; + constexpr size_t chunk_max = 8_MiB + 1; + constexpr size_t chunk_incr = 128_kiB; + + // thread count is 12 here but as the default measurement uses 6 + // we must restrict the core assignment of these 12 threads to + // 6 physical cpu cores on the executing node + + constexpr size_t thread_count = 12; - const size_t thread_count = 6; std::ofstream out_file; + out_file.open("../results/max_" "q-" + (std::string)(simple_query == true ? "simple" : "complex") + "_bm-" + (std::string) BARRIER_MODE + "_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") + - "_tc-" + std::to_string(thread_count * THREAD_GROUP_MULTIPLIER) + "1MiB-2MiB.csv"); + "_tc-" + std::to_string(thread_count) + "1MiB-2MiB.csv"); // set benchmark parameter Linear_Int_Range run("run"); - constexpr size_t chunk_min = 1_MiB; constexpr size_t chunk_max = 8_MiB + 1; constexpr size_t chunk_incr = 128_kiB; Linear_Int_Range chunk_size("chunk_size"); Range mode("mode"); - uint32_t remote_node = 2; - uint32_t remote_node_2 = 2; - uint32_t local_node = 10; - - /*uint32_t remote_node = 6; - uint32_t remote_node_2 = 6; - uint32_t local_node = 2;*/ - print_to_file(out_file, generateHead(run, chunk_size, mode), "thread_group", "time", #ifdef THREAD_TIMINGS "scan_a", "scan_b", "aggr_j", @@ -133,24 +136,22 @@ int main(int argc, char** argv) { #endif "result"); - /*** alloc data and buffers ************************************************/ - base_t* data_a = (base_t*) numa_alloc_onnode(workload_b, remote_node); - base_t* data_b = (base_t*) numa_alloc_onnode(workload_b, remote_node_2); - base_t* data_a_hbm = (base_t*) numa_alloc_onnode(workload_b, local_node); - base_t* data_b_hbm = (base_t*) numa_alloc_onnode(workload_b, local_node); + + base_t* data_a = (base_t*) numa_alloc_local(workload_b); + base_t* data_b = (base_t*) numa_alloc_local(workload_b); + base_t* results = (base_t*) numa_alloc_local(thread_count * sizeof(base_t)); + fill_mt(data_a, workload_b, 0, 100, 42); fill_mt(data_b, workload_b, 0, 100, 420); - std::memcpy(data_a_hbm, data_a, workload_b); - std::memcpy(data_b_hbm, data_b, workload_b); - base_t* results = (base_t*) numa_alloc_onnode(THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t), remote_node); + std::ofstream check_file; check_file.open("../results/max_" "q-" + (std::string)(simple_query == true ? "simple" : "complex") + "_bm-" + (std::string) BARRIER_MODE + "_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") + - "_tc-" + std::to_string(thread_count * THREAD_GROUP_MULTIPLIER) + ".checksum"); + "_tc-" + std::to_string(thread_count) + ".checksum"); if constexpr (QUERY == 1) { //calculate simple checksum if QUERY == 1 -> simple query is applied check_file << sum_check(compare_value_a, data_a, data_b, workload_b); @@ -160,37 +161,34 @@ int main(int argc, char** argv) { check_file.close(); std::string iteration("init"); - Query_Wrapper* qw = nullptr; + Query_Wrapper* qw = nullptr; + while(iteration != "false") { std::promise p; std::shared_future ready_future(p.get_future()); if(iteration != "run") { - if(qw != nullptr) { delete qw; } + uint8_t tc_filter = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_A); uint8_t tc_copy = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, SCAN_B); uint8_t tc_agg = new_mode_manager::thread_count(simple_query ? SIMPLE_Q : COMPLEX_Q, mode.current, AGGR_J); + switch(mode.current) { - case NewPMode::DRAM_base: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size.current, data_a, data_b, results, local_node, remote_node, - tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, true); - break; - case NewPMode::HBM_base: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size.current, data_a_hbm, data_b_hbm, results, local_node, remote_node, - tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, true); - break; - case NewPMode::Mixed_base: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size.current, data_a, data_b_hbm, results, local_node, remote_node, - tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, true); - break; - case NewPMode::Prefetch: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size.current, data_a, data_b, results, local_node, remote_node, - tc_filter, tc_copy, tc_agg, mode.current, THREAD_GROUP_MULTIPLIER, (base_t) 50, (base_t) 42, false); + case NewPMode::Prefetch: + qw = new Query_Wrapper( + &ready_future, workload_b, chunk_size.current, + data_a, data_b, results, tc_filter, tc_copy, tc_agg, + mode.current, 50, 42 + ); + break; + default: + std::cerr << "[x] Unsupported Execution Mode by this build." << std::endl; + exit(-1); } } @@ -280,10 +278,7 @@ int main(int argc, char** argv) { iteration = IterateOnce(run, chunk_size, mode); } - numa_free(data_b_hbm, workload_b); numa_free(data_a, workload_b); numa_free(data_b, workload_b); - numa_free(results, THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t)); - } \ No newline at end of file diff --git a/qdp_project/src/benchmark/QDP_minimal.h b/qdp_project/src/benchmark/QDP_minimal.h deleted file mode 100644 index 007d0d9..0000000 --- a/qdp_project/src/benchmark/QDP_minimal.h +++ /dev/null @@ -1,147 +0,0 @@ -#include -#include -#include -#include -#include - -#include "const.h" -#include "array_utils.h" -#include "cpu_set_utils.h" -#include "iterable_range.h" -#include "memory_literals.h" -#include "pipelines/MAX_scan_filter_pipe.h" -#include "aggregation.h" - -using base_t = uint64_t; - -// calculate the checksum for the simple query -base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) { - base_t sum = 0; - for(int i = 0; i < row_size / sizeof(base_t); ++i) { - sum += (row_A[i] < compare_value) * row_B[i]; - } - return sum; -} - -// calculate the checksum for the complex query -base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t* row_A, base_t* row_B, size_t row_size) { - base_t sum = 0; - for(int i = 0; i < row_size / sizeof(base_t); ++i) { - sum += (row_A[i] < compare_value_a && row_B[i] < compare_value_b) * row_B[i]; - } - return sum; -} - -class QDP_minimal { -private: - // values used for comparisons in the filter operations - const base_t compare_value_a = 50; - const base_t compare_value_b = 42; - // define, which numa nodes to use - // Xeon Max: node 0-7 DRAM and 8-15 HBM - // if the nodes are changed, the pinning ranges in run should be adjusted accordingly too - uint32_t dram_node = 2; - uint32_t dram_node_2 = 2; - uint32_t hbm_node = 10; - -public: - // results of running qdp, set by run() - base_t result; - base_t checksum; - double exec_time; - - // run qdp - void run(const size_t workload_b, size_t chunk_size, uint8_t tc_filter, uint8_t tc_copy, uint8_t tc_agg){ - // allocate data - base_t* data_a = (base_t*) numa_alloc_onnode(workload_b, dram_node); - base_t* data_b = (base_t*) numa_alloc_onnode(workload_b, dram_node_2); - base_t* results = (base_t*) numa_alloc_onnode(THREAD_GROUP_MULTIPLIER * tc_agg * sizeof(base_t), dram_node); - - // fill the memory with acutal values - fill_mt(data_a, workload_b, 0, 100, 42); - fill_mt(data_b, workload_b, 0, 100, 420); - - // run qdp - run(data_a, data_b, results, workload_b, chunk_size, tc_filter, tc_copy, tc_agg); - - // free the allocated memory - numa_free(data_a, workload_b); - numa_free(data_b, workload_b); - numa_free(results, THREAD_GROUP_MULTIPLIER * tc_agg * sizeof(base_t)); - } - - // run qdp, work on provided memory pointers to enable memory reuse across multiple runs - void run(base_t* data_a, base_t* data_b, base_t* results, const size_t workload_b, size_t chunk_size, uint8_t tc_filter, uint8_t tc_copy, uint8_t tc_agg){ - constexpr bool simple_query = (QUERY == 1); - // sync objects - std::promise p; - std::shared_future ready_future(p.get_future()); - - // create the query wrapper, that is managing the to-be-used threads - Query_Wrapper* qw = new Query_Wrapper(&ready_future, workload_b, chunk_size, data_a, data_b, results, hbm_node, dram_node, - tc_filter, tc_copy, tc_agg, NewPMode::Prefetch, THREAD_GROUP_MULTIPLIER, compare_value_a, compare_value_b, false); - - // clear buffers to make sure, that they have been written and are fully mapped before running qdp - qw->clear_buffers(); - - // creating lambdas for executing filter (scan_a), copy (scan_b), and aggregation tasks on the query wrapper - // passing gid (group id), gcnt (group count) and tid (thread id) - auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_a(gid, gcnt, tid); }; - auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_b(gid, gcnt, tid); }; - auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->aggr_j(gid, gcnt, tid); }; - - // creating thread pools, holding all used threads - std::vector filter_pool; - std::vector copy_pool; - std::vector agg_pool; - - int thread_id = 0; - // cpus on node 2 (for sapphire rapids), that the threads should be executed on - std::vector> pinning_ranges {std::make_pair(24, 36), std::make_pair(120, 132)}; - - // create all threads for all thread groups and for every task (copy, filter, aggregation), according their specific theadcount - for(uint32_t gid = 0; gid < THREAD_GROUP_MULTIPLIER; ++gid) { - for(uint32_t tid = 0; tid < tc_filter; ++tid) { - filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); - pin_thread_in_range(filter_pool.back(), thread_id++, pinning_ranges); - } - for(uint32_t tid = 0; tid < tc_copy; ++tid) { - copy_pool.emplace_back(copy_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); - pin_thread_in_range(copy_pool.back(), thread_id++, pinning_ranges); - } - for(uint32_t tid = 0; tid < tc_agg; ++tid) { - agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); - pin_thread_in_range(agg_pool.back(), thread_id++, pinning_ranges); - } - } - - // start the clock - auto start = std::chrono::steady_clock::now(); - // set value to the promise, to signal the waiting threads, that they can start now - p.set_value(); - - // wait for all thread to be finished - for(std::thread& t : filter_pool) { t.join(); } - for(std::thread& t : copy_pool) { t.join(); } - for(std::thread& t : agg_pool) { t.join(); } - - // sum up the results of all the aggregation threads to get a final result - Aggregation::apply(&result, results, sizeof(base_t) * tc_agg * THREAD_GROUP_MULTIPLIER); - auto end = std::chrono::steady_clock::now(); - - // get the overall execution time in seconds - constexpr double nanos_per_second = ((double)1000) * 1000 * 1000; - uint64_t nanos = std::chrono::duration_cast(end - start).count(); - exec_time = (double)(nanos) / nanos_per_second; - - // calculate the checksum according to the used query - if constexpr (QUERY == 1) { - // QUERY == 1 -> simple query is applied - checksum = sum_check(compare_value_a, data_a, data_b, workload_b); - } else { - checksum = sum_check_complex(compare_value_a, compare_value_b, data_a, data_b, workload_b); - } - - delete qw; - } -}; diff --git a/qdp_project/src/benchmark/doubly_filtered_agg.cpp b/qdp_project/src/benchmark/doubly_filtered_agg.cpp deleted file mode 100644 index eaee93d..0000000 --- a/qdp_project/src/benchmark/doubly_filtered_agg.cpp +++ /dev/null @@ -1,149 +0,0 @@ - -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "aggregation.h" -#include "array_utils.h" -#include "cpu_set_utils.h" -#include "file_output.h" -#include "iterable_range.h" -#include "memory_literals.h" -#include "pipelines/scan_filter_pipe.h" - -int main () { - - using base_t = uint64_t; - - - const size_t workload = 2_GiB; - const char filename[256] = "../results/doubly_filtered_results_stronger_affinity_.csv"; - const uint32_t numa_local = 2; - const uint32_t numa_remote = 3; - - - Linear_Int_Range thread_group("thread_groups"); - Exp_Int_Range thread_count_filter("thread_cnt_filter"); - Exp_Int_Range thread_count_filter_copy("thread_cnt_filter_copy"); - Exp_Int_Range thread_count_aggregation("thread_cnt_agg"); - Linear_Int_Range run("run"); - Range mode("mode"); - Exp_Int_Range chunk_size("chunk_size"); - - std::ofstream out_file; - out_file.open(filename); - print_to_file(out_file, generateHead(run, chunk_size, mode, thread_count_filter, thread_count_filter_copy, - thread_count_aggregation, thread_group), "time", "scan_a", "scan_b", "aggr_j", "wait_aggr", "results"); - - base_t* data_a = (base_t*) numa_alloc_onnode(workload, numa_remote); - base_t* data_b = (base_t*) numa_alloc_onnode(workload, numa_remote); - base_t* data_b_hbm = (base_t*) numa_alloc_onnode(workload, numa_local); - fill_mt(data_a, workload, 0, 100, 42); - fill_mt(data_b, workload, 0, 100, 420); - std::memcpy(data_b_hbm, data_b, workload); - base_t* result = (base_t*) numa_alloc_onnode(thread_group.max * thread_count_aggregation.max * sizeof(base_t), - numa_remote); - - std::string iteration("init"); - Query_Wrapper* qw = nullptr; - - while(iteration != "false") { - - std::promise p; - std::shared_future ready_future(p.get_future()); - - if(iteration != "run") { - if(qw != nullptr) { - delete qw; - } - - switch(mode.current) { - case PMode::expl_copy: - qw = new Query_Wrapper(&ready_future, workload, chunk_size.current, data_a, data_b, result, numa_local, numa_remote, - thread_count_filter.current, thread_count_filter_copy.current, thread_count_aggregation.current, - mode.current, thread_group.current, (base_t) 50, (base_t) 42, false); - break; - case PMode::no_copy: - qw = new Query_Wrapper(&ready_future, workload, chunk_size.current, data_a, data_b, result, numa_local, numa_remote, - thread_count_filter.current, thread_count_filter_copy.current, thread_count_aggregation.current, - mode.current, thread_group.current, (base_t) 50, (base_t) 42, true); - break; - case PMode::hbm: - qw = new Query_Wrapper(&ready_future, workload, chunk_size.current, data_a, data_b_hbm, result, numa_local, numa_remote, - thread_count_filter.current, thread_count_filter_copy.current, thread_count_aggregation.current, - mode.current, thread_group.current, (base_t) 50, (base_t) 42, true); - break; - } - } - qw->ready_future = &ready_future; - qw->clear_buffers(); - - - // todo create threads depending on mode - std::vector thread_pool; - auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_a(gid, gcnt, tid); }; - auto filter_copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_b(gid, gcnt, tid); }; - auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->aggr_j(gid, gcnt, tid); }; - - - /* Intel Xeon Gold 6130 // todo implement different for 5120 -> fewer cpus - node 0 cpus: 0-15 64- 79 - node 1 cpus: 16-31 80- 95 - node 2 cpus: 32-47 96-111 - node 3 cpus: 48-63 112-127 - */ - int thread_id = 0; - std::vector> range {std::make_pair(0, 16), std::make_pair(64, 80)}; - for(uint32_t gid = 0; gid < thread_group.current; ++gid) { - - - for(uint32_t tid = 0; tid < thread_count_filter.current; ++tid) { - thread_pool.emplace_back(filter_lambda, gid, thread_group.current, tid); - pin_thread_in_range(thread_pool.back(), thread_id++, range); - } - - for(uint32_t tid = 0; tid < thread_count_filter_copy.current; ++tid) { - thread_pool.emplace_back(filter_copy_lambda, gid, thread_group.current, tid); - pin_thread_in_range(thread_pool.back(), thread_id++, range); - } - - for(uint32_t tid = 0; tid < thread_count_aggregation.current; ++tid) { - thread_pool.emplace_back(aggregation_lambda, gid, thread_group.current, tid); - pin_thread_in_range(thread_pool.back(), thread_id++, range); - } - } - - auto start = std::chrono::steady_clock::now(); - p.set_value(); - - // wait for every thread to join - for(std::thread& t : thread_pool) t.join(); - // aggregate all partial results - Aggregation::apply(result, result, - sizeof(base_t) * thread_count_aggregation.current * thread_group.current); - - auto end = std::chrono::steady_clock::now(); - - double duration = std::chrono::duration_cast(end-start).count() / (double)1000000000; - - - //TODO add mode - print_to_file(out_file, run, chunk_size, mode_manager::string(mode.current), thread_count_filter, - thread_count_filter_copy, thread_count_aggregation, thread_group, duration, - qw->trt->summarize_time(0), qw->trt->summarize_time(1), - qw->trt->summarize_time(2), qw->trt->summarize_time(3), *result); - iteration = IterateOnce(run, chunk_size, mode, thread_count_filter, thread_count_filter_copy, thread_count_aggregation, thread_group); - } - - auto end = std::chrono::system_clock::now(); - std::time_t end_time = std::chrono::system_clock::to_time_t(end); - std::cout << "finished computation at " << std::ctime(&end_time) << std::endl; - - print_to_file(out_file, std::ctime(&end_time)); -} \ No newline at end of file diff --git a/qdp_project/src/benchmark/filter_aggregate_pipeline.cpp b/qdp_project/src/benchmark/filter_aggregate_pipeline.cpp deleted file mode 100644 index b4a6753..0000000 --- a/qdp_project/src/benchmark/filter_aggregate_pipeline.cpp +++ /dev/null @@ -1,184 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "const.h" - -#include "file_output.h" -#include "array_utils.h" -#include "timer_utils.h" -#include "barrier_utils.h" -#include "cpu_set_utils.h" -#include "iterable_range.h" -#include "memory_literals.h" -#include "pipelines/scan_filter_pipe.h" - -#include "aggregation.h" -#include "filter.h" - -using base_t = uint64_t; - -base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) { - base_t sum = 0; - for(int i = 0; i < row_size / sizeof(base_t); ++i) { - sum += (row_A[i] < compare_value) * row_B[i]; - } - return sum; -} - - -int main(int argc, char** argv) { - size_t workload_b = 2_GiB; - std::ofstream out_file; - out_file.open("filter_aggreagate_pipe_bm_" + (std::string) BARRIER_MODE + ".csv"); - - Linear_Int_Range thread_group("thread_groups"); - Linear_Int_Range run("run"); - Exp_Int_Range chunk_size("chunk_size"); - Linear_Int_Range thread_count_filter("thread_cnt_filter"); - Linear_Int_Range thread_count_copy("thread_cnt_copy"); - Linear_Int_Range thread_count_aggregation("thread_cnt_agg"); - Range mode("mode"); - - uint32_t remote_node = 2; - uint32_t remote_node_2 = 2; - uint32_t local_node = 10; - - print_to_file(out_file, generateHead(run, chunk_size, mode, thread_count_filter, thread_count_copy, - thread_count_aggregation, thread_group), "time", - #ifdef THREAD_TIMINGS - "scan_a", "scan_b", "aggr_j", - #endif - #ifdef BARRIER_TIMINGS - "wait_scan_a", "wait_scan_b", "wait_aggr_j", - #endif - "result"); - - - /*** alloc data and buffers ************************************************/ - base_t* data_a = (base_t*) numa_alloc_onnode(workload_b, remote_node); - base_t* data_b = (base_t*) numa_alloc_onnode(workload_b, remote_node_2); - base_t* data_b_hbm = (base_t *) numa_alloc_onnode(workload_b, local_node); - fill_mt(data_a, workload_b, 0, 100, 42); - fill_mt(data_b, workload_b, 0, 100, 420); - std::memcpy(data_b_hbm, data_b, workload_b); - base_t* results = (base_t*) numa_alloc_onnode(thread_group.max * thread_count_aggregation.max * sizeof(base_t), remote_node); - - std::string iteration("init"); - const bool simple_query = true; - Query_Wrapper* qw = nullptr; - while(iteration != "false") { - base_t compare_value = 50; - std::promise p; - std::shared_future ready_future(p.get_future()); - - if(iteration != "run") { - - if(qw != nullptr) { - delete qw; - } - - std::cout << "Changing to mode " << mode.current << " chunksize " << chunk_size.current << " thread_group " << thread_group.current << std::endl; - switch(mode.current) { - case PMode::expl_copy: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size.current, data_a, data_b, results, local_node, remote_node, - thread_count_filter.current, thread_count_copy.current, thread_count_aggregation.current, mode.current, thread_group.current, (base_t) 50, (base_t) 42, false); - break; - case PMode::no_copy: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size.current, data_a, data_b, results, local_node, remote_node, - thread_count_filter.current, thread_count_copy.current, thread_count_aggregation.current, mode.current, thread_group.current, (base_t) 50, (base_t) 42, true); - break; - case PMode::hbm: - qw = new Query_Wrapper(&ready_future, workload_b, chunk_size.current, data_a, data_b_hbm, results, local_node, remote_node, - thread_count_filter.current, thread_count_copy.current, thread_count_aggregation.current, mode.current, thread_group.current, (base_t) 50, (base_t) 42, true); - break; - } - } - - qw->ready_future = &ready_future; - qw->clear_buffers(); - - auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_a(gid, gcnt, tid); }; - auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->scan_b(gid, gcnt, tid); }; - auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw->aggr_j(gid, gcnt, tid); }; - - std::vector filter_pool; - std::vector copy_pool; - std::vector agg_pool; - - int thread_id = 0; - // std::vector> pinning_ranges {std::make_pair(28, 42), std::make_pair(84, 98)}; // node 2 heacboehm2 - std::vector> pinning_ranges {std::make_pair(32, 48), std::make_pair(96, 112)}; // node 2 heacboehm - - for(uint32_t gid = 0; gid < thread_group.current; ++gid) { - - for(uint32_t tid = 0; tid < thread_count_filter.current; ++tid) { - filter_pool.emplace_back(filter_lambda, gid, thread_group.current, tid); - pin_thread_in_range(filter_pool.back(), thread_id++, pinning_ranges); - } - - if(mode.current == PMode::expl_copy){ - for(uint32_t tid = 0; tid < thread_count_copy.current; ++tid) { - copy_pool.emplace_back(copy_lambda, gid, thread_group.current, tid); - pin_thread_in_range(copy_pool.back(), thread_id++, pinning_ranges); - } - } - - for(uint32_t tid = 0; tid < thread_count_aggregation.current; ++tid) { - agg_pool.emplace_back(aggregation_lambda, gid, thread_group.current, tid); - pin_thread_in_range(agg_pool.back(), thread_id++, pinning_ranges); - } - } - - auto start = std::chrono::steady_clock::now(); - p.set_value(); - - for(std::thread& t : filter_pool) { t.join(); } - for(std::thread& t : copy_pool) { t.join(); } - for(std::thread& t : agg_pool) { t.join(); } - - Aggregation::apply(results, results, sizeof(base_t) * thread_count_aggregation.current * thread_group.current); - auto end = std::chrono::steady_clock::now(); - - constexpr double nanos_per_second = ((double)1000) * 1000 * 1000; - uint64_t nanos = std::chrono::duration_cast(end - start).count(); - double seconds = (double)(nanos) / nanos_per_second; - - - - print_to_file(out_file, run, chunk_size, mode_manager::string(mode.current), thread_count_filter, - thread_count_copy, thread_count_aggregation, thread_group, seconds, - #ifdef THREAD_TIMINGS - qw->trt->summarize_time(0), qw->trt->summarize_time(1), qw->trt->summarize_time(2), - #endif - #ifdef BARRIER_TIMINGS - qw->bt->summarize_time(0), qw->bt->summarize_time(1), qw->bt->summarize_time(2), - #endif - results[0]); - - - iteration = IterateOnce(run, chunk_size, mode, thread_count_filter, thread_count_copy, thread_count_aggregation, thread_group); - - } - - numa_free(data_b_hbm, workload_b); - numa_free(data_a, workload_b); - numa_free(data_b, workload_b); - numa_free(results, thread_group.max * sizeof(base_t)); - -} \ No newline at end of file diff --git a/qdp_project/src/benchmark/latency.cpp b/qdp_project/src/benchmark/latency.cpp deleted file mode 100644 index 011066a..0000000 --- a/qdp_project/src/benchmark/latency.cpp +++ /dev/null @@ -1,188 +0,0 @@ -/* - * numa_memory_latency - * Copyright (c) 2017 UMEZAWA Takeshi - * This software is licensed under GNU GPL version 2 or later. - * - * This file has been modified - */ - -#include -#include -#include -#include -#include -#include -#include -#include "file_output.h" -#include -#include -#include -#include - -#ifndef VOLATILE -#define VOLATILE 0 -#endif - -#define cachelinesize 64 -union CACHELINE { - char cacheline[cachelinesize]; - #if VOLATILE - volatile CACHELINE* next; - #else - CACHELINE* next; - #endif /*VOLATILE*/ -}; - -#define REPT4(x) do { x; x; x; x; } while(0) -#define REPT16(x) do { REPT4(x); REPT4(x); REPT4(x); REPT4(x); } while(0); -#define REPT64(x) do { REPT16(x); REPT16(x); REPT16(x); REPT16(x); } while(0); -#define REPT256(x) do { REPT64(x); REPT64(x); REPT64(x); REPT64(x); } while(0); -#define REPT1024(x) do { REPT256(x); REPT256(x); REPT256(x); REPT256(x); } while(0); - -size_t bufsize = 1 * 1024 * 1024 * 1024; -size_t nloop = 128 * 1024; -std::vector offsets; - -#if VOLATILE - -volatile CACHELINE* walk(volatile CACHELINE* start) -{ - volatile CACHELINE* p = start; - for (size_t i = 0; i < nloop; ++i) { - REPT1024(p = p->next); - } - return p; -} - -#else - -CACHELINE* walk(CACHELINE* start, uint64_t* sum) -{ - CACHELINE* p = start; - for (size_t i = 0; i < nloop; ++i) { - REPT1024( - *sum += static_cast(p->cacheline[cachelinesize-1]); - p = p->next; - ); - } - return p; -} - -#endif /*VOLATILE*/ - -void bench(int tasknode, int memnode, std::ofstream* out_file) -{ - struct timespec ts_begin, ts_end, ts_elapsed; - - printf("bench(task=%d, mem=%d)\n", tasknode, memnode); - - if (numa_run_on_node(tasknode) != 0) { - printf("failed to run on node: %s\n", strerror(errno)); - return; - } - - CACHELINE* const buf = (CACHELINE*)numa_alloc_onnode(bufsize, memnode); - if (buf == NULL) { - printf("failed to allocate memory\n"); - return; - } - - for (size_t i = 0; i < offsets.size() - 1; ++i) { - // assuming that next-pointer never overwrites last Byte of the cacheline/union - buf[offsets[i]].cacheline[cachelinesize-1] = offsets[i] % 128; - buf[offsets[i]].next = buf + offsets[i+1]; - } - buf[offsets[offsets.size() - 1]].next = buf; - buf[offsets[offsets.size() - 1]].cacheline[cachelinesize-1] = offsets[offsets.size() - 1] % 128; - - uint64_t value = 0; - uint64_t* sum = &value; - - clock_gettime(CLOCK_MONOTONIC, &ts_begin); - - #if VOLATILE - walk(buf); - #else - walk(buf, sum); - #endif /*VOLATILE*/ - - clock_gettime(CLOCK_MONOTONIC, &ts_end); - - ts_elapsed.tv_nsec = ts_end.tv_nsec - ts_begin.tv_nsec; - ts_elapsed.tv_sec = ts_end.tv_sec - ts_begin.tv_sec; - if (ts_elapsed.tv_nsec < 0) { - --ts_elapsed.tv_sec; - ts_elapsed.tv_nsec += 1000*1000*1000; - } - double elapsed = ts_elapsed.tv_sec + 0.000000001 * ts_elapsed.tv_nsec; - printf("took %fsec. %fns/load\n", elapsed, elapsed/(1024*nloop)*(1000*1000*1000)); - print_to_file(*out_file, tasknode, memnode, elapsed/(1024*nloop)*(1000*1000*1000), *sum); - numa_free(buf, bufsize); -} - -struct RND { - std::mt19937 mt; - RND() : mt(time(NULL)) {} - std::mt19937::result_type operator()(std::mt19937::result_type n) { return mt() % n; } -} r; - -void usage(const char* prog) -{ - printf("usage: %s [-h] [bufsize] [nloop]\n", prog); -} - -int main(int argc, char* argv[]) -{ - int ch; - - while ((ch = getopt(argc, argv, "h")) != -1) { - switch (ch) { - case 'h': - default: - usage(argv[0]); - exit(1); - } - } - - argc -= optind; - argv += optind; - - if (argc > 1) { - // 1048576 KiB = 1 GiB - bufsize = atoi(argv[0]) * 1024; // in KiB - nloop = atoi(argv[1]) * 1024; - } - - offsets.resize(bufsize / cachelinesize); - - for (size_t i = 0; i < offsets.size(); ++i) - offsets[i] = i; - std::random_shuffle(offsets.begin() + 1, offsets.end(), r); - - uint64_t expected_checksum = 0; - #if VOLATILE == 0 - for (size_t i = 0; i < nloop * 1024; ++i) { - expected_checksum += offsets[i % offsets.size()] % 128; - } - #endif - - std::ofstream check_file; - check_file.open("../results/micro_bench/latency/micro_bench_latency_" + (std::string)(VOLATILE == 1 ? "volatile" : "sum") + ".checksum"); - check_file << expected_checksum; - check_file.close(); - - - printf("benchmark bufsize=%zuKiB, nloop=%zuKi\n", bufsize/1024, nloop/1024); - - std::ofstream out_file; - out_file.open("../results/micro_bench/latency/micro_bench_latency_"+ (std::string)(VOLATILE == 1 ? "volatile" : "sum") + ".csv"); - print_to_file(out_file, "tasknode", "memnode", "latency", "checksum"); - - for (int tasknode = 0; tasknode < 8; tasknode++) { - for (int memnode = 0; memnode < 16; memnode++) { - bench(tasknode, memnode, &out_file); - } - } - - return 0; -} \ No newline at end of file diff --git a/qdp_project/src/benchmark/micro_benchmarks.cpp b/qdp_project/src/benchmark/micro_benchmarks.cpp deleted file mode 100644 index 4e63f82..0000000 --- a/qdp_project/src/benchmark/micro_benchmarks.cpp +++ /dev/null @@ -1,271 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include "memory_literals.h" -#include "array_utils.h" -#include "file_output.h" -#include "aggregation.h" - - -using base_t = uint64_t; - -size_t thread_cnt_memcpy = 128; -size_t thread_cnt_read = 128; -size_t runs = 10; - - -base_t sum_up(base_t* data, size_t workload){ - base_t sum = 0; - for(int i = 0; i < workload/sizeof(base_t); i++){ - sum += data[i]; - } - return sum; -} - -int reverse_bits(int number, size_t bit_count) { - int result = 0; - for(int i = 0; i < bit_count; i++) { - result <<= 1; - result |= (number & 1); - number >>= 1; - } - return result; -} - - -double measure_memcpy_bw(base_t* src, base_t* dest, size_t workload, base_t* result){ - std::promise p; - std::shared_future ready_future(p.get_future()); - - auto thread_lambda = [&](base_t* source, base_t* destination, size_t count) { - ready_future.wait(); - memcpy(destination, source, count); - }; - - std::vector thread_pool; - size_t total_elements = workload / sizeof(base_t); - size_t elements_per_thread = total_elements / thread_cnt_memcpy; - size_t remainder = total_elements % thread_cnt_memcpy; - - for(size_t tid = 0; tid < thread_cnt_memcpy; tid++) { - size_t elements_to_process = elements_per_thread + (tid < remainder ? 1 : 0); - size_t byte_offset = (elements_per_thread * tid + std::min(tid, remainder)) * sizeof(base_t); - - thread_pool.emplace_back(thread_lambda, src + byte_offset / sizeof(base_t), dest + byte_offset / sizeof(base_t), elements_to_process * sizeof(base_t)); - } - - auto start = std::chrono::steady_clock::now(); - p.set_value(); - for(std::thread& t : thread_pool) { t.join(); } - auto stop = std::chrono::steady_clock::now(); - - auto duration = std::chrono::duration_cast(stop - start); - double seconds = duration.count() / 1e9; - double throughput = (workload / seconds) / (1024 * 1024 * 1024); - *result = sum_up(dest, workload); - return throughput; -} - -double measure_read_bw(base_t* data, size_t workload, base_t* results){ - const size_t chunk_size = sizeof(__m512i); - const size_t num_chunks = (workload) / chunk_size; - __m512i* src = reinterpret_cast<__m512i*>(data); - std::promise p; - std::shared_future ready_future(p.get_future()); - size_t num_chunks_per_thread = num_chunks / thread_cnt_read; - size_t num_chunks_remainder = num_chunks % thread_cnt_read; - - auto thread_lambda = [&](__m512i* src, int tid, int num_chunks) { - __m512i accumulator = _mm512_setzero_si512(); - ready_future.wait(); - for (int i = 0; i < num_chunks; i++) { - __m512i chunk = _mm512_load_si512(&src[i]); - accumulator = _mm512_add_epi64(accumulator, chunk); - } - results[tid] = _mm512_reduce_add_epi64(accumulator); - }; - - std::vector thread_pool; - int offset; - for(int tid = 0; tid < thread_cnt_read; tid++){ - if(tid < num_chunks_remainder){ - offset = tid * (num_chunks_per_thread + 1); - thread_pool.emplace_back(thread_lambda, &src[offset], tid, (num_chunks_per_thread + 1)); - } else { - offset = tid*num_chunks_per_thread + num_chunks_remainder; - thread_pool.emplace_back(thread_lambda, &src[offset], tid, num_chunks_per_thread); - } - - } - - auto start = std::chrono::steady_clock::now(); - p.set_value(); - for(std::thread& t : thread_pool) { t.join(); } - auto stop = std::chrono::steady_clock::now(); - - Aggregation::apply(results, results, sizeof(base_t) * thread_cnt_read); - auto duration = std::chrono::duration_cast(stop - start); - double seconds = duration.count() / 1e9; - double throughput = (workload / seconds) / (1024 * 1024 * 1024); - return throughput; -} - -void exec_multiple_runs_memcpy(size_t workload, int exec_node, int src_node, int dest_node, std::ofstream* out_file, std::string iteration_type){ - base_t value; - base_t* result = &value; - base_t* src = (base_t*) numa_alloc_onnode(workload, src_node); - base_t* dest = (base_t*) numa_alloc_onnode(workload, dest_node); - fill_mt(src, workload, 0, 100, 42); - fill_mt(dest, workload, 0, 100, 12); - numa_run_on_node(exec_node); - - if(dest_node == 0 && src_node == 0){ - std::ofstream check_file; - check_file.open("../results/micro_bench/micro_bench_bw_memcpy_execnode_" + std::to_string(exec_node) - + "_threadcnt_" + std::to_string(thread_cnt_memcpy) + "_" + iteration_type + ".checksum"); - check_file << sum_up(src, workload); - check_file.close(); - } - - for(size_t run = 0; run < runs; run++){ - double bw = measure_memcpy_bw(src, dest, workload, result); - std::cout << "Copy throughput executed on node " << exec_node << " form node " << src_node << " to node " - << dest_node << ": " << bw << " GiB/s" << std::endl; - print_to_file(*out_file, run, src_node, dest_node, bw, *result); - std::memset(dest, 0x00, workload); - *result = 0; - } - numa_free(src, workload); - numa_free(dest, workload); -} - -void measure_all_memcpy_bw_for_chosen_execnode(int exec_node){ - std::ofstream out_file; - out_file.open("../results/micro_bench/micro_bench_bw_memcpy_execnode_" + std::to_string(exec_node) - + "_threadcnt_" + std::to_string(thread_cnt_memcpy) + ".csv"); - print_to_file(out_file, "run", "src_node", "dest_node", "bw", "result"); - const size_t workload = 4_GiB; - - for(int src_node = 0; src_node < 16; src_node++){ - for(int dest_node = 0; dest_node < 16; dest_node++){ - exec_multiple_runs_memcpy(workload, exec_node, src_node, dest_node, &out_file, ""); - } - } - out_file.close(); -} - -void measure_all_memcpy_bw_for_chosen_execnode_reversed(int exec_node){ - std::ofstream out_file; - out_file.open("../results/micro_bench/micro_bench_bw_memcpy_execnode_" + std::to_string(exec_node) - + "_threadcnt_" + std::to_string(thread_cnt_memcpy) + "_reversed.csv"); - print_to_file(out_file, "run", "src_node", "dest_node", "bw", "result"); - const size_t workload = 4_GiB; - - for(int src_node = 15; src_node >= 0; src_node--){ - for(int dest_node = 15; dest_node >= 0; dest_node--){ - exec_multiple_runs_memcpy(workload, exec_node, src_node, dest_node, &out_file, "reversed"); - } - } - out_file.close(); -} - - - -void measure_all_memcpy_bw_for_chosen_execnode_reversed_bitwise(int exec_node){ - std::ofstream out_file; - out_file.open("../results/micro_bench/micro_bench_bw_memcpy_execnode_" + std::to_string(exec_node) - + "_threadcnt_" + std::to_string(thread_cnt_memcpy) + "_reversed_bitwise.csv"); - print_to_file(out_file, "run", "src_node", "dest_node", "bw", "result"); - const size_t workload = 4_GiB; - - for(int src_node = 0; src_node < 16; src_node++){ - for(int dest_node = 0; dest_node < 16; dest_node++){ - int reversed_src_node = reverse_bits(src_node, 4); - int reversed_dest_node = reverse_bits(dest_node, 4); - exec_multiple_runs_memcpy(workload, exec_node, reversed_src_node, reversed_dest_node, &out_file, "reversed_bitwise"); - } - } - out_file.close(); -} - - -void exec_multiple_runs_read(size_t workload, int mem_node, int exec_node, std::ofstream *out_file, std::string iteration_type){ - base_t* data = (base_t*) numa_alloc_onnode(workload, mem_node); - fill_mt(data, workload, 0, 100, 42); - base_t* results = (base_t*) numa_alloc_onnode(thread_cnt_read * sizeof(base_t), exec_node); - numa_run_on_node(exec_node); - - if(mem_node == 0 && exec_node == 0){ - std::ofstream check_file; - check_file.open("../results/micro_bench/micro_bench_bw_read_threadcnt_" + std::to_string(thread_cnt_read) + "_" + iteration_type + ".checksum"); - check_file << sum_up(data, workload); - check_file.close(); - } - - for(size_t run = 0; run < runs; run++){ - double bw = measure_read_bw(data, workload, results); - std::cout << "Read throughput executed on node " << exec_node << " for node " << mem_node << ": " << bw << " GiB/s" << std::endl; - print_to_file(*out_file, run, exec_node, mem_node, bw, results[0]); - std::memset(results, 0x00, thread_cnt_read * sizeof(base_t)); - } - numa_free(data, workload); - numa_free(results, thread_cnt_read * sizeof(base_t)); -} - -void measure_all_read_bw(){ - std::ofstream out_file; - out_file.open("../results/micro_bench/micro_bench_bw_read_threadcnt_" + std::to_string(thread_cnt_read) + ".csv"); - print_to_file(out_file, "run", "exec_node", "mem_node", "bw", "result"); - const size_t workload = 8_GiB; - - for(int exec_node = 0; exec_node < 8; exec_node++){ - for(int mem_node = 0; mem_node < 16; mem_node++){ - exec_multiple_runs_read(workload, mem_node, exec_node, &out_file, ""); - } - } - out_file.close(); -} - -void measure_all_read_bw_reversed(){ - std::ofstream out_file; - out_file.open("../results/micro_bench/micro_bench_bw_read_threadcnt_" + std::to_string(thread_cnt_read) + "_reversed.csv"); - print_to_file(out_file, "run", "exec_node", "mem_node", "bw", "result"); - const size_t workload = 8_GiB; - - for(int exec_node = 7; exec_node >= 0; exec_node--){ - for(int mem_node = 15; mem_node >= 0; mem_node--){ - exec_multiple_runs_read(workload, mem_node, exec_node, &out_file, "reversed"); - } - } - out_file.close(); -} - -void measure_all_read_bw_reversed_bitwise(){ - std::ofstream out_file; - out_file.open("../results/micro_bench/micro_bench_bw_read_threadcnt_" + std::to_string(thread_cnt_read) + "_reversed_bitwise.csv"); - print_to_file(out_file, "run", "exec_node", "mem_node", "bw", "result"); - const size_t workload = 8_GiB; - - for(int exec_node0 = 0; exec_node0 < 8; exec_node0++){ - for(int mem_node0 = 0; mem_node0 < 16; mem_node0++){ - int mem_node = reverse_bits(mem_node0, 4); - int exec_node = reverse_bits(exec_node0, 3); - exec_multiple_runs_read(workload, mem_node, exec_node, &out_file, "reversed_bitwise"); - } - } - out_file.close(); -} - - - -int main() { - // nodes 0-7 hold cores and DRAM, nodes 8-15 only HBM - - measure_all_read_bw_reversed_bitwise(); - measure_all_memcpy_bw_for_chosen_execnode_reversed_bitwise(0); - - return 0; -} \ No newline at end of file diff --git a/qdp_project/src/benchmark/pipelines/DIMES_scan_filter_pipe.h b/qdp_project/src/benchmark/pipelines/DIMES_scan_filter_pipe.h deleted file mode 100644 index 6dbc652..0000000 --- a/qdp_project/src/benchmark/pipelines/DIMES_scan_filter_pipe.h +++ /dev/null @@ -1,391 +0,0 @@ - -#include -#include -#include -#include - -#include - -#include "filter.h" -#include "aggregation.h" -#include "vector_loader.h" -#include "timer_utils.h" -#include "barrier_utils.h" -#include "execution_modes.h" - - -template -class Query_Wrapper { -public: - // sync - std::shared_future* ready_future; - - thread_runtime_timing* trt; - barrier_timing* bt; - -private: - // numa - uint32_t close_mem; - uint32_t far_mem; - - // data - size_t size_b; - size_t chunk_size_b; - size_t chunk_size_w; - size_t chunk_cnt; - base_t* data_a; - base_t* data_b; - base_t* dest; - - // ratios - uint32_t thread_count_fc; - uint32_t thread_count_fi; - uint32_t thread_count_ag; - uint32_t thread_group; - - // done bits - volatile uint8_t* ready_flag_a; - volatile uint8_t* ready_flag_b; - std::mutex ready_a_m; - std::mutex ready_b_m; - - // buffer - uint16_t* mask_a; - uint16_t* mask_b; - base_t** buffer_b; - - // params - base_t cmp_a; - base_t cmp_b; - bool no_copy; - NewPMode mode; - - // sync - std::unique_ptr*>> sync_barrier; - std::string barrier_mode = BARRIER_MODE; - - using filterCopy = Filter; - using filterNoCopy = Filter; - using filter = Filter; - using aggregation = Aggregation; - -public: - - - Query_Wrapper(std::shared_future* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a, - base_t* data_b, base_t* dest, uint32_t numa_close, uint32_t numa_far, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag, - NewPMode mode, uint32_t thread_group, base_t cmp_a = 50, base_t cmp_b = 42, bool no_copy = false) : - ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b), - dest(dest), close_mem(numa_close), far_mem(numa_far), mode(mode), thread_group(thread_group), cmp_a(cmp_a), cmp_b(cmp_b), no_copy(no_copy){ - - chunk_size_w = chunk_size_b / sizeof(base_t); - chunk_cnt = size_b / chunk_size_b; - thread_count_fi = tc_fi; - thread_count_fc = tc_fc; - thread_count_ag = tc_ag; - - ready_flag_a = (volatile uint8_t *) numa_alloc_onnode( - chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), close_mem); - ready_flag_b = (volatile uint8_t *) numa_alloc_onnode( - chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), close_mem); - - mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), close_mem); - mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), close_mem); - - trt = new thread_runtime_timing(4, 16*4*4*4, close_mem); - bt = new barrier_timing(4, 16*4*4*4, close_mem); - reset_barriers(); - - if constexpr(BUFFER_LIMIT==1) { - // TODO size ok like that? - buffer_b = (base_t**) numa_alloc_onnode(size_b * sizeof(base_t*), close_mem); - buffer_b[0] = (base_t*) numa_alloc_onnode(thread_group * chunk_size_b, close_mem); - buffer_b[1] = (base_t*) numa_alloc_onnode(thread_group * chunk_size_b, close_mem); - } else { - buffer_b = (base_t **) numa_alloc_onnode(sizeof(base_t*), close_mem); - base_t* buffer_tmp = (base_t *) numa_alloc_onnode(size_b, close_mem); - *buffer_b = buffer_tmp; - } - }; - - void reset_barriers(){ - if(sync_barrier != nullptr) { - for(auto& barrier : *sync_barrier) { - delete barrier; - } - sync_barrier.reset(); - } - - sync_barrier = std::make_unique*>>(thread_group); - uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc; - uint32_t barrier_count = barrier_mode.compare("global") == 0 ? 1 : thread_group; - uint32_t barrier_thread_count; - - if constexpr(simple){ - barrier_thread_count = (thread_group / barrier_count) * - (mode == NewPMode::Prefetch ? thread_count_sum : (thread_count_ag + thread_count_fi)); - } else { - barrier_thread_count = (thread_group / barrier_count) * thread_count_sum; - } - for(uint32_t i = 0; i < barrier_count; ++i) { - (*sync_barrier)[i] = new std::barrier(barrier_thread_count); - } - } - - void clear_buffers () { - std::memset((void*)ready_flag_a, 0x00, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0)); - std::memset((void*)ready_flag_b, 0x00, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0)); - - std::memset(mask_a, 0x00, size_b / sizeof(base_t)); - std::memset(mask_b, 0x00, size_b / sizeof(base_t)); - if constexpr(BUFFER_LIMIT==1) { - std::memset(buffer_b[0], 0x00, thread_group * chunk_size_b); - std::memset(buffer_b[1], 0x00, thread_group * chunk_size_b); - } else { - std::memset(*buffer_b, 0x00, size_b); - } - - trt->reset_accumulator(); - bt->reset_accumulator(); - reset_barriers(); - }; - - ~Query_Wrapper() { - numa_free((void*)ready_flag_a, - chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0)); - numa_free((void*)ready_flag_b, - chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0)); - - numa_free(mask_a, size_b / sizeof(base_t)); - numa_free(mask_b, size_b / sizeof(base_t)); - if constexpr(BUFFER_LIMIT==1) { - numa_free(buffer_b[0], thread_group * chunk_size_b); - numa_free(buffer_b[1], thread_group * chunk_size_b); - numa_free(buffer_b, size_b * sizeof(base_t*)); - } else { - numa_free(*buffer_b, size_b); - } - - delete trt; - for(auto& barrier : *sync_barrier) { - delete barrier; - } - delete bt; - - }; - - //this can be set without need to change allocations - void set_thread_group_count(uint32_t value) { - this->thread_group = value; - }; - -private: - static inline base_t* get_sub_chunk_ptr(base_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, - size_t tcnt) { - base_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w; - return chunk_ptr + tid * (chunk_size_w / tcnt); - } - - static inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, - size_t tcnt) { - // 16 integer are addressed with one uint16_t in mask buffer - size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt); - return base_ptr + (offset / 16); - } - - static bool bit_at(volatile uint8_t* bitmap, uint32_t bitpos) { - uint8_t value = bitmap[bitpos / 8]; - switch(bitpos % 8) { - case 0: return value & 0b00000001; - case 1: return value & 0b00000010; - case 2: return value & 0b00000100; - case 3: return value & 0b00001000; - case 4: return value & 0b00010000; - case 5: return value & 0b00100000; - case 6: return value & 0b01000000; - case 7: return value & 0b10000000; - default: return false; - } - } - - static void set_bit_at(volatile uint8_t* bitmap, std::mutex& mutex, uint32_t bitpos) { - mutex.lock(); - switch(bitpos % 8) { - case 0: bitmap[bitpos / 8] |= 0b00000001;break; - case 1: bitmap[bitpos / 8] |= 0b00000010;break; - case 2: bitmap[bitpos / 8] |= 0b00000100;break; - case 3: bitmap[bitpos / 8] |= 0b00001000;break; - case 4: bitmap[bitpos / 8] |= 0b00010000;break; - case 5: bitmap[bitpos / 8] |= 0b00100000;break; - case 6: bitmap[bitpos / 8] |= 0b01000000;break; - case 7: bitmap[bitpos / 8] |= 0b10000000;break; - } - mutex.unlock(); - } - -public: - - static base_t checksum(base_t* a, base_t* b, base_t cmp_a, base_t cmp_b, size_t size_b) { - base_t sum = 0; - for(int i = 0; i < size_b / sizeof(base_t); ++i) { - if(a[i] >= cmp_a && b[i] <= cmp_b) { - sum += b[i]; - } - } - return sum; - } - - static void checkmask(uint16_t* mask, base_t cmp, base_t* data, size_t size_b, bool leq) { - uint32_t cnt = 0; - for(int i = 0; i < size_b / sizeof(base_t); ++i) { - if(leq) { - if(((data[i] <= cmp) != bit_at((uint8_t*)mask, i))) { - ++cnt; - } - } else { - if(((data[i] >= cmp) != bit_at((uint8_t*)mask, i))) { - ++cnt; - } - } - } - } - - static void checkmask_16(uint16_t* mask, base_t cmp, base_t* data, size_t size_b, bool leq) { - for(int i = 0; i < size_b / sizeof(base_t) / 16 ; ++i) { - std::bitset<16> m(mask[i]); - uint16_t ch = 0; - for(int j = 0; j < 16; ++j) { - if(data[i*16 + j] <= cmp) { - ch |= 0x1 << j; - } - } - std::bitset<16> c(ch); - - std::cout << "act " << m << std::endl; - std::cout << "rea " << c << std::endl << std::endl; - } - } - - - void scan_b(size_t gid, size_t gcnt, size_t tid) { - size_t tcnt = thread_count_fc; - assert(chunk_size_w % tcnt == 0); - assert(chunk_size_w % 16 == 0); - assert(chunk_size_w % tcnt * 16 == 0); - - // wait till everyone can start - ready_future->wait(); - - // the lower gids run once more if the chunks are not evenly distributable - uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); - uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; - for(uint32_t i = 0; i < runs; ++i) { - trt->start_timer(1, tid * gcnt + gid); - - // calculate pointers - size_t chunk_id = gid + gcnt * i; - base_t* chunk_ptr = get_sub_chunk_ptr(data_b , chunk_id, chunk_size_w, tid, tcnt); - uint16_t* mask_ptr = get_sub_mask_ptr (mask_b , chunk_id, chunk_size_w, tid, tcnt); - - if constexpr(simple){ - base_t* buffer_ptr; - if constexpr(BUFFER_LIMIT==1) { - buffer_ptr = get_sub_chunk_ptr(buffer_b[i % 2], gid, chunk_size_w, tid, tcnt); - } else { - buffer_ptr = get_sub_chunk_ptr(*buffer_b, chunk_id, chunk_size_w, tid, tcnt); - } - std::memcpy(buffer_ptr, chunk_ptr, chunk_size_b / tcnt); - } else { - if(no_copy) { - filterNoCopy::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_b, chunk_size_b / tcnt); - } else { - base_t* buffer_ptr; - if constexpr(BUFFER_LIMIT==1) { - buffer_ptr = get_sub_chunk_ptr(buffer_b[i % 2], gid, chunk_size_w, tid, tcnt); - } else { - buffer_ptr = get_sub_chunk_ptr(*buffer_b, chunk_id, chunk_size_w, tid, tcnt); - } - filterCopy::apply_same(mask_ptr, buffer_ptr, chunk_ptr, cmp_b, chunk_size_b / tcnt); - } - } - - trt->stop_timer(1, tid * gcnt + gid); - bt->timed_wait(*(*sync_barrier)[barrier_idx], 1, tid * gcnt + gid); - - } - (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); - - } - - void scan_a(size_t gid, size_t gcnt, size_t tid) { - size_t tcnt = thread_count_fi; - assert(chunk_size_w % tcnt == 0); - assert(chunk_size_w % 16 == 0); - assert(chunk_size_w % tcnt * 16 == 0); - - // wait till everyone can start - ready_future->wait(); - - // the lower gids run once more if the chunks are not evenly distributable - uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); - uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; - for(uint32_t i = 0; i < runs; ++i) { - trt->start_timer(0, tid * gcnt + gid); - // calculate pointers - size_t chunk_id = gid + gcnt * i; - base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt); - uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); - - filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt); - - trt->stop_timer(0, tid * gcnt + gid); - bt->timed_wait(*(*sync_barrier)[barrier_idx], 0, tid * gcnt + gid); - } - (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); - } - - void aggr_j(size_t gid, size_t gcnt, size_t tid) { - size_t tcnt = thread_count_ag; - // wait till everyone can start - ready_future->wait(); - - // calculate values - __m512i aggregator = aggregation::OP::zero(); - // the lower gids run once more if the chunks are not evenly distributable - uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); - uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; - for(uint32_t i = 0; i < runs; ++i) { - - bt->timed_wait(*(*sync_barrier)[barrier_idx], 2, tid * gcnt + gid); - trt->start_timer(2, tid * gcnt + gid); - - // calculate pointers - size_t chunk_id = gid + gcnt * i; - base_t* chunk_ptr; - if(no_copy) { - chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); - } else { - if constexpr(BUFFER_LIMIT==1) { - chunk_ptr = get_sub_chunk_ptr(buffer_b[i % 2], gid, chunk_size_w, tid, tcnt); - } else { - chunk_ptr = get_sub_chunk_ptr(*buffer_b, chunk_id, chunk_size_w, tid, tcnt); - } - } - uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); - uint16_t* mask_ptr_b = get_sub_mask_ptr (mask_b, chunk_id, chunk_size_w, tid, tcnt); - - base_t tmp = _mm512_reduce_add_epi64(aggregator); - if constexpr(simple){ - aggregator = aggregation::apply_masked(aggregator, chunk_ptr, mask_ptr_a, chunk_size_b / tcnt); - } else { - aggregator = aggregation::apply_masked(aggregator, chunk_ptr, mask_ptr_a, mask_ptr_b, chunk_size_b / tcnt); - } - trt->stop_timer(2, tid * gcnt + gid); - } - - // so threads with more runs dont wait for finished threads - (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); - - aggregation::happly(dest + (tid * gcnt + gid), aggregator); - } -}; \ No newline at end of file diff --git a/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h b/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h index 3b1d861..e224391 100644 --- a/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h +++ b/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h @@ -15,9 +15,9 @@ #include "measurement_utils.h" #include "execution_modes.h" -#include "../../../thirdParty/dsa_offload/offloading-cacher/cache.hpp" +#include "../../../../offloading-cacher/cache.hpp" -template +template class Query_Wrapper { public: // sync @@ -28,11 +28,9 @@ public: pcm_value_collector* pvc; private: - dsacache::Cache cache_; + static constexpr size_t COPY_POLICY_MIN_SIZE = 64 * 1024 * 1024; - // numa - uint32_t close_mem; - uint32_t far_mem; + dsacache::Cache cache_; // data size_t size_b; @@ -47,13 +45,11 @@ private: uint32_t thread_count_fc; uint32_t thread_count_fi; uint32_t thread_count_ag; - uint32_t thread_group; + uint32_t thread_count; // done bits volatile uint8_t* ready_flag_a; volatile uint8_t* ready_flag_b; - std::mutex ready_a_m; - std::mutex ready_b_m; // buffer uint16_t* mask_a; @@ -73,70 +69,72 @@ private: using filter = Filter; using aggregation = Aggregation; - void InitCache(const std::string& device) { - if (device == "default") { - static const auto cache_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) { - return numa_dst_node; - }; + static int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { + return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node; + } - static const auto copy_policy = [](const int numa_dst_node, const int numa_src_node) { - return std::vector{ numa_src_node, numa_dst_node }; - }; + static std::vector CopyMethodPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { + if (data_size < COPY_POLICY_MIN_SIZE) { + // if the data size is small then the copy will just be carried + // out by the destination node which does not require setting numa + // thread affinity as the selected dsa engine is already the one + // present on the calling thread - cache_.Init(cache_policy,copy_policy); - } - else if (device == "xeonmax") { - static const auto cache_policy = [](const int numa_dst_node, const int numa_src_node, const size_t data_size) { - return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node; - }; - - static const auto copy_policy = [](const int numa_dst_node, const int numa_src_node) { - const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0; - if (same_socket) { - const bool socket_number = numa_dst_node >> 2; - if (socket_number == 0) return std::vector{ 0, 1, 2, 3 }; - else return std::vector{ 4, 5, 6, 7 }; - } - else return std::vector{ numa_src_node, numa_dst_node }; - }; - - cache_.Init(cache_policy,copy_policy); + return std::vector{ (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) }; } else { - std::cerr << "Given device '" << device << "' not supported!" << std::endl; - exit(-1); + // for sufficiently large data, smart copy is used which will utilize + // all four engines for intra-socket copy operations and cross copy on + // the source and destination nodes for inter-socket copy + + const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0; + + if (same_socket) { + const bool socket_number = numa_dst_node >> 2; + if (socket_number == 0) return std::vector{ 0, 1, 2, 3 }; + else return std::vector{ 4, 5, 6, 7 }; + } + else { + return std::vector{ + (numa_src_node >= 8 ? numa_src_node - 8 : numa_src_node), + (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) + }; + } } } public: - - - Query_Wrapper(std::shared_future* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a, - base_t* data_b, base_t* dest, uint32_t numa_close, uint32_t numa_far, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag, - NewPMode mode, uint32_t thread_group, base_t cmp_a = 50, base_t cmp_b = 42) : + Query_Wrapper(std::shared_future* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a, + base_t* data_b, base_t* dest, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag, + NewPMode mode, base_t cmp_a = 50, base_t cmp_b = 42) : ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b), - dest(dest), close_mem(numa_close), far_mem(numa_far), mode(mode), thread_group(thread_group), cmp_a(cmp_a), cmp_b(cmp_b){ - + dest(dest), mode(mode), cmp_a(cmp_a), cmp_b(cmp_b) { + + const int current_cpu = sched_getcpu(); + const int current_node = numa_node_of_cpu(current_cpu); + const int cache_node = CachePlacementPolicy(current_node, current_node, 0); + chunk_size_w = chunk_size_b / sizeof(base_t); chunk_cnt = size_b / chunk_size_b; + thread_count_fi = tc_fi; thread_count_fc = tc_fc; thread_count_ag = tc_ag; - ready_flag_a = (volatile uint8_t *) numa_alloc_onnode( - chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), close_mem); - ready_flag_b = (volatile uint8_t *) numa_alloc_onnode( - chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), close_mem); + thread_count = tc_fi + tc_fc + tc_ag; + + ready_flag_a = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), cache_node); + ready_flag_b = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), cache_node); - mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), close_mem); - mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), close_mem); + mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node); + mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node); - InitCache("xeonmax"); + cache_.Init(CachePlacementPolicy, CopyMethodPolicy); - size_t measurement_space = THREAD_GROUP_MULTIPLIER * std::max(std::max(tc_fi, tc_fc), tc_ag); - trt = new thread_runtime_timing(3, measurement_space, far_mem); - bt = new barrier_timing(3, measurement_space, far_mem); - pvc = new pcm_value_collector({"scan_a", "scan_b", "aggr_j"}, measurement_space, far_mem); + size_t measurement_space = std::max(std::max(tc_fi, tc_fc), tc_ag); + trt = new thread_runtime_timing(3, measurement_space, current_node); + bt = new barrier_timing(3, measurement_space, current_node); + pvc = new pcm_value_collector({"scan_a", "scan_b", "aggr_j"}, measurement_space, current_node); reset_barriers(); }; @@ -148,16 +146,15 @@ public: sync_barrier.reset(); } - sync_barrier = std::make_unique*>>(thread_group); + sync_barrier = std::make_unique*>>(thread_count); uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc; - uint32_t barrier_count = barrier_mode.compare("global") == 0 ? 1 : thread_group; + uint32_t barrier_count = barrier_mode.compare("global") == 0 ? 1 : thread_count; uint32_t barrier_thread_count; if constexpr(simple){ - barrier_thread_count = (thread_group / barrier_count) * - (mode == NewPMode::Prefetch ? thread_count_sum : (thread_count_ag + thread_count_fi)); + barrier_thread_count = (thread_count / barrier_count) * (mode == NewPMode::Prefetch ? thread_count_sum : (thread_count_ag + thread_count_fi)); } else { - barrier_thread_count = (thread_group / barrier_count) * thread_count_sum; + barrier_thread_count = (thread_count / barrier_count) * thread_count_sum; } for(uint32_t i = 0; i < barrier_count; ++i) { (*sync_barrier)[i] = new std::barrier(barrier_thread_count); @@ -180,10 +177,8 @@ public: }; ~Query_Wrapper() { - numa_free((void*)ready_flag_a, - chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0)); - numa_free((void*)ready_flag_b, - chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0)); + numa_free((void*)ready_flag_a, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0)); + numa_free((void*)ready_flag_b, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0)); numa_free(mask_a, size_b / sizeof(base_t)); numa_free(mask_b, size_b / sizeof(base_t)); @@ -202,14 +197,12 @@ public: }; private: - static inline base_t* get_sub_chunk_ptr(base_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, - size_t tcnt) { + static inline base_t* get_sub_chunk_ptr(base_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) { base_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w; return chunk_ptr + tid * (chunk_size_w / tcnt); } - static inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, - size_t tcnt) { + static inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) { // 16 integer are addressed with one uint16_t in mask buffer size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt); return base_ptr + (offset / 16); @@ -258,6 +251,7 @@ public: // the lower gids run once more if the chunks are not evenly distributable uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; + for(uint32_t i = 0; i < runs; ++i) { trt->start_timer(1, tid * gcnt + gid); pvc->start("scan_b", tid * gcnt + gid); @@ -268,28 +262,45 @@ public: uint16_t* mask_ptr = get_sub_mask_ptr(mask_b, chunk_id, chunk_size_w, tid, tcnt); if constexpr(simple){ - cache_.Access(chunk_ptr, chunk_size_b / tcnt); + cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); } else { - const auto data = cache_.Access(chunk_ptr, chunk_size_b / tcnt); + const auto data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); - // wait on copy to complete - during this time other threads may - // continue with their calculation which leads to little impact - // and we will be faster if the cache is used + if constexpr(wait_b) { + // wait on copy to complete - during this time other threads may + // continue with their calculation which leads to little impact + // and we will be faster if the cache is used - data->WaitOnCompletion(); + data->WaitOnCompletion(); - // obtain the data location from the cache entry + // obtain the data location from the cache entry - base_t* data_ptr = data->GetDataLocation(); + base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); - // nullptr is still a legal return value for CacheData::GetLocation() - // even after waiting, so this must be checked + // nullptr is still a legal return value for CacheData::GetLocation() + // even after waiting, so this must be checked - if (data_ptr == nullptr) { - data_ptr = chunk_ptr; + if (data_ptr == nullptr) { + std::cerr << "[!] Cache Miss in ScanB" << std::endl; + data_ptr = chunk_ptr; + } + + filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt); } + else { + // obtain the data location from the cache entry + + base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); + + // nullptr is still a legal return value for CacheData::GetLocation() + // even after waiting, so this must be checked - filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt); + if (data_ptr == nullptr) { + data_ptr = chunk_ptr; + } + + filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt); + } } pvc->stop("scan_b", tid * gcnt + gid); @@ -321,7 +332,21 @@ public: base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt); uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); - filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt); + if constexpr (cache_a) { + const auto data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); + data->WaitOnCompletion(); + base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); + + if (data_ptr == nullptr) { + std::cerr << "[!] Cache Miss in ScanA" << std::endl; + data_ptr = chunk_ptr; + } + + filter::apply_same(mask_ptr, nullptr, data_ptr, cmp_a, chunk_size_b / tcnt); + } + else { + filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt); + } pvc->stop("scan_a", tid * gcnt + gid); trt->stop_timer(0, tid * gcnt + gid); @@ -340,19 +365,19 @@ public: // the lower gids run once more if the chunks are not evenly distributable uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; + for(uint32_t i = 0; i < runs; ++i) { - bt->timed_wait(*(*sync_barrier)[barrier_idx], 2, tid * gcnt + gid); trt->start_timer(2, tid * gcnt + gid); pvc->start("aggr_j", tid * gcnt + gid); // calculate pointers size_t chunk_id = gid + gcnt * i; - const base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); + base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); // access the cache for the given chunk which will have been accessed in scan_b - const auto data = cache_.Access(chunk_ptr, chunk_size_b / tcnt); + const auto data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); // wait on the caching task to complete, this will give time for other processes // to make progress here which will therefore not hurt performance @@ -362,14 +387,14 @@ public: // after the copy task has finished we obtain the pointer to the cached // copy of data_b which is then used from now on - const base_t* data_ptr = data->GetDataLocation(); + base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); // nullptr is still a legal return value for CacheData::GetLocation() // even after waiting, so this must be checked if (data_ptr == nullptr) { data_ptr = chunk_ptr; - std::cerr << "Cache Miss" << std::endl; + std::cerr << "[!] Cache Miss in AggrJ" << std::endl; } uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); diff --git a/qdp_project/src/benchmark/pipelines/scan_filter_pipe.h b/qdp_project/src/benchmark/pipelines/scan_filter_pipe.h deleted file mode 100644 index 2b10b06..0000000 --- a/qdp_project/src/benchmark/pipelines/scan_filter_pipe.h +++ /dev/null @@ -1,387 +0,0 @@ - -#include -#include -#include -#include - -#include - -#include "filter.h" -#include "aggregation.h" -#include "vector_loader.h" -#include "timer_utils.h" -#include "barrier_utils.h" -#include "execution_modes.h" - - -template -class Query_Wrapper { -public: - // sync - std::shared_future* ready_future; - - thread_runtime_timing* trt; - barrier_timing* bt; - -private: - // numa - uint32_t close_mem; - uint32_t far_mem; - - // data - size_t size_b; - size_t chunk_size_b; - size_t chunk_size_w; - size_t chunk_cnt; - base_t* data_a; - base_t* data_b; - base_t* dest; - - // ratios - uint32_t thread_count_fc; - uint32_t thread_count_fi; - uint32_t thread_count_ag; - uint32_t thread_group; - - // done bits - volatile uint8_t* ready_flag_a; - volatile uint8_t* ready_flag_b; - std::mutex ready_a_m; - std::mutex ready_b_m; - - // buffer - uint16_t* mask_a; - uint16_t* mask_b; - base_t** buffer_b; - - // params - base_t cmp_a; - base_t cmp_b; - bool no_copy; - PMode mode; - - // sync - std::unique_ptr*>> sync_barrier; - std::string barrier_mode = BARRIER_MODE; - - using filterCopy = Filter; - using filterNoCopy = Filter; - using filter = Filter; - using aggregation = Aggregation; - -public: - - - Query_Wrapper(std::shared_future* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a, - base_t* data_b, base_t* dest, uint32_t numa_close, uint32_t numa_far, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag, - PMode mode, uint32_t thread_group, base_t cmp_a = 50, base_t cmp_b = 42, bool no_copy = false) : - ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b), - dest(dest), close_mem(numa_close), far_mem(numa_far), mode(mode), thread_group(thread_group), cmp_a(cmp_a), cmp_b(cmp_b), no_copy(no_copy){ - - chunk_size_w = chunk_size_b / sizeof(base_t); - chunk_cnt = size_b / chunk_size_b; - thread_count_fi = tc_fi; - thread_count_fc = tc_fc; - thread_count_ag = tc_ag; - - ready_flag_a = (volatile uint8_t *) numa_alloc_onnode( - chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), close_mem); - ready_flag_b = (volatile uint8_t *) numa_alloc_onnode( - chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), close_mem); - - mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), close_mem); - mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), close_mem); - - trt = new thread_runtime_timing(4, 20, close_mem); - bt = new barrier_timing(4, 20, close_mem); - reset_barriers(); - - if constexpr(BUFFER_LIMIT==1) { - // TODO size ok like that? - buffer_b = (base_t**) numa_alloc_onnode(size_b * sizeof(base_t*), close_mem); - buffer_b[0] = (base_t*) numa_alloc_onnode(thread_group * chunk_size_b, close_mem); - buffer_b[1] = (base_t*) numa_alloc_onnode(thread_group * chunk_size_b, close_mem); - } else { - buffer_b = (base_t **) numa_alloc_onnode(sizeof(base_t*), close_mem); - base_t* buffer_tmp = (base_t *) numa_alloc_onnode(size_b, close_mem); - *buffer_b = buffer_tmp; - } - }; - - void reset_barriers(){ - if(sync_barrier != nullptr) { - for(auto& barrier : *sync_barrier) { - delete barrier; - } - sync_barrier.reset(); - } - - sync_barrier = std::make_unique*>>(thread_group); - uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc; - uint32_t barrier_count = barrier_mode.compare("global") == 0 ? 1 : thread_group; - uint32_t barrier_thread_count; - - if constexpr(simple){ - barrier_thread_count = (thread_group / barrier_count) * - (mode == PMode::expl_copy ? thread_count_sum : (thread_count_ag + thread_count_fi)); - } else { - barrier_thread_count = (thread_group / barrier_count) * thread_count_sum; - } - for(uint32_t i = 0; i < barrier_count; ++i) { - (*sync_barrier)[i] = new std::barrier(barrier_thread_count); - } - } - - - void clear_buffers () { - std::memset((void*)ready_flag_a, 0x00, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0)); - std::memset((void*)ready_flag_b, 0x00, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0)); - - std::memset(mask_a, 0x00, size_b / sizeof(base_t)); - std::memset(mask_b, 0x00, size_b / sizeof(base_t)); - if constexpr(BUFFER_LIMIT==1) { - std::memset(buffer_b[0], 0x00, thread_group * chunk_size_b); - std::memset(buffer_b[1], 0x00, thread_group * chunk_size_b); - } else { - std::memset(*buffer_b, 0x00, size_b); - } - - trt->reset_accumulator(); - bt->reset_accumulator(); - reset_barriers(); - }; - - ~Query_Wrapper() { - numa_free((void*)ready_flag_a, - chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0)); - numa_free((void*)ready_flag_b, - chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0)); - - numa_free(mask_a, size_b / sizeof(base_t)); - numa_free(mask_b, size_b / sizeof(base_t)); - if constexpr(BUFFER_LIMIT==1) { - numa_free(buffer_b[0], thread_group * chunk_size_b); - numa_free(buffer_b[1], thread_group * chunk_size_b); - numa_free(buffer_b, size_b * sizeof(base_t*)); - } else { - numa_free(*buffer_b, size_b); - } - - delete trt; - for(auto& barrier : *sync_barrier) { - delete barrier; - } - delete bt; - - }; - -private: - static inline base_t* get_sub_chunk_ptr(base_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, - size_t tcnt) { - base_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w; - return chunk_ptr + tid * (chunk_size_w / tcnt); - } - - static inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, - size_t tcnt) { - // 16 integer are addressed with one uint16_t in mask buffer - size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt); - return base_ptr + (offset / 16); - } - - static bool bit_at(volatile uint8_t* bitmap, uint32_t bitpos) { - uint8_t value = bitmap[bitpos / 8]; - switch(bitpos % 8) { - case 0: return value & 0b00000001; - case 1: return value & 0b00000010; - case 2: return value & 0b00000100; - case 3: return value & 0b00001000; - case 4: return value & 0b00010000; - case 5: return value & 0b00100000; - case 6: return value & 0b01000000; - case 7: return value & 0b10000000; - default: return false; - } - } - - static void set_bit_at(volatile uint8_t* bitmap, std::mutex& mutex, uint32_t bitpos) { - mutex.lock(); - switch(bitpos % 8) { - case 0: bitmap[bitpos / 8] |= 0b00000001;break; - case 1: bitmap[bitpos / 8] |= 0b00000010;break; - case 2: bitmap[bitpos / 8] |= 0b00000100;break; - case 3: bitmap[bitpos / 8] |= 0b00001000;break; - case 4: bitmap[bitpos / 8] |= 0b00010000;break; - case 5: bitmap[bitpos / 8] |= 0b00100000;break; - case 6: bitmap[bitpos / 8] |= 0b01000000;break; - case 7: bitmap[bitpos / 8] |= 0b10000000;break; - } - mutex.unlock(); - } - -public: - - static base_t checksum(base_t* a, base_t* b, base_t cmp_a, base_t cmp_b, size_t size_b) { - base_t sum = 0; - for(int i = 0; i < size_b / sizeof(base_t); ++i) { - if(a[i] >= cmp_a && b[i] <= cmp_b) { - sum += b[i]; - } - } - return sum; - } - - static void checkmask(uint16_t* mask, base_t cmp, base_t* data, size_t size_b, bool leq) { - uint32_t cnt = 0; - for(int i = 0; i < size_b / sizeof(base_t); ++i) { - if(leq) { - if(((data[i] <= cmp) != bit_at((uint8_t*)mask, i))) { - ++cnt; - } - } else { - if(((data[i] >= cmp) != bit_at((uint8_t*)mask, i))) { - ++cnt; - } - } - } - } - - static void checkmask_16(uint16_t* mask, base_t cmp, base_t* data, size_t size_b, bool leq) { - for(int i = 0; i < size_b / sizeof(base_t) / 16 ; ++i) { - std::bitset<16> m(mask[i]); - uint16_t ch = 0; - for(int j = 0; j < 16; ++j) { - if(data[i*16 + j] <= cmp) { - ch |= 0x1 << j; - } - } - std::bitset<16> c(ch); - - std::cout << "act " << m << std::endl; - std::cout << "rea " << c << std::endl << std::endl; - } - } - - - void scan_b(size_t gid, size_t gcnt, size_t tid) { - size_t tcnt = thread_count_fc; - assert(chunk_size_w % tcnt == 0); - assert(chunk_size_w % 16 == 0); - assert(chunk_size_w % tcnt * 16 == 0); - - // wait till everyone can start - ready_future->wait(); - - // the lower gids run once more if the chunks are not evenly distributable - uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); - uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; - for(uint32_t i = 0; i < runs; ++i) { - trt->start_timer(1, tid * gcnt + gid); - - // calculate pointers - size_t chunk_id = gid + gcnt * i; - base_t* chunk_ptr = get_sub_chunk_ptr(data_b , chunk_id, chunk_size_w, tid, tcnt); - uint16_t* mask_ptr = get_sub_mask_ptr (mask_b , chunk_id, chunk_size_w, tid, tcnt); - - if constexpr(simple){ - base_t* buffer_ptr; - if constexpr(BUFFER_LIMIT==1) { - buffer_ptr = get_sub_chunk_ptr(buffer_b[i % 2], gid, chunk_size_w, tid, tcnt); - } else { - buffer_ptr = get_sub_chunk_ptr(*buffer_b, chunk_id, chunk_size_w, tid, tcnt); - } - std::memcpy(buffer_ptr, chunk_ptr, chunk_size_b / tcnt); - } else { - if(no_copy) { - filterNoCopy::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_b, chunk_size_b / tcnt); - } else { - base_t* buffer_ptr; - if constexpr(BUFFER_LIMIT==1) { - buffer_ptr = get_sub_chunk_ptr(buffer_b[i % 2], gid, chunk_size_w, tid, tcnt); - } else { - buffer_ptr = get_sub_chunk_ptr(*buffer_b, chunk_id, chunk_size_w, tid, tcnt); - } - filterCopy::apply_same(mask_ptr, buffer_ptr, chunk_ptr, cmp_b, chunk_size_b / tcnt); - } - } - - trt->stop_timer(1, tid * gcnt + gid); - bt->timed_wait(*(*sync_barrier)[barrier_idx], 1, tid * gcnt + gid); - - } - (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); - - } - - void scan_a(size_t gid, size_t gcnt, size_t tid) { - size_t tcnt = thread_count_fi; - assert(chunk_size_w % tcnt == 0); - assert(chunk_size_w % 16 == 0); - assert(chunk_size_w % tcnt * 16 == 0); - - // wait till everyone can start - ready_future->wait(); - - // the lower gids run once more if the chunks are not evenly distributable - uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); - uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; - for(uint32_t i = 0; i < runs; ++i) { - trt->start_timer(0, tid * gcnt + gid); - // calculate pointers - size_t chunk_id = gid + gcnt * i; - base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt); - uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); - - filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt); - - trt->stop_timer(0, tid * gcnt + gid); - bt->timed_wait(*(*sync_barrier)[barrier_idx], 0, tid * gcnt + gid); - } - (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); - } - - void aggr_j(size_t gid, size_t gcnt, size_t tid) { - size_t tcnt = thread_count_ag; - // wait till everyone can start - ready_future->wait(); - - // calculate values - __m512i aggregator = aggregation::OP::zero(); - // the lower gids run once more if the chunks are not evenly distributable - uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); - uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid; - for(uint32_t i = 0; i < runs; ++i) { - - bt->timed_wait(*(*sync_barrier)[barrier_idx], 2, tid * gcnt + gid); - trt->start_timer(2, tid * gcnt + gid); - - // calculate pointers - size_t chunk_id = gid + gcnt * i; - base_t* chunk_ptr; - if(no_copy) { - chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); - } else { - if constexpr(BUFFER_LIMIT==1) { - chunk_ptr = get_sub_chunk_ptr(buffer_b[i%2], gid, chunk_size_w, tid, tcnt); - } else { - chunk_ptr = get_sub_chunk_ptr(*buffer_b, chunk_id, chunk_size_w, tid, tcnt); - } - } - uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); - uint16_t* mask_ptr_b = get_sub_mask_ptr (mask_b, chunk_id, chunk_size_w, tid, tcnt); - - base_t tmp = _mm512_reduce_add_epi64(aggregator); - if constexpr(simple){ - aggregator = aggregation::apply_masked(aggregator, chunk_ptr, mask_ptr_a, chunk_size_b / tcnt); - } else { - aggregator = aggregation::apply_masked(aggregator, chunk_ptr, mask_ptr_a, mask_ptr_b, chunk_size_b / tcnt); - } - trt->stop_timer(2, tid * gcnt + gid); - } - - // so threads with more runs dont wait for finished threads - (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); - - aggregation::happly(dest + (tid * gcnt + gid), aggregator); - } -}; \ No newline at end of file diff --git a/qdp_project/src/utils/execution_modes.h b/qdp_project/src/utils/execution_modes.h index ca04b4f..b494fab 100644 --- a/qdp_project/src/utils/execution_modes.h +++ b/qdp_project/src/utils/execution_modes.h @@ -55,17 +55,24 @@ struct new_mode_manager { };*/ constexpr static int thread_counts[2][4][3] = { + // thread counts for both simple and complex querry + // inner layout: { scan_a, scan_b, aggr_j } + //simple query - //scan_a, scan_b, aggr_j - {{4, 0, 2}, // DRAM_base - {4, 0, 2}, // HBM_base - {4, 0, 2}, // Mixed_base - {1, 4, 1}},// Prefetching + { + {4, 0, 2}, // DRAM_base + {4, 0, 2}, // HBM_base + {4, 0, 2}, // Mixed_base + {4, 4, 4} // Prefetching + }, + //complex query - {{1, 4, 1}, // DRAM_base - {1, 4, 1}, // HBM_base - {1, 4, 1}, // Mixed_base - {1, 4, 1}},// Prefetching + { + {1, 4, 1}, // DRAM_base + {1, 4, 1}, // HBM_base + {1, 4, 1}, // Mixed_base + {4, 4, 4} // Prefetching + } }; static inline NewPMode inc(NewPMode value) { @@ -81,9 +88,17 @@ struct new_mode_manager { }; static std::string string(NewPMode value) { switch(value) { - case DRAM_base: return "DRAM_Baseline"; - case HBM_base: return "HBM_Baseline"; - case Mixed_base: return "DRAM_HBM_Baseline"; - } return "Q-d_Prefetching"; + case DRAM_base: + return "DRAM_Baseline"; + case HBM_base: + return "HBM_Baseline"; + case Mixed_base: + return "DRAM_HBM_Baseline"; + case Prefetch: + return "Q-d_Prefetching"; + default: + std::cerr << "[x] Unknown Processing Mode" << std::endl; + exit(-1); + } }; }; \ No newline at end of file