diff --git a/qdp_project/src/Benchmark.cpp b/qdp_project/src/Benchmark.cpp index c55543c..aa0a256 100644 --- a/qdp_project/src/Benchmark.cpp +++ b/qdp_project/src/Benchmark.cpp @@ -17,12 +17,11 @@ #include "Configuration.hpp" #include "BenchmarkHelpers.cpp" -using filter = Filter; -using aggregation = Aggregation; +using filter = FilterLT; +using aggregation = AggregationSUM; dsacache::Cache CACHE_; -std::array, GROUP_COUNT> PREFETCHED_CHUNKS_; std::vector*> BARRIERS_; std::shared_future LAUNCH_; @@ -46,8 +45,6 @@ void caching(size_t gid, size_t tid) { constexpr size_t SUBCHUNK_COUNT = SUBCHUNK_THREAD_RATIO > 0 ? SUBCHUNK_THREAD_RATIO : 1; constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / SUBCHUNK_COUNT; constexpr size_t SUBCHUNK_SIZE_ELEMENTS = CHUNK_SIZE_ELEMENTS / SUBCHUNK_COUNT; - constexpr size_t LAST_SUBCHUNK_SIZE_B = SUBCHUNK_SIZE_B + (CHUNK_SIZE_B % SUBCHUNK_COUNT); - // TODO: last thread (whether simulated or not) in last group must use last subchunk size for its last subchunk in the last run as size for (size_t i = 0; i < RUN_COUNT; i++) { const size_t chunk_index = get_chunk_index(gid, i); @@ -55,42 +52,53 @@ void caching(size_t gid, size_t tid) { for (size_t j = 0; j < SUBCHUNK_COUNT; j++) { uint64_t* sub_chunk_ptr = &chunk_ptr[j * SUBCHUNK_SIZE_ELEMENTS]; - CACHE_.Access(reinterpret_cast(sub_chunk_ptr), SUBCHUNK_SIZE_B); + } + } - PREFETCHED_CHUNKS_[gid]++; - PREFETCHED_CHUNKS_[gid].notify_one(); + constexpr size_t LAST_CHUNK_SIZE_B = SUBCHUNK_SIZE_B + (CHUNK_SIZE_B % SUBCHUNK_COUNT); + if constexpr (LAST_CHUNK_SIZE_B > 0) { + if (gid == GROUP_COUNT - 1 && tid == TC_SCANB - 1) { + const size_t chunk_index = get_chunk_index(gid, RUN_COUNT + 1); + uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); + CACHE_.Access(reinterpret_cast(chunk_ptr), LAST_CHUNK_SIZE_B); } } } else if constexpr (CACHE_OVERCHUNKING) { - constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_AGGRJ * GROUP_COUNT)); - // TODO: last thread (whether simulated or not) in last group must use last chunk size for its last run as size - for (size_t tid_virt = tid; tid_virt < TC_AGGRJ; tid_virt += VIRT_TID_INCREMENT) { for (size_t i = 0; i < RUN_COUNT; i++) { const size_t chunk_index = get_chunk_index(gid, i); - uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid_virt); + uint64_t *chunk_ptr = get_chunk(DATA_B_, chunk_index, tid_virt); - CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B); + CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B); + } + } - PREFETCHED_CHUNKS_[gid]++; - PREFETCHED_CHUNKS_[gid].notify_one(); + constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_AGGRJ * GROUP_COUNT)); + if constexpr (LAST_CHUNK_SIZE_B > 0) { + if (gid == GROUP_COUNT - 1 && tid == TC_SCANB - 1) { + const size_t chunk_index = get_chunk_index(gid, RUN_COUNT + 1); + uint64_t *chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); + CACHE_.Access(reinterpret_cast(chunk_ptr), LAST_CHUNK_SIZE_B); } } } else { - constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % ((TC_SCANB > 0 ? TC_SCANB : 1) * GROUP_COUNT)); - // TODO: last thread in last group must use last chunk size for its last run as size - for (size_t i = 0; i < RUN_COUNT; i++) { const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B); + } - PREFETCHED_CHUNKS_[gid]++; - PREFETCHED_CHUNKS_[gid].notify_one(); + constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % ((TC_SCANB > 0 ? TC_SCANB : 1) * GROUP_COUNT)); + if constexpr (LAST_CHUNK_SIZE_B > 0) { + if (gid == GROUP_COUNT - 1 && tid == TC_SCANB - 1) { + const size_t chunk_index = get_chunk_index(gid, RUN_COUNT + 1); + uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); + CACHE_.Access(reinterpret_cast(chunk_ptr), LAST_CHUNK_SIZE_B); + } } } } @@ -114,42 +122,85 @@ void scan_b(size_t gid, size_t tid) { void scan_a(size_t gid, size_t tid) { constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_SCANA * GROUP_COUNT)); - // TODO: last thread in last group must use last chunk size for its last run as size THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); - THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1); + THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT); + INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX].clear(); + INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX].resize(RUN_COUNT); LAUNCH_.wait(); - THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); - for (size_t i = 0; i < RUN_COUNT; i++) { + THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); + const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_A_, chunk_index, tid); uint16_t* mask_ptr = get_mask(MASK_A_, chunk_index, tid); - filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA); + const auto internal_timing = filter::apply_same(mask_ptr, chunk_ptr); + INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX][i] = internal_timing; + + THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); BARRIERS_[gid]->arrive_and_wait(); + + THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now(); } - THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); - THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now(); + if constexpr (LAST_CHUNK_SIZE_B > 0) { + if (gid == GROUP_COUNT - 1 && tid == TC_SCANB - 1) { + const size_t chunk_index = get_chunk_index(gid, RUN_COUNT + 1); + uint64_t* chunk_ptr = get_chunk(DATA_A_, chunk_index, tid); + uint16_t* mask_ptr = get_mask(MASK_A_, chunk_index, tid); + filter::apply_same(mask_ptr, chunk_ptr); + } + } BARRIERS_[gid]->arrive_and_drop(); } +template +uint64_t AggrFn(uint64_t* chunk_ptr, uint16_t* mask_ptr_a, const uint32_t tid, const uint32_t gid, __m512i& aggregator) { + + std::unique_ptr data; + uint64_t* data_ptr; + + if constexpr (PERFORM_CACHING) { + data = CACHE_.Access(reinterpret_cast(chunk_ptr), size, dsacache::FLAG_ACCESS_WEAK); + data->WaitOnCompletion(); + + data_ptr = reinterpret_cast(data->GetDataLocation()); + + if (data_ptr == nullptr) { + data_ptr = chunk_ptr; + } + else if (data_ptr == chunk_ptr) { + // prevent counting weak-accesses + } + else { + CACHE_HITS_[UniqueIndex(gid,tid)]++; + } + } + else { + data_ptr = chunk_ptr; + } + + uint64_t tmp = _mm512_reduce_add_epi64(aggregator); + return aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a); +} + void aggr_j(size_t gid, size_t tid) { constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ; constexpr size_t LAST_CHUNK_SIZE_B = SUBCHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_AGGRJ * GROUP_COUNT)); - // TODO: last thread in last group must use last chunk size for its last run as size CACHE_HITS_[UniqueIndex(gid,tid)] = 0; THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT); + INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX].clear(); + INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX].resize(RUN_COUNT); - __m512i aggregator = aggregation::OP::zero(); + __m512i aggregator = aggregation::zero(); LAUNCH_.wait(); @@ -158,45 +209,25 @@ void aggr_j(size_t gid, size_t tid) { BARRIERS_[gid]->arrive_and_wait(); - while (true) { - const int32_t old = PREFETCHED_CHUNKS_[gid].fetch_sub(1); - if (old > 0) break; - PREFETCHED_CHUNKS_[gid]++; - PREFETCHED_CHUNKS_[gid].wait(0); - } - THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); uint16_t* mask_ptr_a = get_mask(MASK_A_, chunk_index, tid); - std::unique_ptr data; - uint64_t* data_ptr; + const auto internal_timing = AggrFn(chunk_ptr, mask_ptr_a, tid, gid, aggregator); + INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX][i] = internal_timing; - if constexpr (PERFORM_CACHING) { - data = CACHE_.Access(reinterpret_cast(chunk_ptr), SUBCHUNK_SIZE_B, dsacache::FLAG_ACCESS_WEAK); - data->WaitOnCompletion(); - data_ptr = reinterpret_cast(data->GetDataLocation()); + THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_END] = std::chrono::steady_clock::now(); + } - if (data_ptr == nullptr) { - data_ptr = chunk_ptr; - } - else if (data_ptr == chunk_ptr) { - // prevent counting weak-accesses - } - else { - CACHE_HITS_[UniqueIndex(gid,tid)]++; - } + if constexpr (LAST_CHUNK_SIZE_B > 0) { + if (gid == GROUP_COUNT - 1 && tid == TC_AGGRJ - 1) { + const size_t chunk_index = get_chunk_index(gid, RUN_COUNT + 1); + uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); + uint16_t* mask_ptr_a = get_mask(MASK_A_, chunk_index, tid); + AggrFn(chunk_ptr, mask_ptr_a, tid, gid, aggregator); } - else { - data_ptr = chunk_ptr; - } - - uint64_t tmp = _mm512_reduce_add_epi64(aggregator); - aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B); - - THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_END] = std::chrono::steady_clock::now(); } BARRIERS_[gid]->arrive_and_drop(); @@ -219,7 +250,7 @@ int main() { const std::string ofname = "results/qdp-xeonmax-" + std::string(MODE_STRING) + "-tca" + std::to_string(TC_SCANA) + "-tcb" + std::to_string(TC_SCANB) + "-tcj" + std::to_string(TC_AGGRJ) + "-tmul" + std::to_string(GROUP_COUNT) + "-wl" + std::to_string(WL_SIZE_B) + "-cs" + std::to_string(CHUNK_SIZE_B) + ".csv"; std::ofstream fout(ofname); - fout << "run;rt-ns;rt-s;result[0];scana-run;scana-wait;scanb-run;scanb-wait;aggrj-run;aggrj-wait;cache-hr;" << std::endl; + fout << "run;rt-ns;rt-s;result[0];scana-run;scana-wait;scana-load;scanb-run;scanb-wait;aggrj-run;aggrj-wait;aggrj-load;cache-hr;" << std::endl; DATA_A_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, MEM_NODE_A); DATA_B_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, MEM_NODE_B); @@ -274,7 +305,7 @@ int main() { for(std::thread& t : agg_pool) { t.join(); } uint64_t result_actual = 0; - Aggregation::apply(&result_actual, DATA_DST_, sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT); + aggregation::apply(&result_actual, DATA_DST_); const auto time_end = std::chrono::steady_clock::now(); @@ -283,18 +314,18 @@ int main() { std::cout << "Result Expected: " << result_expected << ", Result Actual: " << result_actual << std::endl; if (i >= WARMUP_ITERATION_COUNT) { - uint64_t scana_run = 0, scana_wait = 0, scanb_run = 0, scanb_wait = 0, aggrj_run = 0, aggrj_wait = 0; - process_timings(&scana_run, &scana_wait, &scanb_run, &scanb_wait, &aggrj_run, &aggrj_wait); + uint64_t scana_run = 0, scana_wait = 0, scanb_run = 0, scanb_wait = 0, aggrj_run = 0, aggrj_wait = 0, scana_load = 0, aggrj_load = 0; + process_timings(&scana_run, &scana_wait, &scanb_run, &scanb_wait, &aggrj_run, &aggrj_wait, &scana_load, &aggrj_load); constexpr double nanos_per_second = ((double)1000) * 1000 * 1000; - const uint64_t nanos = std::chrono::duration_cast(time_end - time_start).count(); - const double seconds = (double)(nanos) / nanos_per_second; + const uint64_t nanos = std::chrono::duration_cast(time_end - time_start).count(); + const double seconds = (double)(nanos) / nanos_per_second; fout << i - WARMUP_ITERATION_COUNT << ";" << nanos << ";" << seconds << ";" << result_actual << ";" - << scana_run << ";" << scana_wait << ";" << scanb_run << ";" << scanb_wait << ";" << aggrj_run << ";" << aggrj_wait << ";" + << scana_run << ";" << scana_wait << ";" << scana_load << ";" << scanb_run << ";" << scanb_wait << ";" << aggrj_run << ";" << aggrj_wait << ";" << aggrj_load << ";" << process_cache_hitrate() << ";" << std::endl; } diff --git a/qdp_project/src/Configuration.hpp b/qdp_project/src/Configuration.hpp index b247252..0e3588f 100644 --- a/qdp_project/src/Configuration.hpp +++ b/qdp_project/src/Configuration.hpp @@ -3,7 +3,7 @@ #include "utils/memory_literals.h" #ifndef MODE_SET_BY_CMAKE -#define MODE_SIMPLE_PREFETCH +#define MODE_PREFETCH #endif constexpr size_t WL_SIZE_B = 4_GiB; @@ -12,11 +12,11 @@ constexpr uint32_t ITERATION_COUNT = 5; constexpr int MEM_NODE_HBM = 8; constexpr int MEM_NODE_DRAM = 0; -#ifdef MODE_SIMPLE_PREFETCH -constexpr uint32_t GROUP_COUNT = 24; +#ifdef MODE_PREFETCH +constexpr uint32_t GROUP_COUNT = 12; constexpr size_t CHUNK_SIZE_B = 16_MiB; constexpr uint32_t TC_SCANA = 1; -constexpr uint32_t TC_SCANB = 0; +constexpr uint32_t TC_SCANB = 1; constexpr uint32_t TC_AGGRJ = 1; constexpr bool PERFORM_CACHING = true; constexpr bool PERFORM_CACHING_IN_AGGREGATION = true; @@ -26,7 +26,7 @@ constexpr char MODE_STRING[] = "prefetch"; #endif #ifdef MODE_DRAM constexpr size_t CHUNK_SIZE_B = 2_MiB; -constexpr uint32_t GROUP_COUNT = 16; +constexpr uint32_t GROUP_COUNT = 12; constexpr uint32_t TC_SCANA = 2; constexpr uint32_t TC_SCANB = 0; constexpr uint32_t TC_AGGRJ = 1; @@ -38,7 +38,7 @@ constexpr char MODE_STRING[] = "dram"; #endif #ifdef MODE_HBM constexpr size_t CHUNK_SIZE_B = 2_MiB; -constexpr uint32_t GROUP_COUNT = 16; +constexpr uint32_t GROUP_COUNT = 12; constexpr uint32_t TC_SCANA = 2; constexpr uint32_t TC_SCANB = 0; constexpr uint32_t TC_AGGRJ = 1; @@ -50,7 +50,6 @@ constexpr char MODE_STRING[] = "hbm"; #endif constexpr uint64_t CMP_A = 50; -constexpr uint32_t TC_COMBINED = TC_SCANA + TC_SCANB + TC_AGGRJ; constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t); constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B; constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t); diff --git a/qdp_project/src/utils/BenchmarkHelpers.cpp b/qdp_project/src/utils/BenchmarkHelpers.cpp index 4d000c7..551ce81 100644 --- a/qdp_project/src/utils/BenchmarkHelpers.cpp +++ b/qdp_project/src/utils/BenchmarkHelpers.cpp @@ -11,6 +11,7 @@ constexpr size_t TIME_STAMP_BEGIN = 0; constexpr size_t TIME_STAMP_WAIT = 1; constexpr size_t TIME_STAMP_END = 2; +std::array, 3> INTERNAL_TIMING_VECTOR_LOAD_; std::array>>, 3> THREAD_TIMING_; std::array CACHE_HITS_; @@ -27,14 +28,6 @@ uint64_t sum_check(uint64_t compare_value, uint64_t* row_A, uint64_t* row_B, siz return sum; } -uint64_t sum_check_complex(uint64_t compare_value_a, uint64_t compare_value_b, uint64_t* row_A, uint64_t* row_B, size_t row_size) { - uint64_t sum = 0; - for(int i = 0; i < row_size / sizeof(uint64_t); ++i) { - sum += (row_A[i] < compare_value_a && row_B[i] < compare_value_b) * row_B[i]; - } - return sum; -} - int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) { return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node; } @@ -102,7 +95,8 @@ double process_cache_hitrate() { void process_timings( uint64_t* scana_run, uint64_t* scana_wait, uint64_t* scanb_run, uint64_t* scanb_wait, - uint64_t* aggrj_run, uint64_t* aggrj_wait + uint64_t* aggrj_run, uint64_t* aggrj_wait, + uint64_t* scana_load, uint64_t* aggrj_load ) { { uint64_t scana_rc = 0; @@ -152,4 +146,10 @@ void process_timings( *aggrj_wait /= aggrj_rc; } } + { + for (const auto e : INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX]) *scana_load += e; + for (const auto e : INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX]) *aggrj_load += e; + *scana_load /= INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX].size(); + *aggrj_load /= INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX].size(); + } } \ No newline at end of file diff --git a/qdp_project/src/utils/aggregation.h b/qdp_project/src/utils/aggregation.h index 119ab14..8dbacdc 100644 --- a/qdp_project/src/utils/aggregation.h +++ b/qdp_project/src/utils/aggregation.h @@ -8,309 +8,102 @@ #include "vector_loader.h" #include "const.h" - -/** - * @brief Super Class for all Aggregation functions. Guards Sub Classes from having an non integral base type. - * - * @tparam T - */ -template -class AggFunction { - static_assert(std::is_integral::value, "The base type of an AggFunction must be an integral"); -}; - -/** - * @brief Template class that implements methods used for Summation. It wraps the corresponding vector intrinsics - * - * @tparam T base datatype for the implemented methods - */ -template -class Sum : public AggFunction { +template +class AggregationSUM { public: static inline __m512i simd_agg(__m512i aggregator, __m512i vector) { - if constexpr (sizeof(T) == 4) return _mm512_add_epi32(aggregator, vector); - else if constexpr (sizeof(T) == 8) return _mm512_add_epi64(aggregator, vector); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers"); + if constexpr (sizeof(base_t) == 4) return _mm512_add_epi32(aggregator, vector); + else if constexpr (sizeof(base_t) == 8) return _mm512_add_epi64(aggregator, vector); + static_assert(sizeof(base_t) == 4 || sizeof(base_t) == 8, "Sum is only implemented for 32 and 64 wide integers"); }; static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) { - if constexpr (sizeof(T) == 4) return _mm512_mask_add_epi32(aggregator, mask, aggregator, vector); - else if constexpr (sizeof(T) == 8) return _mm512_mask_add_epi64(aggregator, mask, aggregator, vector); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers"); + if constexpr (sizeof(base_t) == 4) return _mm512_mask_add_epi32(aggregator, mask, aggregator, vector); + else if constexpr (sizeof(base_t) == 8) return _mm512_mask_add_epi64(aggregator, mask, aggregator, vector); + static_assert(sizeof(base_t) == 4 || sizeof(base_t) == 8, "Sum is only implemented for 32 and 64 wide integers"); }; - static inline T simd_reduce(__m512i vector) { - if constexpr (sizeof(T) == 4) return _mm512_reduce_add_epi32(vector); - else if constexpr (sizeof(T) == 8) return _mm512_reduce_add_epi64(vector); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers"); + static inline base_t simd_reduce(__m512i vector) { + if constexpr (sizeof(base_t) == 4) return _mm512_reduce_add_epi32(vector); + else if constexpr (sizeof(base_t) == 8) return _mm512_reduce_add_epi64(vector); + static_assert(sizeof(base_t) == 4 || sizeof(base_t) == 8, "Sum is only implemented for 32 and 64 wide integers"); }; - static inline T scalar_agg(T aggregator, T scalar) { return aggregator + scalar; }; + static inline base_t scalar_agg(base_t aggregator, base_t scalar) { return aggregator + scalar; }; static inline __m512i zero() { return _mm512_set1_epi32(0); }; -}; - - -/** - * @brief Template class that implements methods used for Maximum determination. It wraps the corresponding vector intrinsics - * - * @tparam T base datatype for the implemented methods - * - */ -template -class Max : public AggFunction { -public: - static inline __m512i simd_agg(__m512i aggregator, __m512i vector) { - if constexpr (sizeof(T) == 4) return _mm512_max_epi32(aggregator, vector); - else if constexpr (sizeof(T) == 8) return _mm512_max_epi64(aggregator, vector); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers"); - } - static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) { - if constexpr (sizeof(T) == 4) return _mm512_mask_max_epi32(aggregator, mask, aggregator, vector); - else if constexpr (sizeof(T) == 8) return _mm512_mask_max_epi64(aggregator, mask, aggregator, vector); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers"); - } - - static inline T simd_reduce(__m512i vector) { - if constexpr (sizeof(T) == 4) return _mm512_reduce_max_epi32(vector); - else if constexpr (sizeof(T) == 8) return _mm512_reduce_max_epi64(vector); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers"); - } - - static inline T scalar_agg(T aggregator, T scalar) { return std::max(aggregator, scalar); } - - static inline __m512i zero() { - if constexpr (sizeof(T) == 4) { - if constexpr (std::is_signed::value) return _mm512_set1_epi32(0xFFFFFFFF); - else return _mm512_set1_epi32(0x0); - } - else if constexpr (sizeof(T) == 8) { - if constexpr (std::is_signed::value) return _mm512_set1_epi32(0xFFFFFFFFFFFFFFFF); - else return _mm512_set1_epi32(0x0); - } - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers"); - } -}; + static_assert(std::is_same_v, "Enforce unsigned 64 bit ints."); -/** - * @brief Template class that implements methods used for Minimum determination. It wraps the corresponding vector intrinsics - * - * @tparam T base datatype for the implemented methods - * - */ -template -class Min : public AggFunction { -public: - static inline __m512i simd_agg(__m512i aggregator, __m512i vector) { - if constexpr (sizeof(T) == 4) return _mm512_min_epi32(aggregator, vector); - else if constexpr (sizeof(T) == 8) return _mm512_min_epi64(aggregator, vector); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers"); - } + /* + * returns time in ns spent loading vector + */ + template + static bool apply(base_t *dest, base_t *src) { + constexpr size_t lanes = VECTOR_SIZE(); + constexpr size_t value_count = CHUNK_SIZE_B / sizeof(base_t); + constexpr size_t iterations = value_count - lanes + 1; - static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) { - if constexpr (sizeof(T) == 4) return _mm512_mask_min_epi32(aggregator, mask, aggregator, vector); - else if constexpr (sizeof(T) == 8) return _mm512_mask_min_epi64(aggregator, mask, aggregator, vector); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers"); - } + static_assert(value_count >= lanes); - static inline T simd_reduce(__m512i vector) { - if constexpr (sizeof(T) == 4) return _mm512_reduce_min_epi32(vector); - else if constexpr (sizeof(T) == 8) return _mm512_reduce_min_epi64(vector); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers"); - } + __m512i agg_vec = zero(); + size_t i = 0; - static inline T scalar_agg(T aggregator, T scalar) { return std::min(aggregator, scalar); } + for(size_t i = 0; i < iterations; i += lanes) { + __m512i vec = Vector_Loader::load(src + i); - static inline __m512i zero() { - if constexpr (sizeof(T) == 4) { - if constexpr (std::is_signed::value) return _mm512_set1_epi32(0xEFFFFFFF); - else return _mm512_set1_epi32(0xFFFFFFFF); + agg_vec = simd_agg(agg_vec, vec); } - else if constexpr (sizeof(T) == 8) { - if constexpr (std::is_signed::value) return _mm512_set1_epi32(0xEFFFFFFFFFFFFFFF); - else return _mm512_set1_epi32(0xFFFFFFFFFFFFFFFF); - } - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers"); - } -}; -/** - * @brief Template Class that implements an aggregation operation. - * - * @tparam base_t Base type of the values for aggregation - * @tparam func - * @tparam load_mode - */ -template class func, load_mode load_mode> -class Aggregation{ -public: - - static_assert(std::is_same_v, "Enforce unsigned 64 bit ints."); - - using OP = func; - /** - * @brief Calculates the memory maximal needed to store a chunk's processing result. - * - * @param chunk_size_b Size of the chunk in byte - * @return size_t Size of the chunk's processing result in byte - */ - static size_t result_bytes_per_chunk(size_t chunk_size_b) { - // aggregation returns a single value of type base_t - return sizeof(base_t); - } - - /** - * @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes. - * The result is written to main memory. - * - * @param dest Pointer to the start of the result chunk - * @param src Pointer to the start of the source chunk - * @param chunk_size_b Size of the source chunk in bytes - * @return true When the aggregation is done - * @return false Never - */ - static bool apply (base_t *dest, base_t *src, size_t chunk_size_b) { - constexpr size_t lanes = VECTOR_SIZE(); - size_t value_count = chunk_size_b / sizeof(base_t); - __m512i agg_vec = func::zero(); - size_t i = 0; - base_t result = 0; - // stop before! running out of space - if(value_count >= lanes) {// keep in mind value_count is unsigned so if it becomes negative, it doesn't. - for(; i <= value_count - lanes; i += lanes) { - __m512i vec = Vector_Loader::load(src + i); - - agg_vec = func::simd_agg(agg_vec, vec); - } - result = func::simd_reduce(agg_vec); - } + base_t result = simd_reduce(agg_vec); for(; i < value_count; ++i) { - result = func::scalar_agg(result, src[i]); + result = scalar_agg(result, src[i]); } + *dest = result; return true; } - /** - * @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes, - * while applying the bit string stored in *masks*. The result is written to main memory. - * - * @param dest Pointer to the start of the result chunk - * @param src Pointer to the start of the source chunk - * @param masks Pointer the bitstring that marks the values that should be aggregated - * @param chunk_size_b Size of the source chunk in bytes - * @return true When the aggregation is done - * @return false Never - */ - static bool apply_masked (base_t *dest, base_t *src, uint16_t* msks, size_t chunk_size_b) { + /* + * returns time in ns spent loading vector + */ + template + static uint64_t apply_masked(__m512i& dest, base_t *src, uint16_t* msks) { constexpr size_t lanes = VECTOR_SIZE(); - uint8_t* masks = (uint8_t *)msks; - size_t value_count = chunk_size_b / sizeof(base_t); - __m512i agg_vec = func::zero(); - size_t i = 0; - // stop before! running out of space - if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't. - for(; i <= value_count - lanes; i += lanes) { - __m512i vec = Vector_Loader::load(src + i); - __mmask8 mask = _mm512_int2mask(masks[i / lanes]); + constexpr size_t value_count = CHUNK_SIZE_B / sizeof(base_t); + constexpr size_t iterations = value_count - lanes + 1; - agg_vec = func::simd_mask_agg(agg_vec, mask, vec); - } - *dest = func::simd_reduce(agg_vec); + static_assert(value_count >= lanes); - for(; i < value_count; ++i) { - uint8_t mask = masks[i / lanes]; - if(mask & (0b1 << (i % lanes))){ - *dest = func::scalar_agg(*dest, src[i]); - } - } + uint64_t load_time = 0; - return true; - } + auto* masks = reinterpret_cast(msks); + + for(size_t i = 0; i < iterations; i += lanes) { + auto ts_load = std::chrono::steady_clock::now(); - /** - * @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes, - * while applying the bit string stored in *masks*. The values are agggegated in the register *dest* without - * clearing beforehand. - * - * NOTE! This function only works correctly if the the chunk_size_b is a multiple of 64 byte - * - * @param dest Vector register used for storing and passing the result around - * @param src Pointer to the start of the source chunk - * @param masks Pointer the bitstring that marks the values that should be aggregated - * @param chunk_size_b Size of the source chunk in bytes - * @return __m512i Vector register holding the aggregation result - */ - static __m512i apply_masked (__m512i dest, base_t *src, uint16_t* msks, size_t chunk_size_b) { - constexpr size_t lanes = VECTOR_SIZE(); - uint8_t* masks = (uint8_t*) msks; - //TODO this function does not work if value_count % lanes != 0 - size_t value_count = chunk_size_b / sizeof(base_t); - size_t i = 0; - // stop before! running out of space - if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't. - for(; i <= value_count - lanes; i += lanes) { __m512i vec = Vector_Loader::load(src + i); - __mmask8 mask = _mm512_int2mask(masks[i / lanes]); - dest = func::simd_agg(dest, mask, vec); - } - return dest; - } + auto te_load = std::chrono::steady_clock::now(); + load_time += std::chrono::duration_cast(te_load - ts_load).count(); - /** - * @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes, - * while applying two bit strings stored in *masks_0* and *masks_1*. The values are aggregated in the register - * *dest* without clearing beforehand. - * - * NOTE! This function only works correctly if the the chunk_size_b is a multiple of 64 byte - * - * @param dest Vector register used for storing and passing the result around - * @param src Pointer to the start of the source chunk - * @param masks_0 Pointer the bitstring that marks the values that should be aggregated - * @param masks_1 Pointer the bitstring that marks the values that should be aggregated - * @param chunk_size_b Size of the source chunk in bytes - * @return __m512i Vector register holding the aggregation result - */ - static __m512i apply_masked (__m512i dest, base_t *src, uint16_t* msks0, uint16_t* msks1, size_t chunk_size_b) { - constexpr size_t lanes = VECTOR_SIZE(); - uint8_t* masks0 = (uint8_t*) msks0; - uint8_t* masks1 = (uint8_t*) msks1; - //TODO this function does not work if value_count % lanes != 0 - size_t value_count = chunk_size_b / sizeof(base_t); - size_t i = 0; - // stop before! running out of space - if(value_count >= lanes) // keep in mind value_count is unsigned so if it becomes negative, it doesn't. - for(; i <= value_count - lanes; i += lanes) { - __m512i vec = Vector_Loader::load(src + i); - __mmask8 mask0 = _mm512_int2mask(masks0[i / lanes]); - __mmask8 mask1 = _mm512_int2mask(masks1[i / lanes]); + __mmask8 mask = _mm512_int2mask(masks[i / lanes]); - mask0 = _kand_mask8(mask0, mask1); - dest = func::simd_agg(dest, mask0, vec); + dest = simd_agg(dest, mask, vec); } - return dest; + return load_time; } - /** - * @brief Reduces a vector by applying the aggregation function horizontally. - * - * @param dest Result of the horizontal aggregation - * @param src Vector as source for the horizontal aggregation - * @return true When the operation is done - * @return false Never - */ static bool happly (base_t *dest, __m512i src) { - *dest = func::simd_reduce(src); + *dest = simd_reduce(src); return true; } static __m512i get_zero() { - return func::zero(); + return zero(); } }; \ No newline at end of file diff --git a/qdp_project/src/utils/filter.h b/qdp_project/src/utils/filter.h index a58a761..b0c7231 100644 --- a/qdp_project/src/utils/filter.h +++ b/qdp_project/src/utils/filter.h @@ -7,164 +7,64 @@ #include "vector_loader.h" -/** - * @brief Super Class for all Aggregation functions. Guards Sub Classes from having an non integral base type. - * - * @tparam T An integral datatype - */ -template -class FilterFunction { - static_assert(std::is_integral::value, "The base type of a FilterFunction must be an integeral."); -}; - -/** - * @brief Template class that implements methods used for finding values that are not equal to the compare value. - * It wraps the corresponding vector intrinsics. - * - * @tparam T base datatype for the implemented methods - */ -template -class NEQ : public FilterFunction { +template +class FilterLT { public: static inline __mmask16 simd_filter(__m512i vector, __m512i comp) { - if constexpr (sizeof(T) == 4) return _mm512_cmpneq_epi32_mask(vector, comp); - else if constexpr (sizeof(T) == 8) return _mm512_cmpneq_epi64_mask(vector, comp); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "NEQ is only implemented for 32 and 64 wide integers"); + if constexpr (sizeof(base_t) == 4) return _mm512_cmplt_epi32_mask(vector, comp); + else if constexpr (sizeof(base_t) == 8) return _mm512_cmplt_epi64_mask(vector, comp); + static_assert(sizeof(base_t) == 4 || sizeof(base_t) == 8, "LT is only implemented for 32 and 64 wide integers"); } - static inline bool scalar_filter(T scalar, T comp) { return scalar != comp; } -}; + static inline bool scalar_filter(base_t scalar, base_t comp) { return scalar < comp; } -template -class EQ : public FilterFunction { -public: - static inline __mmask16 simd_filter(__m512i vector, __m512i comp) { - if constexpr (sizeof(T) == 4) return _mm512_cmpeq_epi32_mask(vector, comp); - else if constexpr (sizeof(T) == 8) return _mm512_cmpeq_epi64_mask(vector, comp); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "EQ is only implemented for 32 and 64 wide integers"); - } - - static inline bool scalar_filter(T scalar, T comp) { return scalar == comp; } -}; + /* + * returns time in ns spent loading vector + */ + template + static uint64_t apply_same(uint16_t *dst, base_t *src) { + constexpr uint32_t lanes = VECTOR_SIZE(); + constexpr size_t value_count = CHUNK_SIZE_B / sizeof(base_t); + constexpr size_t iterations = value_count - lanes; -template -class LT : public FilterFunction { -public: - static inline __mmask16 simd_filter(__m512i vector, __m512i comp) { - if constexpr (sizeof(T) == 4) return _mm512_cmplt_epi32_mask(vector, comp); - else if constexpr (sizeof(T) == 8) return _mm512_cmplt_epi64_mask(vector, comp); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "LT is only implemented for 32 and 64 wide integers"); - } + static_assert(value_count > lanes); - static inline bool scalar_filter(T scalar, T comp) { return scalar < comp; } -}; + uint64_t load_time = 0; -template -class LEQ : public FilterFunction { -public: - static inline __mmask16 simd_filter(__m512i vector, __m512i comp) { - if constexpr (sizeof(T) == 4) return _mm512_cmple_epi32_mask(vector, comp); - else if constexpr (sizeof(T) == 8) return _mm512_cmple_epi64_mask(vector, comp); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "LEQ is only implemented for 32 and 64 wide integers"); - } + uint8_t* dest = (uint8_t*) dst; - static inline bool scalar_filter(T scalar, T comp) { return scalar <= comp; } -}; + __m512i cmp_vec = _mm512_set1_epi64(CMP_VALUE); -template -class GT : public FilterFunction { -public: - static inline __mmask16 simd_filter(__m512i vector, __m512i comp) { - if constexpr (sizeof(T) == 4) return _mm512_cmpgt_epi32_mask(vector, comp); - else if constexpr (sizeof(T) == 8) return _mm512_cmpgt_epi64_mask(vector, comp); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "GT is only implemented for 32 and 64 wide integers"); - } + size_t i = 0; - static inline bool scalar_filter(T scalar, T comp) { return scalar > comp; } -}; + for(; i < iterations; i += lanes) { + auto ts_load = std::chrono::steady_clock::now(); -template -class GEQ : public FilterFunction { -public: - static inline __mmask16 simd_filter(__m512i vector, __m512i comp) { - if constexpr (sizeof(T) == 4) return _mm512_cmpge_epi32_mask(vector, comp); - else if constexpr (sizeof(T) == 8) return _mm512_cmpge_epi64_mask(vector, comp); - static_assert(sizeof(T) == 4 || sizeof(T) == 8, "GEQ is only implemented for 32 and 64 wide integers"); - } + __m512i vec = Vector_Loader::load(src + i); - static inline bool scalar_filter(T scalar, T comp) { return scalar >= comp; } -}; + auto te_load = std::chrono::steady_clock::now(); + load_time += std::chrono::duration_cast(te_load - ts_load).count(); + __mmask8 bitmask = simd_filter(vec, cmp_vec); -template class func, load_mode load_mode, bool copy> -class Filter { -public: + uint8_t int_mask = (uint8_t) _mm512_mask2int(bitmask); - static_assert(std::is_same_v, "We enforce 64 bit integer"); - - /** - * @brief Calculates the memory maximal needed to store a chunk's processing result. - * - * @param chunk_size_b Size of the chunk in byte - * @return size_t Size of the chunk's processing result in byte - */ - static size_t result_bytes_per_chunk(size_t chunk_size_b) { - // + 7 to enshure that we have enougth bytes -> / 8 -> rounds down - // if we had 17 / 8 = 2 but (17 + 7) / 8 = 3 - // if we hat 16 / 8 = 2 is right, as well as, 16 + 7 / 8 = 2 - return (chunk_size_b / sizeof(base_t) + 7) / 8; - } - - - /** - * @brief Applies the filter function on the chunk starting at *src* and spanning *chunk_size_b* bytes, while comparing with he same value every time. - * The resulting bit string is written to main memory. - * - * @param dest Pointer to the start of the result chunk - * @param src Pointer to the start of the source chunk - * @param cmp_value Comparision value to compare the values from source to - * @param chunk_size_b Size of the source chunk in bytes - * @return true When the filter operation is done - * @return false Never - */ - // we only need this impl. yet, as all filter are at the end of a pipeline - static bool apply_same (uint16_t *dst, base_t *buffer, base_t *src, base_t cmp_value, size_t chunk_size_b) { - constexpr uint32_t lanes = VECTOR_SIZE(); - uint8_t* dest = (uint8_t*) dst; - size_t value_count = chunk_size_b / sizeof(base_t); - __m512i cmp_vec = _mm512_set1_epi64(cmp_value); - size_t i = 0; - // this weird implementetion is neccessary, see analogous impl in aggregation for explaination - if(value_count > lanes) { - for(; (i < value_count - lanes); i += lanes) { - __m512i vec = Vector_Loader::load(src + i); - __mmask8 bitmask = func::simd_filter(vec, cmp_vec); - - uint8_t int_mask = (uint8_t) _mm512_mask2int(bitmask); - - dest[i / lanes] = int_mask; - if constexpr(copy){ - Vector_Loader::store(buffer + i, vec); - } - } + dest[i / lanes] = int_mask; } auto dest_pos = i / lanes; uint8_t int_mask = 0; + for(; i < value_count; ++i) { base_t val = src[i]; - uint8_t result = func::scalar_filter(val, cmp_value); + uint8_t result = scalar_filter(val, CMP_VALUE); int_mask |= (result << (i % lanes)); - - if constexpr(copy){ - buffer[i] = val; - } } + dest[dest_pos] = int_mask; - return true; + return load_time; } - }; \ No newline at end of file