Browse Source

add optimal dsa caching mode

master
Constantin Fürst 11 months ago
parent
commit
2e0f637363
  1. 46
      qdp_project/src/benchmark/MAX_benchmark.cpp
  2. 64
      qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h

46
qdp_project/src/benchmark/MAX_benchmark.cpp

@ -70,17 +70,18 @@ base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t*
enum class ExecMode {
DramBaseline,
HbmPeak,
HbmPrefetch
HbmPrefetch,
HbmPrefetchOpt
};
int main(int argc, char** argv) {
constexpr ExecMode mode = ExecMode::HbmPrefetch;
constexpr ExecMode mode = ExecMode::HbmPrefetchOpt;
constexpr size_t workload_b = 4_GiB;
constexpr base_t compare_value_a = 50;
constexpr base_t compare_value_b = 42;
constexpr size_t chunk_size = 256_MiB;
constexpr size_t chunk_size = 64_MiB;
// thread count is 12 here but as the default measurement uses 6
// we must restrict the core assignment of these 12 threads to
@ -99,6 +100,11 @@ int main(int argc, char** argv) {
tc_copy = 8;
tc_agg = 8;
}
else if constexpr (mode == ExecMode::HbmPrefetchOpt) {
tc_filter = 0;
tc_copy = 0;
tc_agg = 1;
}
else {
tc_filter = 8;
tc_copy = 0;
@ -127,6 +133,9 @@ int main(int argc, char** argv) {
else if constexpr (mode == ExecMode::DramBaseline) {
mode_string = "DramBaseline";
}
else if constexpr (mode == ExecMode::HbmPrefetchOpt) {
mode_string = "HbmDsaPrefetchOpt";
}
else {
mode_string = "Unknown";
}
@ -166,7 +175,7 @@ int main(int argc, char** argv) {
std::promise<void> p;
std::shared_future<void> ready_future(p.get_future());
Query_Wrapper<base_t, mode == ExecMode::HbmPrefetch> qw (
Query_Wrapper<base_t, mode == ExecMode::HbmPrefetch || mode == ExecMode::HbmPrefetchOpt> qw (
&ready_future, workload_b, chunk_size,
data_a, data_b, results, tc_filter, tc_copy,
tc_agg,compare_value_a, compare_value_b
@ -178,23 +187,32 @@ int main(int argc, char** argv) {
auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.scan_a(gid, gcnt, tid); };
auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.scan_b(gid, gcnt, tid); };
auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.aggr_j(gid, gcnt, tid); };
auto combined_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.combined(gid, gcnt, tid); };
std::vector<std::thread> filter_pool;
std::vector<std::thread> copy_pool;
std::vector<std::thread> agg_pool;
std::vector<std::thread> combined_pool;
for(uint32_t gid = 0; gid < THREAD_GROUP_MULTIPLIER; ++gid) {
for(uint32_t tid = 0; tid < tc_filter; ++tid) {
filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
if constexpr (mode == ExecMode::HbmPrefetchOpt) {
for(uint32_t tid = 0; tid < tc_agg; ++tid) {
agg_pool.emplace_back(combined_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
}
}
// if tc_copy == 0 this loop is skipped
for(uint32_t tid = 0; tid < tc_copy; ++tid) {
copy_pool.emplace_back(copy_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
}
for(uint32_t tid = 0; tid < tc_agg; ++tid) {
agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
else {
for(uint32_t tid = 0; tid < tc_filter; ++tid) {
filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
}
// if tc_copy == 0 this loop is skipped
for(uint32_t tid = 0; tid < tc_copy; ++tid) {
copy_pool.emplace_back(copy_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
}
for(uint32_t tid = 0; tid < tc_agg; ++tid) {
agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
}
}
}

64
qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h

@ -246,26 +246,69 @@ public:
for(uint32_t i = 0; i < runs; ++i) {
trt->start_timer(1, tid * gcnt + gid);
pvc->start("scan_b", tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
if constexpr (caching) {
const auto data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
data->WaitOnCompletion();
data->WaitOnCompletion();
}
pvc->stop("scan_b", tid * gcnt + gid);
trt->stop_timer(1, tid * gcnt + gid);
bt->timed_wait(*(*sync_barrier)[barrier_idx], 1, tid * gcnt + gid);
}
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
void combined(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_ag;
assert(chunk_size_w % tcnt == 0);
assert(chunk_size_w % 16 == 0);
assert(chunk_size_w % tcnt * 16 == 0);
if constexpr (caching) (*(*sync_barrier)[barrier_idx]).arrive_and_drop();
__m512i aggregator = aggregation::OP::zero();
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
// wait till everyone can start
ready_future->wait();
for(uint32_t i = 0; i < runs; ++i) {
// calculate pointers and initialize local storage
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
std::unique_ptr<dsacache::CacheData> data;
// perform "prefetch_b"
{
data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
}
// perform operation "scan_a"
{
uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt);
}
// wait on caching task
data->WaitOnCompletion();
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
// continue with operation "aggr_j"
{
uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
base_t tmp = _mm512_reduce_add_epi64(aggregator);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, chunk_size_b / tcnt);
}
}
if constexpr (!caching) (*(*sync_barrier)[barrier_idx]).arrive_and_drop();
aggregation::happly(dest + (tid * gcnt + gid), aggregator);
}
void scan_a(size_t gid, size_t gcnt, size_t tid) {
@ -294,11 +337,8 @@ public:
pvc->stop("scan_a", tid * gcnt + gid);
trt->stop_timer(0, tid * gcnt + gid);
bt->timed_wait(*(*sync_barrier)[barrier_idx], 0, tid * gcnt + gid);
if constexpr (caching) (*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
if constexpr (!caching) (*(*sync_barrier)[barrier_idx]).arrive_and_drop();
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
void aggr_j(size_t gid, size_t gcnt, size_t tid) {

Loading…
Cancel
Save