From 2e0f637363cbbdc7ff0e0b1691135e2327fc7f5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Tue, 23 Jan 2024 20:06:21 +0100 Subject: [PATCH] add optimal dsa caching mode --- qdp_project/src/benchmark/MAX_benchmark.cpp | 46 +++++++++---- .../pipelines/MAX_scan_filter_pipe.h | 64 +++++++++++++++---- 2 files changed, 84 insertions(+), 26 deletions(-) diff --git a/qdp_project/src/benchmark/MAX_benchmark.cpp b/qdp_project/src/benchmark/MAX_benchmark.cpp index 57e6c18..c421449 100644 --- a/qdp_project/src/benchmark/MAX_benchmark.cpp +++ b/qdp_project/src/benchmark/MAX_benchmark.cpp @@ -70,17 +70,18 @@ base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t* enum class ExecMode { DramBaseline, HbmPeak, - HbmPrefetch + HbmPrefetch, + HbmPrefetchOpt }; int main(int argc, char** argv) { - constexpr ExecMode mode = ExecMode::HbmPrefetch; + constexpr ExecMode mode = ExecMode::HbmPrefetchOpt; constexpr size_t workload_b = 4_GiB; constexpr base_t compare_value_a = 50; constexpr base_t compare_value_b = 42; - constexpr size_t chunk_size = 256_MiB; + constexpr size_t chunk_size = 64_MiB; // thread count is 12 here but as the default measurement uses 6 // we must restrict the core assignment of these 12 threads to @@ -99,6 +100,11 @@ int main(int argc, char** argv) { tc_copy = 8; tc_agg = 8; } + else if constexpr (mode == ExecMode::HbmPrefetchOpt) { + tc_filter = 0; + tc_copy = 0; + tc_agg = 1; + } else { tc_filter = 8; tc_copy = 0; @@ -127,6 +133,9 @@ int main(int argc, char** argv) { else if constexpr (mode == ExecMode::DramBaseline) { mode_string = "DramBaseline"; } + else if constexpr (mode == ExecMode::HbmPrefetchOpt) { + mode_string = "HbmDsaPrefetchOpt"; + } else { mode_string = "Unknown"; } @@ -166,7 +175,7 @@ int main(int argc, char** argv) { std::promise p; std::shared_future ready_future(p.get_future()); - Query_Wrapper qw ( + Query_Wrapper qw ( &ready_future, workload_b, chunk_size, data_a, data_b, results, tc_filter, tc_copy, tc_agg,compare_value_a, compare_value_b @@ -178,23 +187,32 @@ int main(int argc, char** argv) { auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.scan_a(gid, gcnt, tid); }; auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.scan_b(gid, gcnt, tid); }; auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.aggr_j(gid, gcnt, tid); }; + auto combined_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.combined(gid, gcnt, tid); }; std::vector filter_pool; std::vector copy_pool; std::vector agg_pool; + std::vector combined_pool; for(uint32_t gid = 0; gid < THREAD_GROUP_MULTIPLIER; ++gid) { - for(uint32_t tid = 0; tid < tc_filter; ++tid) { - filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); + if constexpr (mode == ExecMode::HbmPrefetchOpt) { + for(uint32_t tid = 0; tid < tc_agg; ++tid) { + agg_pool.emplace_back(combined_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); + } } - - // if tc_copy == 0 this loop is skipped - for(uint32_t tid = 0; tid < tc_copy; ++tid) { - copy_pool.emplace_back(copy_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); - } - - for(uint32_t tid = 0; tid < tc_agg; ++tid) { - agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); + else { + for(uint32_t tid = 0; tid < tc_filter; ++tid) { + filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); + } + + // if tc_copy == 0 this loop is skipped + for(uint32_t tid = 0; tid < tc_copy; ++tid) { + copy_pool.emplace_back(copy_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); + } + + for(uint32_t tid = 0; tid < tc_agg; ++tid) { + agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid); + } } } diff --git a/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h b/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h index 4658c67..567beb0 100755 --- a/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h +++ b/qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h @@ -246,26 +246,69 @@ public: for(uint32_t i = 0; i < runs; ++i) { trt->start_timer(1, tid * gcnt + gid); - pvc->start("scan_b", tid * gcnt + gid); - + // calculate pointers size_t chunk_id = gid + gcnt * i; base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); if constexpr (caching) { const auto data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); - data->WaitOnCompletion(); + data->WaitOnCompletion(); } - pvc->stop("scan_b", tid * gcnt + gid); trt->stop_timer(1, tid * gcnt + gid); - - bt->timed_wait(*(*sync_barrier)[barrier_idx], 1, tid * gcnt + gid); + } + + (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); + } + + void combined(size_t gid, size_t gcnt, size_t tid) { + size_t tcnt = thread_count_ag; + assert(chunk_size_w % tcnt == 0); + assert(chunk_size_w % 16 == 0); + assert(chunk_size_w % tcnt * 16 == 0); - if constexpr (caching) (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); + __m512i aggregator = aggregation::OP::zero(); + uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid); + + // wait till everyone can start + ready_future->wait(); + + for(uint32_t i = 0; i < runs; ++i) { + // calculate pointers and initialize local storage + + size_t chunk_id = gid + gcnt * i; + base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); + std::unique_ptr data; + + // perform "prefetch_b" + + { + data = cache_.Access(reinterpret_cast(chunk_ptr), chunk_size_b / tcnt); + } + + // perform operation "scan_a" + + { + uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); + filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt); + } + + // wait on caching task + + data->WaitOnCompletion(); + base_t* data_ptr = reinterpret_cast(data->GetDataLocation()); + + // continue with operation "aggr_j" + + { + uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); + base_t tmp = _mm512_reduce_add_epi64(aggregator); + aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, chunk_size_b / tcnt); + } } - if constexpr (!caching) (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); + aggregation::happly(dest + (tid * gcnt + gid), aggregator); } void scan_a(size_t gid, size_t gcnt, size_t tid) { @@ -294,11 +337,8 @@ public: pvc->stop("scan_a", tid * gcnt + gid); trt->stop_timer(0, tid * gcnt + gid); bt->timed_wait(*(*sync_barrier)[barrier_idx], 0, tid * gcnt + gid); - - if constexpr (caching) (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); } - - if constexpr (!caching) (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); + (*(*sync_barrier)[barrier_idx]).arrive_and_drop(); } void aggr_j(size_t gid, size_t gcnt, size_t tid) {