Browse Source

reset benchmark

master
Constantin Fürst 11 months ago
parent
commit
de1de9134b
  1. 106
      qdp_project/src/Benchmark.cpp
  2. 29
      qdp_project/src/Configuration.hpp
  3. 4
      qdp_project/src/utils/aggregation.h
  4. 4
      qdp_project/src/utils/filter.h

106
qdp_project/src/Benchmark.cpp

@ -5,6 +5,7 @@
#include <fstream>
#include <future>
#include <array>
#include <atomic>
#include "const.h"
#include "filter.h"
@ -21,6 +22,7 @@ using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>;
dsacache::Cache CACHE_;
std::array<std::atomic<int32_t>, GROUP_COUNT> PREFETCHED_CHUNKS_;
std::vector<std::barrier<NopStruct>*> BARRIERS_;
std::shared_future<void> LAUNCH_;
@ -29,6 +31,63 @@ uint64_t* DATA_B_;
uint16_t* MASK_A_;
uint64_t* DATA_DST_;
// if more b than j -> perform b normal, subsplit j
// if more j than b -> subsplit b like it is now
template<size_t TC_CACHING>
void caching(size_t gid, size_t tid) {
constexpr size_t VIRT_TID_INCREMENT = TC_CACHING / TC_AGGRJ;
constexpr size_t SUBCHUNK_THREAD_RATIO = TC_AGGRJ / (TC_CACHING == 0 ? 1 : TC_CACHING);
constexpr bool CACHE_SUBCHUNKING = SUBCHUNK_THREAD_RATIO > 1;
constexpr bool CACHE_OVERCHUNKING = VIRT_TID_INCREMENT > 1;
if constexpr (PERFORM_CACHING) {
if constexpr (CACHE_SUBCHUNKING) {
constexpr size_t SUBCHUNK_COUNT = SUBCHUNK_THREAD_RATIO > 0 ? SUBCHUNK_THREAD_RATIO : 1;
constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / SUBCHUNK_COUNT;
constexpr size_t SUBCHUNK_SIZE_ELEMENTS = CHUNK_SIZE_ELEMENTS / SUBCHUNK_COUNT;
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_SCANB>(DATA_B_, chunk_index, tid);
for (size_t j = 0; j < SUBCHUNK_COUNT; j++) {
uint64_t* sub_chunk_ptr = &chunk_ptr[j * SUBCHUNK_SIZE_ELEMENTS];
CACHE_.Access(reinterpret_cast<uint8_t*>(sub_chunk_ptr), SUBCHUNK_SIZE_B);
PREFETCHED_CHUNKS_[gid]++;
PREFETCHED_CHUNKS_[gid].notify_one();
}
}
}
else if constexpr (CACHE_OVERCHUNKING) {
for (size_t tid_virt = tid; tid_virt < TC_AGGRJ; tid_virt += VIRT_TID_INCREMENT) {
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid_virt);
CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), CHUNK_SIZE_B);
PREFETCHED_CHUNKS_[gid]++;
PREFETCHED_CHUNKS_[gid].notify_one();
}
}
}
else {
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_SCANB>(DATA_B_, chunk_index, tid);
CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), CHUNK_SIZE_B);
PREFETCHED_CHUNKS_[gid]++;
PREFETCHED_CHUNKS_[gid].notify_one();
}
}
}
}
void scan_b(size_t gid, size_t tid) {
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].clear();
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1);
@ -37,16 +96,7 @@ void scan_b(size_t gid, size_t tid) {
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
if constexpr (PERFORM_CACHING) {
static_assert(TC_AGGRJ == TC_SCANB);
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_SCANB>(DATA_B_, chunk_index, tid);
CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), SUBCHUNK_SIZE_B_AGGRJ);
}
}
caching<TC_SCANB>(gid, tid);
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now();
@ -65,33 +115,36 @@ void scan_a(size_t gid, size_t tid) {
uint64_t* chunk_ptr = get_chunk<TC_SCANA>(DATA_A_, chunk_index, tid);
uint16_t* mask_ptr = get_mask<TC_SCANA>(MASK_A_, chunk_index, tid);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, SUBCHUNK_SIZE_B_SCANA);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA);
BARRIERS_[gid]->arrive_and_wait();
}
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now();
BARRIERS_[gid]->arrive_and_drop();
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now();
}
void aggr_j(size_t gid, size_t tid) {
constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ;
CACHE_HITS_[UniqueIndex(gid,tid)] = 0;
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].clear();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1);
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT);
__m512i aggregator = aggregation::OP::zero();
LAUNCH_.wait();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
BARRIERS_[gid]->arrive_and_wait();
for (size_t i = 0; i < RUN_COUNT; i++) {
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
BARRIERS_[gid]->arrive_and_drop();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid);
uint16_t* mask_ptr_a = get_mask<TC_AGGRJ>(MASK_A_, chunk_index, tid);
@ -100,9 +153,8 @@ void aggr_j(size_t gid, size_t tid) {
uint64_t* data_ptr;
if constexpr (PERFORM_CACHING) {
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), SUBCHUNK_SIZE_B_AGGRJ, dsacache::FLAG_ACCESS_WEAK);
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), SUBCHUNK_SIZE_B, dsacache::FLAG_ACCESS_WEAK);
data->WaitOnCompletion();
data_ptr = reinterpret_cast<uint64_t*>(data->GetDataLocation());
if (data_ptr == nullptr) {
@ -120,13 +172,14 @@ void aggr_j(size_t gid, size_t tid) {
}
uint64_t tmp = _mm512_reduce_add_epi64(aggregator);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B_AGGRJ);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B);
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_END] = std::chrono::steady_clock::now();
}
aggregation::happly(&DATA_DST_[UniqueIndex(gid,tid)], aggregator);
BARRIERS_[gid]->arrive_and_drop();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now();
aggregation::happly(&DATA_DST_[UniqueIndex(gid,tid)], aggregator);
}
int main() {
@ -159,7 +212,6 @@ int main() {
// which is configured for xeonmax with smart assignment
uint64_t cache_flags = 0;
cache_flags |= dsacache::FLAG_WAIT_WEAK;
cache_flags |= dsacache::FLAG_HANDLE_PF;
CACHE_.SetFlags(cache_flags);
CACHE_.Init(CachePlacementPolicy, CopyMethodPolicy);
}
@ -200,7 +252,7 @@ int main() {
for(std::thread& t : agg_pool) { t.join(); }
uint64_t result_actual = 0;
aggregation::apply(&result_actual, DATA_DST_, sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT);
Aggregation<uint64_t, Sum, load_mode::Aligned>::apply(&result_actual, DATA_DST_, sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT);
const auto time_end = std::chrono::steady_clock::now();
@ -213,8 +265,8 @@ int main() {
process_timings(&scana_run, &scana_wait, &scanb_run, &scanb_wait, &aggrj_run, &aggrj_wait);
constexpr double nanos_per_second = ((double)1000) * 1000 * 1000;
const uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_start).count();
const double seconds = (double)(nanos) / nanos_per_second;
const uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_start).count();
const double seconds = (double)(nanos) / nanos_per_second;
fout
<< i - WARMUP_ITERATION_COUNT << ";"

29
qdp_project/src/Configuration.hpp

@ -3,7 +3,7 @@
#include "utils/memory_literals.h"
#ifndef MODE_SET_BY_CMAKE
#define MODE_PREFETCH
#define MODE_SIMPLE_PREFETCH
#endif
constexpr size_t WL_SIZE_B = 4_GiB;
@ -13,20 +13,20 @@ constexpr int MEM_NODE_HBM = 8;
constexpr int MEM_NODE_DRAM = 0;
#ifdef MODE_PREFETCH
constexpr uint32_t GROUP_COUNT = 4;
constexpr size_t CHUNK_SIZE_B = 8_MiB;
constexpr uint32_t TC_SCANA = 4;
constexpr uint32_t TC_SCANB = 1;
constexpr uint32_t GROUP_COUNT = 16;
constexpr size_t CHUNK_SIZE_B = 16_MiB;
constexpr uint32_t TC_SCANA = 1;
constexpr uint32_t TC_SCANB = 0;
constexpr uint32_t TC_AGGRJ = 1;
constexpr bool PERFORM_CACHING = true;
constexpr int MEM_NODE_A = 0;
constexpr int MEM_NODE_B = 0;
constexpr int MEM_NODE_B = 1;
constexpr char MODE_STRING[] = "prefetch";
#endif
#ifdef MODE_DRAM
constexpr size_t CHUNK_SIZE_B = 2_MiB;
constexpr uint32_t GROUP_COUNT = 4;
constexpr uint32_t TC_SCANA = 4;
constexpr uint32_t GROUP_COUNT = 16;
constexpr uint32_t TC_SCANA = 2;
constexpr uint32_t TC_SCANB = 0;
constexpr uint32_t TC_AGGRJ = 1;
constexpr bool PERFORM_CACHING = false;
@ -36,8 +36,8 @@ constexpr char MODE_STRING[] = "dram";
#endif
#ifdef MODE_HBM
constexpr size_t CHUNK_SIZE_B = 2_MiB;
constexpr uint32_t GROUP_COUNT = 4;
constexpr uint32_t TC_SCANA = 4;
constexpr uint32_t GROUP_COUNT = 16;
constexpr uint32_t TC_SCANA = 2;
constexpr uint32_t TC_SCANB = 0;
constexpr uint32_t TC_AGGRJ = 1;
constexpr bool PERFORM_CACHING = false;
@ -47,19 +47,12 @@ constexpr char MODE_STRING[] = "hbm";
#endif
constexpr uint64_t CMP_A = 50;
constexpr uint32_t TC_COMBINED = TC_SCANA + TC_SCANB + TC_AGGRJ;
constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t);
constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B;
constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t);
constexpr size_t RUN_COUNT = CHUNK_COUNT / GROUP_COUNT;
constexpr size_t SUBCHUNK_SIZE_B_SCANA = CHUNK_SIZE_B / TC_SCANA;
constexpr size_t SUBCHUNK_SIZE_B_AGGRJ = CHUNK_SIZE_B / TC_AGGRJ;
static_assert(RUN_COUNT > 0);
static_assert(WL_SIZE_B % 16 == 0);
static_assert(CHUNK_SIZE_B % 16 == 0);
static_assert(CHUNK_SIZE_B % GROUP_COUNT == 0);
static_assert(CHUNK_SIZE_B % TC_AGGRJ == 0);
static_assert(CHUNK_SIZE_B % TC_SCANB == 0);
static_assert(CHUNK_SIZE_B % TC_SCANA == 0);

