Browse Source

remove vector-load timing as its too expensive

master
Constantin Fürst 11 months ago
parent
commit
6a4eec37ca
  1. 14
      qdp_project/plotter.py
  2. 31
      qdp_project/src/Benchmark.cpp
  3. 12
      qdp_project/src/utils/BenchmarkHelpers.cpp
  4. 7
      qdp_project/src/utils/aggregation.h
  5. 11
      qdp_project/src/utils/filter.h

14
qdp_project/plotter.py

@ -5,12 +5,12 @@ import seaborn as sns
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
output_path = "./plots" output_path = "./plots"
prefetch_result = "./evaluation-results/qdp-xeonmax-simple-prefetch-tca4-tcb1-tcj2-tmul8-wl4294967296-cs8388608.csv"
dram_result = "./evaluation-results/qdp-xeonmax-simple-dram-tca2-tcb0-tcj1-tmul16-wl4294967296-cs2097152.csv"
prefetch_result = "./evaluation-results/qdp-xeonmax-prefetch-tca2-tcb1-tcj1-tmul8-wl4294967296-cs16777216.csv"
dram_result = "./evaluation-results/qdp-xeonmax-dram-tca2-tcb0-tcj1-tmul8-wl4294967296-cs2097152.csv"
tt_name = "rt-ns" tt_name = "rt-ns"
function_names = [ "scana-run", "scana-load", "scanb-run", "aggrj-run", "aggrj-load" ]
fn_nice = [ "Scan A, Filter", "Scan A, Load", "Scan B", "Aggregate, Project + Sum", "Aggregate, Load" ]
function_names = [ "scana-run", "scanb-run", "aggrj-run" ]
fn_nice = [ "Scan A, Filter", "Scan B, Prefetch", "Aggregate, Project + Sum" ]
def read_timings_from_csv(fname) -> tuple[list[float], list[str]]: def read_timings_from_csv(fname) -> tuple[list[float], list[str]]:
t = {} t = {}
@ -34,10 +34,10 @@ def read_timings_from_csv(fname) -> tuple[list[float], list[str]]:
def get_data_prefetch_cache_access() -> tuple[list[float], list[str]]: def get_data_prefetch_cache_access() -> tuple[list[float], list[str]]:
total = 1.14
data = [ 0.05, 0.02, 0.17, 0.40, 0.36, 0.13 ]
total = 0.3
data = [ 0.07, 0.19, 0.04 ]
data = [ x * 100 / total for x in data ] data = [ x * 100 / total for x in data ]
keys = ["Cache::GetCacheNode", "Cache::GetFromCache", "dml::handler::constructor", "Cache::AllocOnNode", "dml::make_task", "dml::submit"]
keys = ["numa_alloc_onnode", "dml::make_mem_move_task", "dml::hardware_device::submit"]
return data,keys return data,keys

31
qdp_project/src/Benchmark.cpp

