From af4e3de80c495e2d23c10ea2406cca826510cec6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Tue, 23 Jan 2024 17:19:36 +0100 Subject: [PATCH] enable qdp testing for dram as baseline, pre-allocated hbm as peak and the already existing dsa-hbm-prefetch --- qdp_project/src/benchmark/MAX_benchmark.cpp | 164 +++++++++--------- .../pipelines/MAX_scan_filter_pipe.h | 105 +++-------- 2 files changed, 107 insertions(+), 162 deletions(-) mode change 100644 => 100755 qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h diff --git a/qdp_project/src/benchmark/MAX_benchmark.cpp b/qdp_project/src/benchmark/MAX_benchmark.cpp index 640792e..c015666 100644 --- a/qdp_project/src/benchmark/MAX_benchmark.cpp +++ b/qdp_project/src/benchmark/MAX_benchmark.cpp @@ -21,10 +21,6 @@ #define THREAD_GROUP_MULTIPLIER 2 #endif -#ifndef QUERY -#define QUERY 1 -#endif - #ifndef BARRIER_MODE #define BARRIER_MODE "global" #endif @@ -33,18 +29,6 @@ #define BUFFER_LIMIT 1 #endif -#ifndef PINNING -#define PINNING 1 -#endif - -#ifndef PCM_M -#define PCM_M 0 -#endif - -#if PCM_M == 1 -#include "pcm.h" -#endif - #include "const.h" #include "file_output.h" @@ -55,6 +39,7 @@ #include "cpu_set_utils.h" #include "iterable_range.h" #include "memory_literals.h" + #include "pipelines/MAX_scan_filter_pipe.h" #include "aggregation.h" @@ -62,6 +47,10 @@ using base_t = uint64_t; +static int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { + return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node; +} + base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) { base_t sum = 0; for(int i = 0; i < row_size / sizeof(base_t); ++i) { @@ -78,26 +67,18 @@ base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t* return sum; } +enum class ExecMode { + DramBaseline, + HbmPeak, + HbmPrefetch +}; + int main(int argc, char** argv) { -#if PCM == 1 - pcm::PCM *pcm = pcm::PCM::getInstance(); - //and check for errors - auto error_code = pcm->program(); - if(error_code != pcm::PCM::Success) { - std::cerr << "PCM couldn't start" << std::endl; - std::cerr << "Error code: " << error_code << std::endl; - std::cerr << "Try to execute 'sudo modprobe msr' and execute this program with root privigeges."; - return 1; - } -#endif + constexpr ExecMode mode = ExecMode::DramBaseline; - // set constants constexpr size_t workload_b = 2_GiB; constexpr base_t compare_value_a = 50; constexpr base_t compare_value_b = 42; - constexpr bool simple_query = (QUERY == 1); - constexpr bool cache_a = false; - constexpr bool wait_b = false; constexpr size_t chunk_min = 1_MiB; constexpr size_t chunk_max = 8_MiB + 1; @@ -107,15 +88,52 @@ int main(int argc, char** argv) { // we must restrict the core assignment of these 12 threads to // 6 physical cpu cores on the executing node - constexpr size_t thread_count = 12; + + + /*** alloc data and buffers ************************************************/ + + uint8_t tc_filter; + uint8_t tc_copy; + uint8_t tc_agg; + + if constexpr (mode == ExecMode::HbmPrefetch) { + tc_filter = 4; + tc_copy = 1; + tc_agg = 1; + } + else { + tc_filter = 4; + tc_copy = 0; + tc_agg = 2; + } + + const uint8_t tc_combined = tc_filter + tc_copy + tc_agg; + + base_t* data_a; + base_t* data_b; + base_t* results; + + const int current_cpu = sched_getcpu(); + const int current_node = numa_node_of_cpu(current_cpu); + const int cache_node = CachePlacementPolicy(current_node, current_node, 0); std::ofstream out_file; - const std::string ofname = std::string("results/max_") + - "q-" + (std::string)(simple_query == true ? "simple" : "complex") + - "_bm-" + (std::string) BARRIER_MODE + - "_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") + - "_tc-" + std::to_string(thread_count) + "1MiB-2MiB.csv"; + std::string mode_string; + if constexpr (mode == ExecMode::HbmPrefetch) { + mode_string = "HbmDsaPrefetch"; + } + else if constexpr (mode == ExecMode::HbmPeak) { + mode_string = "HbmAllocPeak"; + } + else if constexpr (mode == ExecMode::DramBaseline) { + mode_string = "DramBaseline"; + } + else { + mode_string = "Unknown"; + } + + const std::string ofname = "qdp-xeonmax-simpleq-" + mode_string + "-tca" + std::to_string(tc_filter) + "-tcb" + std::to_string(tc_copy) + "-tcj" + std::to_string(tc_agg) + "-tmul" + std::to_string(THREAD_GROUP_MULTIPLIER) ; out_file.open(ofname); @@ -123,65 +141,41 @@ int main(int argc, char** argv) { std::cerr << "Failed to open Output File '" << ofname << "'" << std::endl; } - // set benchmark parameter + // set benchmark parameter Linear_Int_Range run("run"); Linear_Int_Range chunk_size("chunk_size"); - print_to_file(out_file, generateHead(run, chunk_size, "mode"), "thread_group", "time", - #ifdef THREAD_TIMINGS - "scan_a", "scan_b", "aggr_j", - #endif - #ifdef BARRIER_TIMINGS - "wait_scan_a", "wait_scan_b", "wait_aggr_j", - #endif - #if PCM == 1 - pcm_value_collector::getHead("scan_a"), - pcm_value_collector::getHead("scan_b"), - pcm_value_collector::getHead("aggr_j"), - #endif - "result"); - - /*** alloc data and buffers ************************************************/ + print_to_file(out_file, generateHead(run, chunk_size), "thread_group", "time", +#ifdef THREAD_TIMINGS + "scan_a", "scan_b", "aggr_j", +#endif +#ifdef BARRIER_TIMINGS + "wait_scan_a", "wait_scan_b", "wait_aggr_j", +#endif + "result"); - base_t* data_a = (base_t*) numa_alloc_local(workload_b); - base_t* data_b = (base_t*) numa_alloc_local(workload_b); - base_t* results = (base_t*) numa_alloc_local(thread_count * sizeof(base_t)); + if constexpr (mode == ExecMode::HbmPeak) { + data_a = (base_t*) numa_alloc_onnode(workload_b, cache_node); + data_b = (base_t*) numa_alloc_onnode(workload_b, cache_node); + results = (base_t*) numa_alloc_onnode(tc_combined * sizeof(base_t), cache_node); + } + else { + data_a = (base_t*) numa_alloc_onnode(workload_b, current_node); + data_b = (base_t*) numa_alloc_onnode(workload_b, current_node); + results = (base_t*) numa_alloc_onnode(tc_combined * sizeof(base_t), current_node); + } fill_mt(data_a, workload_b, 0, 100, 42); fill_mt(data_b, workload_b, 0, 100, 420); - const std::string cfname = std::string("results/max_") + - "q-" + (std::string)(simple_query == true ? "simple" : "complex") + - "_bm-" + (std::string) BARRIER_MODE + - "_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") + - "_tc-" + std::to_string(thread_count) + ".checksum"; - - std::ofstream check_file; - check_file.open(cfname); - - if (check_file.bad()) { - std::cerr << "Failed to open Checksum File '" << cfname << "'" << std::endl; - } - - if constexpr (QUERY == 1) { - //calculate simple checksum if QUERY == 1 -> simple query is applied - check_file << sum_check(compare_value_a, data_a, data_b, workload_b); - } else { - check_file << sum_check_complex(compare_value_a, compare_value_b, data_a, data_b, workload_b); - } - check_file.close(); - for(uint32_t i = 0; i < 15; i++) { std::promise p; std::shared_future ready_future(p.get_future()); - const uint8_t tc_filter = 6; - const uint8_t tc_copy = 2; - const uint8_t tc_agg = 4; - - Query_Wrapper qw ( + Query_Wrapper qw ( &ready_future, workload_b, chunk_size.current, - data_a, data_b, results, tc_filter, tc_copy, tc_agg, 50, 42 + data_a, data_b, results, tc_filter, tc_copy, + tc_agg,compare_value_a, compare_value_b ); qw.ready_future = &ready_future; @@ -225,7 +219,7 @@ int main(int argc, char** argv) { double seconds = (double)(nanos) / nanos_per_second; if (i >= 5) { - print_to_file(out_file, run, chunk_size, "DSA-HBM-Prefetch", THREAD_GROUP_MULTIPLIER, seconds, + print_to_file(out_file, run, chunk_size, THREAD_GROUP_MULTIPLIER, seconds, #ifdef THREAD_TIMINGS qw.trt->summarize_time(0), qw.trt->summarize_time(1), qw.trt->summarize_time(2), #endif @@ -239,5 +233,5 @@ int main(int argc, char** argv) { numa_free(data_a, workload_b); numa_free(data_b, workload_b); - numa_free(results, THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t)); + numa_free(results, THREAD_GROUP_MULTIPLIER * tc_combined * sizeof(base_t)); } \ No newline at end of file diff --git a/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h b/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h old mode 100644 new mode 100755 index 06b7488..a913ac3 --- a/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h +++ b/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h @@ -16,7 +16,7 @@ #include "../../../../offloading-cacher/cache.hpp" -template +template class Query_Wrapper { public: // sync @@ -28,7 +28,6 @@ public: private: static constexpr size_t COPY_POLICY_MIN_SIZE = 64 * 1024 * 1024; - dsacache::Cache cache_; // data @@ -124,7 +123,9 @@ public: mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node); mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node); - cache_.Init(CachePlacementPolicy, CopyMethodPolicy); + if constexpr (caching) { + cache_.Init(CachePlacementPolicy, CopyMethodPolicy); + } size_t measurement_space = std::max(std::max(tc_fi, tc_fc), tc_ag); trt = new thread_runtime_timing(3, measurement_space, current_node); @@ -157,7 +158,9 @@ public: std::memset(mask_a, 0x00, size_b / sizeof(base_t)); std::memset(mask_b, 0x00, size_b / sizeof(base_t)); - cache_.Clear(); + if constexpr (caching) { + cache_.Clear(); + } trt->reset_accumulator(); bt->reset_accumulator(); @@ -248,48 +251,9 @@ public: // calculate pointers size_t chunk_id = gid + gcnt * i; base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); - uint16_t* mask_ptr = get_sub_mask_ptr(mask_b, chunk_id, chunk_size_w, tid, tcnt); - if constexpr(simple){ + if constexpr (caching) { cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); - } else { - const auto data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); - - if constexpr(wait_b) { - // wait on copy to complete - during this time other threads may - // continue with their calculation which leads to little impact - // and we will be faster if the cache is used - - data->WaitOnCompletion(); - - // obtain the data location from the cache entry - - base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); - - // nullptr is still a legal return value for CacheData::GetLocation() - // even after waiting, so this must be checked - - if (data_ptr == nullptr) { - std::cerr << "[!] Cache Miss in ScanB" << std::endl; - data_ptr = chunk_ptr; - } - - filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt); - } - else { - // obtain the data location from the cache entry - - base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); - - // nullptr is still a legal return value for CacheData::GetLocation() - // even after waiting, so this must be checked - - if (data_ptr == nullptr) { - data_ptr = chunk_ptr; - } - - filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt); - } } pvc->stop("scan_b", tid * gcnt + gid); @@ -321,21 +285,7 @@ public: base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt); uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); - if constexpr (cache_a) { - const auto data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); - data->WaitOnCompletion(); - base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); - - if (data_ptr == nullptr) { - std::cerr << "[!] Cache Miss in ScanA" << std::endl; - data_ptr = chunk_ptr; - } - - filter::apply_same(mask_ptr, nullptr, data_ptr, cmp_a, chunk_size_b / tcnt); - } - else { - filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt); - } + filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt); pvc->stop("scan_a", tid * gcnt + gid); trt->stop_timer(0, tid * gcnt + gid); @@ -363,39 +313,40 @@ public: // calculate pointers size_t chunk_id = gid + gcnt * i; base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); + base_t* data_ptr; - // access the cache for the given chunk which will have been accessed in scan_b + if constexpr (caching) { + // access the cache for the given chunk which will have been accessed in scan_b - const auto data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); + const auto data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); - // wait on the caching task to complete, this will give time for other processes - // to make progress here which will therefore not hurt performance + // wait on the caching task to complete, this will give time for other processes + // to make progress here which will therefore not hurt performance - data->WaitOnCompletion(); + data->WaitOnCompletion(); - // after the copy task has finished we obtain the pointer to the cached - // copy of data_b which is then used from now on + // after the copy task has finished we obtain the pointer to the cached + // copy of data_b which is then used from now on - base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); + base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); - // nullptr is still a legal return value for CacheData::GetLocation() - // even after waiting, so this must be checked + // nullptr is still a legal return value for CacheData::GetLocation() + // even after waiting, so this must be checked - if (data_ptr == nullptr) { + if (data_ptr == nullptr) { + data_ptr = chunk_ptr; + std::cerr << "[!] Cache Miss in AggrJ" << std::endl; + } + } + else { data_ptr = chunk_ptr; - std::cerr << "[!] Cache Miss in AggrJ" << std::endl; } uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); - uint16_t* mask_ptr_b = get_sub_mask_ptr (mask_b, chunk_id, chunk_size_w, tid, tcnt); base_t tmp = _mm512_reduce_add_epi64(aggregator); - if constexpr(simple){ - aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, chunk_size_b / tcnt); - } else { - aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, mask_ptr_b, chunk_size_b / tcnt); - } + aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, chunk_size_b / tcnt); pvc->stop("aggr_j", tid * gcnt + gid); trt->stop_timer(2, tid * gcnt + gid);