diff --git a/qdp_project/src/Benchmark.cpp b/qdp_project/src/Benchmark.cpp index 7c2f76b..586eb63 100644 --- a/qdp_project/src/Benchmark.cpp +++ b/qdp_project/src/Benchmark.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include "const.h" #include "filter.h" @@ -29,11 +30,11 @@ constexpr uint32_t WARMUP_ITERATION_COUNT = 5; constexpr uint32_t ITERATION_COUNT = 5; #ifdef MODE_PREFETCH -constexpr size_t CHUNK_SIZE_B = 64_MiB; -constexpr uint32_t GROUP_COUNT = 32; +constexpr size_t CHUNK_SIZE_B = 256_MiB; +constexpr uint32_t GROUP_COUNT = 8; constexpr uint32_t TC_SCANA = 1; -constexpr uint32_t TC_SCANB = 1; -constexpr uint32_t TC_AGGRJ = 1; +constexpr uint32_t TC_SCANB = 2; +constexpr uint32_t TC_AGGRJ = 2; constexpr bool PERFORM_CACHING = true; constexpr bool DATA_IN_HBM = false; constexpr char MODE_STRING[] = "prefetch"; @@ -82,6 +83,16 @@ using aggregation = Aggregation; dsacache::Cache CACHE_; +constexpr size_t SCANA_TIMING_INDEX = 0; +constexpr size_t SCANB_TIMING_INDEX = 1; +constexpr size_t AGGRJ_TIMING_INDEX = 2; +constexpr size_t TIME_STAMP_BEGIN = 0; +constexpr size_t TIME_STAMP_WAIT = 1; +constexpr size_t TIME_STAMP_END = 2; + +// THREAD_TIMING_[TYPE][TID][ITERATION][STAMP] = TIMEPOINT +std::array>>, 3> THREAD_TIMING_; + std::vector*> BARRIERS_; std::shared_future LAUNCH_; @@ -107,25 +118,81 @@ inline uint16_t* get_mask(uint16_t* base, const size_t chunk_index, const size_t } +void process_timings( + uint64_t* scana_run, uint64_t* scana_wait, + uint64_t* scanb_run, uint64_t* scanb_wait, + uint64_t* aggrj_run, uint64_t* aggrj_wait +) { + { + uint64_t scana_rc = 0; + + for (const auto& e : THREAD_TIMING_[SCANA_TIMING_INDEX]) { + for (const auto& m : e) { + *scana_run += std::chrono::duration_cast(m[TIME_STAMP_WAIT] - m[TIME_STAMP_BEGIN]).count(); + *scana_wait += std::chrono::duration_cast(m[TIME_STAMP_END] - m[TIME_STAMP_WAIT]).count(); + scana_rc++; + } + } + + *scana_run /= scana_rc; + *scana_wait /= scana_rc; + } + { + uint64_t scanb_rc = 0; + + for (const auto& e : THREAD_TIMING_[SCANB_TIMING_INDEX]) { + for (const auto& m : e) { + *scanb_run += std::chrono::duration_cast(m[TIME_STAMP_WAIT] - m[TIME_STAMP_BEGIN]).count(); + *scanb_wait += std::chrono::duration_cast(m[TIME_STAMP_END] - m[TIME_STAMP_WAIT]).count(); + scanb_rc++; + } + } + + *scana_run /= scanb_rc; + *scana_wait /= scanb_rc; + } + { + uint64_t aggrj_rc = 0; + + for (const auto& e : THREAD_TIMING_[SCANB_TIMING_INDEX]) { + for (const auto& m : e) { + *aggrj_wait += std::chrono::duration_cast(m[TIME_STAMP_WAIT] - m[TIME_STAMP_BEGIN]).count(); + *aggrj_run += std::chrono::duration_cast(m[TIME_STAMP_END] - m[TIME_STAMP_WAIT]).count(); + aggrj_rc++; + } + } + + *aggrj_run /= aggrj_rc; + *aggrj_wait /= aggrj_rc; + } +} + void scan_b(size_t gid, size_t tid) { constexpr size_t split = TC_AGGRJ / TC_SCANB; const size_t start = tid * split; const size_t end = start + split; + THREAD_TIMING_[SCANB_TIMING_INDEX][tid * gid].clear(); + THREAD_TIMING_[SCANB_TIMING_INDEX][tid * gid].resize(split); + LAUNCH_.wait(); if constexpr (PERFORM_CACHING) { - std::vector> data; - for (size_t i = start; i < end; i++) { + THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); + const size_t chunk_index = get_chunk_index(gid, 0); uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, i); - data.emplace_back(std::move(CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B / TC_AGGRJ))); - } + const auto data = CACHE_.Access(reinterpret_cast(chunk_ptr), CHUNK_SIZE_B / TC_AGGRJ); + + THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); + + BARRIERS_[gid]->arrive_and_wait(); + + data->WaitOnCompletion(); - for (auto& e : data) { - e->WaitOnCompletion(); + THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_END] = std::chrono::steady_clock::now(); } } @@ -133,27 +200,45 @@ void scan_b(size_t gid, size_t tid) { } void scan_a(size_t gid, size_t tid) { + THREAD_TIMING_[SCANA_TIMING_INDEX][tid * gid].clear(); + THREAD_TIMING_[SCANA_TIMING_INDEX][tid * gid].resize(RUN_COUNT); + LAUNCH_.wait(); for (size_t i = 0; i < RUN_COUNT; i++) { + THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); + const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_A_, chunk_index, tid); uint16_t* mask_ptr = get_mask(MASK_A_, chunk_index, tid); filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA); + + THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); + + BARRIERS_[gid]->arrive_and_wait(); + + THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_END] = std::chrono::steady_clock::now(); } BARRIERS_[gid]->arrive_and_drop(); } void aggr_j(size_t gid, size_t tid) { + THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid].clear(); + THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid].resize(RUN_COUNT); + LAUNCH_.wait(); __m512i aggregator = aggregation::OP::zero(); - BARRIERS_[gid]->arrive_and_wait(); - for (size_t i = 0; i < RUN_COUNT; i++) { + THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); + + BARRIERS_[gid]->arrive_and_wait(); + + THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); + const size_t chunk_index = get_chunk_index(gid, i); uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index, tid); uint16_t* mask_ptr = get_mask(MASK_A_, chunk_index, tid); @@ -174,13 +259,22 @@ void aggr_j(size_t gid, size_t tid) { data_ptr = chunk_ptr; } + uint64_t tmp = _mm512_reduce_add_epi64(aggregator); aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr, CHUNK_SIZE_B / TC_AGGRJ); + + THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_END] = std::chrono::steady_clock::now(); } + BARRIERS_[gid]->arrive_and_drop(); + aggregation::happly(DATA_DST_ + (tid * GROUP_COUNT + gid), aggregator); } int main() { + THREAD_TIMING_[AGGRJ_TIMING_INDEX].resize(TC_AGGRJ * GROUP_COUNT); + THREAD_TIMING_[SCANA_TIMING_INDEX].resize(TC_SCANA * GROUP_COUNT); + THREAD_TIMING_[SCANB_TIMING_INDEX].resize(TC_SCANB * GROUP_COUNT); + const int current_cpu = sched_getcpu(); const int current_node = numa_node_of_cpu(current_cpu); const int cache_node = CachePlacementPolicy(current_node, current_node, 0); @@ -188,7 +282,7 @@ int main() { const std::string ofname = "results/qdp-xeonmax-simpleq-" + std::string(MODE_STRING) + "-tca" + std::to_string(TC_SCANA) + "-tcb" + std::to_string(TC_SCANB) + "-tcj" + std::to_string(TC_AGGRJ) + "-tmul" + std::to_string(GROUP_COUNT) + "-wl" + std::to_string(WL_SIZE_B) + "-cs" + std::to_string(CHUNK_SIZE_B) + ".csv"; std::ofstream fout(ofname); - fout << "run;time;result[0];" << std::endl; + fout << "run;rt-ns;rt-s;result[0];scana-run;scana-wait;scanb-run;scanb-wait;aggrj-run;aggrj-wait;" << std::endl; if constexpr (DATA_IN_HBM) { DATA_A_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, cache_node); @@ -247,7 +341,19 @@ int main() { const auto time_end = std::chrono::steady_clock::now(); if (i >= WARMUP_ITERATION_COUNT) { - fout << i - WARMUP_ITERATION_COUNT << ";" << std::chrono::duration_cast(time_end - time_start).count() << ";" << std::hex << DATA_DST_[0] << std::dec << ";" << std::endl; + uint64_t scana_run = 0, scana_wait = 0, scanb_run = 0, scanb_wait = 0, aggrj_run = 0, aggrj_wait = 0; + process_timings(&scana_run, &scana_wait, &scanb_run, &scanb_wait, &aggrj_run, &aggrj_wait); + + constexpr double nanos_per_second = ((double)1000) * 1000 * 1000; + const uint64_t nanos = std::chrono::duration_cast(time_end - time_end).count(); + const double seconds = (double)(nanos) / nanos_per_second; + + fout + << i - WARMUP_ITERATION_COUNT << ";" + << nanos << ";" << seconds << ";" + << std::hex << DATA_DST_[0] << std::dec << ";" + << scana_run << ";" << scana_wait << ";" << scanb_run << ";" << scanb_wait << ";" << aggrj_run << ";" << aggrj_wait << ";" + << std::endl; } for (std::barrier* b : BARRIERS_) {