Browse Source

add barriers to the qdp benchmark

master
Constantin Fürst 11 months ago
parent
commit
457a3b520a
  1. 40
      qdp_project/src/Benchmark.cpp

40
qdp_project/src/Benchmark.cpp

@ -22,15 +22,15 @@
////////////////////////////////
/// BENCHMARK SETUP
constexpr size_t WL_SIZE_B = 4_GiB;
constexpr size_t CHUNK_SIZE_B = 128_MiB;
constexpr size_t WL_SIZE_B = 64_MiB;
constexpr size_t CHUNK_SIZE_B = 1_MiB;
constexpr uint64_t CMP_A = 50;
constexpr uint32_t WARMUP_ITERATION_COUNT = 5;
constexpr uint32_t ITERATION_COUNT = 10;
constexpr size_t GROUP_COUNT = 4;
constexpr size_t TC_SCANA = 2;
constexpr size_t TC_SCANB = 2;
constexpr size_t TC_AGGRJ = 1;
constexpr uint32_t WARMUP_ITERATION_COUNT = 0;
constexpr uint32_t ITERATION_COUNT = 2;
constexpr uint32_t GROUP_COUNT = 2;
constexpr uint32_t TC_SCANA = 1;
constexpr uint32_t TC_SCANB = 1;
constexpr uint32_t TC_AGGRJ = 1;
constexpr bool PERFORM_CACHING = false;
constexpr bool DATA_IN_HBM = false;
constexpr char MODE_STRING[] = "DramBase";
@ -38,6 +38,7 @@ constexpr char MODE_STRING[] = "DramBase";
/// DO NOT CONFIGURE BEYOND THIS
////////////////////////////////
constexpr uint32_t TC_COMBINED = TC_SCANA + TC_SCANB + TC_AGGRJ;
constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t);
constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B;
constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t);
@ -47,7 +48,7 @@ using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>;
dsacache::Cache CACHE_;
std::vector<std::barrier<NopStruct>> BARRIERS_;
std::vector<std::barrier<NopStruct>*> BARRIERS_;
std::shared_future<void> LAUNCH_;
uint64_t* DATA_A_;
@ -72,6 +73,8 @@ void scan_b(size_t gid, size_t tid) {
data->WaitOnCompletion();
}
}
BARRIERS_[gid]->arrive_and_drop();
}
void scan_a(size_t gid, size_t tid) {
@ -82,11 +85,13 @@ void scan_a(size_t gid, size_t tid) {
for(uint32_t i = 0; i < runs; ++i) {
// calculate pointers
size_t chunk_id = gid + GROUP_COUNT * i;
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_B_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA);
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA);
uint16_t* mask_ptr = get_sub_mask_ptr (MASK_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA);
}
BARRIERS_[gid]->arrive_and_drop();
}
void aggr_j(size_t gid, size_t tid) {
@ -98,9 +103,11 @@ void aggr_j(size_t gid, size_t tid) {
uint32_t runs = CHUNK_COUNT / GROUP_COUNT + (CHUNK_COUNT % GROUP_COUNT > gid);
for(uint32_t i = 0; i < runs; ++i) {
BARRIERS_[gid]->arrive_and_wait();
// calculate pointers
size_t chunk_id = gid + GROUP_COUNT * i;
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ);
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_B_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ);
std::unique_ptr<dsacache::CacheData> data;
uint64_t* data_ptr;
@ -166,8 +173,6 @@ int main() {
fill_mt<uint64_t>(DATA_A_, WL_SIZE_B, 0, 100, 420);
for (uint32_t i = 0; i < ITERATION_COUNT + WARMUP_ITERATION_COUNT; i++) {
CACHE_.Clear();
std::promise<void> launch_promise;
LAUNCH_ = launch_promise.get_future();
@ -176,6 +181,8 @@ int main() {
std::vector<std::thread> agg_pool;
for(uint32_t gid = 0; gid < GROUP_COUNT; ++gid) {
BARRIERS_.emplace_back(new std::barrier<NopStruct>(TC_COMBINED));
for(uint32_t tid = 0; tid < TC_SCANA; ++tid) {
filter_pool.emplace_back(scan_a, gid, tid);
}
@ -204,6 +211,13 @@ int main() {
if (i >= WARMUP_ITERATION_COUNT) {
fout << i << ";" << std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_start).count() << std::endl;
}
for (std::barrier<NopStruct>* b : BARRIERS_) {
delete b;
}
BARRIERS_.clear();
CACHE_.Clear();
}
numa_free(DATA_A_, WL_SIZE_B);

Loading…
Cancel
Save