Browse Source

resolve issues from the recent reset of qdp benchmark

master
Constantin Fürst 11 months ago
parent
commit
006b856c44
  1. 53
      qdp_project/src/Benchmark.cpp
  2. 7
      qdp_project/src/Configuration.hpp

53
qdp_project/src/Benchmark.cpp

@ -22,7 +22,6 @@ using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>;
dsacache::Cache CACHE_;
std::array<std::atomic<int32_t>, GROUP_COUNT> PREFETCHED_CHUNKS_;
std::vector<std::barrier<NopStruct>*> BARRIERS_;
std::shared_future<void> LAUNCH_;
@ -31,13 +30,16 @@ uint64_t* DATA_B_;
uint16_t* MASK_A_;
uint64_t* DATA_DST_;
// if more b than j -> perform b normal, subsplit j
// if more j than b -> subsplit b like it is now
void scan_b(size_t gid, size_t tid) {
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].clear();
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1);
LAUNCH_.wait();
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
template<size_t TC_CACHING>
void caching(size_t gid, size_t tid) {
constexpr size_t VIRT_TID_INCREMENT = TC_CACHING / TC_AGGRJ;
constexpr size_t SUBCHUNK_THREAD_RATIO = TC_AGGRJ / (TC_CACHING == 0 ? 1 : TC_CACHING);
constexpr size_t VIRT_TID_INCREMENT = TC_SCANB / TC_AGGRJ;
constexpr size_t SUBCHUNK_THREAD_RATIO = TC_AGGRJ / (TC_SCANB == 0 ? 1 : TC_SCANB);
constexpr bool CACHE_SUBCHUNKING = SUBCHUNK_THREAD_RATIO > 1;
constexpr bool CACHE_OVERCHUNKING = VIRT_TID_INCREMENT > 1;
@ -55,9 +57,6 @@ void caching(size_t gid, size_t tid) {
uint64_t* sub_chunk_ptr = &chunk_ptr[j * SUBCHUNK_SIZE_ELEMENTS];
CACHE_.Access(reinterpret_cast<uint8_t*>(sub_chunk_ptr), SUBCHUNK_SIZE_B);
PREFETCHED_CHUNKS_[gid]++;
PREFETCHED_CHUNKS_[gid].notify_one();
}
}
}
@ -68,9 +67,6 @@ void caching(size_t gid, size_t tid) {
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid_virt);
CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), CHUNK_SIZE_B);
PREFETCHED_CHUNKS_[gid]++;
PREFETCHED_CHUNKS_[gid].notify_one();
}
}
}
@ -80,23 +76,9 @@ void caching(size_t gid, size_t tid) {
uint64_t* chunk_ptr = get_chunk<TC_SCANB>(DATA_B_, chunk_index, tid);
CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), CHUNK_SIZE_B);
PREFETCHED_CHUNKS_[gid]++;
PREFETCHED_CHUNKS_[gid].notify_one();
}
}
}
}
void scan_b(size_t gid, size_t tid) {
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].clear();
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1);
LAUNCH_.wait();
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
caching<TC_SCANB>(gid, tid);
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now();
@ -117,13 +99,13 @@ void scan_a(size_t gid, size_t tid) {
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA);
BARRIERS_[gid]->arrive_and_wait();
}
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now();
BARRIERS_[gid]->arrive_and_drop();
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now();
}
void aggr_j(size_t gid, size_t tid) {
@ -132,19 +114,19 @@ void aggr_j(size_t gid, size_t tid) {
CACHE_HITS_[UniqueIndex(gid,tid)] = 0;
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].clear();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT);
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1);
__m512i aggregator = aggregation::OP::zero();
LAUNCH_.wait();
BARRIERS_[gid]->arrive_and_wait();
for (size_t i = 0; i < RUN_COUNT; i++) {
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
BARRIERS_[gid]->arrive_and_drop();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid);
uint16_t* mask_ptr_a = get_mask<TC_AGGRJ>(MASK_A_, chunk_index, tid);
@ -174,10 +156,9 @@ BARRIERS_[gid]->arrive_and_wait();
uint64_t tmp = _mm512_reduce_add_epi64(aggregator);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B);
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_END] = std::chrono::steady_clock::now();
}
BARRIERS_[gid]->arrive_and_drop();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now();
aggregation::happly(&DATA_DST_[UniqueIndex(gid,tid)], aggregator);
}

7
qdp_project/src/Configuration.hpp

@ -3,7 +3,7 @@
#include "utils/memory_literals.h"
#ifndef MODE_SET_BY_CMAKE
#define MODE_SIMPLE_PREFETCH
#define MODE_PREFETCH
#endif
constexpr size_t WL_SIZE_B = 4_GiB;
@ -15,8 +15,8 @@ constexpr int MEM_NODE_DRAM = 0;
#ifdef MODE_PREFETCH
constexpr uint32_t GROUP_COUNT = 16;
constexpr size_t CHUNK_SIZE_B = 16_MiB;
constexpr uint32_t TC_SCANA = 1;
constexpr uint32_t TC_SCANB = 0;
constexpr uint32_t TC_SCANA = 2;
constexpr uint32_t TC_SCANB = 1;
constexpr uint32_t TC_AGGRJ = 1;
constexpr bool PERFORM_CACHING = true;
constexpr int MEM_NODE_A = 0;
@ -47,7 +47,6 @@ constexpr char MODE_STRING[] = "hbm";
#endif
constexpr uint64_t CMP_A = 50;
constexpr uint32_t TC_COMBINED = TC_SCANA + TC_SCANB + TC_AGGRJ;
constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t);
constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B;
constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t);

Loading…
Cancel
Save