diff --git a/qdp_project/src/Benchmark.cpp b/qdp_project/src/Benchmark.cpp index d11e403..cd435a3 100644 --- a/qdp_project/src/Benchmark.cpp +++ b/qdp_project/src/Benchmark.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include "const.h" #include "filter.h" @@ -21,6 +22,7 @@ using aggregation = Aggregation; dsacache::Cache CACHE_; +std::array, GROUP_COUNT> PREFETCHED_CHUNKS_; std::vector*> BARRIERS_; std::shared_future LAUNCH_; @@ -29,6 +31,63 @@ uint64_t* DATA_B_; uint16_t* MASK_A_; uint64_t* DATA_DST_; +// if more b than j -> perform b normal, subsplit j +// if more j than b -> subsplit b like it is now + +template +void caching(size_t gid, size_t tid) { + constexpr size_t VIRT_TID_INCREMENT = TC_CACHING / TC_AGGRJ; + constexpr size_t SUBCHUNK_THREAD_RATIO = TC_AGGRJ / (TC_CACHING == 0 ? 1 : TC_CACHING); + constexpr bool CACHE_SUBCHUNKING = SUBCHUNK_THREAD_RATIO > 1; + constexpr bool CACHE_OVERCHUNKING = VIRT_TID_INCREMENT > 1; + + if constexpr (PERFORM_CACHING) { + if constexpr (CACHE_SUBCHUNKING) { + constexpr size_t SUBCHUNK_COUNT = SUBCHUNK_THREAD_RATIO > 0 ? SUBCHUNK_THREAD_RATIO : 1; + constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / SUBCHUNK_COUNT; + constexpr size_t SUBCHUNK_SIZE_ELEMENTS = CHUNK_SIZE_ELEMENTS / SUBCHUNK_COUNT; + + for (size_t i = 0; i < RUN_COUNT; i++) { + const size_t chunk_index = get_chunk_index(gid, i); + uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); + + for (size_t j = 0; j < SUBCHUNK_COUNT; j++) { + uint64_t* sub_chunk_ptr = &chunk_ptr[j * SUBCHUNK_SIZE_ELEMENTS]; + + CACHE_.Access(reinterpret_cast(sub_chunk_ptr), SUBCHUNK_SIZE_B); + + PREFETCHED_CHUNKS_[gid]++; + PREFETCHED_CHUNKS_[gid].notify_one(); + } + } + } + else if constexpr (CACHE_OVERCHUNKING) { + for (size_t tid_virt = tid; tid_virt < TC_AGGRJ; tid_virt += VIRT_TID_INCREMENT) { + for (size_t i = 0; i < RUN_COUNT; i++) { + const size_t chunk_index = get_chunk_index(gid, i); + uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid_virt); + + CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B); + + PREFETCHED_CHUNKS_[gid]++; + PREFETCHED_CHUNKS_[gid].notify_one(); + } + } + } + else { + for (size_t i = 0; i < RUN_COUNT; i++) { + const size_t chunk_index = get_chunk_index(gid, i); + uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); + + CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B); + + PREFETCHED_CHUNKS_[gid]++; + PREFETCHED_CHUNKS_[gid].notify_one(); + } + } + } +} + void scan_b(size_t gid, size_t tid) { THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1); @@ -37,16 +96,7 @@ void scan_b(size_t gid, size_t tid) { THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); - if constexpr (PERFORM_CACHING) { - static_assert(TC_AGGRJ == TC_SCANB); - - for (size_t i = 0; i < RUN_COUNT; i++) { - const size_t chunk_index = get_chunk_index(gid, i); - uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); - - CACHE_.Access(reinterpret_cast(chunk_ptr), SUBCHUNK_SIZE_B_AGGRJ); - } - } + caching(gid, tid); THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now(); @@ -65,33 +115,36 @@ void scan_a(size_t gid, size_t tid) { uint64_t* chunk_ptr = get_chunk(DATA_A_, chunk_index, tid); uint16_t* mask_ptr = get_mask(MASK_A_, chunk_index, tid); - filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, SUBCHUNK_SIZE_B_SCANA); + filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA); + + BARRIERS_[gid]->arrive_and_wait(); } THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); + THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now(); BARRIERS_[gid]->arrive_and_drop(); - - THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now(); } void aggr_j(size_t gid, size_t tid) { + constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ; + CACHE_HITS_[UniqueIndex(gid,tid)] = 0; THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); - THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1); + THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT); __m512i aggregator = aggregation::OP::zero(); LAUNCH_.wait(); - THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); +BARRIERS_[gid]->arrive_and_wait(); + for (size_t i = 0; i < RUN_COUNT; i++) { + THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); - BARRIERS_[gid]->arrive_and_drop(); - THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); + THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); - for (size_t i = 0; i < RUN_COUNT; i++) { const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); uint16_t* mask_ptr_a = get_mask(MASK_A_, chunk_index, tid); @@ -100,9 +153,8 @@ void aggr_j(size_t gid, size_t tid) { uint64_t* data_ptr; if constexpr (PERFORM_CACHING) { - data = CACHE_.Access(reinterpret_cast(chunk_ptr), SUBCHUNK_SIZE_B_AGGRJ, dsacache::FLAG_ACCESS_WEAK); + data = CACHE_.Access(reinterpret_cast(chunk_ptr), SUBCHUNK_SIZE_B, dsacache::FLAG_ACCESS_WEAK); data->WaitOnCompletion(); - data_ptr = reinterpret_cast(data->GetDataLocation()); if (data_ptr == nullptr) { @@ -120,13 +172,14 @@ void aggr_j(size_t gid, size_t tid) { } uint64_t tmp = _mm512_reduce_add_epi64(aggregator); - aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B_AGGRJ); + aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B); + THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_END] = std::chrono::steady_clock::now(); } - aggregation::happly(&DATA_DST_[UniqueIndex(gid,tid)], aggregator); + BARRIERS_[gid]->arrive_and_drop(); - THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now(); + aggregation::happly(&DATA_DST_[UniqueIndex(gid,tid)], aggregator); } int main() { @@ -159,7 +212,6 @@ int main() { // which is configured for xeonmax with smart assignment uint64_t cache_flags = 0; cache_flags |= dsacache::FLAG_WAIT_WEAK; - cache_flags |= dsacache::FLAG_HANDLE_PF; CACHE_.SetFlags(cache_flags); CACHE_.Init(CachePlacementPolicy, CopyMethodPolicy); } @@ -200,7 +252,7 @@ int main() { for(std::thread& t : agg_pool) { t.join(); } uint64_t result_actual = 0; - aggregation::apply(&result_actual, DATA_DST_, sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT); + Aggregation::apply(&result_actual, DATA_DST_, sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT); const auto time_end = std::chrono::steady_clock::now(); @@ -213,8 +265,8 @@ int main() { process_timings(&scana_run, &scana_wait, &scanb_run, &scanb_wait, &aggrj_run, &aggrj_wait); constexpr double nanos_per_second = ((double)1000) * 1000 * 1000; - const uint64_t nanos = std::chrono::duration_cast(time_end - time_start).count(); - const double seconds = (double)(nanos) / nanos_per_second; + const uint64_t nanos = std::chrono::duration_cast(time_end - time_start).count(); + const double seconds = (double)(nanos) / nanos_per_second; fout << i - WARMUP_ITERATION_COUNT << ";" diff --git a/qdp_project/src/Configuration.hpp b/qdp_project/src/Configuration.hpp index ca0891e..c297c25 100644 --- a/qdp_project/src/Configuration.hpp +++ b/qdp_project/src/Configuration.hpp @@ -3,7 +3,7 @@ #include "utils/memory_literals.h" #ifndef MODE_SET_BY_CMAKE -#define MODE_PREFETCH +#define MODE_SIMPLE_PREFETCH #endif constexpr size_t WL_SIZE_B = 4_GiB; @@ -13,20 +13,20 @@ constexpr int MEM_NODE_HBM = 8; constexpr int MEM_NODE_DRAM = 0; #ifdef MODE_PREFETCH -constexpr uint32_t GROUP_COUNT = 4; -constexpr size_t CHUNK_SIZE_B = 8_MiB; -constexpr uint32_t TC_SCANA = 4; -constexpr uint32_t TC_SCANB = 1; +constexpr uint32_t GROUP_COUNT = 16; +constexpr size_t CHUNK_SIZE_B = 16_MiB; +constexpr uint32_t TC_SCANA = 1; +constexpr uint32_t TC_SCANB = 0; constexpr uint32_t TC_AGGRJ = 1; constexpr bool PERFORM_CACHING = true; constexpr int MEM_NODE_A = 0; -constexpr int MEM_NODE_B = 0; +constexpr int MEM_NODE_B = 1; constexpr char MODE_STRING[] = "prefetch"; #endif #ifdef MODE_DRAM constexpr size_t CHUNK_SIZE_B = 2_MiB; -constexpr uint32_t GROUP_COUNT = 4; -constexpr uint32_t TC_SCANA = 4; +constexpr uint32_t GROUP_COUNT = 16; +constexpr uint32_t TC_SCANA = 2; constexpr uint32_t TC_SCANB = 0; constexpr uint32_t TC_AGGRJ = 1; constexpr bool PERFORM_CACHING = false; @@ -36,8 +36,8 @@ constexpr char MODE_STRING[] = "dram"; #endif #ifdef MODE_HBM constexpr size_t CHUNK_SIZE_B = 2_MiB; -constexpr uint32_t GROUP_COUNT = 4; -constexpr uint32_t TC_SCANA = 4; +constexpr uint32_t GROUP_COUNT = 16; +constexpr uint32_t TC_SCANA = 2; constexpr uint32_t TC_SCANB = 0; constexpr uint32_t TC_AGGRJ = 1; constexpr bool PERFORM_CACHING = false; @@ -47,19 +47,12 @@ constexpr char MODE_STRING[] = "hbm"; #endif constexpr uint64_t CMP_A = 50; +constexpr uint32_t TC_COMBINED = TC_SCANA + TC_SCANB + TC_AGGRJ; constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t); constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B; constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t); constexpr size_t RUN_COUNT = CHUNK_COUNT / GROUP_COUNT; -constexpr size_t SUBCHUNK_SIZE_B_SCANA = CHUNK_SIZE_B / TC_SCANA; -constexpr size_t SUBCHUNK_SIZE_B_AGGRJ = CHUNK_SIZE_B / TC_AGGRJ; - static_assert(RUN_COUNT > 0); static_assert(WL_SIZE_B % 16 == 0); static_assert(CHUNK_SIZE_B % 16 == 0); -static_assert(CHUNK_SIZE_B % GROUP_COUNT == 0); -static_assert(CHUNK_SIZE_B % TC_AGGRJ == 0); -static_assert(CHUNK_SIZE_B % TC_SCANB == 0); -static_assert(CHUNK_SIZE_B % TC_SCANA == 0); - diff --git a/qdp_project/src/utils/aggregation.h b/qdp_project/src/utils/aggregation.h index 648be48..119ab14 100644 --- a/qdp_project/src/utils/aggregation.h +++ b/qdp_project/src/utils/aggregation.h @@ -211,7 +211,6 @@ public: size_t value_count = chunk_size_b / sizeof(base_t); __m512i agg_vec = func::zero(); size_t i = 0; - // stop before! running out of space if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't. for(; i <= value_count - lanes; i += lanes) { @@ -251,7 +250,6 @@ public: //TODO this function does not work if value_count % lanes != 0 size_t value_count = chunk_size_b / sizeof(base_t); size_t i = 0; - // stop before! running out of space if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't. for(; i <= value_count - lanes; i += lanes) { @@ -315,4 +313,4 @@ public: static __m512i get_zero() { return func::zero(); } -}; +}; \ No newline at end of file diff --git a/qdp_project/src/utils/filter.h b/qdp_project/src/utils/filter.h index 4ee64d1..a58a761 100644 --- a/qdp_project/src/utils/filter.h +++ b/qdp_project/src/utils/filter.h @@ -134,12 +134,12 @@ public: size_t value_count = chunk_size_b / sizeof(base_t); __m512i cmp_vec = _mm512_set1_epi64(cmp_value); size_t i = 0; - // this weird implementetion is neccessary, see analogous impl in aggregation for explaination if(value_count > lanes) { for(; (i < value_count - lanes); i += lanes) { __m512i vec = Vector_Loader::load(src + i); __mmask8 bitmask = func::simd_filter(vec, cmp_vec); + uint8_t int_mask = (uint8_t) _mm512_mask2int(bitmask); dest[i / lanes] = int_mask; @@ -167,4 +167,4 @@ public: return true; } -}; +}; \ No newline at end of file