#include #include #include #include #include #include #include #include #include "const.h" #include "filter.h" #include "aggregation.h" #include "array_utils.h" #include "../../offloading-cacher/cache.hpp" #include "Configuration.hpp" #include "BenchmarkHelpers.cpp" using filter = Filter; using aggregation = Aggregation; dsacache::Cache CACHE_; std::array, GROUP_COUNT> PREFETCHED_CHUNKS_; std::vector*> BARRIERS_; std::shared_future LAUNCH_; uint64_t* DATA_A_; uint64_t* DATA_B_; uint16_t* MASK_A_; uint64_t* DATA_DST_; // if more b than j -> perform b normal, subsplit j // if more j than b -> subsplit b like it is now template void caching(size_t gid, size_t tid) { constexpr size_t VIRT_TID_INCREMENT = TC_CACHING / TC_AGGRJ; constexpr size_t SUBCHUNK_THREAD_RATIO = TC_AGGRJ / (TC_CACHING == 0 ? 1 : TC_CACHING); constexpr bool CACHE_SUBCHUNKING = SUBCHUNK_THREAD_RATIO > 1; constexpr bool CACHE_OVERCHUNKING = VIRT_TID_INCREMENT > 1; if constexpr (PERFORM_CACHING) { if constexpr (CACHE_SUBCHUNKING) { constexpr size_t SUBCHUNK_COUNT = SUBCHUNK_THREAD_RATIO > 0 ? SUBCHUNK_THREAD_RATIO : 1; constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / SUBCHUNK_COUNT; constexpr size_t SUBCHUNK_SIZE_ELEMENTS = CHUNK_SIZE_ELEMENTS / SUBCHUNK_COUNT; constexpr size_t LAST_SUBCHUNK_SIZE_B = SUBCHUNK_SIZE_B + (CHUNK_SIZE_B % SUBCHUNK_COUNT); // TODO: last thread (whether simulated or not) in last group must use last subchunk size for its last subchunk in the last run as size for (size_t i = 0; i < RUN_COUNT; i++) { const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); for (size_t j = 0; j < SUBCHUNK_COUNT; j++) { uint64_t* sub_chunk_ptr = &chunk_ptr[j * SUBCHUNK_SIZE_ELEMENTS]; CACHE_.Access(reinterpret_cast(sub_chunk_ptr), SUBCHUNK_SIZE_B); PREFETCHED_CHUNKS_[gid]++; PREFETCHED_CHUNKS_[gid].notify_one(); } } } else if constexpr (CACHE_OVERCHUNKING) { constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_AGGRJ * GROUP_COUNT)); // TODO: last thread (whether simulated or not) in last group must use last chunk size for its last run as size for (size_t tid_virt = tid; tid_virt < TC_AGGRJ; tid_virt += VIRT_TID_INCREMENT) { for (size_t i = 0; i < RUN_COUNT; i++) { const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid_virt); CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B); PREFETCHED_CHUNKS_[gid]++; PREFETCHED_CHUNKS_[gid].notify_one(); } } } else { constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % ((TC_SCANB > 0 ? TC_SCANB : 1) * GROUP_COUNT)); // TODO: last thread in last group must use last chunk size for its last run as size for (size_t i = 0; i < RUN_COUNT; i++) { const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B); PREFETCHED_CHUNKS_[gid]++; PREFETCHED_CHUNKS_[gid].notify_one(); } } } } void scan_b(size_t gid, size_t tid) { THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1); LAUNCH_.wait(); THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); if constexpr (!PERFORM_CACHING_IN_AGGREGATION) { caching(gid, tid); } THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now(); } void scan_a(size_t gid, size_t tid) { constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_SCANA * GROUP_COUNT)); // TODO: last thread in last group must use last chunk size for its last run as size THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1); LAUNCH_.wait(); THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); for (size_t i = 0; i < RUN_COUNT; i++) { const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_A_, chunk_index, tid); uint16_t* mask_ptr = get_mask(MASK_A_, chunk_index, tid); filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA); BARRIERS_[gid]->arrive_and_wait(); } THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now(); BARRIERS_[gid]->arrive_and_drop(); } void aggr_j(size_t gid, size_t tid) { constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ; constexpr size_t LAST_CHUNK_SIZE_B = SUBCHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_AGGRJ * GROUP_COUNT)); // TODO: last thread in last group must use last chunk size for its last run as size CACHE_HITS_[UniqueIndex(gid,tid)] = 0; THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT); __m512i aggregator = aggregation::OP::zero(); LAUNCH_.wait(); for (size_t i = 0; i < RUN_COUNT; i++) { THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); BARRIERS_[gid]->arrive_and_wait(); while (true) { const int32_t old = PREFETCHED_CHUNKS_[gid].fetch_sub(1); if (old > 0) break; PREFETCHED_CHUNKS_[gid]++; PREFETCHED_CHUNKS_[gid].wait(0); } THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); uint16_t* mask_ptr_a = get_mask(MASK_A_, chunk_index, tid); std::unique_ptr data; uint64_t* data_ptr; if constexpr (PERFORM_CACHING) { data = CACHE_.Access(reinterpret_cast(chunk_ptr), SUBCHUNK_SIZE_B, dsacache::FLAG_ACCESS_WEAK); data->WaitOnCompletion(); data_ptr = reinterpret_cast(data->GetDataLocation()); if (data_ptr == nullptr) { data_ptr = chunk_ptr; } else if (data_ptr == chunk_ptr) { // prevent counting weak-accesses } else { CACHE_HITS_[UniqueIndex(gid,tid)]++; } } else { data_ptr = chunk_ptr; } uint64_t tmp = _mm512_reduce_add_epi64(aggregator); aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B); THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_END] = std::chrono::steady_clock::now(); } BARRIERS_[gid]->arrive_and_drop(); aggregation::happly(&DATA_DST_[UniqueIndex(gid,tid)], aggregator); } int main() { THREAD_TIMING_[AGGRJ_TIMING_INDEX].resize(TC_AGGRJ * GROUP_COUNT); THREAD_TIMING_[SCANA_TIMING_INDEX].resize(TC_SCANA * GROUP_COUNT); THREAD_TIMING_[SCANB_TIMING_INDEX].resize(TC_SCANB * GROUP_COUNT); const int current_cpu = sched_getcpu(); const int current_node = numa_node_of_cpu(current_cpu); if (current_node != MEM_NODE_DRAM) { std::cerr << "Application is not running on pre-programmed Node!" << std::endl; } const std::string ofname = "results/qdp-xeonmax-" + std::string(MODE_STRING) + "-tca" + std::to_string(TC_SCANA) + "-tcb" + std::to_string(TC_SCANB) + "-tcj" + std::to_string(TC_AGGRJ) + "-tmul" + std::to_string(GROUP_COUNT) + "-wl" + std::to_string(WL_SIZE_B) + "-cs" + std::to_string(CHUNK_SIZE_B) + ".csv"; std::ofstream fout(ofname); fout << "run;rt-ns;rt-s;result[0];scana-run;scana-wait;scanb-run;scanb-wait;aggrj-run;aggrj-wait;cache-hr;" << std::endl; DATA_A_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, MEM_NODE_A); DATA_B_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, MEM_NODE_B); MASK_A_ = (uint16_t*) numa_alloc_onnode(WL_SIZE_ELEMENTS, MEM_NODE_HBM); DATA_DST_ = (uint64_t*) numa_alloc_onnode(TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t), MEM_NODE_HBM); if constexpr (PERFORM_CACHING) { // cache will be configured to wait weak by default // it will also not handle page faults which cause delay // it will use the copy and caching policy from BenchmarkHelpers.cpp // which is configured for xeonmax with smart assignment uint64_t cache_flags = 0; cache_flags |= dsacache::FLAG_WAIT_WEAK; CACHE_.SetFlags(cache_flags); CACHE_.Init(CachePlacementPolicy, CopyMethodPolicy); } fill_mt(DATA_A_, WL_SIZE_B, 0, 100, 42); fill_mt(DATA_B_, WL_SIZE_B, 0, 100, 420); for (uint32_t i = 0; i < ITERATION_COUNT + WARMUP_ITERATION_COUNT; i++) { std::promise launch_promise; LAUNCH_ = launch_promise.get_future(); std::vector filter_pool; std::vector copy_pool; std::vector agg_pool; for(uint32_t gid = 0; gid < GROUP_COUNT; ++gid) { BARRIERS_.emplace_back(new std::barrier(TC_SCANA + TC_AGGRJ)); for(uint32_t tid = 0; tid < TC_SCANA; ++tid) { filter_pool.emplace_back(scan_a, gid, tid); } for(uint32_t tid = 0; tid < TC_SCANB; ++tid) { copy_pool.emplace_back(scan_b, gid, tid); } for(uint32_t tid = 0; tid < TC_AGGRJ; ++tid) { agg_pool.emplace_back(aggr_j, gid, tid); } } const auto time_start = std::chrono::steady_clock::now(); launch_promise.set_value(); for(std::thread& t : filter_pool) { t.join(); } for(std::thread& t : copy_pool) { t.join(); } for(std::thread& t : agg_pool) { t.join(); } uint64_t result_actual = 0; Aggregation::apply(&result_actual, DATA_DST_, sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT); const auto time_end = std::chrono::steady_clock::now(); const uint64_t result_expected = sum_check(CMP_A, DATA_A_, DATA_B_, WL_SIZE_B); std::cout << "Result Expected: " << result_expected << ", Result Actual: " << result_actual << std::endl; if (i >= WARMUP_ITERATION_COUNT) { uint64_t scana_run = 0, scana_wait = 0, scanb_run = 0, scanb_wait = 0, aggrj_run = 0, aggrj_wait = 0; process_timings(&scana_run, &scana_wait, &scanb_run, &scanb_wait, &aggrj_run, &aggrj_wait); constexpr double nanos_per_second = ((double)1000) * 1000 * 1000; const uint64_t nanos = std::chrono::duration_cast(time_end - time_start).count(); const double seconds = (double)(nanos) / nanos_per_second; fout << i - WARMUP_ITERATION_COUNT << ";" << nanos << ";" << seconds << ";" << result_actual << ";" << scana_run << ";" << scana_wait << ";" << scanb_run << ";" << scanb_wait << ";" << aggrj_run << ";" << aggrj_wait << ";" << process_cache_hitrate() << ";" << std::endl; } for (std::barrier* b : BARRIERS_) { delete b; } BARRIERS_.clear(); CACHE_.Clear(); } numa_free(DATA_A_, WL_SIZE_B); numa_free(DATA_B_, WL_SIZE_B); numa_free(MASK_A_, WL_SIZE_ELEMENTS); numa_free(DATA_DST_, TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t)); return 0; }