Browse Source

modify benchmarking code to measure time spent loading vectors too

master
Constantin Fürst 11 months ago
parent
commit
122eab35b7
  1. 163
      qdp_project/src/Benchmark.cpp
  2. 13
      qdp_project/src/Configuration.hpp
  3. 18
      qdp_project/src/utils/BenchmarkHelpers.cpp
  4. 309
      qdp_project/src/utils/aggregation.h
  5. 162
      qdp_project/src/utils/filter.h

163
qdp_project/src/Benchmark.cpp

@ -17,12 +17,11 @@
#include "Configuration.hpp"
#include "BenchmarkHelpers.cpp"
using filter = Filter<uint64_t, LT, load_mode::Stream, false>;
using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>;
using filter = FilterLT<uint64_t, load_mode::Stream>;
using aggregation = AggregationSUM<uint64_t, load_mode::Stream>;
dsacache::Cache CACHE_;
std::array<std::atomic<int32_t>, GROUP_COUNT> PREFETCHED_CHUNKS_;
std::vector<std::barrier<NopStruct>*> BARRIERS_;
std::shared_future<void> LAUNCH_;
@ -46,8 +45,6 @@ void caching(size_t gid, size_t tid) {
constexpr size_t SUBCHUNK_COUNT = SUBCHUNK_THREAD_RATIO > 0 ? SUBCHUNK_THREAD_RATIO : 1;
constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / SUBCHUNK_COUNT;
constexpr size_t SUBCHUNK_SIZE_ELEMENTS = CHUNK_SIZE_ELEMENTS / SUBCHUNK_COUNT;
constexpr size_t LAST_SUBCHUNK_SIZE_B = SUBCHUNK_SIZE_B + (CHUNK_SIZE_B % SUBCHUNK_COUNT);
// TODO: last thread (whether simulated or not) in last group must use last subchunk size for its last subchunk in the last run as size
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
@ -55,42 +52,53 @@ void caching(size_t gid, size_t tid) {
for (size_t j = 0; j < SUBCHUNK_COUNT; j++) {
uint64_t* sub_chunk_ptr = &chunk_ptr[j * SUBCHUNK_SIZE_ELEMENTS];
CACHE_.Access(reinterpret_cast<uint8_t*>(sub_chunk_ptr), SUBCHUNK_SIZE_B);
}
}
PREFETCHED_CHUNKS_[gid]++;
PREFETCHED_CHUNKS_[gid].notify_one();
constexpr size_t LAST_CHUNK_SIZE_B = SUBCHUNK_SIZE_B + (CHUNK_SIZE_B % SUBCHUNK_COUNT);
if constexpr (LAST_CHUNK_SIZE_B > 0) {
if (gid == GROUP_COUNT - 1 && tid == TC_SCANB - 1) {
const size_t chunk_index = get_chunk_index(gid, RUN_COUNT + 1);
uint64_t* chunk_ptr = get_chunk<TC_SCANB>(DATA_B_, chunk_index, tid);
CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), LAST_CHUNK_SIZE_B);
}
}
}
else if constexpr (CACHE_OVERCHUNKING) {
constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_AGGRJ * GROUP_COUNT));
// TODO: last thread (whether simulated or not) in last group must use last chunk size for its last run as size
for (size_t tid_virt = tid; tid_virt < TC_AGGRJ; tid_virt += VIRT_TID_INCREMENT) {
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid_virt);
uint64_t *chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid_virt);
CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), CHUNK_SIZE_B);
CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), CHUNK_SIZE_B);
}
}
PREFETCHED_CHUNKS_[gid]++;
PREFETCHED_CHUNKS_[gid].notify_one();
constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_AGGRJ * GROUP_COUNT));
if constexpr (LAST_CHUNK_SIZE_B > 0) {
if (gid == GROUP_COUNT - 1 && tid == TC_SCANB - 1) {
const size_t chunk_index = get_chunk_index(gid, RUN_COUNT + 1);
uint64_t *chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid);
CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), LAST_CHUNK_SIZE_B);
}
}
}
else {
constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % ((TC_SCANB > 0 ? TC_SCANB : 1) * GROUP_COUNT));
// TODO: last thread in last group must use last chunk size for its last run as size
for (size_t i = 0; i < RUN_COUNT; i++) {
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_SCANB>(DATA_B_, chunk_index, tid);
CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), CHUNK_SIZE_B);
}
PREFETCHED_CHUNKS_[gid]++;
PREFETCHED_CHUNKS_[gid].notify_one();
constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % ((TC_SCANB > 0 ? TC_SCANB : 1) * GROUP_COUNT));
if constexpr (LAST_CHUNK_SIZE_B > 0) {
if (gid == GROUP_COUNT - 1 && tid == TC_SCANB - 1) {
const size_t chunk_index = get_chunk_index(gid, RUN_COUNT + 1);
uint64_t* chunk_ptr = get_chunk<TC_SCANB>(DATA_B_, chunk_index, tid);
CACHE_.Access(reinterpret_cast<uint8_t*>(chunk_ptr), LAST_CHUNK_SIZE_B);
}
}
}
}
@ -114,42 +122,85 @@ void scan_b(size_t gid, size_t tid) {
void scan_a(size_t gid, size_t tid) {
constexpr size_t LAST_CHUNK_SIZE_B = CHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_SCANA * GROUP_COUNT));
// TODO: last thread in last group must use last chunk size for its last run as size
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].clear();
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].resize(1);
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT);
INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX].clear();
INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX].resize(RUN_COUNT);
LAUNCH_.wait();
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
for (size_t i = 0; i < RUN_COUNT; i++) {
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_SCANA>(DATA_A_, chunk_index, tid);
uint16_t* mask_ptr = get_mask<TC_SCANA>(MASK_A_, chunk_index, tid);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA);
const auto internal_timing = filter::apply_same<CMP_A, CHUNK_SIZE_B / TC_SCANA>(mask_ptr, chunk_ptr);
INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX][i] = internal_timing;
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
BARRIERS_[gid]->arrive_and_wait();
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now();
}
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now();
if constexpr (LAST_CHUNK_SIZE_B > 0) {
if (gid == GROUP_COUNT - 1 && tid == TC_SCANB - 1) {
const size_t chunk_index = get_chunk_index(gid, RUN_COUNT + 1);
uint64_t* chunk_ptr = get_chunk<TC_SCANA>(DATA_A_, chunk_index, tid);
uint16_t* mask_ptr = get_mask<TC_SCANA>(MASK_A_, chunk_index, tid);
filter::apply_same<CMP_A, LAST_CHUNK_SIZE_B>(mask_ptr, chunk_ptr);
}
}
BARRIERS_[gid]->arrive_and_drop();
}
template <size_t size>
uint64_t AggrFn(uint64_t* chunk_ptr, uint16_t* mask_ptr_a, const uint32_t tid, const uint32_t gid, __m512i& aggregator) {
std::unique_ptr<dsacache::CacheData> data;
uint64_t* data_ptr;
if constexpr (PERFORM_CACHING) {
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), size, dsacache::FLAG_ACCESS_WEAK);
data->WaitOnCompletion();
data_ptr = reinterpret_cast<uint64_t*>(data->GetDataLocation());
if (data_ptr == nullptr) {
data_ptr = chunk_ptr;
}
else if (data_ptr == chunk_ptr) {
// prevent counting weak-accesses
}
else {
CACHE_HITS_[UniqueIndex(gid,tid)]++;
}
}
else {
data_ptr = chunk_ptr;
}
uint64_t tmp = _mm512_reduce_add_epi64(aggregator);
return aggregation::apply_masked<size>(aggregator, data_ptr, mask_ptr_a);
}
void aggr_j(size_t gid, size_t tid) {
constexpr size_t SUBCHUNK_SIZE_B = CHUNK_SIZE_B / TC_AGGRJ;
constexpr size_t LAST_CHUNK_SIZE_B = SUBCHUNK_SIZE_B + (CHUNK_SIZE_B % (TC_AGGRJ * GROUP_COUNT));
// TODO: last thread in last group must use last chunk size for its last run as size
CACHE_HITS_[UniqueIndex(gid,tid)] = 0;
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].clear();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT);
INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX].clear();
INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX].resize(RUN_COUNT);
__m512i aggregator = aggregation::OP::zero();
__m512i aggregator = aggregation::zero();
LAUNCH_.wait();
@ -158,45 +209,25 @@ void aggr_j(size_t gid, size_t tid) {
BARRIERS_[gid]->arrive_and_wait();
while (true) {
const int32_t old = PREFETCHED_CHUNKS_[gid].fetch_sub(1);
if (old > 0) break;
PREFETCHED_CHUNKS_[gid]++;
PREFETCHED_CHUNKS_[gid].wait(0);
}
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
const size_t chunk_index = get_chunk_index(gid, i);
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid);
uint16_t* mask_ptr_a = get_mask<TC_AGGRJ>(MASK_A_, chunk_index, tid);
std::unique_ptr<dsacache::CacheData> data;
uint64_t* data_ptr;
const auto internal_timing = AggrFn<SUBCHUNK_SIZE_B>(chunk_ptr, mask_ptr_a, tid, gid, aggregator);
INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX][i] = internal_timing;
if constexpr (PERFORM_CACHING) {
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), SUBCHUNK_SIZE_B, dsacache::FLAG_ACCESS_WEAK);
data->WaitOnCompletion();
data_ptr = reinterpret_cast<uint64_t*>(data->GetDataLocation());
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_END] = std::chrono::steady_clock::now();
}
if (data_ptr == nullptr) {
data_ptr = chunk_ptr;
}
else if (data_ptr == chunk_ptr) {
// prevent counting weak-accesses
}
else {
CACHE_HITS_[UniqueIndex(gid,tid)]++;
}
if constexpr (LAST_CHUNK_SIZE_B > 0) {
if (gid == GROUP_COUNT - 1 && tid == TC_AGGRJ - 1) {
const size_t chunk_index = get_chunk_index(gid, RUN_COUNT + 1);
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid);
uint16_t* mask_ptr_a = get_mask<TC_AGGRJ>(MASK_A_, chunk_index, tid);
AggrFn<SUBCHUNK_SIZE_B>(chunk_ptr, mask_ptr_a, tid, gid, aggregator);
}
else {
data_ptr = chunk_ptr;
}
uint64_t tmp = _mm512_reduce_add_epi64(aggregator);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B);
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_END] = std::chrono::steady_clock::now();
}
BARRIERS_[gid]->arrive_and_drop();
@ -219,7 +250,7 @@ int main() {
const std::string ofname = "results/qdp-xeonmax-" + std::string(MODE_STRING) + "-tca" + std::to_string(TC_SCANA) + "-tcb" + std::to_string(TC_SCANB) + "-tcj" + std::to_string(TC_AGGRJ) + "-tmul" + std::to_string(GROUP_COUNT) + "-wl" + std::to_string(WL_SIZE_B) + "-cs" + std::to_string(CHUNK_SIZE_B) + ".csv";
std::ofstream fout(ofname);
fout << "run;rt-ns;rt-s;result[0];scana-run;scana-wait;scanb-run;scanb-wait;aggrj-run;aggrj-wait;cache-hr;" << std::endl;
fout << "run;rt-ns;rt-s;result[0];scana-run;scana-wait;scana-load;scanb-run;scanb-wait;aggrj-run;aggrj-wait;aggrj-load;cache-hr;" << std::endl;
DATA_A_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, MEM_NODE_A);
DATA_B_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, MEM_NODE_B);
@ -274,7 +305,7 @@ int main() {
for(std::thread& t : agg_pool) { t.join(); }
uint64_t result_actual = 0;
Aggregation<uint64_t, Sum, load_mode::Aligned>::apply(&result_actual, DATA_DST_, sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT);
aggregation::apply<sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT>(&result_actual, DATA_DST_);
const auto time_end = std::chrono::steady_clock::now();
@ -283,18 +314,18 @@ int main() {
std::cout << "Result Expected: " << result_expected << ", Result Actual: " << result_actual << std::endl;
if (i >= WARMUP_ITERATION_COUNT) {
uint64_t scana_run = 0, scana_wait = 0, scanb_run = 0, scanb_wait = 0, aggrj_run = 0, aggrj_wait = 0;
process_timings(&scana_run, &scana_wait, &scanb_run, &scanb_wait, &aggrj_run, &aggrj_wait);
uint64_t scana_run = 0, scana_wait = 0, scanb_run = 0, scanb_wait = 0, aggrj_run = 0, aggrj_wait = 0, scana_load = 0, aggrj_load = 0;
process_timings(&scana_run, &scana_wait, &scanb_run, &scanb_wait, &aggrj_run, &aggrj_wait, &scana_load, &aggrj_load);
constexpr double nanos_per_second = ((double)1000) * 1000 * 1000;
const uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_start).count();
const double seconds = (double)(nanos) / nanos_per_second;
const uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_start).count();
const double seconds = (double)(nanos) / nanos_per_second;
fout
<< i - WARMUP_ITERATION_COUNT << ";"
<< nanos << ";" << seconds << ";"
<< result_actual << ";"
<< scana_run << ";" << scana_wait << ";" << scanb_run << ";" << scanb_wait << ";" << aggrj_run << ";" << aggrj_wait << ";"
<< scana_run << ";" << scana_wait << ";" << scana_load << ";" << scanb_run << ";" << scanb_wait << ";" << aggrj_run << ";" << aggrj_wait << ";" << aggrj_load << ";"
<< process_cache_hitrate() << ";"
<< std::endl;
}

