Browse Source

fix chunk selection in scanb, use the dataptr in aggrj complex mode, export some functions to src/utils/BenchmarkHelpers.cpp

master
Constantin Fürst 11 months ago
parent
commit
c2b9e6656d
  1. 116
      qdp_project/src/Benchmark.cpp
  2. 14
      qdp_project/src/Configuration.hpp
  3. 94
      qdp_project/src/utils/BenchmarkHelpers.cpp

116
qdp_project/src/Benchmark.cpp

@ -18,40 +18,14 @@
#define MODE_COMPLEX_PREFETCH #define MODE_COMPLEX_PREFETCH
#endif #endif
#include "BenchmarkModes.hpp"
#include "Configuration.hpp"
#include "BenchmarkHelpers.cpp" #include "BenchmarkHelpers.cpp"
constexpr uint64_t CMP_A = 50;
constexpr uint64_t CMP_B = 42;
constexpr uint32_t TC_COMBINED = TC_SCANA + TC_SCANB + TC_AGGRJ;
constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t);
constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B;
constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t);
constexpr size_t RUN_COUNT = CHUNK_COUNT / GROUP_COUNT;
static_assert(TC_AGGRJ % (TC_SCANB > 0 ? TC_SCANB : TC_AGGRJ) == 0);
static_assert(TC_AGGRJ >= TC_SCANB);
static_assert(RUN_COUNT > 0);
static_assert(WL_SIZE_B % 16 == 0);
static_assert(CHUNK_SIZE_B % 16 == 0);
using filter = Filter<uint64_t, LT, load_mode::Stream, false>; using filter = Filter<uint64_t, LT, load_mode::Stream, false>;
using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>; using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>;
dsacache::Cache CACHE_; dsacache::Cache CACHE_;
constexpr size_t SCANA_TIMING_INDEX = 0;
constexpr size_t SCANB_TIMING_INDEX = 1;
constexpr size_t AGGRJ_TIMING_INDEX = 2;
constexpr size_t TIME_STAMP_BEGIN = 0;
constexpr size_t TIME_STAMP_WAIT = 1;
constexpr size_t TIME_STAMP_END = 2;
// THREAD_TIMING_[TYPE][TID][ITERATION][STAMP] = TIMEPOINT
std::array<std::vector<std::vector<std::array<std::chrono::steady_clock::time_point, 3>>>, 3> THREAD_TIMING_;
std::array<uint32_t, GROUP_COUNT * TC_AGGRJ> CACHE_HITS_;
std::vector<std::barrier<NopStruct>*> BARRIERS_; std::vector<std::barrier<NopStruct>*> BARRIERS_;
std::shared_future<void> LAUNCH_; std::shared_future<void> LAUNCH_;
@ -61,92 +35,10 @@ uint16_t* MASK_A_;
uint16_t* MASK_B_; uint16_t* MASK_B_;
uint64_t* DATA_DST_; uint64_t* DATA_DST_;
inline uint64_t get_chunk_index(const size_t gid, const size_t rid) {
return gid + GROUP_COUNT * rid;
}
template<size_t TC>
inline uint64_t* get_chunk(uint64_t* base, const size_t chunk_index, const size_t tid) {
uint64_t* chunk_ptr = base + chunk_index * CHUNK_SIZE_ELEMENTS;
return chunk_ptr + tid * (CHUNK_SIZE_ELEMENTS / TC);
}
template<size_t TC>
inline uint16_t* get_mask(uint16_t* base, const size_t chunk_index, const size_t tid) {
size_t offset = chunk_index * CHUNK_SIZE_ELEMENTS + tid * (CHUNK_SIZE_ELEMENTS / TC);
return base + (offset / 16);
}
double process_cache_hitrate() {
double hr = 0.0;
for (const uint32_t& e : CACHE_HITS_) {
hr += e;
}
return hr / (double)(TC_AGGRJ * GROUP_COUNT * RUN_COUNT);
}
void process_timings(
uint64_t* scana_run, uint64_t* scana_wait,
uint64_t* scanb_run, uint64_t* scanb_wait,
uint64_t* aggrj_run, uint64_t* aggrj_wait
) {
{
uint64_t scana_rc = 0;
for (const auto& e : THREAD_TIMING_[SCANA_TIMING_INDEX]) {
for (const auto& m : e) {
*scana_run += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_WAIT] - m[TIME_STAMP_BEGIN]).count();
*scana_wait += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_END] - m[TIME_STAMP_WAIT]).count();
scana_rc++;
}
}
if (scana_rc != 0) {
*scana_run /= scana_rc;
*scana_wait /= scana_rc;
}
}
{
uint64_t scanb_rc = 0;
for (const auto& e : THREAD_TIMING_[SCANB_TIMING_INDEX]) {
for (const auto& m : e) {
*scanb_run += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_WAIT] - m[TIME_STAMP_BEGIN]).count();
*scanb_wait += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_END] - m[TIME_STAMP_WAIT]).count();
scanb_rc++;
}
}
if (scanb_rc != 0) {
*scana_run /= scanb_rc;
*scana_wait /= scanb_rc;
}
}
{
uint64_t aggrj_rc = 0;
for (const auto& e : THREAD_TIMING_[AGGRJ_TIMING_INDEX]) {
for (const auto& m : e) {
*aggrj_wait += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_WAIT] - m[TIME_STAMP_BEGIN]).count();
*aggrj_run += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_END] - m[TIME_STAMP_WAIT]).count();
aggrj_rc++;
}
}
if (aggrj_rc != 0) {
*aggrj_run /= aggrj_rc;
*aggrj_wait /= aggrj_rc;
}
}
}
// if more b than j -> perform b normal, subsplit j // if more b than j -> perform b normal, subsplit j
// if more j than b -> subsplit b like it is now // if more j than b -> subsplit b like it is now
void scan_b(size_t gid, size_t tid) { void scan_b(size_t gid, size_t tid) {
constexpr bool SUBSPLIT_SCANB = TC_AGGRJ > TC_SCANB;
constexpr size_t SUBCHUNK_COUNT = TC_AGGRJ / (TC_SCANB == 0 ? 1 : TC_SCANB); constexpr size_t SUBCHUNK_COUNT = TC_AGGRJ / (TC_SCANB == 0 ? 1 : TC_SCANB);
constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / SUBCHUNK_COUNT; constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / SUBCHUNK_COUNT;
@ -159,7 +51,7 @@ void scan_b(size_t gid, size_t tid) {
if constexpr (PERFORM_CACHING) { if constexpr (PERFORM_CACHING) {
for (size_t i = 0; i < RUN_COUNT; i++) { for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, 0);
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_SCANB>(DATA_B_, chunk_index, i); uint64_t* chunk_ptr = get_chunk<TC_SCANB>(DATA_B_, chunk_index, i);
for (size_t j = 0; j < SUBCHUNK_COUNT; j++) { for (size_t j = 0; j < SUBCHUNK_COUNT; j++) {
@ -171,7 +63,7 @@ void scan_b(size_t gid, size_t tid) {
if constexpr (COMPLEX_QUERY) { if constexpr (COMPLEX_QUERY) {
for (size_t i = 0; i < RUN_COUNT; i++) { for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, 0);
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_SCANB>(DATA_B_, chunk_index, i); uint64_t* chunk_ptr = get_chunk<TC_SCANB>(DATA_B_, chunk_index, i);
uint16_t* mask_ptr = get_mask<TC_SCANB>(MASK_B_, chunk_index, i); uint16_t* mask_ptr = get_mask<TC_SCANB>(MASK_B_, chunk_index, i);
@ -251,7 +143,7 @@ void aggr_j(size_t gid, size_t tid) {
uint64_t tmp = _mm512_reduce_add_epi64(aggregator); uint64_t tmp = _mm512_reduce_add_epi64(aggregator);
if constexpr (COMPLEX_QUERY) { if constexpr (COMPLEX_QUERY) {
aggregator = aggregation::apply_masked(aggregator, chunk_ptr, mask_ptr_a, mask_ptr_b, CHUNK_SIZE_B / TC_AGGRJ);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, mask_ptr_b, CHUNK_SIZE_B / TC_AGGRJ);
} }
else { else {
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, CHUNK_SIZE_B / TC_AGGRJ); aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, CHUNK_SIZE_B / TC_AGGRJ);

14
qdp_project/src/BenchmarkModes.hpp → qdp_project/src/Configuration.hpp

@ -70,3 +70,17 @@ constexpr bool STORE_B_IN_HBM = true;
constexpr char MODE_STRING[] = "complex-hbm"; constexpr char MODE_STRING[] = "complex-hbm";
constexpr bool COMPLEX_QUERY = true; constexpr bool COMPLEX_QUERY = true;
#endif #endif
constexpr uint64_t CMP_A = 50;
constexpr uint64_t CMP_B = 42;
constexpr uint32_t TC_COMBINED = TC_SCANA + TC_SCANB + TC_AGGRJ;
constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t);
constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B;
constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t);
constexpr size_t RUN_COUNT = CHUNK_COUNT / GROUP_COUNT;
static_assert(TC_AGGRJ % (TC_SCANB > 0 ? TC_SCANB : TC_AGGRJ) == 0);
static_assert(TC_AGGRJ >= TC_SCANB);
static_assert(RUN_COUNT > 0);
static_assert(WL_SIZE_B % 16 == 0);
static_assert(CHUNK_SIZE_B % 16 == 0);

94
qdp_project/src/utils/BenchmarkHelpers.cpp

@ -1,5 +1,18 @@
#include <vector> #include <vector>
#include "../Configuration.hpp"
constexpr size_t SCANA_TIMING_INDEX = 0;
constexpr size_t SCANB_TIMING_INDEX = 1;
constexpr size_t AGGRJ_TIMING_INDEX = 2;
constexpr size_t TIME_STAMP_BEGIN = 0;
constexpr size_t TIME_STAMP_WAIT = 1;
constexpr size_t TIME_STAMP_END = 2;
std::array<std::vector<std::vector<std::array<std::chrono::steady_clock::time_point, 3>>>, 3> THREAD_TIMING_;
std::array<uint32_t, GROUP_COUNT * TC_AGGRJ> CACHE_HITS_;
uint64_t sum_check(uint64_t compare_value, uint64_t* row_A, uint64_t* row_B, size_t row_size) { uint64_t sum_check(uint64_t compare_value, uint64_t* row_A, uint64_t* row_B, size_t row_size) {
uint64_t sum = 0; uint64_t sum = 0;
for(int i = 0; i < row_size / sizeof(uint64_t); ++i) { for(int i = 0; i < row_size / sizeof(uint64_t); ++i) {
@ -45,3 +58,84 @@ struct NopStruct {
return; return;
} }
}; };
inline uint64_t get_chunk_index(const size_t gid, const size_t rid) {
return gid + GROUP_COUNT * rid;
}
template<size_t TC>
inline uint64_t* get_chunk(uint64_t* base, const size_t chunk_index, const size_t tid) {
uint64_t* chunk_ptr = base + chunk_index * CHUNK_SIZE_ELEMENTS;
return chunk_ptr + tid * (CHUNK_SIZE_ELEMENTS / TC);
}
template<size_t TC>
inline uint16_t* get_mask(uint16_t* base, const size_t chunk_index, const size_t tid) {
size_t offset = chunk_index * CHUNK_SIZE_ELEMENTS + tid * (CHUNK_SIZE_ELEMENTS / TC);
return base + (offset / 16);
}
double process_cache_hitrate() {
double hr = 0.0;
for (const uint32_t& e : CACHE_HITS_) {
hr += e;
}
return hr / (double)(TC_AGGRJ * GROUP_COUNT * RUN_COUNT);
}
void process_timings(
uint64_t* scana_run, uint64_t* scana_wait,
uint64_t* scanb_run, uint64_t* scanb_wait,
uint64_t* aggrj_run, uint64_t* aggrj_wait
) {
{
uint64_t scana_rc = 0;
for (const auto& e : THREAD_TIMING_[SCANA_TIMING_INDEX]) {
for (const auto& m : e) {
*scana_run += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_WAIT] - m[TIME_STAMP_BEGIN]).count();
*scana_wait += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_END] - m[TIME_STAMP_WAIT]).count();
scana_rc++;
}
}
if (scana_rc != 0) {
*scana_run /= scana_rc;
*scana_wait /= scana_rc;
}
}
{
uint64_t scanb_rc = 0;
for (const auto& e : THREAD_TIMING_[SCANB_TIMING_INDEX]) {
for (const auto& m : e) {
*scanb_run += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_WAIT] - m[TIME_STAMP_BEGIN]).count();
*scanb_wait += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_END] - m[TIME_STAMP_WAIT]).count();
scanb_rc++;
}
}
if (scanb_rc != 0) {
*scana_run /= scanb_rc;
*scana_wait /= scanb_rc;
}
}
{
uint64_t aggrj_rc = 0;
for (const auto& e : THREAD_TIMING_[AGGRJ_TIMING_INDEX]) {
for (const auto& m : e) {
*aggrj_wait += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_WAIT] - m[TIME_STAMP_BEGIN]).count();
*aggrj_run += std::chrono::duration_cast<std::chrono::nanoseconds>(m[TIME_STAMP_END] - m[TIME_STAMP_WAIT]).count();
aggrj_rc++;
}
}
if (aggrj_rc != 0) {
*aggrj_run /= aggrj_rc;
*aggrj_wait /= aggrj_rc;
}
}
}
Loading…
Cancel
Save