@ -5,7 +5,6 @@
#include <fstream> #include <fstream>
#include <future> #include <future>
#include <array> #include <array>
#include <atomic>
#include "const.h" #include "const.h"
#include "filter.h" #include "filter.h"
@ -83,7 +82,7 @@ void scan_b(size_t gid, size_t tid) {
THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); THREAD_TIMING_[SCANB_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
if constexpr (PERFORM_CACHING && !PERFORM_CACHING_IN_AGGREGATION) {
if constexpr (PERFORM_CACHING) {
caching<TC_SCANB>(gid, tid); caching<TC_SCANB>(gid, tid);
} }
@ -94,8 +93,6 @@ void scan_b(size_t gid, size_t tid) {
void scan_a(size_t gid, size_t tid) { void scan_a(size_t gid, size_t tid) {
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].clear();
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT); THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT);
INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX].clear();
INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX].resize(RUN_COUNT);
LAUNCH_.wait(); LAUNCH_.wait();
@ -106,12 +103,9 @@ void scan_a(size_t gid, size_t tid) {
uint64_t* chunk_ptr = get_chunk<TC_SCANA>(DATA_A_, chunk_index, tid); uint64_t* chunk_ptr = get_chunk<TC_SCANA>(DATA_A_, chunk_index, tid);
uint16_t* mask_ptr = get_mask<TC_SCANA>(MASK_A_, chunk_index, tid); uint16_t* mask_ptr = get_mask<TC_SCANA>(MASK_A_, chunk_index, tid);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA, &INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX][i]);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA);
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
BARRIERS_[gid]->arrive_and_wait();
THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now(); THREAD_TIMING_[SCANA_TIMING_INDEX][UniqueIndex(gid,tid)][0][TIME_STAMP_END] = std::chrono::steady_clock::now();
} }
@ -126,22 +120,15 @@ void aggr_j(size_t gid, size_t tid) {
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].clear(); THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].clear();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT); THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)].resize(RUN_COUNT);
INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX].clear();
INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX].resize(RUN_COUNT);
__m512i aggregator = aggregation::OP::zero(); __m512i aggregator = aggregation::OP::zero();
LAUNCH_.wait(); LAUNCH_.wait();
if constexpr (PERFORM_CACHING && PERFORM_CACHING_IN_AGGREGATION) {
caching<TC_AGGRJ>(gid, tid);
}
BARRIERS_[gid]->arrive_and_drop();
for (size_t i = 0; i < RUN_COUNT; i++) { for (size_t i = 0; i < RUN_COUNT; i++) {
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now(); THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_BEGIN] = std::chrono::steady_clock::now();
BARRIERS_[gid]->arrive_and_wait();
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now(); THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_WAIT] = std::chrono::steady_clock::now();
const size_t chunk_index = get_chunk_index(gid, i); const size_t chunk_index = get_chunk_index(gid, i);
@ -172,13 +159,11 @@ void aggr_j(size_t gid, size_t tid) {
} }
uint64_t tmp = _mm512_reduce_add_epi64(aggregator); uint64_t tmp = _mm512_reduce_add_epi64(aggregator);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B, &INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX][i]);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, SUBCHUNK_SIZE_B);
THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_END] = std::chrono::steady_clock::now(); THREAD_TIMING_[AGGRJ_TIMING_INDEX][UniqueIndex(gid,tid)][i][TIME_STAMP_END] = std::chrono::steady_clock::now();
} }
BARRIERS_[gid]->arrive_and_drop();
aggregation::happly(&DATA_DST_[UniqueIndex(gid,tid)], aggregator); aggregation::happly(&DATA_DST_[UniqueIndex(gid,tid)], aggregator);
} }
@ -197,7 +182,7 @@ int main() {
const std::string ofname = "results/qdp-xeonmax-" + std::string(MODE_STRING) + "-tca" + std::to_string(TC_SCANA) + "-tcb" + std::to_string(TC_SCANB) + "-tcj" + std::to_string(TC_AGGRJ) + "-tmul" + std::to_string(GROUP_COUNT) + "-wl" + std::to_string(WL_SIZE_B) + "-cs" + std::to_string(CHUNK_SIZE_B) + ".csv"; const std::string ofname = "results/qdp-xeonmax-" + std::string(MODE_STRING) + "-tca" + std::to_string(TC_SCANA) + "-tcb" + std::to_string(TC_SCANB) + "-tcj" + std::to_string(TC_AGGRJ) + "-tmul" + std::to_string(GROUP_COUNT) + "-wl" + std::to_string(WL_SIZE_B) + "-cs" + std::to_string(CHUNK_SIZE_B) + ".csv";
std::ofstream fout(ofname); std::ofstream fout(ofname);
fout << "run;rt-ns;rt-s;result[0];scana-run;scana-wait;scana-load;scanb-run;scanb-wait;aggrj-run;aggrj-wait;aggrj-load;cache-hr;" << std::endl;
fout << "run;rt-ns;rt-s;result[0];scana-run;scana-wait;scanb-run;scanb-wait;aggrj-run;aggrj-wait;cache-hr;" << std::endl;
DATA_A_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, MEM_NODE_A); DATA_A_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, MEM_NODE_A);
DATA_B_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, MEM_NODE_B); DATA_B_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, MEM_NODE_B);
@ -261,8 +246,8 @@ int main() {
std::cout << "Result Expected: " << result_expected << ", Result Actual: " << result_actual << std::endl; std::cout << "Result Expected: " << result_expected << ", Result Actual: " << result_actual << std::endl;
if (i >= WARMUP_ITERATION_COUNT) { if (i >= WARMUP_ITERATION_COUNT) {
uint64_t scana_run = 0, scana_wait = 0, scanb_run = 0, scanb_wait = 0, aggrj_run = 0, aggrj_wait = 0, scana_load = 0, aggrj_load = 0;
process_timings(&scana_run, &scana_wait, &scanb_run, &scanb_wait, &aggrj_run, &aggrj_wait, &scana_load, &aggrj_load);
uint64_t scana_run = 0, scana_wait = 0, scanb_run = 0, scanb_wait = 0, aggrj_run = 0, aggrj_wait = 0;
process_timings(&scana_run, &scana_wait, &scanb_run, &scanb_wait, &aggrj_run, &aggrj_wait);
constexpr double nanos_per_second = ((double)1000) * 1000 * 1000; constexpr double nanos_per_second = ((double)1000) * 1000 * 1000;
const uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_start).count(); const uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_start).count();
@ -272,7 +257,7 @@ int main() {
<< i - WARMUP_ITERATION_COUNT << ";" << i - WARMUP_ITERATION_COUNT << ";"
<< nanos << ";" << seconds << ";" << nanos << ";" << seconds << ";"
<< result_actual << ";" << result_actual << ";"
<< scana_run << ";" << scana_wait << ";" << scana_load << ";" << scanb_run << ";" << scanb_wait << ";" << aggrj_run << ";" << aggrj_wait << ";" << aggrj_load << ";"
<< scana_run << ";" << scana_wait << ";" << scanb_run << ";" << scanb_wait << ";" << aggrj_run << ";" << aggrj_wait << ";"
<< process_cache_hitrate() << ";" << process_cache_hitrate() << ";"
<< std::endl; << std::endl;
} }

