|
@ -42,6 +42,7 @@ constexpr uint32_t TC_COMBINED = TC_SCANA + TC_SCANB + TC_AGGRJ; |
|
|
constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t); |
|
|
constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t); |
|
|
constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B; |
|
|
constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B; |
|
|
constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t); |
|
|
constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t); |
|
|
|
|
|
constexpr size_t RUN_COUNT = CHUNK_COUNT / GROUP_COUNT; |
|
|
|
|
|
|
|
|
using filter = Filter<uint64_t, LT, load_mode::Stream, false>; |
|
|
using filter = Filter<uint64_t, LT, load_mode::Stream, false>; |
|
|
using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>; |
|
|
using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>; |
|
@ -60,13 +61,10 @@ void scan_b(size_t gid, size_t tid) { |
|
|
LAUNCH_.wait(); |
|
|
LAUNCH_.wait(); |
|
|
|
|
|
|
|
|
if constexpr (PERFORM_CACHING) { |
|
|
if constexpr (PERFORM_CACHING) { |
|
|
uint32_t runs = CHUNK_COUNT / GROUP_COUNT + (CHUNK_COUNT % GROUP_COUNT > gid); |
|
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<dsacache::CacheData> data; |
|
|
std::unique_ptr<dsacache::CacheData> data; |
|
|
|
|
|
|
|
|
for(uint32_t i = 0; i < runs; ++i) { |
|
|
|
|
|
// calculate pointers
|
|
|
|
|
|
size_t chunk_id = gid + GROUP_COUNT * i; |
|
|
|
|
|
|
|
|
for (size_t i = 0; i < RUN_COUNT; i++) { |
|
|
|
|
|
size_t chunk_id = gid * GROUP_COUNT + i; |
|
|
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_B_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ); |
|
|
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_B_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ); |
|
|
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), CHUNK_SIZE_B / TC_AGGRJ); |
|
|
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), CHUNK_SIZE_B / TC_AGGRJ); |
|
|
} |
|
|
} |
|
@ -80,11 +78,8 @@ void scan_b(size_t gid, size_t tid) { |
|
|
void scan_a(size_t gid, size_t tid) { |
|
|
void scan_a(size_t gid, size_t tid) { |
|
|
LAUNCH_.wait(); |
|
|
LAUNCH_.wait(); |
|
|
|
|
|
|
|
|
uint32_t runs = CHUNK_COUNT / GROUP_COUNT + (CHUNK_COUNT % GROUP_COUNT > gid); |
|
|
|
|
|
|
|
|
|
|
|
for(uint32_t i = 0; i < runs; ++i) { |
|
|
|
|
|
// calculate pointers
|
|
|
|
|
|
size_t chunk_id = gid + GROUP_COUNT * i; |
|
|
|
|
|
|
|
|
for (size_t i = 0; i < RUN_COUNT; i++) { |
|
|
|
|
|
size_t chunk_id = gid * GROUP_COUNT + i; |
|
|
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA); |
|
|
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA); |
|
|
uint16_t* mask_ptr = get_sub_mask_ptr (MASK_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA); |
|
|
uint16_t* mask_ptr = get_sub_mask_ptr (MASK_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA); |
|
|
|
|
|
|
|
@ -97,34 +92,21 @@ void scan_a(size_t gid, size_t tid) { |
|
|
void aggr_j(size_t gid, size_t tid) { |
|
|
void aggr_j(size_t gid, size_t tid) { |
|
|
LAUNCH_.wait(); |
|
|
LAUNCH_.wait(); |
|
|
|
|
|
|
|
|
// calculate values
|
|
|
|
|
|
__m512i aggregator = aggregation::OP::zero(); |
|
|
__m512i aggregator = aggregation::OP::zero(); |
|
|
// the lower gids run once more if the chunks are not evenly distributable
|
|
|
|
|
|
uint32_t runs = CHUNK_COUNT / GROUP_COUNT + (CHUNK_COUNT % GROUP_COUNT > gid); |
|
|
|
|
|
|
|
|
|
|
|
for(uint32_t i = 0; i < runs; ++i) { |
|
|
|
|
|
|
|
|
for (size_t i = 0; i < RUN_COUNT; i++) { |
|
|
BARRIERS_[gid]->arrive_and_wait(); |
|
|
BARRIERS_[gid]->arrive_and_wait(); |
|
|
|
|
|
|
|
|
// calculate pointers
|
|
|
|
|
|
size_t chunk_id = gid + GROUP_COUNT * i; |
|
|
|
|
|
|
|
|
size_t chunk_id = gid * GROUP_COUNT + i; |
|
|
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_B_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ); |
|
|
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_B_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ); |
|
|
|
|
|
|
|
|
std::unique_ptr<dsacache::CacheData> data; |
|
|
std::unique_ptr<dsacache::CacheData> data; |
|
|
uint64_t* data_ptr; |
|
|
uint64_t* data_ptr; |
|
|
|
|
|
|
|
|
if constexpr (PERFORM_CACHING) { |
|
|
if constexpr (PERFORM_CACHING) { |
|
|
// access the cache for the given chunk which will have been accessed in scan_b
|
|
|
|
|
|
|
|
|
|
|
|
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), CHUNK_SIZE_B / TC_AGGRJ); |
|
|
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), CHUNK_SIZE_B / TC_AGGRJ); |
|
|
|
|
|
|
|
|
// after the copy task has finished we obtain the pointer to the cached
|
|
|
|
|
|
// copy of data_b which is then used from now on
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
data->WaitOnCompletion(); |
|
|
data_ptr = reinterpret_cast<uint64_t*>(data->GetDataLocation()); |
|
|
data_ptr = reinterpret_cast<uint64_t*>(data->GetDataLocation()); |
|
|
|
|
|
|
|
|
// nullptr is still a legal return value for CacheData::GetLocation()
|
|
|
|
|
|
// even after waiting, so this must be checked
|
|
|
|
|
|
|
|
|
|
|
|
if (data_ptr == nullptr) { |
|
|
if (data_ptr == nullptr) { |
|
|
std::cerr << "[x] Cache Miss!" << std::endl; |
|
|
std::cerr << "[x] Cache Miss!" << std::endl; |
|
|
exit(-1); |
|
|
exit(-1); |
|
@ -204,7 +186,7 @@ int main() { |
|
|
for(std::thread& t : copy_pool) { t.join(); } |
|
|
for(std::thread& t : copy_pool) { t.join(); } |
|
|
for(std::thread& t : agg_pool) { t.join(); } |
|
|
for(std::thread& t : agg_pool) { t.join(); } |
|
|
|
|
|
|
|
|
Aggregation<uint64_t, Sum, load_mode::Aligned>::apply(DATA_nDST_, DATA_DST_, sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT); |
|
|
|
|
|
|
|
|
Aggregation<uint64_t, Sum, load_mode::Aligned>::apply(DATA_DST_, DATA_DST_, sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT); |
|
|
|
|
|
|
|
|
const auto time_end = std::chrono::steady_clock::now(); |
|
|
const auto time_end = std::chrono::steady_clock::now(); |
|
|
|
|
|
|
|
|