4
qdp_project/src/utils/aggregation.h

@ -211,7 +211,6 @@ public:
size_t value_count = chunk_size_b / sizeof(base_t);
__m512i agg_vec = func<base_t>::zero();
size_t i = 0;
// stop before! running out of space
if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
@ -251,7 +250,6 @@ public:
//TODO this function does not work if value_count % lanes != 0
size_t value_count = chunk_size_b / sizeof(base_t);
size_t i = 0;
// stop before! running out of space
if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
@ -315,4 +313,4 @@ public:
static __m512i get_zero() {
return func<base_t>::zero();
}
};
};

4
qdp_project/src/utils/filter.h

@ -134,12 +134,12 @@ public:
size_t value_count = chunk_size_b / sizeof(base_t);
__m512i cmp_vec = _mm512_set1_epi64(cmp_value);
size_t i = 0;
// this weird implementetion is neccessary, see analogous impl in aggregation for explaination
if(value_count > lanes) {
for(; (i < value_count - lanes); i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
__mmask8 bitmask = func<base_t>::simd_filter(vec, cmp_vec);
uint8_t int_mask = (uint8_t) _mm512_mask2int(bitmask);
dest[i / lanes] = int_mask;
@ -167,4 +167,4 @@ public:
return true;
}
};
};
Loading…
Cancel
Save