12
qdp_project/src/utils/BenchmarkHelpers.cpp

@ -11,7 +11,6 @@ constexpr size_t TIME_STAMP_BEGIN = 0;
constexpr size_t TIME_STAMP_WAIT = 1; constexpr size_t TIME_STAMP_WAIT = 1;
constexpr size_t TIME_STAMP_END = 2; constexpr size_t TIME_STAMP_END = 2;
std::array<std::vector<uint64_t>, 3> INTERNAL_TIMING_VECTOR_LOAD_;
std::array<std::vector<std::vector<std::array<std::chrono::steady_clock::time_point, 3>>>, 3> THREAD_TIMING_; std::array<std::vector<std::vector<std::array<std::chrono::steady_clock::time_point, 3>>>, 3> THREAD_TIMING_;
std::array<uint32_t, GROUP_COUNT * TC_AGGRJ> CACHE_HITS_; std::array<uint32_t, GROUP_COUNT * TC_AGGRJ> CACHE_HITS_;
@ -95,8 +94,7 @@ double process_cache_hitrate() {
void process_timings( void process_timings(
uint64_t* scana_run, uint64_t* scana_wait, uint64_t* scana_run, uint64_t* scana_wait,
uint64_t* scanb_run, uint64_t* scanb_wait, uint64_t* scanb_run, uint64_t* scanb_wait,
uint64_t* aggrj_run, uint64_t* aggrj_wait,
uint64_t* scana_load, uint64_t* aggrj_load
uint64_t* aggrj_run, uint64_t* aggrj_wait
) { ) {
{ {
uint64_t scana_rc = 0; uint64_t scana_rc = 0;
@ -146,12 +144,4 @@ void process_timings(
*aggrj_wait /= aggrj_rc; *aggrj_wait /= aggrj_rc;
} }
} }
{
for (const auto e : INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX]) *scana_load += e;
for (const auto e : INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX]) *aggrj_load += e;
*scana_load /= INTERNAL_TIMING_VECTOR_LOAD_[SCANA_TIMING_INDEX].size();
*aggrj_load /= INTERNAL_TIMING_VECTOR_LOAD_[AGGRJ_TIMING_INDEX].size();
*scana_run -= *scana_load;
*aggrj_run -= *aggrj_run;
}
} }

