Browse Source

enable qdp testing for dram as baseline, pre-allocated hbm as peak and the already existing dsa-hbm-prefetch

master
Constantin Fürst 11 months ago
parent
commit
af4e3de80c
  1. 156
      qdp_project/src/benchmark/MAX_benchmark.cpp
  2. 73
      qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h

156
qdp_project/src/benchmark/MAX_benchmark.cpp

@ -21,10 +21,6 @@
#define THREAD_GROUP_MULTIPLIER 2 #define THREAD_GROUP_MULTIPLIER 2
#endif #endif
#ifndef QUERY
#define QUERY 1
#endif
#ifndef BARRIER_MODE #ifndef BARRIER_MODE
#define BARRIER_MODE "global" #define BARRIER_MODE "global"
#endif #endif
@ -33,18 +29,6 @@
#define BUFFER_LIMIT 1 #define BUFFER_LIMIT 1
#endif #endif
#ifndef PINNING
#define PINNING 1
#endif
#ifndef PCM_M
#define PCM_M 0
#endif
#if PCM_M == 1
#include "pcm.h"
#endif
#include "const.h" #include "const.h"
#include "file_output.h" #include "file_output.h"
@ -55,6 +39,7 @@
#include "cpu_set_utils.h" #include "cpu_set_utils.h"
#include "iterable_range.h" #include "iterable_range.h"
#include "memory_literals.h" #include "memory_literals.h"
#include "pipelines/MAX_scan_filter_pipe.h" #include "pipelines/MAX_scan_filter_pipe.h"
#include "aggregation.h" #include "aggregation.h"
@ -62,6 +47,10 @@
using base_t = uint64_t; using base_t = uint64_t;
static int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node;
}
base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) { base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) {
base_t sum = 0; base_t sum = 0;
for(int i = 0; i < row_size / sizeof(base_t); ++i) { for(int i = 0; i < row_size / sizeof(base_t); ++i) {
@ -78,26 +67,18 @@ base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t*
return sum; return sum;
} }
enum class ExecMode {
DramBaseline,
HbmPeak,
HbmPrefetch
};
int main(int argc, char** argv) { int main(int argc, char** argv) {
#if PCM == 1
pcm::PCM *pcm = pcm::PCM::getInstance();
//and check for errors
auto error_code = pcm->program();
if(error_code != pcm::PCM::Success) {
std::cerr << "PCM couldn't start" << std::endl;
std::cerr << "Error code: " << error_code << std::endl;
std::cerr << "Try to execute 'sudo modprobe msr' and execute this program with root privigeges.";
return 1;
}
#endif
constexpr ExecMode mode = ExecMode::DramBaseline;
// set constants
constexpr size_t workload_b = 2_GiB; constexpr size_t workload_b = 2_GiB;
constexpr base_t compare_value_a = 50; constexpr base_t compare_value_a = 50;
constexpr base_t compare_value_b = 42; constexpr base_t compare_value_b = 42;
constexpr bool simple_query = (QUERY == 1);
constexpr bool cache_a = false;
constexpr bool wait_b = false;
constexpr size_t chunk_min = 1_MiB; constexpr size_t chunk_min = 1_MiB;
constexpr size_t chunk_max = 8_MiB + 1; constexpr size_t chunk_max = 8_MiB + 1;
@ -107,15 +88,52 @@ int main(int argc, char** argv) {
// we must restrict the core assignment of these 12 threads to // we must restrict the core assignment of these 12 threads to
// 6 physical cpu cores on the executing node // 6 physical cpu cores on the executing node
constexpr size_t thread_count = 12;
/*** alloc data and buffers ************************************************/
uint8_t tc_filter;
uint8_t tc_copy;
uint8_t tc_agg;
if constexpr (mode == ExecMode::HbmPrefetch) {
tc_filter = 4;
tc_copy = 1;
tc_agg = 1;
}
else {
tc_filter = 4;
tc_copy = 0;
tc_agg = 2;
}
const uint8_t tc_combined = tc_filter + tc_copy + tc_agg;
base_t* data_a;
base_t* data_b;
base_t* results;
const int current_cpu = sched_getcpu();
const int current_node = numa_node_of_cpu(current_cpu);
const int cache_node = CachePlacementPolicy(current_node, current_node, 0);
std::ofstream out_file; std::ofstream out_file;
const std::string ofname = std::string("results/max_") +
"q-" + (std::string)(simple_query == true ? "simple" : "complex") +
"_bm-" + (std::string) BARRIER_MODE +
"_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") +
"_tc-" + std::to_string(thread_count) + "1MiB-2MiB.csv";
std::string mode_string;
if constexpr (mode == ExecMode::HbmPrefetch) {
mode_string = "HbmDsaPrefetch";
}
else if constexpr (mode == ExecMode::HbmPeak) {
mode_string = "HbmAllocPeak";
}
else if constexpr (mode == ExecMode::DramBaseline) {
mode_string = "DramBaseline";
}
else {
mode_string = "Unknown";
}
const std::string ofname = "qdp-xeonmax-simpleq-" + mode_string + "-tca" + std::to_string(tc_filter) + "-tcb" + std::to_string(tc_copy) + "-tcj" + std::to_string(tc_agg) + "-tmul" + std::to_string(THREAD_GROUP_MULTIPLIER) ;
out_file.open(ofname); out_file.open(ofname);
@ -127,61 +145,37 @@ int main(int argc, char** argv) {
Linear_Int_Range<uint32_t, 0, 30, 1> run("run"); Linear_Int_Range<uint32_t, 0, 30, 1> run("run");
Linear_Int_Range<size_t, chunk_min, chunk_max, chunk_incr> chunk_size("chunk_size"); Linear_Int_Range<size_t, chunk_min, chunk_max, chunk_incr> chunk_size("chunk_size");
print_to_file(out_file, generateHead(run, chunk_size, "mode"), "thread_group", "time",
#ifdef THREAD_TIMINGS
print_to_file(out_file, generateHead(run, chunk_size), "thread_group", "time",
#ifdef THREAD_TIMINGS
"scan_a", "scan_b", "aggr_j", "scan_a", "scan_b", "aggr_j",
#endif
#ifdef BARRIER_TIMINGS
#endif
#ifdef BARRIER_TIMINGS
"wait_scan_a", "wait_scan_b", "wait_aggr_j", "wait_scan_a", "wait_scan_b", "wait_aggr_j",
#endif
#if PCM == 1
pcm_value_collector::getHead("scan_a"),
pcm_value_collector::getHead("scan_b"),
pcm_value_collector::getHead("aggr_j"),
#endif
#endif
"result"); "result");
/*** alloc data and buffers ************************************************/
base_t* data_a = (base_t*) numa_alloc_local(workload_b);
base_t* data_b = (base_t*) numa_alloc_local(workload_b);
base_t* results = (base_t*) numa_alloc_local(thread_count * sizeof(base_t));
if constexpr (mode == ExecMode::HbmPeak) {
data_a = (base_t*) numa_alloc_onnode(workload_b, cache_node);
data_b = (base_t*) numa_alloc_onnode(workload_b, cache_node);
results = (base_t*) numa_alloc_onnode(tc_combined * sizeof(base_t), cache_node);
}
else {
data_a = (base_t*) numa_alloc_onnode(workload_b, current_node);
data_b = (base_t*) numa_alloc_onnode(workload_b, current_node);
results = (base_t*) numa_alloc_onnode(tc_combined * sizeof(base_t), current_node);
}
fill_mt<base_t>(data_a, workload_b, 0, 100, 42); fill_mt<base_t>(data_a, workload_b, 0, 100, 42);
fill_mt<base_t>(data_b, workload_b, 0, 100, 420); fill_mt<base_t>(data_b, workload_b, 0, 100, 420);
const std::string cfname = std::string("results/max_") +
"q-" + (std::string)(simple_query == true ? "simple" : "complex") +
"_bm-" + (std::string) BARRIER_MODE +
"_bl-" + (std::string)(BUFFER_LIMIT == 1 ? "limited" : "unlimited") +
"_tc-" + std::to_string(thread_count) + ".checksum";
std::ofstream check_file;
check_file.open(cfname);
if (check_file.bad()) {
std::cerr << "Failed to open Checksum File '" << cfname << "'" << std::endl;
}
if constexpr (QUERY == 1) {
//calculate simple checksum if QUERY == 1 -> simple query is applied
check_file << sum_check(compare_value_a, data_a, data_b, workload_b);
} else {
check_file << sum_check_complex(compare_value_a, compare_value_b, data_a, data_b, workload_b);
}
check_file.close();
for(uint32_t i = 0; i < 15; i++) { for(uint32_t i = 0; i < 15; i++) {
std::promise<void> p; std::promise<void> p;
std::shared_future<void> ready_future(p.get_future()); std::shared_future<void> ready_future(p.get_future());
const uint8_t tc_filter = 6;
const uint8_t tc_copy = 2;
const uint8_t tc_agg = 4;
Query_Wrapper<base_t, simple_query, cache_a, wait_b> qw (
Query_Wrapper<base_t, mode == ExecMode::HbmPrefetch> qw (
&ready_future, workload_b, chunk_size.current, &ready_future, workload_b, chunk_size.current,
data_a, data_b, results, tc_filter, tc_copy, tc_agg, 50, 42
data_a, data_b, results, tc_filter, tc_copy,
tc_agg,compare_value_a, compare_value_b
); );
qw.ready_future = &ready_future; qw.ready_future = &ready_future;
@ -225,7 +219,7 @@ int main(int argc, char** argv) {
double seconds = (double)(nanos) / nanos_per_second; double seconds = (double)(nanos) / nanos_per_second;
if (i >= 5) { if (i >= 5) {
print_to_file(out_file, run, chunk_size, "DSA-HBM-Prefetch", THREAD_GROUP_MULTIPLIER, seconds,
print_to_file(out_file, run, chunk_size, THREAD_GROUP_MULTIPLIER, seconds,
#ifdef THREAD_TIMINGS #ifdef THREAD_TIMINGS
qw.trt->summarize_time(0), qw.trt->summarize_time(1), qw.trt->summarize_time(2), qw.trt->summarize_time(0), qw.trt->summarize_time(1), qw.trt->summarize_time(2),
#endif #endif
@ -239,5 +233,5 @@ int main(int argc, char** argv) {
numa_free(data_a, workload_b); numa_free(data_a, workload_b);
numa_free(data_b, workload_b); numa_free(data_b, workload_b);
numa_free(results, THREAD_GROUP_MULTIPLIER * thread_count * sizeof(base_t));
numa_free(results, THREAD_GROUP_MULTIPLIER * tc_combined * sizeof(base_t));
} }

73
qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h

@ -16,7 +16,7 @@
#include "../../../../offloading-cacher/cache.hpp" #include "../../../../offloading-cacher/cache.hpp"
template<typename base_t, bool simple, bool cache_a, bool wait_b>
template<typename base_t, bool caching>
class Query_Wrapper { class Query_Wrapper {
public: public:
// sync // sync
@ -28,7 +28,6 @@ public:
private: private:
static constexpr size_t COPY_POLICY_MIN_SIZE = 64 * 1024 * 1024; static constexpr size_t COPY_POLICY_MIN_SIZE = 64 * 1024 * 1024;
dsacache::Cache cache_; dsacache::Cache cache_;
// data // data
@ -124,7 +123,9 @@ public:
mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node); mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node);
mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node); mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node);
if constexpr (caching) {
cache_.Init(CachePlacementPolicy, CopyMethodPolicy); cache_.Init(CachePlacementPolicy, CopyMethodPolicy);
}
size_t measurement_space = std::max(std::max(tc_fi, tc_fc), tc_ag); size_t measurement_space = std::max(std::max(tc_fi, tc_fc), tc_ag);
trt = new thread_runtime_timing(3, measurement_space, current_node); trt = new thread_runtime_timing(3, measurement_space, current_node);
@ -157,7 +158,9 @@ public:
std::memset(mask_a, 0x00, size_b / sizeof(base_t)); std::memset(mask_a, 0x00, size_b / sizeof(base_t));
std::memset(mask_b, 0x00, size_b / sizeof(base_t)); std::memset(mask_b, 0x00, size_b / sizeof(base_t));
if constexpr (caching) {
cache_.Clear(); cache_.Clear();
}
trt->reset_accumulator(); trt->reset_accumulator();
bt->reset_accumulator(); bt->reset_accumulator();
@ -248,48 +251,9 @@ public:
// calculate pointers // calculate pointers
size_t chunk_id = gid + gcnt * i; size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr = get_sub_mask_ptr(mask_b, chunk_id, chunk_size_w, tid, tcnt);
if constexpr(simple){
if constexpr (caching) {
cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt); cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
} else {
const auto data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
if constexpr(wait_b) {
// wait on copy to complete - during this time other threads may
// continue with their calculation which leads to little impact
// and we will be faster if the cache is used
data->WaitOnCompletion();
// obtain the data location from the cache entry
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
// nullptr is still a legal return value for CacheData::GetLocation()
// even after waiting, so this must be checked
if (data_ptr == nullptr) {
std::cerr << "[!] Cache Miss in ScanB" << std::endl;
data_ptr = chunk_ptr;
}
filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt);
}
else {
// obtain the data location from the cache entry
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
// nullptr is still a legal return value for CacheData::GetLocation()
// even after waiting, so this must be checked
if (data_ptr == nullptr) {
data_ptr = chunk_ptr;
}
filterNoCopy::apply_same(mask_ptr, nullptr, data_ptr, cmp_b, chunk_size_b / tcnt);
}
} }
pvc->stop("scan_b", tid * gcnt + gid); pvc->stop("scan_b", tid * gcnt + gid);
@ -321,21 +285,7 @@ public:
base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt); base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
if constexpr (cache_a) {
const auto data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
data->WaitOnCompletion();
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
if (data_ptr == nullptr) {
std::cerr << "[!] Cache Miss in ScanA" << std::endl;
data_ptr = chunk_ptr;
}
filter::apply_same(mask_ptr, nullptr, data_ptr, cmp_a, chunk_size_b / tcnt);
}
else {
filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt); filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt);
}
pvc->stop("scan_a", tid * gcnt + gid); pvc->stop("scan_a", tid * gcnt + gid);
trt->stop_timer(0, tid * gcnt + gid); trt->stop_timer(0, tid * gcnt + gid);
@ -363,7 +313,9 @@ public:
// calculate pointers // calculate pointers
size_t chunk_id = gid + gcnt * i; size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt); base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
base_t* data_ptr;
if constexpr (caching) {
// access the cache for the given chunk which will have been accessed in scan_b // access the cache for the given chunk which will have been accessed in scan_b
const auto data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt); const auto data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
@ -385,17 +337,16 @@ public:
data_ptr = chunk_ptr; data_ptr = chunk_ptr;
std::cerr << "[!] Cache Miss in AggrJ" << std::endl; std::cerr << "[!] Cache Miss in AggrJ" << std::endl;
} }
}
else {
data_ptr = chunk_ptr;
}
uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt); uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr_b = get_sub_mask_ptr (mask_b, chunk_id, chunk_size_w, tid, tcnt);
base_t tmp = _mm512_reduce_add_epi64(aggregator); base_t tmp = _mm512_reduce_add_epi64(aggregator);
if constexpr(simple){
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, chunk_size_b / tcnt); aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, chunk_size_b / tcnt);
} else {
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, mask_ptr_b, chunk_size_b / tcnt);
}
pvc->stop("aggr_j", tid * gcnt + gid); pvc->stop("aggr_j", tid * gcnt + gid);
trt->stop_timer(2, tid * gcnt + gid); trt->stop_timer(2, tid * gcnt + gid);

Loading…
Cancel
Save