|
|
@ -68,6 +68,8 @@ constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t); |
|
|
|
constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B; |
|
|
|
constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t); |
|
|
|
constexpr size_t RUN_COUNT = CHUNK_COUNT / GROUP_COUNT; |
|
|
|
constexpr size_t MASK_ELEMENT_SIZE = 16; |
|
|
|
constexpr size_t MASK_STEP_SIZE = CHUNK_SIZE_ELEMENTS / MASK_ELEMENT_SIZE; |
|
|
|
|
|
|
|
using filter = Filter<uint64_t, LT, load_mode::Stream, false>; |
|
|
|
using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>; |
|
|
@ -82,6 +84,32 @@ uint64_t* DATA_B_; |
|
|
|
uint16_t* MASK_A_; |
|
|
|
uint64_t* DATA_DST_; |
|
|
|
|
|
|
|
template<size_t TC> |
|
|
|
inline uint64_t get_chunk_index(const size_t gid, const size_t tid, const size_t rid) { |
|
|
|
/*
|
|
|
|
* Calculates Chunk Index as follows: |
|
|
|
* group_start = (chunk_count / group_count) * gid |
|
|
|
* thread_start = (chunk_count / (group_count * thread_count)) * tid |
|
|
|
* run_start = (chunk_count / (group_count * thread_count * run_count)) * rid |
|
|
|
* index = group_start + thread_start + run_start |
|
|
|
*/ |
|
|
|
|
|
|
|
constexpr size_t TC_x_RC = TC * RUN_COUNT; |
|
|
|
constexpr size_t GC_x_TC_x_RC = GROUP_COUNT * TC_x_RC; |
|
|
|
|
|
|
|
const size_t index = (CHUNK_COUNT * (TC_x_RC * gid + RUN_COUNT * tid + rid)) / GC_x_TC_x_RC; |
|
|
|
|
|
|
|
return index; |
|
|
|
} |
|
|
|
|
|
|
|
inline uint64_t* get_chunk(uint64_t* base, const size_t chunk_index) { |
|
|
|
return &base[chunk_index * CHUNK_SIZE_ELEMENTS]; |
|
|
|
} |
|
|
|
|
|
|
|
inline uint16_t* get_mask(uint16_t* base, const size_t chunk_index) { |
|
|
|
return &base[chunk_index * MASK_STEP_SIZE]; |
|
|
|
} |
|
|
|
|
|
|
|
void scan_b(size_t gid, size_t tid) { |
|
|
|
LAUNCH_.wait(); |
|
|
|
|
|
|
@ -89,8 +117,9 @@ void scan_b(size_t gid, size_t tid) { |
|
|
|
std::unique_ptr<dsacache::CacheData> data; |
|
|
|
|
|
|
|
for (size_t i = 0; i < RUN_COUNT; i++) { |
|
|
|
size_t chunk_id = gid * GROUP_COUNT + i; |
|
|
|
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_B_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ); |
|
|
|
const size_t chunk_index = get_chunk_index<TC_AGGRJ>(gid, tid, i); |
|
|
|
uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index); |
|
|
|
|
|
|
|
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), CHUNK_SIZE_B / TC_AGGRJ); |
|
|
|
} |
|
|
|
|
|
|
@ -104,9 +133,9 @@ void scan_a(size_t gid, size_t tid) { |
|
|
|
LAUNCH_.wait(); |
|
|
|
|
|
|
|
for (size_t i = 0; i < RUN_COUNT; i++) { |
|
|
|
size_t chunk_id = gid * GROUP_COUNT + i; |
|
|
|
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA); |
|
|
|
uint16_t* mask_ptr = get_sub_mask_ptr (MASK_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA); |
|
|
|
const size_t chunk_index = get_chunk_index<TC_SCANA>(gid, tid, i); |
|
|
|
uint64_t* chunk_ptr = get_chunk(DATA_A_, chunk_index); |
|
|
|
uint16_t* mask_ptr = get_mask(MASK_A_, chunk_index); |
|
|
|
|
|
|
|
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA); |
|
|
|
} |
|
|
@ -119,11 +148,12 @@ void aggr_j(size_t gid, size_t tid) { |
|
|
|
|
|
|
|
__m512i aggregator = aggregation::OP::zero(); |
|
|
|
|
|
|
|
for (size_t i = 0; i < RUN_COUNT; i++) { |
|
|
|
BARRIERS_[gid]->arrive_and_wait(); |
|
|
|
|
|
|
|
size_t chunk_id = gid * GROUP_COUNT + i; |
|
|
|
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_B_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ); |
|
|
|
for (size_t i = 0; i < RUN_COUNT; i++) { |
|
|
|
const size_t chunk_index = get_chunk_index<TC_SCANA>(gid, tid, i); |
|
|
|
uint64_t* chunk_ptr = get_chunk(DATA_B_, chunk_index); |
|
|
|
uint16_t* mask_ptr = get_mask(MASK_A_, chunk_index); |
|
|
|
|
|
|
|
std::unique_ptr<dsacache::CacheData> data; |
|
|
|
uint64_t* data_ptr; |
|
|
@ -141,9 +171,7 @@ void aggr_j(size_t gid, size_t tid) { |
|
|
|
data_ptr = chunk_ptr; |
|
|
|
} |
|
|
|
|
|
|
|
uint16_t* mask_ptr_a = get_sub_mask_ptr(MASK_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ); |
|
|
|
uint64_t tmp = _mm512_reduce_add_epi64(aggregator); |
|
|
|
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, CHUNK_SIZE_B / TC_AGGRJ); |
|
|
|
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr, CHUNK_SIZE_B / TC_AGGRJ); |
|
|
|
} |
|
|
|
|
|
|
|
aggregation::happly(DATA_DST_ + (tid * GROUP_COUNT + gid), aggregator); |
|
|
|