7
qdp_project/src/utils/aggregation.h

@ -245,7 +245,7 @@ public:
* @param chunk_size_b Size of the source chunk in bytes * @param chunk_size_b Size of the source chunk in bytes
* @return __m512i Vector register holding the aggregation result * @return __m512i Vector register holding the aggregation result
*/ */
static __m512i apply_masked (__m512i dest, base_t *src, uint16_t* msks, size_t chunk_size_b, uint64_t* time_load) {
static __m512i apply_masked (__m512i dest, base_t *src, uint16_t* msks, size_t chunk_size_b) {
constexpr size_t lanes = VECTOR_SIZE<base_t>(); constexpr size_t lanes = VECTOR_SIZE<base_t>();
uint8_t* masks = (uint8_t*) msks; uint8_t* masks = (uint8_t*) msks;
//TODO this function does not work if value_count % lanes != 0 //TODO this function does not work if value_count % lanes != 0
@ -257,12 +257,7 @@ public:
// stop before! running out of space // stop before! running out of space
if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't. if(value_count >= lanes) // keep in mind size_w is unsigned so if it becomes negative, it doesn't.
for(; i <= value_count - lanes; i += lanes) { for(; i <= value_count - lanes; i += lanes) {
const auto ts = std::chrono::steady_clock::now();
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i); __m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
const auto te = std::chrono::steady_clock::now();
*time_load += std::chrono::duration_cast<std::chrono::nanoseconds>(te - ts).count();
__mmask8 mask = _mm512_int2mask(masks[i / lanes]); __mmask8 mask = _mm512_int2mask(masks[i / lanes]);
dest = func<base_t>::simd_agg(dest, mask, vec); dest = func<base_t>::simd_agg(dest, mask, vec);
} }

11
qdp_project/src/utils/filter.h

@ -128,27 +128,18 @@ public:
* @return false Never * @return false Never
*/ */
// we only need this impl. yet, as all filter are at the end of a pipeline // we only need this impl. yet, as all filter are at the end of a pipeline
static bool apply_same (uint16_t *dst, base_t *buffer, base_t *src, base_t cmp_value, size_t chunk_size_b, uint64_t* time_load) {
static bool apply_same (uint16_t *dst, base_t *buffer, base_t *src, base_t cmp_value, size_t chunk_size_b) {
constexpr uint32_t lanes = VECTOR_SIZE<base_t>(); constexpr uint32_t lanes = VECTOR_SIZE<base_t>();
uint8_t* dest = (uint8_t*) dst; uint8_t* dest = (uint8_t*) dst;
size_t value_count = chunk_size_b / sizeof(base_t); size_t value_count = chunk_size_b / sizeof(base_t);
__m512i cmp_vec = _mm512_set1_epi64(cmp_value); __m512i cmp_vec = _mm512_set1_epi64(cmp_value);
size_t i = 0; size_t i = 0;
*time_load = 0;
// this weird implementetion is neccessary, see analogous impl in aggregation for explaination // this weird implementetion is neccessary, see analogous impl in aggregation for explaination
if(value_count > lanes) { if(value_count > lanes) {
for(; (i < value_count - lanes); i += lanes) { for(; (i < value_count - lanes); i += lanes) {
const auto ts = std::chrono::steady_clock::now();
__m512i vec = Vector_Loader<base_t, load_mode>::load(src + i); __m512i vec = Vector_Loader<base_t, load_mode>::load(src + i);
const auto te = std::chrono::steady_clock::now();
*time_load += std::chrono::duration_cast<std::chrono::nanoseconds>(te - ts).count();
__mmask8 bitmask = func<base_t>::simd_filter(vec, cmp_vec); __mmask8 bitmask = func<base_t>::simd_filter(vec, cmp_vec);
uint8_t int_mask = (uint8_t) _mm512_mask2int(bitmask); uint8_t int_mask = (uint8_t) _mm512_mask2int(bitmask);
dest[i / lanes] = int_mask; dest[i / lanes] = int_mask;

Loading…
Cancel
Save