13
qdp_project/src/Configuration.hpp

@ -3,7 +3,7 @@
#include "utils/memory_literals.h"
#ifndef MODE_SET_BY_CMAKE
#define MODE_SIMPLE_PREFETCH
#define MODE_PREFETCH
#endif
constexpr size_t WL_SIZE_B = 4_GiB;
@ -12,11 +12,11 @@ constexpr uint32_t ITERATION_COUNT = 5;
constexpr int MEM_NODE_HBM = 8;
constexpr int MEM_NODE_DRAM = 0;
#ifdef MODE_SIMPLE_PREFETCH
constexpr uint32_t GROUP_COUNT = 24;
#ifdef MODE_PREFETCH
constexpr uint32_t GROUP_COUNT = 12;
constexpr size_t CHUNK_SIZE_B = 16_MiB;
constexpr uint32_t TC_SCANA = 1;
constexpr uint32_t TC_SCANB = 0;
constexpr uint32_t TC_SCANB = 1;
constexpr uint32_t TC_AGGRJ = 1;
constexpr bool PERFORM_CACHING = true;
constexpr bool PERFORM_CACHING_IN_AGGREGATION = true;
@ -26,7 +26,7 @@ constexpr char MODE_STRING[] = "prefetch";
#endif
#ifdef MODE_DRAM
constexpr size_t CHUNK_SIZE_B = 2_MiB;
constexpr uint32_t GROUP_COUNT = 16;
constexpr uint32_t GROUP_COUNT = 12;
constexpr uint32_t TC_SCANA = 2;
constexpr uint32_t TC_SCANB = 0;
constexpr uint32_t TC_AGGRJ = 1;
@ -38,7 +38,7 @@ constexpr char MODE_STRING[] = "dram";
#endif
#ifdef MODE_HBM
constexpr size_t CHUNK_SIZE_B = 2_MiB;
constexpr uint32_t GROUP_COUNT = 16;
constexpr uint32_t GROUP_COUNT = 12;
constexpr uint32_t TC_SCANA = 2;
constexpr uint32_t TC_SCANB = 0;
constexpr uint32_t TC_AGGRJ = 1;
@ -50,7 +50,6 @@ constexpr char MODE_STRING[] = "hbm";
#endif
constexpr uint64_t CMP_A = 50;
constexpr uint32_t TC_COMBINED = TC_SCANA + TC_SCANB + TC_AGGRJ;
constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t);
constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B;
constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t);

