From d1cc3e3b0c68c9666272520f77d51bd8570bd574 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constantin=20F=C3=BCrst?= Date: Wed, 31 Jan 2024 23:59:59 +0100 Subject: [PATCH] modification to qdp benchmark, returns to per-chunk barrier wait, uses userspace semaphore for one-way barrier from scan_b to aggr_j as scan_b should submit asap but aggr_j should wait on submission from scan_b, contains TODO for modifying code to support chunkcount not divisible by 2 --- qdp_project/src/Benchmark.cpp | 131 ++++++++++++++++-------------- qdp_project/src/Configuration.hpp | 67 +++++---------- 2 files changed, 89 insertions(+), 109 deletions(-) diff --git a/qdp_project/src/Benchmark.cpp b/qdp_project/src/Benchmark.cpp index de2de99..c55543c 100644 --- a/qdp_project/src/Benchmark.cpp +++ b/qdp_project/src/Benchmark.cpp @@ -5,12 +5,12 @@ #include #include #include +#include #include "const.h" #include "filter.h" #include "aggregation.h" #include "array_utils.h" -#include "memory_literals.h" #include "../../offloading-cacher/cache.hpp" @@ -22,36 +22,32 @@ using aggregation = Aggregation; dsacache::Cache CACHE_; +std::array, GROUP_COUNT> PREFETCHED_CHUNKS_; std::vector*> BARRIERS_; std::shared_future LAUNCH_; uint64_t* DATA_A_; uint64_t* DATA_B_; uint16_t* MASK_A_; -uint16_t* MASK_B_; uint64_t* DATA_DST_; // if more b than j -> perform b normal, subsplit j // if more j than b -> subsplit b like it is now -void scan_b(size_t gid, size_t tid) { - constexpr size_t VIRT_TID_INCREMENT = TC_SCANB / TC_AGGRJ; - constexpr size_t SUBCHUNK_THREAD_RATIO = TC_AGGRJ / (TC_SCANB == 0 ? 1 : TC_SCANB); +template +void caching(size_t gid, size_t tid) { + constexpr size_t VIRT_TID_INCREMENT = TC_CACHING / TC_AGGRJ; + constexpr size_t SUBCHUNK_THREAD_RATIO = TC_AGGRJ / (TC_CACHING == 0 ? 1 : TC_CACHING); constexpr bool CACHE_SUBCHUNKING = SUBCHUNK_THREAD_RATIO > 1; constexpr bool CACHE_OVERCHUNKING = VIRT_TID_INCREMENT > 1; - THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); - THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1); - - LAUNCH_.wait(); - - THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); - if constexpr (PERFORM_CACHING) { if constexpr (CACHE_SUBCHUNKING) { constexpr size_t SUBCHUNK_COUNT = SUBCHUNK_THREAD_RATIO > 0 ? SUBCHUNK_THREAD_RATIO : 1; constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / SUBCHUNK_COUNT; constexpr size_t SUBCHUNK_SIZE_ELEMENTS = CHUNK_SIZE_ELEMENTS / SUBCHUNK_COUNT; + constexpr size_t LAST_SUBCHUNK_SIZE_B = SUBCHUNK_SIZE_B + (CHUNK_SIZE_B % SUBCHUNK_COUNT); + // TODO: last thread (whether simulated or not) in last group must use last subchunk size for its last subchunk in the last run as size for (size_t i = 0; i < RUN_COUNT; i++) { const size_t chunk_index = get_chunk_index(gid, i); @@ -59,46 +55,67 @@ void scan_b(size_t gid, size_t tid) { for (size_t j = 0; j < SUBCHUNK_COUNT; j++) { uint64_t* sub_chunk_ptr = &chunk_ptr[j * SUBCHUNK_SIZE_ELEMENTS]; + CACHE_.Access(reinterpret_cast(sub_chunk_ptr), SUBCHUNK_SIZE_B); + + PREFETCHED_CHUNKS_[gid]++; + PREFETCHED_CHUNKS_[gid].notify_one(); } } } else if constexpr (CACHE_OVERCHUNKING) { + constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_AGGRJ * GROUP_COUNT)); + // TODO: last thread (whether simulated or not) in last group must use last chunk size for its last run as size + for (size_t tid_virt = tid; tid_virt < TC_AGGRJ; tid_virt += VIRT_TID_INCREMENT) { for (size_t i = 0; i < RUN_COUNT; i++) { const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid_virt); + CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B); + + PREFETCHED_CHUNKS_[gid]++; + PREFETCHED_CHUNKS_[gid].notify_one(); } } } else { + constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % ((TC_SCANB > 0 ? TC_SCANB : 1) * GROUP_COUNT)); + // TODO: last thread in last group must use last chunk size for its last run as size + for (size_t i = 0; i < RUN_COUNT; i++) { const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); + CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B); + + PREFETCHED_CHUNKS_[gid]++; + PREFETCHED_CHUNKS_[gid].notify_one(); } } - } +} - if constexpr (COMPLEX_QUERY) { - for (size_t i = 0; i < RUN_COUNT; i++) { - const size_t chunk_index = get_chunk_index(gid, i); - uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); - uint16_t* mask_ptr = get_mask(MASK_B_, chunk_index, tid); +void scan_b(size_t gid, size_t tid) { + THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); + THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1); - filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_B, CHUNK_SIZE_B / TC_SCANB); - } + LAUNCH_.wait(); + + THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); + + if constexpr (!PERFORM_CACHING_IN_AGGREGATION) { + caching(gid, tid); } THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now(); - - BARRIERS_[gid]->arrive_and_drop(); } void scan_a(size_t gid, size_t tid) { + constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_SCANA * GROUP_COUNT)); + // TODO: last thread in last group must use last chunk size for its last run as size + THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1); @@ -112,6 +129,8 @@ void scan_a(size_t gid, size_t tid) { uint16_t* mask_ptr = get_mask(MASK_A_, chunk_index, tid); filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA); + + BARRIERS_[gid]->arrive_and_wait(); } THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); @@ -121,32 +140,42 @@ void scan_a(size_t gid, size_t tid) { } void aggr_j(size_t gid, size_t tid) { + constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ; + constexpr size_t LAST_CHUNK_SIZE_B = SUBCHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_AGGRJ * GROUP_COUNT)); + // TODO: last thread in last group must use last chunk size for its last run as size + CACHE_HITS_[UniqueIndex(gid,tid)] = 0; THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); - THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1); + THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT); __m512i aggregator = aggregation::OP::zero(); LAUNCH_.wait(); - THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); + for (size_t i = 0; i < RUN_COUNT; i++) { + THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); + + BARRIERS_[gid]->arrive_and_wait(); - BARRIERS_[gid]->arrive_and_wait(); + while (true) { + const int32_t old = PREFETCHED_CHUNKS_[gid].fetch_sub(1); + if (old > 0) break; + PREFETCHED_CHUNKS_[gid]++; + PREFETCHED_CHUNKS_[gid].wait(0); + } - THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); + THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); - for (size_t i = 0; i < RUN_COUNT; i++) { const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); uint16_t* mask_ptr_a = get_mask(MASK_A_, chunk_index, tid); - uint16_t* mask_ptr_b = get_mask(MASK_B_, chunk_index, tid); std::unique_ptr data; uint64_t* data_ptr; if constexpr (PERFORM_CACHING) { - data = CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B / TC_AGGRJ, dsacache::FLAG_ACCESS_WEAK); + data = CACHE_.Access(reinterpret_cast(chunk_ptr), SUBCHUNK_SIZE_B, dsacache::FLAG_ACCESS_WEAK); data->WaitOnCompletion(); data_ptr = reinterpret_cast(data->GetDataLocation()); @@ -165,17 +194,11 @@ void aggr_j(size_t gid, size_t tid) { } uint64_t tmp = _mm512_reduce_add_epi64(aggregator); + aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B); - if constexpr (COMPLEX_QUERY) { - aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, mask_ptr_b, CHUNK_SIZE_B / TC_AGGRJ); - } - else { - aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, CHUNK_SIZE_B / TC_AGGRJ); - } + THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_END] = std::chrono::steady_clock::now(); } - THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now(); - BARRIERS_[gid]->arrive_and_drop(); aggregation::happly(&DATA_DST_[UniqueIndex(gid,tid)], aggregator); @@ -188,35 +211,21 @@ int main() { const int current_cpu = sched_getcpu(); const int current_node = numa_node_of_cpu(current_cpu); - const int cache_node = CachePlacementPolicy(current_node, current_node, 0); + + if (current_node != MEM_NODE_DRAM) { + std::cerr << "Application is not running on pre-programmed Node!" << std::endl; + } const std::string ofname = "results/qdp-xeonmax-" + std::string(MODE_STRING) + "-tca" + std::to_string(TC_SCANA) + "-tcb" + std::to_string(TC_SCANB) + "-tcj" + std::to_string(TC_AGGRJ) + "-tmul" + std::to_string(GROUP_COUNT) + "-wl" + std::to_string(WL_SIZE_B) + "-cs" + std::to_string(CHUNK_SIZE_B) + ".csv"; std::ofstream fout(ofname); fout << "run;rt-ns;rt-s;result[0];scana-run;scana-wait;scanb-run;scanb-wait;aggrj-run;aggrj-wait;cache-hr;" << std::endl; - // a is allways allocated in DRAM - - DATA_A_ = (uint64_t*) numa_alloc_local(WL_SIZE_B); - - // resulting masks for a and b and total result will allways reside in HBM - - MASK_A_ = (uint16_t*) numa_alloc_onnode(WL_SIZE_ELEMENTS, cache_node); - MASK_B_ = (uint16_t*) numa_alloc_onnode(WL_SIZE_ELEMENTS, cache_node); - DATA_DST_ = (uint64_t*) numa_alloc_onnode(TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t), cache_node); - - // location of b depends on configuration - - if constexpr (STORE_B_IN_HBM) { - DATA_B_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, cache_node); - } - else { - DATA_B_ = (uint64_t*) numa_alloc_local(WL_SIZE_B); - } + DATA_A_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, MEM_NODE_A); + DATA_B_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, MEM_NODE_B); - DATA_A_ = (uint64_t*) numa_alloc_local(WL_SIZE_B); - MASK_A_ = (uint16_t*) numa_alloc_local(WL_SIZE_ELEMENTS); - DATA_DST_ = (uint64_t*) numa_alloc_local(TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t)); + MASK_A_ = (uint16_t*) numa_alloc_onnode(WL_SIZE_ELEMENTS, MEM_NODE_HBM); + DATA_DST_ = (uint64_t*) numa_alloc_onnode(TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t), MEM_NODE_HBM); if constexpr (PERFORM_CACHING) { // cache will be configured to wait weak by default @@ -241,7 +250,7 @@ int main() { std::vector agg_pool; for(uint32_t gid = 0; gid < GROUP_COUNT; ++gid) { - BARRIERS_.emplace_back(new std::barrier(TC_COMBINED)); + BARRIERS_.emplace_back(new std::barrier(TC_SCANA + TC_AGGRJ)); for(uint32_t tid = 0; tid < TC_SCANA; ++tid) { filter_pool.emplace_back(scan_a, gid, tid); @@ -269,7 +278,7 @@ int main() { const auto time_end = std::chrono::steady_clock::now(); - const uint64_t result_expected = COMPLEX_QUERY ? sum_check_complex(CMP_A, CMP_B, DATA_A_, DATA_B_, WL_SIZE_B) : sum_check(CMP_A, DATA_A_, DATA_B_, WL_SIZE_B); + const uint64_t result_expected = sum_check(CMP_A, DATA_A_, DATA_B_, WL_SIZE_B); std::cout << "Result Expected: " << result_expected << ", Result Actual: " << result_actual << std::endl; diff --git a/qdp_project/src/Configuration.hpp b/qdp_project/src/Configuration.hpp index 7462cf1..b247252 100644 --- a/qdp_project/src/Configuration.hpp +++ b/qdp_project/src/Configuration.hpp @@ -9,76 +9,47 @@ constexpr size_t WL_SIZE_B = 4_GiB; constexpr uint32_t WARMUP_ITERATION_COUNT = 5; constexpr uint32_t ITERATION_COUNT = 5; +constexpr int MEM_NODE_HBM = 8; +constexpr int MEM_NODE_DRAM = 0; #ifdef MODE_SIMPLE_PREFETCH -constexpr uint32_t GROUP_COUNT = 8; -constexpr size_t CHUNK_SIZE_B = 64_MiB; +constexpr uint32_t GROUP_COUNT = 24; +constexpr size_t CHUNK_SIZE_B = 16_MiB; constexpr uint32_t TC_SCANA = 1; -constexpr uint32_t TC_SCANB = 1; +constexpr uint32_t TC_SCANB = 0; constexpr uint32_t TC_AGGRJ = 1; constexpr bool PERFORM_CACHING = true; -constexpr bool STORE_B_IN_HBM = false; -constexpr char MODE_STRING[] = "simple-prefetch"; -constexpr bool COMPLEX_QUERY = false; +constexpr bool PERFORM_CACHING_IN_AGGREGATION = true; +constexpr int MEM_NODE_A = 1; +constexpr int MEM_NODE_B = 2; +constexpr char MODE_STRING[] = "prefetch"; #endif -#ifdef MODE_SIMPLE_DRAM +#ifdef MODE_DRAM constexpr size_t CHUNK_SIZE_B = 2_MiB; constexpr uint32_t GROUP_COUNT = 16; constexpr uint32_t TC_SCANA = 2; constexpr uint32_t TC_SCANB = 0; constexpr uint32_t TC_AGGRJ = 1; constexpr bool PERFORM_CACHING = false; -constexpr bool STORE_B_IN_HBM = false; -constexpr char MODE_STRING[] = "simple-dram"; -constexpr bool COMPLEX_QUERY = false; +constexpr bool PERFORM_CACHING_IN_AGGREGATION = false; +constexpr int MEM_NODE_A = 0; +constexpr int MEM_NODE_B = 0; +constexpr char MODE_STRING[] = "dram"; #endif -#ifdef MODE_SIMPLE_HBM +#ifdef MODE_HBM constexpr size_t CHUNK_SIZE_B = 2_MiB; constexpr uint32_t GROUP_COUNT = 16; constexpr uint32_t TC_SCANA = 2; constexpr uint32_t TC_SCANB = 0; constexpr uint32_t TC_AGGRJ = 1; constexpr bool PERFORM_CACHING = false; -constexpr bool STORE_B_IN_HBM = true; -constexpr char MODE_STRING[] = "simple-hbm"; -constexpr bool COMPLEX_QUERY = false; -#endif -#ifdef MODE_COMPLEX_PREFETCH -constexpr uint32_t GROUP_COUNT = 8; -constexpr size_t CHUNK_SIZE_B = 8_MiB; -constexpr uint32_t TC_SCANA = 4; -constexpr uint32_t TC_SCANB = 1; -constexpr uint32_t TC_AGGRJ = 4; -constexpr bool PERFORM_CACHING = true; -constexpr bool STORE_B_IN_HBM = false; -constexpr char MODE_STRING[] = "complex-prefetch"; -constexpr bool COMPLEX_QUERY = true; -#endif -#ifdef MODE_COMPLEX_DRAM -constexpr size_t CHUNK_SIZE_B = 2_MiB; -constexpr uint32_t GROUP_COUNT = 16; -constexpr uint32_t TC_SCANA = 1; -constexpr uint32_t TC_SCANB = 1; -constexpr uint32_t TC_AGGRJ = 2; -constexpr bool PERFORM_CACHING = false; -constexpr bool STORE_B_IN_HBM = false; -constexpr char MODE_STRING[] = "complex-dram"; -constexpr bool COMPLEX_QUERY = true; -#endif -#ifdef MODE_COMPLEX_HBM -constexpr size_t CHUNK_SIZE_B = 2_MiB; -constexpr uint32_t GROUP_COUNT = 16; -constexpr uint32_t TC_SCANA = 1; -constexpr uint32_t TC_SCANB = 1; -constexpr uint32_t TC_AGGRJ = 2; -constexpr bool PERFORM_CACHING = false; -constexpr bool STORE_B_IN_HBM = true; -constexpr char MODE_STRING[] = "complex-hbm"; -constexpr bool COMPLEX_QUERY = true; +constexpr bool PERFORM_CACHING_IN_AGGREGATION = false; +constexpr int MEM_NODE_A = 0; +constexpr int MEM_NODE_B = 8; +constexpr char MODE_STRING[] = "hbm"; #endif constexpr uint64_t CMP_A = 50; -constexpr uint64_t CMP_B = 42; constexpr uint32_t TC_COMBINED = TC_SCANA + TC_SCANB + TC_AGGRJ; constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t); constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B;