Browse Source

remove bad opt mode from last commit and instead try to improve default prefetching

master
Constantin Fürst 11 months ago
parent
commit
ab217cb080
  1. 27
      qdp_project/src/benchmark/MAX_benchmark.cpp
  2. 54
      qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h

27
qdp_project/src/benchmark/MAX_benchmark.cpp

@ -70,12 +70,11 @@ base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t*
enum class ExecMode {
DramBaseline,
HbmPeak,
HbmPrefetch,
HbmPrefetchOpt
HbmPrefetch
};
int main(int argc, char** argv) {
constexpr ExecMode mode = ExecMode::HbmPrefetchOpt;
constexpr ExecMode mode = ExecMode::HbmPrefetch;
constexpr size_t workload_b = 4_GiB;
constexpr base_t compare_value_a = 50;
@ -97,13 +96,8 @@ int main(int argc, char** argv) {
if constexpr (mode == ExecMode::HbmPrefetch) {
tc_filter = 8;
tc_copy = 8;
tc_agg = 8;
}
else if constexpr (mode == ExecMode::HbmPrefetchOpt) {
tc_filter = 0;
tc_copy = 0;
tc_agg = 1;
tc_copy = 1;
tc_agg = 4;
}
else {
tc_filter = 8;
@ -133,9 +127,6 @@ int main(int argc, char** argv) {
else if constexpr (mode == ExecMode::DramBaseline) {
mode_string = "DramBaseline";
}
else if constexpr (mode == ExecMode::HbmPrefetchOpt) {
mode_string = "HbmDsaPrefetchOpt";
}
else {
mode_string = "Unknown";
}
@ -175,7 +166,7 @@ int main(int argc, char** argv) {
std::promise<void> p;
std::shared_future<void> ready_future(p.get_future());
Query_Wrapper<base_t, mode == ExecMode::HbmPrefetch || mode == ExecMode::HbmPrefetchOpt> qw (
Query_Wrapper<base_t, mode == ExecMode::HbmPrefetch> qw (
&ready_future, workload_b, chunk_size,
data_a, data_b, results, tc_filter, tc_copy,
tc_agg,compare_value_a, compare_value_b
@ -187,7 +178,6 @@ int main(int argc, char** argv) {
auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.scan_a(gid, gcnt, tid); };
auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.scan_b(gid, gcnt, tid); };
auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.aggr_j(gid, gcnt, tid); };
auto combined_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.combined(gid, gcnt, tid); };
std::vector<std::thread> filter_pool;
std::vector<std::thread> copy_pool;
@ -195,12 +185,6 @@ int main(int argc, char** argv) {
std::vector<std::thread> combined_pool;
for(uint32_t gid = 0; gid < THREAD_GROUP_MULTIPLIER; ++gid) {
if constexpr (mode == ExecMode::HbmPrefetchOpt) {
for(uint32_t tid = 0; tid < tc_agg; ++tid) {
agg_pool.emplace_back(combined_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
}
}
else {
for(uint32_t tid = 0; tid < tc_filter; ++tid) {
filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
}
@ -214,7 +198,6 @@ int main(int argc, char** argv) {
agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
}
}
}
auto start = std::chrono::steady_clock::now();
p.set_value();

54
qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h

@ -244,6 +244,8 @@ public:
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
std::unique_ptr<dsacache::CacheData> data;
for(uint32_t i = 0; i < runs; ++i) {
trt->start_timer(1, tid * gcnt + gid);
@ -252,63 +254,17 @@ public:
base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
if constexpr (caching) {
const auto data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
data->WaitOnCompletion();
}
trt->stop_timer(1, tid * gcnt + gid);
}
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
void combined(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_ag;
assert(chunk_size_w % tcnt == 0);
assert(chunk_size_w % 16 == 0);
assert(chunk_size_w % tcnt * 16 == 0);
__m512i aggregator = aggregation::OP::zero();
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
// wait till everyone can start
ready_future->wait();
for(uint32_t i = 0; i < runs; ++i) {
// calculate pointers and initialize local storage
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
std::unique_ptr<dsacache::CacheData> data;
// perform "prefetch_b"
{
data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
}
// perform operation "scan_a"
{
uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt);
trt->stop_timer(1, tid * gcnt + gid);
}
// wait on caching task
if constexpr (caching) {
data->WaitOnCompletion();
base_t* data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
// continue with operation "aggr_j"
{
uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
base_t tmp = _mm512_reduce_add_epi64(aggregator);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, chunk_size_b / tcnt);
}
}
aggregation::happly(dest + (tid * gcnt + gid), aggregator);
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
void scan_a(size_t gid, size_t gcnt, size_t tid) {

Loading…
Cancel
Save