18
qdp_project/src/utils/BenchmarkHelpers.cpp

@ -11,6 +11,7 @@ constexpr size_t TIME_STAMP_BEGIN = 0;
constexpr size_t TIME_STAMP_WAIT = 1;
constexpr size_t TIME_STAMP_END = 2;
std::array<std::vector<uint64_t>, 3> INTERNAL_TIMING_VECTOR_LOAD_;
std::array<std::vector<std::vector<std::array<std::chrono::steady_clock::time_point, 3>>>, 3> THREAD_TIMING_;
std::array<uint32_t, GROUP_COUNT * TC_AGGRJ> CACHE_HITS_;
@ -27,14 +28,6 @@ uint64_t sum_check(uint64_t compare_value, uint64_t* row_A, uint64_t* row_B, siz
return sum;
}
uint64_t sum_check_complex(uint64_t compare_value_a, uint64_t compare_value_b, uint64_t* row_A, uint64_t* row_B, size_t row_size) {
uint64_t sum = 0;
for(int i = 0; i < row_size / sizeof(uint64_t); ++i) {
sum += (row_A[i] < compare_value_a && row_B[i] < compare_value_b) * row_B[i];
}
return sum;
}
int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node;
}
@ -102,7 +95,8 @@ double process_cache_hitrate() {
void process_timings(
uint64_t* scana_run, uint64_t* scana_wait,
uint64_t* scanb_run, uint64_t* scanb_wait,
uint64_t* aggrj_run, uint64_t* aggrj_wait
uint64_t* aggrj_run, uint64_t* aggrj_wait,
uint64_t* scana_load, uint64_t* aggrj_load
) {
{
uint64_t scana_rc = 0;
@ -152,4 +146,10 @@ void process_timings(
*aggrj_wait /= aggrj_rc;
}
}
{
for (const auto e : INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX]) *scana_load += e;
for (const auto e : INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX]) *aggrj_load += e;
*scana_load /= INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX].size();
*aggrj_load /= INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX].size();
}
}

