Browse Source

adapt barrier waiting points, add timings to thread execution

master
Constantin Fürst 11 months ago
parent
commit
e429e8fd40
  1. 132
      qdp_project/src/Benchmark.cpp

132
qdp_project/src/Benchmark.cpp

@ -8,6 +8,7 @@
#include <vector> #include <vector>
#include <fstream> #include <fstream>
#include <future> #include <future>
#include <array>
#include "const.h" #include "const.h"
#include "filter.h" #include "filter.h"
@ -29,11 +30,11 @@ constexpr uint32_t WARMUP_ITERATION_COUNT = 5;
constexpr uint32_t ITERATION_COUNT = 5; constexpr uint32_t ITERATION_COUNT = 5;
#ifdef MODE_PREFETCH #ifdef MODE_PREFETCH
constexpr size_t CHUNK_SIZE_B = 64_MiB;
constexpr uint32_t GROUP_COUNT = 32;
constexpr size_t CHUNK_SIZE_B = 256_MiB;
constexpr uint32_t GROUP_COUNT = 8;
constexpr uint32_t TC_SCANA = 1; constexpr uint32_t TC_SCANA = 1;
constexpr uint32_t TC_SCANB = 1;
constexpr uint32_t TC_AGGRJ = 1;
constexpr uint32_t TC_SCANB = 2;
constexpr uint32_t TC_AGGRJ = 2;
constexpr bool PERFORM_CACHING = true; constexpr bool PERFORM_CACHING = true;
constexpr bool DATA_IN_HBM = false; constexpr bool DATA_IN_HBM = false;
constexpr char MODE_STRING[] = "prefetch"; constexpr char MODE_STRING[] = "prefetch";
@ -82,6 +83,16 @@ using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>;
dsacache::Cache CACHE_; dsacache::Cache CACHE_;
constexpr size_t SCANA_TIMING_INDEX = 0;
constexpr size_t SCANB_TIMING_INDEX = 1;
constexpr size_t AGGRJ_TIMING_INDEX = 2;
constexpr size_t TIME_STAMP_BEGIN = 0;
constexpr size_t TIME_STAMP_WAIT = 1;
constexpr size_t TIME_STAMP_END = 2;
// THREAD_TIMING_[TYPE][TID][ITERATION][STAMP] = TIMEPOINT
std::array<std::vector<std::vector<std::array<std::chrono::steady_clock::time_point, 3>>>, 3> THREAD_TIMING_;
std::vector<std::barrier<NopStruct>*> BARRIERS_; std::vector<std::barrier<NopStruct>*> BARRIERS_;
std::shared_future<void> LAUNCH_; std::shared_future<void> LAUNCH_;
@ -107,25 +118,81 @@ inline uint16_t* get_mask(uint16_t* base, const size_t chunk_index, const size_t
} }
void process_timings(
uint64_t* scana_run, uint64_t* scana_wait,
uint64_t* scanb_run, uint64_t* scanb_wait,
uint64_t* aggrj_run, uint64_t* aggrj_wait
) {
{
uint64_t scana_rc = 0;
for (const auto& e : THREAD_TIMING_[SCANA_TIMING_INDEX]) {
for (const auto& m : e) {
*scana_run += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_WAIT] - m[TIME_STAMP_BEGIN]).count();
*scana_wait += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_END] - m[TIME_STAMP_WAIT]).count();
scana_rc++;
}
}
*scana_run /= scana_rc;
*scana_wait /= scana_rc;
}
{
uint64_t scanb_rc = 0;
for (const auto& e : THREAD_TIMING_[SCANB_TIMING_INDEX]) {
for (const auto& m : e) {
*scanb_run += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_WAIT] - m[TIME_STAMP_BEGIN]).count();
*scanb_wait += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_END] - m[TIME_STAMP_WAIT]).count();
scanb_rc++;
}
}
*scana_run /= scanb_rc;
*scana_wait /= scanb_rc;
}
{
uint64_t aggrj_rc = 0;
for (const auto& e : THREAD_TIMING_[SCANB_TIMING_INDEX]) {
for (const auto& m : e) {
*aggrj_wait += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_WAIT] - m[TIME_STAMP_BEGIN]).count();
*aggrj_run += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_END] - m[TIME_STAMP_WAIT]).count();
aggrj_rc++;
}
}
*aggrj_run /= aggrj_rc;
*aggrj_wait /= aggrj_rc;
}
}
void scan_b(size_t gid, size_t tid) { void scan_b(size_t gid, size_t tid) {
constexpr size_t split = TC_AGGRJ / TC_SCANB; constexpr size_t split = TC_AGGRJ / TC_SCANB;
const size_t start = tid * split; const size_t start = tid * split;
const size_t end = start + split; const size_t end = start + split;
THREAD_TIMING_[SCANB_TIMING_INDEX][tid * gid].clear();
THREAD_TIMING_[SCANB_TIMING_INDEX][tid * gid].resize(split);
LAUNCH_.wait(); LAUNCH_.wait();
if constexpr (PERFORM_CACHING) { if constexpr (PERFORM_CACHING) {
std::vector<std::unique_ptr<dsacache::CacheData>> data;
for (size_t i = start; i < end; i++) { for (size_t i = start; i < end; i++) {
THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
const size_t chunk_index = get_chunk_index(gid, 0); const size_t chunk_index = get_chunk_index(gid, 0);
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, i); uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, i);
data.emplace_back(std::move(CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), CHUNK_SIZE_B / TC_AGGRJ)));
}
const auto data = CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), CHUNK_SIZE_B / TC_AGGRJ);
for (auto& e : data) {
e->WaitOnCompletion();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
BARRIERS_[gid]->arrive_and_wait();
data->WaitOnCompletion();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_END] = std::chrono::steady_clock::now();
} }
} }
@ -133,27 +200,45 @@ void scan_b(size_t gid, size_t tid) {
} }
void scan_a(size_t gid, size_t tid) { void scan_a(size_t gid, size_t tid) {
THREAD_TIMING_[SCANA_TIMING_INDEX][tid * gid].clear();
THREAD_TIMING_[SCANA_TIMING_INDEX][tid * gid].resize(RUN_COUNT);
LAUNCH_.wait(); LAUNCH_.wait();
for (size_t i = 0; i < RUN_COUNT; i++) { for (size_t i = 0; i < RUN_COUNT; i++) {
THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
const size_t chunk_index = get_chunk_index(gid, i); const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_SCANA>(DATA_A_, chunk_index, tid); uint64_t* chunk_ptr = get_chunk<TC_SCANA>(DATA_A_, chunk_index, tid);
uint16_t* mask_ptr = get_mask<TC_SCANA>(MASK_A_, chunk_index, tid); uint16_t* mask_ptr = get_mask<TC_SCANA>(MASK_A_, chunk_index, tid);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA); filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA);
THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
BARRIERS_[gid]->arrive_and_wait();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_END] = std::chrono::steady_clock::now();
} }
BARRIERS_[gid]->arrive_and_drop(); BARRIERS_[gid]->arrive_and_drop();
} }
void aggr_j(size_t gid, size_t tid) { void aggr_j(size_t gid, size_t tid) {
THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid].clear();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid].resize(RUN_COUNT);
LAUNCH_.wait(); LAUNCH_.wait();
__m512i aggregator = aggregation::OP::zero(); __m512i aggregator = aggregation::OP::zero();
for (size_t i = 0; i < RUN_COUNT; i++) {
THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
BARRIERS_[gid]->arrive_and_wait(); BARRIERS_[gid]->arrive_and_wait();
for (size_t i = 0; i < RUN_COUNT; i++) {
THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
const size_t chunk_index = get_chunk_index(gid, i); const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid); uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid);
uint16_t* mask_ptr = get_mask<TC_AGGRJ>(MASK_A_, chunk_index, tid); uint16_t* mask_ptr = get_mask<TC_AGGRJ>(MASK_A_, chunk_index, tid);
@ -174,13 +259,22 @@ void aggr_j(size_t gid, size_t tid) {
data_ptr = chunk_ptr; data_ptr = chunk_ptr;
} }
uint64_t tmp = _mm512_reduce_add_epi64(aggregator);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr, CHUNK_SIZE_B / TC_AGGRJ); aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr, CHUNK_SIZE_B / TC_AGGRJ);
THREAD_TIMING_[AGGRJ_TIMING_INDEX][tid * gid][i][TIME_STAMP_END] = std::chrono::steady_clock::now();
} }
BARRIERS_[gid]->arrive_and_drop();
aggregation::happly(DATA_DST_ + (tid * GROUP_COUNT + gid), aggregator); aggregation::happly(DATA_DST_ + (tid * GROUP_COUNT + gid), aggregator);
} }
int main() { int main() {
THREAD_TIMING_[AGGRJ_TIMING_INDEX].resize(TC_AGGRJ * GROUP_COUNT);
THREAD_TIMING_[SCANA_TIMING_INDEX].resize(TC_SCANA * GROUP_COUNT);
THREAD_TIMING_[SCANB_TIMING_INDEX].resize(TC_SCANB * GROUP_COUNT);
const int current_cpu = sched_getcpu(); const int current_cpu = sched_getcpu();
const int current_node = numa_node_of_cpu(current_cpu); const int current_node = numa_node_of_cpu(current_cpu);
const int cache_node = CachePlacementPolicy(current_node, current_node, 0); const int cache_node = CachePlacementPolicy(current_node, current_node, 0);
@ -188,7 +282,7 @@ int main() {
const std::string ofname = "results/qdp-xeonmax-simpleq-" + std::string(MODE_STRING) + "-tca" + std::to_string(TC_SCANA) + "-tcb" + std::to_string(TC_SCANB) + "-tcj" + std::to_string(TC_AGGRJ) + "-tmul" + std::to_string(GROUP_COUNT) + "-wl" + std::to_string(WL_SIZE_B) + "-cs" + std::to_string(CHUNK_SIZE_B) + ".csv"; const std::string ofname = "results/qdp-xeonmax-simpleq-" + std::string(MODE_STRING) + "-tca" + std::to_string(TC_SCANA) + "-tcb" + std::to_string(TC_SCANB) + "-tcj" + std::to_string(TC_AGGRJ) + "-tmul" + std::to_string(GROUP_COUNT) + "-wl" + std::to_string(WL_SIZE_B) + "-cs" + std::to_string(CHUNK_SIZE_B) + ".csv";
std::ofstream fout(ofname); std::ofstream fout(ofname);
fout << "run;time;result[0];" << std::endl;
fout << "run;rt-ns;rt-s;result[0];scana-run;scana-wait;scanb-run;scanb-wait;aggrj-run;aggrj-wait;" << std::endl;
if constexpr (DATA_IN_HBM) { if constexpr (DATA_IN_HBM) {
DATA_A_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, cache_node); DATA_A_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, cache_node);
@ -247,7 +341,19 @@ int main() {
const auto time_end = std::chrono::steady_clock::now(); const auto time_end = std::chrono::steady_clock::now();
if (i >= WARMUP_ITERATION_COUNT) { if (i >= WARMUP_ITERATION_COUNT) {
fout << i - WARMUP_ITERATION_COUNT << ";" << std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_start).count() << ";" << std::hex << DATA_DST_[0] << std::dec << ";" << std::endl;
uint64_t scana_run = 0, scana_wait = 0, scanb_run = 0, scanb_wait = 0, aggrj_run = 0, aggrj_wait = 0;
process_timings(&scana_run, &scana_wait, &scanb_run, &scanb_wait, &aggrj_run, &aggrj_wait);
constexpr double nanos_per_second = ((double)1000) * 1000 * 1000;
const uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_end).count();
const double seconds = (double)(nanos) / nanos_per_second;
fout
<< i - WARMUP_ITERATION_COUNT << ";"
<< nanos << ";" << seconds << ";"
<< std::hex << DATA_DST_[0] << std::dec << ";"
<< scana_run << ";" << scana_wait << ";" << scanb_run << ";" << scanb_wait << ";" << aggrj_run << ";" << aggrj_wait << ";"
<< std::endl;
} }
for (std::barrier<NopStruct>* b : BARRIERS_) { for (std::barrier<NopStruct>* b : BARRIERS_) {

Loading…
Cancel
Save