Browse Source

reset some changes to the aggregation and filter functions not quite needed

master
Constantin Fürst 11 months ago
parent
commit
a3a8dff1aa
  1. 25
      qdp_project/src/Benchmark.cpp
  2. 314
      qdp_project/src/utils/aggregation.h
  3. 169
      qdp_project/src/utils/filter.h

25
qdp_project/src/Benchmark.cpp

@ -17,8 +17,8 @@
#include "Configuration.hpp"
#include "BenchmarkHelpers.cpp"
using filter = FilterLT<uint64_t, load_mode::Stream>;
using aggregation = AggregationSUM<uint64_t, load_mode::Stream>;
using filter = Filter<uint64_t, LT, load_mode::Stream, false>;
using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>;
dsacache::Cache CACHE_;
@ -137,8 +137,7 @@ void scan_a(size_t gid, size_t tid) {
uint64_t* chunk_ptr = get_chunk<TC_SCANA>(DATA_A_, chunk_index, tid);
uint16_t* mask_ptr = get_mask<TC_SCANA>(MASK_A_, chunk_index, tid);
const auto internal_timing = filter::apply_same<CMP_A, CHUNK_SIZE_B / TC_SCANA>(mask_ptr, chunk_ptr);
INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX][i] = internal_timing;
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA, &INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX][i]);
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
@ -152,7 +151,8 @@ void scan_a(size_t gid, size_t tid) {
const size_t chunk_index = get_chunk_index(gid, RUN_COUNT + 1);
uint64_t* chunk_ptr = get_chunk<TC_SCANA>(DATA_A_, chunk_index, tid);
uint16_t* mask_ptr = get_mask<TC_SCANA>(MASK_A_, chunk_index, tid);
filter::apply_same<CMP_A, LAST_CHUNK_SIZE_B>(mask_ptr, chunk_ptr);
uint64_t t_unused;
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, LAST_CHUNK_SIZE_B, &t_unused);
}
}
@ -160,8 +160,7 @@ void scan_a(size_t gid, size_t tid) {
}
template <size_t size>
uint64_t AggrFn(uint64_t* chunk_ptr, uint16_t* mask_ptr_a, const uint32_t tid, const uint32_t gid, __m512i& aggregator) {
__m512i AggrFn(uint64_t* chunk_ptr, uint16_t* mask_ptr_a, const uint32_t tid, const uint32_t gid, __m512i aggregator, uint64_t* load_time) {
std::unique_ptr<dsacache::CacheData> data;
uint64_t* data_ptr;
@ -186,7 +185,7 @@ uint64_t AggrFn(uint64_t* chunk_ptr, uint16_t* mask_ptr_a, const uint32_t tid, c
}
uint64_t tmp = _mm512_reduce_add_epi64(aggregator);
return aggregation::apply_masked<size>(aggregator, data_ptr, mask_ptr_a);
return aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, size, load_time);
}
void aggr_j(size_t gid, size_t tid) {
@ -200,7 +199,7 @@ void aggr_j(size_t gid, size_t tid) {
INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX].clear();
INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX].resize(RUN_COUNT);
__m512i aggregator = aggregation::zero();
__m512i aggregator = aggregation::OP::zero();
LAUNCH_.wait();
@ -215,8 +214,7 @@ void aggr_j(size_t gid, size_t tid) {
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid);
uint16_t* mask_ptr_a = get_mask<TC_AGGRJ>(MASK_A_, chunk_index, tid);
const auto internal_timing = AggrFn<SUBCHUNK_SIZE_B>(chunk_ptr, mask_ptr_a, tid, gid, aggregator);
INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX][i] = internal_timing;
aggregator = AggrFn<SUBCHUNK_SIZE_B>(chunk_ptr, mask_ptr_a, tid, gid, aggregator, &INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX][i]);
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_END] = std::chrono::steady_clock::now();
}
@ -226,7 +224,8 @@ void aggr_j(size_t gid, size_t tid) {
const size_t chunk_index = get_chunk_index(gid, RUN_COUNT + 1);
uint64_t* chunk_ptr = get_chunk<TC_AGGRJ>(DATA_B_, chunk_index, tid);
uint16_t* mask_ptr_a = get_mask<TC_AGGRJ>(MASK_A_, chunk_index, tid);
AggrFn<SUBCHUNK_SIZE_B>(chunk_ptr, mask_ptr_a, tid, gid, aggregator);
uint64_t t_unused;
AggrFn<SUBCHUNK_SIZE_B>(chunk_ptr, mask_ptr_a, tid, gid, aggregator, &t_unused);
}
}
@ -305,7 +304,7 @@ int main() {
for(std::thread& t : agg_pool) { t.join(); }
uint64_t result_actual = 0;
aggregation::apply<sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT>(&result_actual, DATA_DST_);
aggregation::apply(&result_actual, DATA_DST_, sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT);
const auto time_end = std::chrono::steady_clock::now();

314
qdp_project/src/utils/aggregation.h

@ -8,102 +8,318 @@
#include "vector_loader.h"
#include "const.h"
template<typename base_t, load_mode load_mode>
class AggregationSUM {
/**
* @brief Super Class for all Aggregation functions. Guards Sub Classes from having an non integral base type.
*
* @tparam T
*/
template <typename T>
class AggFunction {
static_assert(std::is_integral<T>::value, "The base type of an AggFunction must be an integral");
};
/**
* @brief Template class that implements methods used for Summation. It wraps the corresponding vector intrinsics
*
* @tparam T base datatype for the implemented methods
*/
template<typename T>
class Sum : public AggFunction<T> {
public:
static inline __m512i simd_agg(__m512i aggregator, __m512i vector) {
if constexpr (sizeof(base_t) == 4) return _mm512_add_epi32(aggregator, vector);
else if constexpr (sizeof(base_t) == 8) return _mm512_add_epi64(aggregator, vector);
static_assert(sizeof(base_t) == 4 || sizeof(base_t) == 8, "Sum is only implemented for 32 and 64 wide integers");
if constexpr (sizeof(T) == 4) return _mm512_add_epi32(aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_add_epi64(aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers");
};
static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) {
if constexpr (sizeof(base_t) == 4) return _mm512_mask_add_epi32(aggregator, mask, aggregator, vector);
else if constexpr (sizeof(base_t) == 8) return _mm512_mask_add_epi64(aggregator, mask, aggregator, vector);
static_assert(sizeof(base_t) == 4 || sizeof(base_t) == 8, "Sum is only implemented for 32 and 64 wide integers");
if constexpr (sizeof(T) == 4) return _mm512_mask_add_epi32(aggregator, mask, aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_mask_add_epi64(aggregator, mask, aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers");
};
static inline base_t simd_reduce(__m512i vector) {
if constexpr (sizeof(base_t) == 4) return _mm512_reduce_add_epi32(vector);
else if constexpr (sizeof(base_t) == 8) return _mm512_reduce_add_epi64(vector);
static_assert(sizeof(base_t) == 4 || sizeof(base_t) == 8, "Sum is only implemented for 32 and 64 wide integers");
static inline T simd_reduce(__m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_reduce_add_epi32(vector);
else if constexpr (sizeof(T) == 8) return _mm512_reduce_add_epi64(vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Sum is only implemented for 32 and 64 wide integers");
};
static inline base_t scalar_agg(base_t aggregator, base_t scalar) { return aggregator + scalar; };
static inline T scalar_agg(T aggregator, T scalar) { return aggregator + scalar; };
static inline __m512i zero() { return _mm512_set1_epi32(0); };
};
static_assert(std::is_same_v<base_t, uint64_t>, "Enforce unsigned 64 bit ints.");
/*
* returns time in ns spent loading vector
*/
template<size_t CHUNK_SIZE_B>
static bool apply(base_t *dest, base_t *src) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
constexpr size_t value_count = CHUNK_SIZE_B / sizeof(base_t);
constexpr size_t iterations = value_count - lanes + 1;
/**
* @brief Template class that implements methods used for Maximum determination. It wraps the corresponding vector intrinsics
*
* @tparam T base datatype for the implemented methods
*
*/
template<typename T>
class Max : public AggFunction<T> {
public:
static inline __m512i simd_agg(__m512i aggregator, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_max_epi32(aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_max_epi64(aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers");
}
static_assert(value_count >= lanes);
static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_mask_max_epi32(aggregator, mask, aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_mask_max_epi64(aggregator, mask, aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers");
}
__m512i agg_vec = zero();
size_t i = 0;
static inline T simd_reduce(__m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_reduce_max_epi32(vector);
else if constexpr (sizeof(T) == 8) return _mm512_reduce_max_epi64(vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers");
}
for(size_t i = 0; i < iterations; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
static inline T scalar_agg(T aggregator, T scalar) { return std::max(aggregator, scalar); }
agg_vec = simd_agg(agg_vec, vec);
static inline __m512i zero() {
if constexpr (sizeof(T) == 4) {
if constexpr (std::is_signed<T>::value) return _mm512_set1_epi32(0xFFFFFFFF);
else return _mm512_set1_epi32(0x0);
}
else if constexpr (sizeof(T) == 8) {
if constexpr (std::is_signed<T>::value) return _mm512_set1_epi32(0xFFFFFFFFFFFFFFFF);
else return _mm512_set1_epi32(0x0);
}
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Max is only implemented for 32 and 64 wide integers");
}
};
/**
* @brief Template class that implements methods used for Minimum determination. It wraps the corresponding vector intrinsics
*
* @tparam T base datatype for the implemented methods
*
*/
template<typename T>
class Min : public AggFunction<T> {
public:
static inline __m512i simd_agg(__m512i aggregator, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_min_epi32(aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_min_epi64(aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers");
}
base_t result = simd_reduce(agg_vec);
static inline __m512i simd_agg(__m512i aggregator, __mmask16 mask, __m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_mask_min_epi32(aggregator, mask, aggregator, vector);
else if constexpr (sizeof(T) == 8) return _mm512_mask_min_epi64(aggregator, mask, aggregator, vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers");
}
for(; i < value_count; ++i) {
result = scalar_agg(result, src[i]);
static inline T simd_reduce(__m512i vector) {
if constexpr (sizeof(T) == 4) return _mm512_reduce_min_epi32(vector);
else if constexpr (sizeof(T) == 8) return _mm512_reduce_min_epi64(vector);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers");
}
static inline T scalar_agg(T aggregator, T scalar) { return std::min(aggregator, scalar); }
static inline __m512i zero() {
if constexpr (sizeof(T) == 4) {
if constexpr (std::is_signed<T>::value) return _mm512_set1_epi32(0xEFFFFFFF);
else return _mm512_set1_epi32(0xFFFFFFFF);
}
else if constexpr (sizeof(T) == 8) {
if constexpr (std::is_signed<T>::value) return _mm512_set1_epi32(0xEFFFFFFFFFFFFFFF);
else return _mm512_set1_epi32(0xFFFFFFFFFFFFFFFF);
}
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "Min is only implemented for 32 and 64 wide integers");
}
};
/**
* @brief Template Class that implements an aggregation operation.
*
* @tparam base_t Base type of the values for aggregation
* @tparam func
* @tparam load_mode
*/
template<typename base_t, template<typename _base_t> class func, load_mode load_mode>
class Aggregation{
public:
static_assert(std::is_same_v<base_t, uint64_t>, "Enforce unsigned 64 bit ints.");
using OP = func<base_t>;
/**
* @brief Calculates the memory maximal needed to store a chunk's processing result.
*
* @param chunk_size_b Size of the chunk in byte
* @return size_t Size of the chunk's processing result in byte
*/
static size_t result_bytes_per_chunk(size_t chunk_size_b) {
// aggregation returns a single value of type base_t
return sizeof(base_t);
}
/**
* @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes.
* The result is written to main memory.
*
* @param dest Pointer to the start of the result chunk
* @param src Pointer to the start of the source chunk
* @param chunk_size_b Size of the source chunk in bytes
* @return true When the aggregation is done
* @return false Never
*/
static bool apply (base_t *dest, base_t *src, size_t chunk_size_b) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
size_t value_count = chunk_size_b / sizeof(base_t);
__m512i agg_vec = func<base_t>::zero();
size_t i = 0;
base_t result = 0;
// stop before! running out of space
if(value_count >= lanes) {// keep in mind value_count is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
agg_vec = func<base_t>::simd_agg(agg_vec, vec);
}
result = func<base_t>::simd_reduce(agg_vec);
}
for(; i < value_count; ++i) {
result = func<base_t>::scalar_agg(result, src[i]);
}
*dest = result;
return true;
}
/*
* returns time in ns spent loading vector
*/
template<size_t CHUNK_SIZE_B>
static uint64_t apply_masked(__m512i& dest, base_t *src, uint16_t* msks) {
/**
* @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes,
* while applying the bit string stored in *masks*. The result is written to main memory.
*
* @param dest Pointer to the start of the result chunk
* @param src Pointer to the start of the source chunk
* @param masks Pointer the bitstring that marks the values that should be aggregated
* @param chunk_size_b Size of the source chunk in bytes
* @return true When the aggregation is done
* @return false Never
*/
static bool apply_masked (base_t *dest, base_t *src, uint16_t* msks, size_t chunk_size_b) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
constexpr size_t value_count = CHUNK_SIZE_B / sizeof(base_t);
constexpr size_t iterations = value_count - lanes + 1;
uint8_t* masks = (uint8_t *)msks;
size_t value_count = chunk_size_b / sizeof(base_t);
__m512i agg_vec = func<base_t>::zero();
size_t i = 0;
static_assert(value_count >= lanes);
// stop before! running out of space
if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
__mmask8 mask = _mm512_int2mask(masks[i / lanes]);
uint64_t load_time = 0;
agg_vec = func<base_t>::simd_mask_agg(agg_vec, mask, vec);
}
*dest = func<base_t>::simd_reduce(agg_vec);
auto* masks = reinterpret_cast<uint8_t*>(msks);
for(; i < value_count; ++i) {
uint8_t mask = masks[i / lanes];
if(mask & (0b1 << (i % lanes))){
*dest = func<base_t>::scalar_agg(*dest, src[i]);
}
}
return true;
}
/**
* @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes,
* while applying the bit string stored in *masks*. The values are agggegated in the register *dest* without
* clearing beforehand.
*
* NOTE! This function only works correctly if the the chunk_size_b is a multiple of 64 byte
*
* @param dest Vector register used for storing and passing the result around
* @param src Pointer to the start of the source chunk
* @param masks Pointer the bitstring that marks the values that should be aggregated
* @param chunk_size_b Size of the source chunk in bytes
* @return __m512i Vector register holding the aggregation result
*/
static __m512i apply_masked (__m512i dest, base_t *src, uint16_t* msks, size_t chunk_size_b, uint64_t* time_load) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
uint8_t* masks = (uint8_t*) msks;
//TODO this function does not work if value_count % lanes != 0
size_t value_count = chunk_size_b / sizeof(base_t);
size_t i = 0;
for(size_t i = 0; i < iterations; i += lanes) {
auto ts_load = std::chrono::steady_clock::now();
*time_load = 0;
// stop before! running out of space
if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
const auto ts = std::chrono::steady_clock::now();
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
auto te_load = std::chrono::steady_clock::now();
load_time += std::chrono::duration_cast<std::chrono::nanoseconds>(te_load - ts_load).count();
const auto te = std::chrono::steady_clock::now();
*time_load += std::chrono::duration_cast<std::chrono::nanoseconds>(te - ts).count();
__mmask8 mask = _mm512_int2mask(masks[i / lanes]);
dest = func<base_t>::simd_agg(dest, mask, vec);
}
return dest;
}
/**
* @brief Applies the aggregation function on the chunk starting at *src* and spanning *chunk_size_b* bytes,
* while applying two bit strings stored in *masks_0* and *masks_1*. The values are aggregated in the register
* *dest* without clearing beforehand.
*
* NOTE! This function only works correctly if the the chunk_size_b is a multiple of 64 byte
*
* @param dest Vector register used for storing and passing the result around
* @param src Pointer to the start of the source chunk
* @param masks_0 Pointer the bitstring that marks the values that should be aggregated
* @param masks_1 Pointer the bitstring that marks the values that should be aggregated
* @param chunk_size_b Size of the source chunk in bytes
* @return __m512i Vector register holding the aggregation result
*/
static __m512i apply_masked (__m512i dest, base_t *src, uint16_t* msks0, uint16_t* msks1, size_t chunk_size_b) {
constexpr size_t lanes = VECTOR_SIZE<base_t>();
uint8_t* masks0 = (uint8_t*) msks0;
uint8_t* masks1 = (uint8_t*) msks1;
//TODO this function does not work if value_count % lanes != 0
size_t value_count = chunk_size_b / sizeof(base_t);
size_t i = 0;
// stop before! running out of space
if(value_count >= lanes) // keep in mind value_count is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) {
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
__mmask8 mask0 = _mm512_int2mask(masks0[i / lanes]);
__mmask8 mask1 = _mm512_int2mask(masks1[i / lanes]);
dest = simd_agg(dest, mask, vec);
mask0 = _kand_mask8(mask0, mask1);
dest = func<base_t>::simd_agg(dest, mask0, vec);
}
return load_time;
return dest;
}
/**
* @brief Reduces a vector by applying the aggregation function horizontally.
*
* @param dest Result of the horizontal aggregation
* @param src Vector as source for the horizontal aggregation
* @return true When the operation is done
* @return false Never
*/
static bool happly (base_t *dest, __m512i src) {
*dest = simd_reduce(src);
*dest = func<base_t>::simd_reduce(src);
return true;
}
static __m512i get_zero() {
return zero();
return func<base_t>::zero();
}
};

169
qdp_project/src/utils/filter.h

@ -7,64 +7,173 @@
#include "vector_loader.h"
template<typename base_t, load_mode load_mode>
class FilterLT {
/**
* @brief Super Class for all Aggregation functions. Guards Sub Classes from having an non integral base type.
*
* @tparam T An integral datatype
*/
template<typename T>
class FilterFunction {
static_assert(std::is_integral<T>::value, "The base type of a FilterFunction must be an integeral.");
};
/**
* @brief Template class that implements methods used for finding values that are not equal to the compare value.
* It wraps the corresponding vector intrinsics.
*
* @tparam T base datatype for the implemented methods
*/
template<typename T>
class NEQ : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(base_t) == 4) return _mm512_cmplt_epi32_mask(vector, comp);
else if constexpr (sizeof(base_t) == 8) return _mm512_cmplt_epi64_mask(vector, comp);
static_assert(sizeof(base_t) == 4 || sizeof(base_t) == 8, "LT is only implemented for 32 and 64 wide integers");
if constexpr (sizeof(T) == 4) return _mm512_cmpneq_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmpneq_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "NEQ is only implemented for 32 and 64 wide integers");
}
static inline bool scalar_filter(base_t scalar, base_t comp) { return scalar < comp; }
static inline bool scalar_filter(T scalar, T comp) { return scalar != comp; }
};
/*
* returns time in ns spent loading vector
*/
template<base_t CMP_VALUE, size_t CHUNK_SIZE_B>
static uint64_t apply_same(uint16_t *dst, base_t *src) {
constexpr uint32_t lanes = VECTOR_SIZE<base_t>();
constexpr size_t value_count = CHUNK_SIZE_B / sizeof(base_t);
constexpr size_t iterations = value_count - lanes;
template<typename T>
class EQ : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmpeq_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmpeq_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "EQ is only implemented for 32 and 64 wide integers");
}
static inline bool scalar_filter(T scalar, T comp) { return scalar == comp; }
};
template<typename T>
class LT : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmplt_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmplt_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "LT is only implemented for 32 and 64 wide integers");
}
static_assert(value_count > lanes);
static inline bool scalar_filter(T scalar, T comp) { return scalar < comp; }
};
uint64_t load_time = 0;
template<typename T>
class LEQ : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmple_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmple_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "LEQ is only implemented for 32 and 64 wide integers");
}
uint8_t* dest = (uint8_t*) dst;
static inline bool scalar_filter(T scalar, T comp) { return scalar <= comp; }
};
__m512i cmp_vec = _mm512_set1_epi64(CMP_VALUE);
template<typename T>
class GT : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmpgt_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmpgt_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "GT is only implemented for 32 and 64 wide integers");
}
static inline bool scalar_filter(T scalar, T comp) { return scalar > comp; }
};
template<typename T>
class GEQ : public FilterFunction<T> {
public:
static inline __mmask16 simd_filter(__m512i vector, __m512i comp) {
if constexpr (sizeof(T) == 4) return _mm512_cmpge_epi32_mask(vector, comp);
else if constexpr (sizeof(T) == 8) return _mm512_cmpge_epi64_mask(vector, comp);
static_assert(sizeof(T) == 4 || sizeof(T) == 8, "GEQ is only implemented for 32 and 64 wide integers");
}
static inline bool scalar_filter(T scalar, T comp) { return scalar >= comp; }
};
template<typename base_t, template<typename _base_t> class func, load_mode load_mode, bool copy>
class Filter {
public:
static_assert(std::is_same_v<base_t, uint64_t>, "We enforce 64 bit integer");
/**
* @brief Calculates the memory maximal needed to store a chunk's processing result.
*
* @param chunk_size_b Size of the chunk in byte
* @return size_t Size of the chunk's processing result in byte
*/
static size_t result_bytes_per_chunk(size_t chunk_size_b) {
// + 7 to enshure that we have enougth bytes -> / 8 -> rounds down
// if we had 17 / 8 = 2 but (17 + 7) / 8 = 3
// if we hat 16 / 8 = 2 is right, as well as, 16 + 7 / 8 = 2
return (chunk_size_b / sizeof(base_t) + 7) / 8;
}
/**
* @brief Applies the filter function on the chunk starting at *src* and spanning *chunk_size_b* bytes, while comparing with he same value every time.
* The resulting bit string is written to main memory.
*
* @param dest Pointer to the start of the result chunk
* @param src Pointer to the start of the source chunk
* @param cmp_value Comparision value to compare the values from source to
* @param chunk_size_b Size of the source chunk in bytes
* @return true When the filter operation is done
* @return false Never
*/
// we only need this impl. yet, as all filter are at the end of a pipeline
static bool apply_same (uint16_t *dst, base_t *buffer, base_t *src, base_t cmp_value, size_t chunk_size_b, uint64_t* time_load) {
constexpr uint32_t lanes = VECTOR_SIZE<base_t>();
uint8_t* dest = (uint8_t*) dst;
size_t value_count = chunk_size_b / sizeof(base_t);
__m512i cmp_vec = _mm512_set1_epi64(cmp_value);
size_t i = 0;
for(; i < iterations; i += lanes) {
auto ts_load = std::chrono::steady_clock::now();
*time_load = 0;
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
// this weird implementetion is neccessary, see analogous impl in aggregation for explaination
if(value_count > lanes) {
for(; (i < value_count - lanes); i += lanes) {
const auto ts = std::chrono::steady_clock::now();
auto te_load = std::chrono::steady_clock::now();
load_time += std::chrono::duration_cast<std::chrono::nanoseconds>(te_load - ts_load).count();
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
__mmask8 bitmask = simd_filter(vec, cmp_vec);
const auto te = std::chrono::steady_clock::now();
*time_load += std::chrono::duration_cast<std::chrono::nanoseconds>(te - ts).count();
uint8_t int_mask = (uint8_t) _mm512_mask2int(bitmask);
__mmask8 bitmask = func<base_t>::simd_filter(vec, cmp_vec);
dest[i / lanes] = int_mask;
uint8_t int_mask = (uint8_t) _mm512_mask2int(bitmask);
dest[i / lanes] = int_mask;
if constexpr(copy){
Vector_Loader<base_t, load_mode>::store(buffer + i, vec);
}
}
}
auto dest_pos = i / lanes;
uint8_t int_mask = 0;
for(; i < value_count; ++i) {
base_t val = src[i];
uint8_t result = scalar_filter(val, CMP_VALUE);
uint8_t result = func<base_t>::scalar_filter(val, cmp_value);
int_mask |= (result << (i % lanes));
}
if constexpr(copy){
buffer[i] = val;
}
}
dest[dest_pos] = int_mask;
return load_time;
return true;
}
};
Loading…
Cancel
Save