309
qdp_project/src/utils/aggregation.h

@ -8,309 +8,102 @@
#include "vector_loader.h"
#include "const.h"
/**
* @brief Super Class for all Aggregation functions. Guards Sub Classes from having an non integral base type.
*
* @tparam T
*/
template <typename T>
class AggFunction {
static_assert(std::is_integral<T>::value, "The base type of an AggFunction must be an integral");
};
/**
* @brief Template class that implements methods used for Summation. It wraps the corresponding vector intrinsics
*
* @tparam T base datatype for the implemented methods
*/
template<typename T>
class Sum : public AggFunction<T> {
template<typename base_t, load_mode load_mode>
class AggregationSUM {
public:
static inline __m512i simd_agg(__m512i aggregator, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_add_epi32(aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_add_epi64(aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers");
if constexpr (sizeof(base_t) == 4) return _mm512_add_epi32(aggregator, vector);
else if constexpr (sizeof(base_t) == 8) return _mm512_add_epi64(aggregator, vector);
static_assert(sizeof(base_t) == 4 || sizeof(base_t) == 8, "Sum is only implemented for 32 and 64 wide integers");
};
static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_mask_add_epi32(aggregator, mask, aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_mask_add_epi64(aggregator, mask, aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers");
if constexpr (sizeof(base_t) == 4) return _mm512_mask_add_epi32(aggregator, mask, aggregator, vector);
else if constexpr (sizeof(base_t) == 8) return _mm512_mask_add_epi64(aggregator, mask, aggregator, vector);
static_assert(sizeof(base_t) == 4 || sizeof(base_t) == 8, "Sum is only implemented for 32 and 64 wide integers");
};
static inline T simd_reduce(__m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_reduce_add_epi32(vector);
else if constexpr (sizeof(T) == 8) return _mm512_reduce_add_epi64(vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers");
static inline base_t simd_reduce(__m512i vector) {
if constexpr (sizeof(base_t) == 4) return _mm512_reduce_add_epi32(vector);
else if constexpr (sizeof(base_t) == 8) return _mm512_reduce_add_epi64(vector);
static_assert(sizeof(base_t) == 4 || sizeof(base_t) == 8, "Sum is only implemented for 32 and 64 wide integers");
};
static inline T scalar_agg(T aggregator, T scalar) { return aggregator + scalar; };
static inline base_t scalar_agg(base_t aggregator, base_t scalar) { return aggregator + scalar; };
static inline __m512i zero() { return _mm512_set1_epi32(0); };
};
/**
* @brief Template class that implements methods used for Maximum determination. It wraps the corresponding vector intrinsics
*
* @tparam T base datatype for the implemented methods
*
*/
template<typename T>
class Max : public AggFunction<T> {
public:
static inline __m512i simd_agg(__m512i aggregator, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_max_epi32(aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_max_epi64(aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers");
}
static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_mask_max_epi32(aggregator, mask, aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_mask_max_epi64(aggregator, mask, aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers");
}
static inline T simd_reduce(__m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_reduce_max_epi32(vector);
else if constexpr (sizeof(T) == 8) return _mm512_reduce_max_epi64(vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers");
}
static inline T scalar_agg(T aggregator, T scalar) { return std::max(aggregator, scalar); }
static inline __m512i zero() {
if constexpr (sizeof(T) == 4) {
if constexpr (std::is_signed<T>::value) return _mm512_set1_epi32(0xFFFFFFFF);
else return _mm512_set1_epi32(0x0);
}
else if constexpr (sizeof(T) == 8) {
if constexpr (std::is_signed<T>::value) return _mm512_set1_epi32(0xFFFFFFFFFFFFFFFF);
else return _mm512_set1_epi32(0x0);
}
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers");
}
};
static_assert(std::is_same_v<base_t, uint64_t>, "Enforce unsigned 64 bit ints.");
/**
* @brief Template class that implements methods used for Minimum determination. It wraps the corresponding vector intrinsics
*
* @tparam T base datatype for the implemented methods
*
*/
template<typename T>
class Min : public AggFunction<T> {
public:
static inline __m512i simd_agg(__m512i aggregator, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_min_epi32(aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_min_epi64(aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers");
}
/*
* returns time in ns spent loading vector
*/
template<size_t CHUNK_SIZE_B>
static bool apply(base_t *dest, base_t *src) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
constexpr size_t value_count = CHUNK_SIZE_B / sizeof(base_t);
constexpr size_t iterations = value_count - lanes + 1;
static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_mask_min_epi32(aggregator, mask, aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_mask_min_epi64(aggregator, mask, aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers");
}
static_assert(value_count >= lanes);
static inline T simd_reduce(__m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_reduce_min_epi32(vector);
else if constexpr (sizeof(T) == 8) return _mm512_reduce_min_epi64(vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers");
}
__m512i agg_vec = zero();
size_t i = 0;
static inline T scalar_agg(T aggregator, T scalar) { return std::min(aggregator, scalar); }
for(size_t i = 0; i < iterations; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
static inline __m512i zero() {
if constexpr (sizeof(T) == 4) {
if constexpr (std::is_signed<T>::value) return _mm512_set1_epi32(0xEFFFFFFF);
else return _mm512_set1_epi32(0xFFFFFFFF);
agg_vec = simd_agg(agg_vec, vec);
}
else if constexpr (sizeof(T) == 8) {
if constexpr (std::is_signed<T>::value) return _mm512_set1_epi32(0xEFFFFFFFFFFFFFFF);
else return _mm512_set1_epi32(0xFFFFFFFFFFFFFFFF);
}
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers");
}
};
/**
* @brief Template Class that implements an aggregation operation.
*
* @tparam base_t Base type of the values for aggregation
* @tparam func
* @tparam load_mode
*/
template<typename base_t, template<typename _base_t> class func, load_mode load_mode>
class Aggregation{
public:
static_assert(std::is_same_v<base_t, uint64_t>, "Enforce unsigned 64 bit ints.");
using OP = func<base_t>;
/**
* @brief Calculates the memory maximal needed to store a chunk's processing result.
*
* @param chunk_size_b Size of the chunk in byte
* @return size_t Size of the chunk's processing result in byte
*/
static size_t result_bytes_per_chunk(size_t chunk_size_b) {
// aggregation returns a single value of type base_t
return sizeof(base_t);
}
/**
* @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes.
* The result is written to main memory.
*
* @param dest Pointer to the start of the result chunk
* @param src Pointer to the start of the source chunk
* @param chunk_size_b Size of the source chunk in bytes
* @return true When the aggregation is done
* @return false Never
*/
static bool apply (base_t *dest, base_t *src, size_t chunk_size_b) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
size_t value_count = chunk_size_b / sizeof(base_t);
__m512i agg_vec = func<base_t>::zero();
size_t i = 0;
base_t result = 0;
// stop before! running out of space
if(value_count >= lanes) {// keep in mind value_count is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
agg_vec = func<base_t>::simd_agg(agg_vec, vec);
}
result = func<base_t>::simd_reduce(agg_vec);
}
base_t result = simd_reduce(agg_vec);
for(; i < value_count; ++i) {
result = func<base_t>::scalar_agg(result, src[i]);
result = scalar_agg(result, src[i]);
}
*dest = result;
return true;
}
/**
* @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes,
* while applying the bit string stored in *masks*. The result is written to main memory.
*
* @param dest Pointer to the start of the result chunk
* @param src Pointer to the start of the source chunk
* @param masks Pointer the bitstring that marks the values that should be aggregated
* @param chunk_size_b Size of the source chunk in bytes
* @return true When the aggregation is done
* @return false Never
*/
static bool apply_masked (base_t *dest, base_t *src, uint16_t* msks, size_t chunk_size_b) {
/*
* returns time in ns spent loading vector
*/
template<size_t CHUNK_SIZE_B>
static uint64_t apply_masked(__m512i& dest, base_t *src, uint16_t* msks) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
uint8_t* masks = (uint8_t *)msks;
size_t value_count = chunk_size_b / sizeof(base_t);
__m512i agg_vec = func<base_t>::zero();
size_t i = 0;
// stop before! running out of space
if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
__mmask8 mask = _mm512_int2mask(masks[i / lanes]);
constexpr size_t value_count = CHUNK_SIZE_B / sizeof(base_t);
constexpr size_t iterations = value_count - lanes + 1;
agg_vec = func<base_t>::simd_mask_agg(agg_vec, mask, vec);
}
*dest = func<base_t>::simd_reduce(agg_vec);
static_assert(value_count >= lanes);
for(; i < value_count; ++i) {
uint8_t mask = masks[i / lanes];
if(mask & (0b1 << (i % lanes))){
*dest = func<base_t>::scalar_agg(*dest, src[i]);
}
}
uint64_t load_time = 0;
return true;
}
auto* masks = reinterpret_cast<uint8_t*>(msks);
for(size_t i = 0; i < iterations; i += lanes) {
auto ts_load = std::chrono::steady_clock::now();
/**
* @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes,
* while applying the bit string stored in *masks*. The values are agggegated in the register *dest* without
* clearing beforehand.
*
* NOTE! This function only works correctly if the the chunk_size_b is a multiple of 64 byte
*
* @param dest Vector register used for storing and passing the result around
* @param src Pointer to the start of the source chunk
* @param masks Pointer the bitstring that marks the values that should be aggregated
* @param chunk_size_b Size of the source chunk in bytes
* @return __m512i Vector register holding the aggregation result
*/
static __m512i apply_masked (__m512i dest, base_t *src, uint16_t* msks, size_t chunk_size_b) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
uint8_t* masks = (uint8_t*) msks;
//TODO this function does not work if value_count % lanes != 0
size_t value_count = chunk_size_b / sizeof(base_t);
size_t i = 0;
// stop before! running out of space
if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
__mmask8 mask = _mm512_int2mask(masks[i / lanes]);
dest = func<base_t>::simd_agg(dest, mask, vec);
}
return dest;
}
auto te_load = std::chrono::steady_clock::now();
load_time += std::chrono::duration_cast<std::chrono::nanoseconds>(te_load - ts_load).count();
/**
* @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes,
* while applying two bit strings stored in *masks_0* and *masks_1*. The values are aggregated in the register
* *dest* without clearing beforehand.
*
* NOTE! This function only works correctly if the the chunk_size_b is a multiple of 64 byte
*
* @param dest Vector register used for storing and passing the result around
* @param src Pointer to the start of the source chunk
* @param masks_0 Pointer the bitstring that marks the values that should be aggregated
* @param masks_1 Pointer the bitstring that marks the values that should be aggregated
* @param chunk_size_b Size of the source chunk in bytes
* @return __m512i Vector register holding the aggregation result
*/
static __m512i apply_masked (__m512i dest, base_t *src, uint16_t* msks0, uint16_t* msks1, size_t chunk_size_b) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
uint8_t* masks0 = (uint8_t*) msks0;
uint8_t* masks1 = (uint8_t*) msks1;
//TODO this function does not work if value_count % lanes != 0
size_t value_count = chunk_size_b / sizeof(base_t);
size_t i = 0;
// stop before! running out of space
if(value_count >= lanes) // keep in mind value_count is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
__mmask8 mask0 = _mm512_int2mask(masks0[i / lanes]);
__mmask8 mask1 = _mm512_int2mask(masks1[i / lanes]);
__mmask8 mask = _mm512_int2mask(masks[i / lanes]);
mask0 = _kand_mask8(mask0, mask1);
dest = func<base_t>::simd_agg(dest, mask0, vec);
dest = simd_agg(dest, mask, vec);
}
return dest;
return load_time;
}
/**
* @brief Reduces a vector by applying the aggregation function horizontally.
*
* @param dest Result of the horizontal aggregation
* @param src Vector as source for the horizontal aggregation
* @return true When the operation is done
* @return false Never
*/
static bool happly (base_t *dest, __m512i src) {
*dest = func<base_t>::simd_reduce(src);
*dest = simd_reduce(src);
return true;
}
static __m512i get_zero() {
return func<base_t>::zero();
return zero();
}
};

162
qdp_project/src/utils/filter.h

@ -7,164 +7,64 @@
#include "vector_loader.h"
/**
* @brief Super Class for all Aggregation functions. Guards Sub Classes from having an non integral base type.
*
* @tparam T An integral datatype
*/
template<typename T>
class FilterFunction {
static_assert(std::is_integral<T>::value, "The base type of a FilterFunction must be an integeral.");
};
/**
* @brief Template class that implements methods used for finding values that are not equal to the compare value.
* It wraps the corresponding vector intrinsics.
*
* @tparam T base datatype for the implemented methods
*/
template<typename T>
class NEQ : public FilterFunction<T> {
template<typename base_t, load_mode load_mode>
class FilterLT {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmpneq_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmpneq_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "NEQ is only implemented for 32 and 64 wide integers");
if constexpr (sizeof(base_t) == 4) return _mm512_cmplt_epi32_mask(vector, comp);
else if constexpr (sizeof(base_t) == 8) return _mm512_cmplt_epi64_mask(vector, comp);
static_assert(sizeof(base_t) == 4 || sizeof(base_t) == 8, "LT is only implemented for 32 and 64 wide integers");
}
static inline bool scalar_filter(T scalar, T comp) { return scalar != comp; }
};
static inline bool scalar_filter(base_t scalar, base_t comp) { return scalar < comp; }
template<typename T>
class EQ : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmpeq_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmpeq_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "EQ is only implemented for 32 and 64 wide integers");
}
static inline bool scalar_filter(T scalar, T comp) { return scalar == comp; }
};
/*
* returns time in ns spent loading vector
*/
template<base_t CMP_VALUE, size_t CHUNK_SIZE_B>
static uint64_t apply_same(uint16_t *dst, base_t *src) {
constexpr uint32_t lanes = VECTOR_SIZE<base_t>();
constexpr size_t value_count = CHUNK_SIZE_B / sizeof(base_t);
constexpr size_t iterations = value_count - lanes;
template<typename T>
class LT : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmplt_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmplt_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "LT is only implemented for 32 and 64 wide integers");
}
static_assert(value_count > lanes);
static inline bool scalar_filter(T scalar, T comp) { return scalar < comp; }
};
uint64_t load_time = 0;
template<typename T>
class LEQ : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmple_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmple_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "LEQ is only implemented for 32 and 64 wide integers");
}
uint8_t* dest = (uint8_t*) dst;
static inline bool scalar_filter(T scalar, T comp) { return scalar <= comp; }
};
__m512i cmp_vec = _mm512_set1_epi64(CMP_VALUE);
template<typename T>
class GT : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmpgt_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmpgt_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "GT is only implemented for 32 and 64 wide integers");
}
size_t i = 0;
static inline bool scalar_filter(T scalar, T comp) { return scalar > comp; }
};
for(; i < iterations; i += lanes) {
auto ts_load = std::chrono::steady_clock::now();
template<typename T>
class GEQ : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmpge_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmpge_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "GEQ is only implemented for 32 and 64 wide integers");
}
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
static inline bool scalar_filter(T scalar, T comp) { return scalar >= comp; }
};
auto te_load = std::chrono::steady_clock::now();
load_time += std::chrono::duration_cast<std::chrono::nanoseconds>(te_load - ts_load).count();
__mmask8 bitmask = simd_filter(vec, cmp_vec);
template<typename base_t, template<typename _base_t> class func, load_mode load_mode, bool copy>
class Filter {
public:
uint8_t int_mask = (uint8_t) _mm512_mask2int(bitmask);
static_assert(std::is_same_v<base_t, uint64_t>, "We enforce 64 bit integer");
/**
* @brief Calculates the memory maximal needed to store a chunk's processing result.
*
* @param chunk_size_b Size of the chunk in byte
* @return size_t Size of the chunk's processing result in byte
*/
static size_t result_bytes_per_chunk(size_t chunk_size_b) {
// + 7 to enshure that we have enougth bytes -> / 8 -> rounds down
// if we had 17 / 8 = 2 but (17 + 7) / 8 = 3
// if we hat 16 / 8 = 2 is right, as well as, 16 + 7 / 8 = 2
return (chunk_size_b / sizeof(base_t) + 7) / 8;
}
/**
* @brief Applies the filter function on the chunk starting at *src* and spanning *chunk_size_b* bytes, while comparing with he same value every time.
* The resulting bit string is written to main memory.
*
* @param dest Pointer to the start of the result chunk
* @param src Pointer to the start of the source chunk
* @param cmp_value Comparision value to compare the values from source to
* @param chunk_size_b Size of the source chunk in bytes
* @return true When the filter operation is done
* @return false Never
*/
// we only need this impl. yet, as all filter are at the end of a pipeline
static bool apply_same (uint16_t *dst, base_t *buffer, base_t *src, base_t cmp_value, size_t chunk_size_b) {
constexpr uint32_t lanes = VECTOR_SIZE<base_t>();
uint8_t* dest = (uint8_t*) dst;
size_t value_count = chunk_size_b / sizeof(base_t);
__m512i cmp_vec = _mm512_set1_epi64(cmp_value);
size_t i = 0;
// this weird implementetion is neccessary, see analogous impl in aggregation for explaination
if(value_count > lanes) {
for(; (i < value_count - lanes); i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
__mmask8 bitmask = func<base_t>::simd_filter(vec, cmp_vec);
uint8_t int_mask = (uint8_t) _mm512_mask2int(bitmask);
dest[i / lanes] = int_mask;
if constexpr(copy){
Vector_Loader<base_t, load_mode>::store(buffer + i, vec);
}
}
dest[i / lanes] = int_mask;
}
auto dest_pos = i / lanes;
uint8_t int_mask = 0;
for(; i < value_count; ++i) {
base_t val = src[i];
uint8_t result = func<base_t>::scalar_filter(val, cmp_value);
uint8_t result = scalar_filter(val, CMP_VALUE);
int_mask |= (result << (i % lanes));
if constexpr(copy){
buffer[i] = val;
}
}
dest[dest_pos] = int_mask;
return true;
return load_time;
}
};
Loading…
Cancel
Save