Browse Source

reworking the qdp benchmark

master
Constantin Fürst 11 months ago
parent
commit
aa0867aa3a
  1. 39
      qdp_project/CMakeLists.txt
  2. 0
      qdp_project/src/.gitkeep
  3. 215
      qdp_project/src/Benchmark.cpp
  4. 232
      qdp_project/src/benchmark/MAX_benchmark.cpp
  5. 360
      qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h
  6. 50
      qdp_project/src/utils/BenchmarkHelpers.cpp
  7. 0
      qdp_project/src/utils/aggregation.h
  8. 73
      qdp_project/src/utils/barrier_utils.h
  9. 82
      qdp_project/src/utils/cpu_set_utils.h
  10. 76
      qdp_project/src/utils/file_output.h
  11. 0
      qdp_project/src/utils/filter.h
  12. 208
      qdp_project/src/utils/iterable_range.h
  13. 152
      qdp_project/src/utils/measurement_utils.h
  14. 6
      qdp_project/src/utils/pcm.h
  15. 80
      qdp_project/src/utils/timer_utils.h

39
qdp_project/CMakeLists.txt

@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.18) cmake_minimum_required(VERSION 3.18)
# set the project name # set the project name
project(NUMA_Slow_Fast_Datamigration_Test VERSION 0.1)
project(QDPBench VERSION 0.1)
# specify the C standard # specify the C standard
set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD 20)
@ -26,47 +26,12 @@ add_compile_options(
"$<$<CONFIG:Debug>:${DEBUG_FLAGS}>" "$<$<CONFIG:Debug>:${DEBUG_FLAGS}>"
) )
# evaluate custom variables
function(eval vname vvalid vdefault)
# is variable is set to the below value if its not already defined from the comand line
set(VALID ${vvalid} CACHE INTERNAL "Possible values for ${vname}")
set(${vname} ${vdefault} CACHE STRING "The barrier mode")
# command for GUI shenanigans
set_property(CACHE ${vname} PROPERTY STRINGS VALID)
if(${vname} IN_LIST VALID)
message(STATUS "Variable ${vname} = ${${vname}}")
else()
message(STATUS "Variable ${vname} has invalid value ${${vname}}")
# set the fallback value for use in parent function
unset(${vname} CACHE)
message(STATUS "Fallback to default: ${vname} = ${vdefault}")
set(${vname} ${vdefault} PARENT_SCOPE)
endif()
endfunction()
eval(WSUPPRESS "suppress;show" "show")
if($<STREQUAL:${BUFFER_LIMIT},suppress> EQUAL 1)
add_compile_options("${SUPPRESS_WARNINGS}")
endif()
eval(BARRIER_MODE "global;local" "global")
add_definitions(-DBARRIER_MODE="${BARRIER_MODE}")
eval(BUFFER_LIMIT "unlimited;limited" "unlimited")
add_definitions(-DBUFFER_LIMIT=$<STREQUAL:${BUFFER_LIMIT},limited>)
eval(THREAD_FACTOR "1;2;3;4;5;6;7;8;9;10" "1")
add_definitions(-DTHREAD_GROUP_MULTIPLIER=${THREAD_FACTOR})
# include directories # include directories
include_directories(src/utils) include_directories(src/utils)
include_directories(src/algorithm)
include_directories(src/algorithm/operators)
# link libraries # link libraries
link_libraries(-lnuma -lpthread -l:libdml.a) link_libraries(-lnuma -lpthread -l:libdml.a)
# Add targets only below # Add targets only below
# specify build targets # specify build targets
add_executable(MAXBench src/benchmark/MAX_benchmark.cpp)
add_executable(QDPBench src/Benchmark.cpp)

0
qdp_project/src/.gitkeep

215
qdp_project/src/Benchmark.cpp

@ -0,0 +1,215 @@
#include <memory>
#include <cassert>
#include <mutex>
#include <cstring>
#include <bitset>
#include <algorithm>
#include <barrier>
#include <vector>
#include <fstream>
#include <future>
#include "const.h"
#include "filter.h"
#include "aggregation.h"
#include "array_utils.h"
#include "memory_literals.h"
#include "../../offloading-cacher/cache.hpp"
#include "BenchmarkHelpers.cpp"
////////////////////////////////
/// BENCHMARK SETUP
constexpr size_t WL_SIZE_B = 4_GiB;
constexpr size_t CHUNK_SIZE_B = 128_MiB;
constexpr uint64_t CMP_A = 50;
constexpr uint32_t WARMUP_ITERATION_COUNT = 5;
constexpr uint32_t ITERATION_COUNT = 10;
constexpr size_t GROUP_COUNT = 4;
constexpr size_t TC_SCANA = 2;
constexpr size_t TC_SCANB = 2;
constexpr size_t TC_AGGRJ = 1;
constexpr bool PERFORM_CACHING = false;
constexpr bool DATA_IN_HBM = false;
constexpr char MODE_STRING[] = "DramBase";
/// DO NOT CONFIGURE BEYOND THIS
////////////////////////////////
constexpr size_t WL_SIZE_ELEMENTS = WL_SIZE_B / sizeof(uint64_t);
constexpr size_t CHUNK_COUNT = WL_SIZE_B / CHUNK_SIZE_B;
constexpr size_t CHUNK_SIZE_ELEMENTS = CHUNK_SIZE_B / sizeof(uint64_t);
using filter = Filter<uint64_t, LT, load_mode::Stream, false>;
using aggregation = Aggregation<uint64_t, Sum, load_mode::Stream>;
dsacache::Cache CACHE_;
std::vector<std::barrier<NopStruct>> BARRIERS_;
std::shared_future<void> LAUNCH_;
uint64_t* DATA_A_;
uint64_t* DATA_B_;
uint16_t* MASK_A_;
uint64_t* DATA_DST_;
void scan_b(size_t gid, size_t tid) {
LAUNCH_.wait();
uint32_t runs = CHUNK_COUNT / GROUP_COUNT + (CHUNK_COUNT % GROUP_COUNT > gid);
std::unique_ptr<dsacache::CacheData> data;
for(uint32_t i = 0; i < runs; ++i) {
// calculate pointers
size_t chunk_id = gid + GROUP_COUNT * i;
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_B_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANB);
if constexpr (PERFORM_CACHING) {
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), CHUNK_SIZE_B / TC_SCANB);
data->WaitOnCompletion();
}
}
}
void scan_a(size_t gid, size_t tid) {
LAUNCH_.wait();
uint32_t runs = CHUNK_COUNT / GROUP_COUNT + (CHUNK_COUNT % GROUP_COUNT > gid);
for(uint32_t i = 0; i < runs; ++i) {
// calculate pointers
size_t chunk_id = gid + GROUP_COUNT * i;
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_B_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA);
uint16_t* mask_ptr = get_sub_mask_ptr (MASK_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_SCANA);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, CMP_A, CHUNK_SIZE_B / TC_SCANA);
}
}
void aggr_j(size_t gid, size_t tid) {
LAUNCH_.wait();
// calculate values
__m512i aggregator = aggregation::OP::zero();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = CHUNK_COUNT / GROUP_COUNT + (CHUNK_COUNT % GROUP_COUNT > gid);
for(uint32_t i = 0; i < runs; ++i) {
// calculate pointers
size_t chunk_id = gid + GROUP_COUNT * i;
uint64_t* chunk_ptr = get_sub_chunk_ptr(DATA_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ);
std::unique_ptr<dsacache::CacheData> data;
uint64_t* data_ptr;
if constexpr (PERFORM_CACHING) {
// access the cache for the given chunk which will have been accessed in scan_b
data = CACHE_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), CHUNK_SIZE_B / TC_AGGRJ);
// after the copy task has finished we obtain the pointer to the cached
// copy of data_b which is then used from now on
data_ptr = reinterpret_cast<uint64_t*>(data->GetDataLocation());
// nullptr is still a legal return value for CacheData::GetLocation()
// even after waiting, so this must be checked
if (data_ptr == nullptr) {
std::cerr << "[x] Cache Miss!" << std::endl;
exit(-1);
}
}
else {
data_ptr = chunk_ptr;
}
uint16_t* mask_ptr_a = get_sub_mask_ptr(MASK_A_, chunk_id, CHUNK_SIZE_ELEMENTS, tid, TC_AGGRJ);
uint64_t tmp = _mm512_reduce_add_epi64(aggregator);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, CHUNK_SIZE_B / TC_AGGRJ);
}
aggregation::happly(DATA_DST_ + (tid * GROUP_COUNT + gid), aggregator);
}
int main() {
const int current_cpu = sched_getcpu();
const int current_node = numa_node_of_cpu(current_cpu);
const int cache_node = CachePlacementPolicy(current_node, current_node, 0);
const std::string ofname = "results/qdp-xeonmax-simpleq-" + std::string(MODE_STRING) + "-tca" + std::to_string(TC_SCANA) + "-tcb" + std::to_string(TC_SCANB) + "-tcj" + std::to_string(TC_AGGRJ) + "-tmul" + std::to_string(GROUP_COUNT) + "-wl" + std::to_string(WL_SIZE_B) + "-cs" + std::to_string(CHUNK_SIZE_B) + ".csv";
std::ofstream fout(ofname);
fout << "run;time;" << std::endl;
if constexpr (DATA_IN_HBM) {
DATA_A_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, cache_node);
DATA_B_ = (uint64_t*) numa_alloc_onnode(WL_SIZE_B, cache_node);
MASK_A_ = (uint16_t*) numa_alloc_onnode(WL_SIZE_ELEMENTS, cache_node);
DATA_DST_ = (uint64_t*) numa_alloc_onnode(TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t), cache_node);
}
else {
DATA_A_ = (uint64_t*) numa_alloc_local(WL_SIZE_B);
DATA_B_ = (uint64_t*) numa_alloc_local(WL_SIZE_B);
MASK_A_ = (uint16_t*) numa_alloc_local(WL_SIZE_ELEMENTS);
DATA_DST_ = (uint64_t*) numa_alloc_local(TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t));
}
if constexpr (PERFORM_CACHING) {
CACHE_.Init(CachePlacementPolicy, CopyMethodPolicy);
}
fill_mt<uint64_t>(DATA_A_, WL_SIZE_B, 0, 100, 42);
fill_mt<uint64_t>(DATA_A_, WL_SIZE_B, 0, 100, 420);
for (uint32_t i = 0; i < ITERATION_COUNT + WARMUP_ITERATION_COUNT; i++) {
CACHE_.Clear();
std::promise<void> launch_promise;
LAUNCH_ = launch_promise.get_future();
std::vector<std::thread> filter_pool;
std::vector<std::thread> copy_pool;
std::vector<std::thread> agg_pool;
for(uint32_t gid = 0; gid < GROUP_COUNT; ++gid) {
for(uint32_t tid = 0; tid < TC_SCANA; ++tid) {
filter_pool.emplace_back(scan_a, gid, tid);
}
for(uint32_t tid = 0; tid < TC_SCANB; ++tid) {
copy_pool.emplace_back(scan_b, gid, tid);
}
for(uint32_t tid = 0; tid < TC_AGGRJ; ++tid) {
agg_pool.emplace_back(aggr_j, gid, tid);
}
}
const auto time_start = std::chrono::steady_clock::now();
launch_promise.set_value();
for(std::thread& t : filter_pool) { t.join(); }
for(std::thread& t : copy_pool) { t.join(); }
for(std::thread& t : agg_pool) { t.join(); }
Aggregation<uint64_t, Sum, load_mode::Aligned>::apply(DATA_DST_, DATA_DST_, sizeof(uint64_t) * TC_AGGRJ * GROUP_COUNT);
const auto time_end = std::chrono::steady_clock::now();
if (i >= WARMUP_ITERATION_COUNT) {
fout << i << ";" << std::chrono::duration_cast<std::chrono::nanoseconds>(time_end - time_start).count() << std::endl;
}
}
numa_free(DATA_A_, WL_SIZE_B);
numa_free(DATA_B_, WL_SIZE_B);
numa_free(MASK_A_, WL_SIZE_ELEMENTS);
numa_free(DATA_DST_, TC_AGGRJ * GROUP_COUNT * sizeof(uint64_t));
return 0;
}

232
qdp_project/src/benchmark/MAX_benchmark.cpp

@ -1,232 +0,0 @@
#include <atomic>
#include <barrier>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <future>
#include <iostream>
#include <limits>
#include <list>
#include <mutex>
#include <queue>
#include <thread>
#include <tuple>
#include <utility>
#include <numa.h>
#ifndef THREAD_GROUP_MULTIPLIER
#define THREAD_GROUP_MULTIPLIER 2
#endif
#ifndef BARRIER_MODE
#define BARRIER_MODE "global"
#endif
#ifndef BUFFER_LIMIT
#define BUFFER_LIMIT 1
#endif
#include "const.h"
#include "file_output.h"
#include "array_utils.h"
#include "timer_utils.h"
#include "barrier_utils.h"
#include "measurement_utils.h"
#include "cpu_set_utils.h"
#include "iterable_range.h"
#include "memory_literals.h"
#include "pipelines/MAX_scan_filter_pipe.h"
#include "aggregation.h"
#include "filter.h"
using base_t = uint64_t;
static int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node;
}
base_t sum_check(base_t compare_value, base_t* row_A, base_t* row_B, size_t row_size) {
base_t sum = 0;
for(int i = 0; i < row_size / sizeof(base_t); ++i) {
sum += (row_A[i] < compare_value) * row_B[i];
}
return sum;
}
base_t sum_check_complex(base_t compare_value_a, base_t compare_value_b, base_t* row_A, base_t* row_B, size_t row_size) {
base_t sum = 0;
for(int i = 0; i < row_size / sizeof(base_t); ++i) {
sum += (row_A[i] < compare_value_a && row_B[i] < compare_value_b) * row_B[i];
}
return sum;
}
enum class ExecMode {
DramBaseline,
HbmPeak,
HbmPrefetch
};
int main(int argc, char** argv) {
constexpr ExecMode mode = ExecMode::HbmPrefetch;
constexpr size_t workload_b = 4_GiB;
constexpr base_t compare_value_a = 50;
constexpr base_t compare_value_b = 42;
constexpr size_t chunk_size = 64_MiB;
// thread count is 12 here but as the default measurement uses 6
// we must restrict the core assignment of these 12 threads to
// 6 physical cpu cores on the executing node
/*** alloc data and buffers ************************************************/
uint8_t tc_filter;
uint8_t tc_copy;
uint8_t tc_agg;
if constexpr (mode == ExecMode::HbmPrefetch) {
tc_filter = 8;
tc_copy = 1;
tc_agg = 4;
}
else {
tc_filter = 8;
tc_copy = 0;
tc_agg = 4;
}
const uint8_t tc_combined = tc_filter + tc_copy + tc_agg;
base_t* data_a;
base_t* data_b;
base_t* results;
const int current_cpu = sched_getcpu();
const int current_node = numa_node_of_cpu(current_cpu);
const int cache_node = CachePlacementPolicy(current_node, current_node, 0);
std::ofstream out_file;
std::string mode_string;
if constexpr (mode == ExecMode::HbmPrefetch) {
mode_string = "HbmDsaPrefetch";
}
else if constexpr (mode == ExecMode::HbmPeak) {
mode_string = "HbmAllocPeak";
}
else if constexpr (mode == ExecMode::DramBaseline) {
mode_string = "DramBaseline";
}
else {
mode_string = "Unknown";
}
const std::string ofname = "results/qdp-xeonmax-simpleq-" + mode_string + "-tca" + std::to_string(tc_filter) + "-tcb" + std::to_string(tc_copy) + "-tcj" + std::to_string(tc_agg) + "-tmul" + std::to_string(THREAD_GROUP_MULTIPLIER) + "-wl" + std::to_string(workload_b) + "-cs" + std::to_string(chunk_size) + ".csv";
out_file.open(ofname);
if (!out_file.is_open()) {
std::cerr << "Failed to open Output File '" << ofname << "'" << std::endl;
}
print_to_file(out_file, "thread_group", "time",
#ifdef THREAD_TIMINGS
"scan_a", "scan_b", "aggr_j",
#endif
#ifdef BARRIER_TIMINGS
"wait_scan_a", "wait_scan_b", "wait_aggr_j",
#endif
"result");
if constexpr (mode == ExecMode::HbmPeak) {
data_a = (base_t*) numa_alloc_onnode(workload_b, cache_node);
data_b = (base_t*) numa_alloc_onnode(workload_b, cache_node);
results = (base_t*) numa_alloc_onnode(tc_combined * sizeof(base_t), cache_node);
}
else {
data_a = (base_t*) numa_alloc_onnode(workload_b, current_node);
data_b = (base_t*) numa_alloc_onnode(workload_b, current_node);
results = (base_t*) numa_alloc_onnode(tc_combined * sizeof(base_t), current_node);
}
fill_mt<base_t>(data_a, workload_b, 0, 100, 42);
fill_mt<base_t>(data_b, workload_b, 0, 100, 420);
for(uint32_t i = 0; i < 15; i++) {
std::promise<void> p;
std::shared_future<void> ready_future(p.get_future());
Query_Wrapper<base_t, mode == ExecMode::HbmPrefetch> qw (
&ready_future, workload_b, chunk_size,
data_a, data_b, results, tc_filter, tc_copy,
tc_agg,compare_value_a, compare_value_b
);
qw.ready_future = &ready_future;
qw.clear_buffers();
auto filter_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.scan_a(gid, gcnt, tid); };
auto copy_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.scan_b(gid, gcnt, tid); };
auto aggregation_lambda = [&qw](uint32_t gid, uint32_t gcnt, uint32_t tid) { qw.aggr_j(gid, gcnt, tid); };
std::vector<std::thread> filter_pool;
std::vector<std::thread> copy_pool;
std::vector<std::thread> agg_pool;
std::vector<std::thread> combined_pool;
for(uint32_t gid = 0; gid < THREAD_GROUP_MULTIPLIER; ++gid) {
for(uint32_t tid = 0; tid < tc_filter; ++tid) {
filter_pool.emplace_back(filter_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
}
// if tc_copy == 0 this loop is skipped
for(uint32_t tid = 0; tid < tc_copy; ++tid) {
copy_pool.emplace_back(copy_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
}
for(uint32_t tid = 0; tid < tc_agg; ++tid) {
agg_pool.emplace_back(aggregation_lambda, gid, THREAD_GROUP_MULTIPLIER, tid);
}
}
auto start = std::chrono::steady_clock::now();
p.set_value();
for(std::thread& t : filter_pool) { t.join(); }
for(std::thread& t : copy_pool) { t.join(); }
for(std::thread& t : agg_pool) { t.join(); }
Aggregation<base_t, Sum, load_mode::Aligned>::apply(results, results, sizeof(base_t) * tc_agg * THREAD_GROUP_MULTIPLIER);
auto end = std::chrono::steady_clock::now();
constexpr double nanos_per_second = ((double)1000) * 1000 * 1000;
uint64_t nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
double seconds = (double)(nanos) / nanos_per_second;
if (i >= 5) {
print_to_file(out_file, THREAD_GROUP_MULTIPLIER, seconds,
#ifdef THREAD_TIMINGS
qw.trt->summarize_time(0), qw.trt->summarize_time(1), qw.trt->summarize_time(2),
#endif
#ifdef BARRIER_TIMINGS
qw.bt->summarize_time(0), qw.bt->summarize_time(1), qw.bt->summarize_time(2),
#endif
results[0]);
out_file << std::endl;
}
}
numa_free(data_a, workload_b);
numa_free(data_b, workload_b);
numa_free(results, THREAD_GROUP_MULTIPLIER * tc_combined * sizeof(base_t));
}

360
qdp_project/src/benchmark/pipelines/MAX_scan_filter_pipe.h

@ -1,360 +0,0 @@
#include <cassert>
#include <mutex>
#include <cstring>
#include <bitset>
#include <algorithm>
#include <numa.h>
#include "filter.h"
#include "aggregation.h"
#include "vector_loader.h"
#include "timer_utils.h"
#include "barrier_utils.h"
#include "measurement_utils.h"
#include "../../../../offloading-cacher/cache.hpp"
template<typename base_t, bool caching>
class Query_Wrapper {
public:
// sync
std::shared_future<void>* ready_future;
thread_runtime_timing* trt;
barrier_timing* bt;
pcm_value_collector* pvc;
private:
static constexpr size_t COPY_POLICY_MIN_SIZE = 64 * 1024 * 1024;
dsacache::Cache cache_;
// data
size_t size_b;
size_t chunk_size_b;
size_t chunk_size_w;
size_t chunk_cnt;
base_t* data_a;
base_t* data_b;
base_t* dest;
// ratios
uint32_t thread_count_fc;
uint32_t thread_count_fi;
uint32_t thread_count_ag;
// done bits
volatile uint8_t* ready_flag_a;
volatile uint8_t* ready_flag_b;
// buffer
uint16_t* mask_a;
uint16_t* mask_b;
// params
base_t cmp_a;
base_t cmp_b;
// sync
std::unique_ptr<std::vector<std::barrier<barrier_completion_function>*>> sync_barrier;
std::string barrier_mode = BARRIER_MODE;
using filterCopy = Filter<base_t, LT, load_mode::Stream, true>;
using filterNoCopy = Filter<base_t, LT, load_mode::Stream, false>;
using filter = Filter<base_t, LT, load_mode::Stream, false>;
using aggregation = Aggregation<base_t, Sum, load_mode::Stream>;
static int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node;
}
static std::vector<int> CopyMethodPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
if (data_size < COPY_POLICY_MIN_SIZE) {
// if the data size is small then the copy will just be carried
// out by the destination node which does not require setting numa
// thread affinity as the selected dsa engine is already the one
// present on the calling thread
return std::vector<int>{ (numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node) };
}
else {
// for sufficiently large data, smart copy is used which will utilize
// all four engines for intra-socket copy operations and cross copy on
// the source and destination nodes for inter-socket copy
const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0;
if (same_socket) {
const bool socket_number = numa_dst_node >> 2;
if (socket_number == 0) return std::vector<int>{ 0, 1, 2, 3 };
else return std::vector<int>{ 4, 5, 6, 7 };
}
else {
return std::vector<int>{
(numa_src_node >= 8 ? numa_src_node - 8 : numa_src_node),
(numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node)
};
}
}
}
public:
Query_Wrapper(std::shared_future<void>* rdy_fut, size_t workload_b, size_t chunk_size_b, base_t* data_a,
base_t* data_b, base_t* dest, uint32_t tc_fi, uint32_t tc_fc, uint32_t tc_ag,
base_t cmp_a = 50, base_t cmp_b = 42) :
ready_future(rdy_fut), size_b(workload_b), chunk_size_b(chunk_size_b), data_a(data_a), data_b(data_b),
dest(dest), cmp_a(cmp_a), cmp_b(cmp_b) {
const int current_cpu = sched_getcpu();
const int current_node = numa_node_of_cpu(current_cpu);
const int cache_node = CachePlacementPolicy(current_node, current_node, 0);
chunk_size_w = chunk_size_b / sizeof(base_t);
chunk_cnt = size_b / chunk_size_b;
thread_count_fi = tc_fi;
thread_count_fc = tc_fc;
thread_count_ag = tc_ag;
ready_flag_a = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0), cache_node);
ready_flag_b = (volatile uint8_t *) numa_alloc_onnode( chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0), cache_node);
mask_a = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node);
mask_b = (uint16_t *) numa_alloc_onnode(size_b / sizeof(base_t), cache_node);
if constexpr (caching) {
cache_.Init(CachePlacementPolicy, CopyMethodPolicy);
}
size_t measurement_space = std::max(std::max(tc_fi, tc_fc), tc_ag);
trt = new thread_runtime_timing(3, measurement_space, current_node);
bt = new barrier_timing(3, measurement_space, current_node);
pvc = new pcm_value_collector({"scan_a", "scan_b", "aggr_j"}, measurement_space, current_node);
reset_barriers();
};
void reset_barriers(){
if(sync_barrier != nullptr) {
for(auto& barrier : *sync_barrier) {
delete barrier;
}
sync_barrier.reset();
}
uint32_t thread_count_sum = thread_count_ag + thread_count_fi + thread_count_fc;
sync_barrier = std::make_unique<std::vector<std::barrier<barrier_completion_function>*>>(thread_count_sum);
uint32_t barrier_count = barrier_mode == "global" ? 1 : thread_count_sum;
for(uint32_t i = 0; i < barrier_count; ++i) {
(*sync_barrier)[i] = new std::barrier<barrier_completion_function>(thread_count_sum);
}
}
void clear_buffers () {
std::memset((void*)ready_flag_a, 0x00, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0));
std::memset((void*)ready_flag_b, 0x00, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0));
std::memset(mask_a, 0x00, size_b / sizeof(base_t));
std::memset(mask_b, 0x00, size_b / sizeof(base_t));
if constexpr (caching) {
cache_.Clear();
}
trt->reset_accumulator();
bt->reset_accumulator();
pvc->reset();
reset_barriers();
};
~Query_Wrapper() {
numa_free((void*)ready_flag_a, chunk_cnt * thread_count_fi / 8 + ((chunk_cnt * thread_count_fi % 8) != 0));
numa_free((void*)ready_flag_b, chunk_cnt * thread_count_fc / 8 + ((chunk_cnt * thread_count_fc % 8) != 0));
numa_free(mask_a, size_b / sizeof(base_t));
numa_free(mask_b, size_b / sizeof(base_t));
delete trt;
for(auto& barrier : *sync_barrier) {
delete barrier;
}
delete bt;
delete pvc;
};
//this can be set without need to change allocations
void set_thread_group_count(uint32_t value) {
this->thread_group = value;
};
private:
static inline base_t* get_sub_chunk_ptr(base_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) {
base_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w;
return chunk_ptr + tid * (chunk_size_w / tcnt);
}
static inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) {
// 16 integer are addressed with one uint16_t in mask buffer
size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt);
return base_ptr + (offset / 16);
}
static bool bit_at(volatile uint8_t* bitmap, uint32_t bitpos) {
uint8_t value = bitmap[bitpos / 8];
switch(bitpos % 8) {
case 0: return value & 0b00000001;
case 1: return value & 0b00000010;
case 2: return value & 0b00000100;
case 3: return value & 0b00001000;
case 4: return value & 0b00010000;
case 5: return value & 0b00100000;
case 6: return value & 0b01000000;
case 7: return value & 0b10000000;
default: return false;
}
}
static void set_bit_at(volatile uint8_t* bitmap, std::mutex& mutex, uint32_t bitpos) {
mutex.lock();
switch(bitpos % 8) {
case 0: bitmap[bitpos / 8] |= 0b00000001;break;
case 1: bitmap[bitpos / 8] |= 0b00000010;break;
case 2: bitmap[bitpos / 8] |= 0b00000100;break;
case 3: bitmap[bitpos / 8] |= 0b00001000;break;
case 4: bitmap[bitpos / 8] |= 0b00010000;break;
case 5: bitmap[bitpos / 8] |= 0b00100000;break;
case 6: bitmap[bitpos / 8] |= 0b01000000;break;
case 7: bitmap[bitpos / 8] |= 0b10000000;break;
}
mutex.unlock();
}
public:
void scan_b(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_ag;
assert(chunk_size_w % tcnt == 0);
assert(chunk_size_w % 16 == 0);
assert(chunk_size_w % tcnt * 16 == 0);
// wait till everyone can start
ready_future->wait();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
std::unique_ptr<dsacache::CacheData> data;
for(uint32_t i = 0; i < runs; ++i) {
trt->start_timer(1, tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
if constexpr (caching) {
data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
}
trt->stop_timer(1, tid * gcnt + gid);
}
if constexpr (caching) {
data->WaitOnCompletion();
}
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
void scan_a(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_fi;
assert(chunk_size_w % tcnt == 0);
assert(chunk_size_w % 16 == 0);
assert(chunk_size_w % tcnt * 16 == 0);
// wait till everyone can start
ready_future->wait();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) {
trt->start_timer(0, tid * gcnt + gid);
pvc->start("scan_a", tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_a, chunk_id, chunk_size_w, tid, tcnt);
uint16_t* mask_ptr = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
filter::apply_same(mask_ptr, nullptr, chunk_ptr, cmp_a, chunk_size_b / tcnt);
pvc->stop("scan_a", tid * gcnt + gid);
trt->stop_timer(0, tid * gcnt + gid);
bt->timed_wait(*(*sync_barrier)[barrier_idx], 0, tid * gcnt + gid);
}
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
}
void aggr_j(size_t gid, size_t gcnt, size_t tid) {
size_t tcnt = thread_count_ag;
// wait till everyone can start
ready_future->wait();
// calculate values
__m512i aggregator = aggregation::OP::zero();
// the lower gids run once more if the chunks are not evenly distributable
uint32_t runs = chunk_cnt / gcnt + (chunk_cnt % gcnt > gid);
uint32_t barrier_idx = barrier_mode.compare("global") == 0 ? 0 : gid;
for(uint32_t i = 0; i < runs; ++i) {
bt->timed_wait(*(*sync_barrier)[barrier_idx], 2, tid * gcnt + gid);
trt->start_timer(2, tid * gcnt + gid);
pvc->start("aggr_j", tid * gcnt + gid);
// calculate pointers
size_t chunk_id = gid + gcnt * i;
base_t* chunk_ptr = get_sub_chunk_ptr(data_b, chunk_id, chunk_size_w, tid, tcnt);
std::unique_ptr<dsacache::CacheData> data;
base_t* data_ptr;
if constexpr (caching) {
// access the cache for the given chunk which will have been accessed in scan_b
data = cache_.Access(reinterpret_cast<uint8_t *>(chunk_ptr), chunk_size_b / tcnt);
// after the copy task has finished we obtain the pointer to the cached
// copy of data_b which is then used from now on
data_ptr = reinterpret_cast<base_t*>(data->GetDataLocation());
// nullptr is still a legal return value for CacheData::GetLocation()
// even after waiting, so this must be checked
if (data_ptr == nullptr) {
std::cerr << "[x] Cache Miss!" << std::endl;
exit(-1);
}
}
else {
data_ptr = chunk_ptr;
}
uint16_t* mask_ptr_a = get_sub_mask_ptr (mask_a, chunk_id, chunk_size_w, tid, tcnt);
base_t tmp = _mm512_reduce_add_epi64(aggregator);
aggregator = aggregation::apply_masked(aggregator, data_ptr, mask_ptr_a, chunk_size_b / tcnt);
pvc->stop("aggr_j", tid * gcnt + gid);
trt->stop_timer(2, tid * gcnt + gid);
}
// so threads with more runs dont wait for alerady finished threads
(*(*sync_barrier)[barrier_idx]).arrive_and_drop();
aggregation::happly(dest + (tid * gcnt + gid), aggregator);
}
};

50
qdp_project/src/utils/BenchmarkHelpers.cpp

@ -0,0 +1,50 @@
#include <vector>
int CachePlacementPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
return numa_dst_node < 8 ? numa_dst_node + 8 : numa_dst_node;
}
std::vector<int> CopyMethodPolicy(const int numa_dst_node, const int numa_src_node, const size_t data_size) {
// for sufficiently large data, smart copy is used which will utilize
// all four engines for intra-socket copy operations and cross copy on
// the source and destination nodes for inter-socket copy
const bool same_socket = ((numa_dst_node ^ numa_src_node) & 4) == 0;
if (same_socket) {
const bool socket_number = numa_dst_node >> 2;
if (socket_number == 0) return std::vector<int>{ 0, 1, 2, 3 };
else return std::vector<int>{ 4, 5, 6, 7 };
}
else {
return std::vector<int>{
(numa_src_node >= 8 ? numa_src_node - 8 : numa_src_node),
(numa_dst_node >= 8 ? numa_dst_node - 8 : numa_dst_node)
};
}
}
struct NopStruct {
inline void operator() () {
return;
}
};
inline uint64_t* get_sub_chunk_ptr(uint64_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) {
uint64_t* chunk_ptr = base_ptr + chunk_id * chunk_size_w;
return chunk_ptr + tid * (chunk_size_w / tcnt);
}
inline uint16_t* get_sub_mask_ptr(uint16_t* base_ptr, size_t chunk_id, size_t chunk_size_w, size_t tid, size_t tcnt) {
// 16 integer are addressed with one uint16_t in mask buffer
size_t offset = chunk_id * chunk_size_w + tid * (chunk_size_w / tcnt);
return base_ptr + (offset / 16);
}
uint64_t sum_check(uint64_t compare_value, uint64_t* row_A, uint64_t* row_B, size_t row_size) {
uint64_t sum = 0;
for(int i = 0; i < row_size / sizeof(uint64_t); ++i) {
sum += (row_A[i] < compare_value) * row_B[i];
}
return sum;
}

0
qdp_project/src/algorithm/operators/aggregation.h → qdp_project/src/utils/aggregation.h

73
qdp_project/src/utils/barrier_utils.h

@ -1,73 +0,0 @@
#pragma once
#include <cstdint>
#include <numa.h>
#include <barrier>
#include <chrono>
#define BARRIER_TIMINGS 1
struct barrier_completion_function {
inline void operator() () {
return;
}
};
struct barrier_timing {
uint32_t time_points, time_threads;
double** time_accumulator;
barrier_timing(uint32_t timing_points, uint32_t timing_threads, uint32_t memory_node) {
#ifdef BARRIER_TIMINGS
time_points = timing_points;
time_threads = timing_threads;
time_accumulator = (double**) numa_alloc_onnode(timing_points * sizeof(double*), memory_node);
for(uint32_t i = 0; i < timing_points; ++i) {
time_accumulator[i] = (double*) numa_alloc_onnode(timing_threads * sizeof(double), memory_node);
}
#endif
}
~barrier_timing() {
#ifdef BARRIER_TIMINGS
for(uint32_t i = 0; i < time_points; ++i) {
numa_free(time_accumulator[i], time_threads * sizeof(double));
}
numa_free(time_accumulator, time_points * sizeof(double*));
#endif
}
void reset_accumulator() {
#ifdef BARRIER_TIMINGS
for(uint32_t i = 0; i < time_points; ++i){
for(uint32_t j = 0; j < time_threads; ++j){
time_accumulator[i][j] = 0.0;
}}
#endif
}
double summarize_time(uint32_t time_point) {
#ifdef BARRIER_TIMINGS
double sum = 0.0;
for(uint32_t i = 0; i < time_threads; ++i) {
sum += time_accumulator[time_point][i];
}
return sum;
#endif
}
void timed_wait(std::barrier<struct barrier_completion_function>& barrier, uint32_t point_id, uint32_t thread_id) {
#ifdef BARRIER_TIMINGS
auto before_barrier = std::chrono::steady_clock::now();
#endif
barrier.arrive_and_wait();
#ifdef BARRIER_TIMINGS
auto after_barrier = std::chrono::steady_clock::now();
uint64_t barrier_wait_time = std::chrono::duration_cast<std::chrono::nanoseconds>(after_barrier - before_barrier).count();
double seconds = barrier_wait_time / (1000.0 * 1000.0 * 1000.0);
time_accumulator[point_id][thread_id] += seconds;
#endif
}
};

82
qdp_project/src/utils/cpu_set_utils.h

@ -1,82 +0,0 @@
#pragma once
#include <cstdint>
#include <thread>
#include <cassert>
#include <iostream>
#include <vector>
#include <utility>
/** Sets all bits in a given cpu_set_t between L and H (condition L <= H)*/
#define CPU_BETWEEN(L, H, SET) assert(L <= H); for(; L < H; ++L) {CPU_SET(L, SET);}
/**
* Applies the affinity defined in set to the thread, through pthread library
* calls. If it fails it wites the problem to stderr and terminated the program.
*/
inline void pin_thread(std::thread& thread, cpu_set_t* set) {
int error_code = pthread_setaffinity_np(thread.native_handle(), sizeof(cpu_set_t), set);
if (error_code != 0) {
std::cerr << "Error calling pthread_setaffinity_np in copy_pool assignment: " << error_code << std::endl;
exit(-1);
}
}
/**
* Returns the cpu id of the thread_id-th cpu in a given (multi)range. Thread_id
* greater than the number of cpus in the (multi)range are valid. In this case
* the (thread_id % #cpus in the range)-th cpu in the range is returned.
*/
int get_cpu_id(int thread_id, const std::vector<std::pair<int, int>>& range) {
int subrange_size = range[0].second - range[0].first;
int i = 0;
while(subrange_size <= thread_id) {
thread_id -= subrange_size;
i = (i + 1) % range.size();
subrange_size = range[i].second - range[i].first;
}
return thread_id + range[i].first;
}
/*inline void cpu_set_between(cpu_set_t* set, uint32_t low, uint32_t high) {
assert(low != high);
if (low > high) std::swap(low, high);
for(; low < high; ++low) {
CPU_SET(low, set);
}
}*/
/**
* Pins the given thread to the thread_id-th cpu in the given range.
*/
void pin_thread_in_range(std::thread& thread, int thread_id, std::vector<std::pair<int, int>>& range) {
cpu_set_t set;
CPU_ZERO(&set);
CPU_SET(get_cpu_id(thread_id, range), &set);
pin_thread(thread, &set);
}
/**
* Pins the given thread to all cpus in the given range.
*/
void pin_thread_in_range(std::thread& thread, std::vector<std::pair<int, int>>& range) {
cpu_set_t set;
CPU_ZERO(&set);
for(auto r : range) { CPU_BETWEEN(r.first, r.second, &set); }
pin_thread(thread, &set);
}
/**
* Pins the given thread to all cpu ids between low (incl.) and high (excl.).
*/
inline void pin_thread_between(std::thread& thread, uint32_t low, uint32_t high) {
cpu_set_t set;
CPU_ZERO(&set);
CPU_BETWEEN(low, high, &set);
pin_thread(thread, &set);
}

76
qdp_project/src/utils/file_output.h

@ -1,76 +0,0 @@
/**
* @file file_output.h
* @author André Berthold
* @brief Implements a template-function that accepts an arbitrary number of parameters that should be printed
* @version 0.1
* @date 2023-05-25
*
* @copyright Copyright (c) 2023
*
*/
#pragma once
#include <fstream>
#include <string>
#include <type_traits>
#include "iterable_range.h"
template<class T>
inline constexpr bool is_numeric_v = std::disjunction<
std::is_integral<T>,
std::is_floating_point<T>>::value;
/**
* @brief Converts a parameter to a string by either using it directly or its member current (if it is of type Labeled)
* as parameter to the std::string-Constructor.
*
* @tparam T Type of the parameter
* @param value Parameter to be converted
* @return std::string The converted parameter
*/
template<typename T>
inline std::string to_string(T value) {
if constexpr(std::is_base_of<Labeled, T>::value){
// integrals cannot be use in the string constructor and must be translated by the std::to_string-function
if constexpr (is_numeric_v<decltype(value.current)>) {
return std::to_string(value.current);
} else {
return std::string(value.current);
}
} else {
// integrals cannot be use in the string constructor and must be translated by the std::to_string-function
if constexpr (is_numeric_v<decltype(value)>) {
return std::to_string(value);
} else {
return std::string(value);
}
}
}
/**
* @brief This function wites the content of *val* to *file*. Terminates terecursive function definition.
*
* @tparam type Type of the paramter *val* (is usually implicitly defeined)
* @param file File that is written to
* @param val Value that is translated to a char stream and written to the file
*/
template<typename type>
inline void print_to_file(std::ofstream &file, type val) {
file << to_string(val) << std::endl;
}
/**
* @brief This function wites the content of *val* and that content if *vals* to *file*.
*
* @tparam type Type of the paramter *val* (is usually implicitly defeined)
* @tparam types Parameter pack that describes the types of *vals*
* @param file File that is written to
* @param val Value that is translated to a char stream and written to the file
* @param vals Paramater pack of values that are gonna be printed to the file
*/
template<typename type, typename... types>
inline void print_to_file(std::ofstream &file, type val, types ... vals) {
file << to_string(val) << ",";
print_to_file(file, vals...);
}

0
qdp_project/src/algorithm/operators/filter.h → qdp_project/src/utils/filter.h

208
qdp_project/src/utils/iterable_range.h

@ -1,208 +0,0 @@
#pragma once
#include <cstdint>
#include <type_traits>
#include <string>
constexpr auto NO_NEXT = "false";
/**
* @brief Class that adds an label member-parameter to a sub-class
*
*/
class Labeled {
public:
std::string label;
public:
Labeled(std::string str) : label(str) {};
Labeled(const char* str) { this->label = std::string(str); };
};
/**
* @brief Converts a parameter to a string by either reading the member label (if it is of type Labeled) or using it
* as parameter to the std::string-Constructor.
*
* @tparam T Type of the parameter
* @param value Parameter to be converted
* @return std::string The converted parameter
*/
template<typename T>
inline std::string generateHead(T value) {
if constexpr(std::is_base_of<Labeled, T>::value){
return value.label;
} else {
return std::string(value);
}
}
/**
* @brief Converts a parameter-pack to a string calling genarateHead(T) on every parameter and concatenatin the results.
*
* @tparam T Type of the first parameter
* @tparam Ts Parameter pack specifying the preceeding parameters' types
* @param value Parameter to be transformed
* @param values Parameter-pack of the next prameters to be transformed
* @return std::string Comma-separated concatenation of all parameters string representation
*/
template<typename T, typename... Ts>
inline std::string generateHead(T value, Ts... values) {
return generateHead(value) + ',' + generateHead(values...);
}
/**
* @brief Takes a single Range object and calls its next function.
*
* @tparam T Specific type of the Range object
* @param t Instance of the Range object
* @return std::string Label of the Range object or "false" if the Range reaced its end and was reset
*/
template<typename T>
std::string IterateOnce(T& t) {
if(t.next()) return t.label;
else t.reset();
return std::string(NO_NEXT); //the string signalises that the iteration has to be terminiated.
}
/**
* @brief Takes a number of Range objects and recusively increments them till the first Range does not reach its end
* upon incrementing. It tarts at the first Range object given. Every Range object that reached its end is reset to
* its start value.
*
* @tparam T Specific type of the first Range object
* @tparam Ts Types to the following Range objects
* @param t First instance of the Range object
* @param ts Parameter pack of the following Range objects
* @return std::string Label of the highest index Range object that was altered, or "false" if the last Range object
* reache its end and was reset
*/
template<typename T, typename... Ts>
std::string IterateOnce(T& t , Ts&... ts) {
if(t.next()) return t.label;
else t.reset();
return IterateOnce<Ts...>(ts...);
}
/**
* @brief Class that provides a convenient interface for iteratin throug a parameter range. It stores a public value
* that can be altered by the classes' methods.
*
* @tparam T Base type of the parameter
* @tparam INIT Initial value of the current pointer
* @tparam PRED Struct providing an apply function testing if the current value is in range or not
* @tparam INC Struct providing an apply function setting the current value to the value following the current value
*/
template<typename T, T INIT, typename PRED, typename INC>
class Range : public Labeled {
public:
/**
* @brief Current value of the parameter
*/
T current = INIT;
/**
* @brief Resets current to its initial value
*/
void reset() {current = INIT; };
/**
* @brief Sets current to its next value (according to INC::inc) and returns if the range Reached its end
* (accordingt to PRED::pred).
*
* @return true The newly assigned value of current is in the range
* @return false Otherwise
*/
bool next() {
current = INC::inc(current);
return PRED::pred(current);
};
/**
* @brief Checks if current is in the Range (according to PRED).
*
* @return true PRED returns true
* @return false Otherwise
*/
bool valid() { return PRED::apply(current); };
};
/**
* @brief Class that is in contrast to Range specialized for integral values.
*
* @tparam T Integral base type of the Range
* @tparam INIT Initial value of the parameter
* @tparam MAX Maximal value of the parameter
* @tparam INC Struct providing an apply function setting the current value to the value following the current value
*/
template<typename T, T INIT, T MAX, typename INC>
class Int_Range : public Labeled {
static_assert(std::is_integral<T>::value, "Int_Range requires an integral base type");
public:
const T max = MAX;
T current = INIT;
void reset() {current = INIT; };
bool next() {
current = INC::inc(current);
return current < MAX;
};
bool valid() { return current < MAX; };
};
/**
* @brief Class that is in contrast to Int_Range specialized for integrals that grow linearly.
*
* @tparam T Integral base type of the Range
* @tparam INIT Initial value of the parameter
* @tparam MAX Maximal value of the parameter
* @tparam STEP Increase of the value per next()-call
*/
template<typename T, T INIT, T MAX, T STEP = 1>
class Linear_Int_Range : public Labeled {
static_assert(std::is_integral<T>::value, "Linear_Int_Range requires an integral base type");
public:
const T max = MAX;
T current = INIT;
void reset() {current = INIT; };
bool next() {
current += STEP;
return current < MAX;
};
bool valid() { return current < MAX; };
};
/**
* @brief Class that is in contrast to Int_Range specialized for integrals that grow exponetially.
*
* @tparam T Integral base type of the Range
* @tparam INIT Initial value of the parameter
* @tparam MAX Maximal value of the parameter
* @tparam FACTOR Multiplicative Increase of the value per next()-call
*/
template<typename T, T INIT, T MAX, T FACTOR = 2>
class Exp_Int_Range : public Labeled {
static_assert(std::is_integral<T>::value, "Exp_Int_Range requires an integral base type");
public:
const T max = MAX;
T current = INIT;
void reset() {current = INIT; };
bool next() {
current *= FACTOR;
return current < MAX;
};
bool valid() { return current < MAX; };
};

152
qdp_project/src/utils/measurement_utils.h

@ -1,152 +0,0 @@
#pragma once
#include <cstdint>
#include <chrono>
#include <vector>
#include <string>
#include <algorithm>
#include <numa.h>
#if PCM_M == 1
#define PCM_MEASURE 1
#include "pcm.h"
#endif
struct pcm_value_collector {
const uint32_t value_count = 6;
uint32_t threads;
std::vector<std::string> points;
#ifdef PCM_MEASURE
pcm::SystemCounterState** states;
#endif
uint64_t** collection;
pcm_value_collector(const std::vector<std::string>& in_points, uint32_t threads, uint32_t memory_node) : threads(threads) {
#ifdef PCM_MEASURE
points = std::vector(in_points);
collection = (uint64_t**) numa_alloc_onnode(threads * sizeof(uint64_t*), memory_node);
states = (pcm::SystemCounterState**) numa_alloc_onnode(threads * sizeof(pcm::SystemCounterState*), memory_node);
for(int i = 0; i < threads; ++i) {
collection[i] = (uint64_t*) numa_alloc_onnode(points.size() * value_count * sizeof(uint64_t), memory_node);
states[i] = (pcm::SystemCounterState*) numa_alloc_onnode(points.size() * sizeof(pcm::SystemCounterState), memory_node);
}
#endif
}
~pcm_value_collector() {
#ifdef PCM_MEASURE
for(int i = 0; i < threads; ++i) {
numa_free(collection[threads], points.size() * value_count * sizeof(uint64_t));
}
numa_free(collection, threads * sizeof(uint64_t*));
numa_free(states, threads * sizeof(pcm::SystemCounterState));
#endif
}
void reset() {
#ifdef PCM_MEASURE
for(int i = 0; i < threads; ++i)
for(uint32_t j = 0; j < points.size() * value_count; ++j){
collection[i][j] = 0;
}
#endif
}
int64_t point_index(const std::string& value) {
auto it = std::find(points.begin(), points.end(), value);
if(it == points.end()) return -1;
else return it - points.begin();
}
std::vector<uint64_t> summarize(const std::string &point) {
#ifdef PCM_MEASURE
std::vector<uint64_t> sums(value_count);
int64_t idx = point_index(point);
if(idx < 0) return sums;
for(uint32_t v = 0; v < value_count; ++v) {
for(uint32_t i = 0; i < threads; ++i) {
sums[v] += collection[i][static_cast<uint32_t>(idx) + points.size() * v];
}
}
return sums;
#endif
return std::vector<uint64_t> {0};
}
std::string summarize_as_string(const std::string &point) {
#ifdef PCM_MEASURE
auto summary = summarize(point);
auto it = summary.begin();
auto end = summary.end();
if(it >= end) return "";
std::string result("");
result += std::to_string(*it);
++it;
while(it < end) {
result += ",";
result += std::to_string(*it);
++it;
}
return result;
#endif
return "";
}
void start(const std::string& point, uint32_t thread) {
#ifdef PCM_MEASURE
int64_t idx = point_index(point);
if(idx < 0) {
std::cerr << "Invalid 'point' given. Ignored!" << std::endl;
return;
}
states[thread][static_cast<uint32_t>(idx)] = pcm::getSystemCounterState();
#endif
}
static std::string getHead(const std::string& point) {
return point + "_l2h," +
point + "_l2m," +
point + "_l3h," +
point + "_l3hns," +
point + "_l3m," +
point + "_mc";
}
#ifdef PCM_MEASURE
void read_values(uint32_t point_idx, uint32_t thread, pcm::SystemCounterState& start, pcm::SystemCounterState& end) {
collection[thread][point_idx + points.size() * 0] += getL2CacheHits(start, end);
collection[thread][point_idx + points.size() * 1] += getL2CacheMisses(start, end);
collection[thread][point_idx + points.size() * 2] += getL3CacheHits(start, end);
collection[thread][point_idx + points.size() * 3] += getL3CacheHitsNoSnoop(start, end);
collection[thread][point_idx + points.size() * 4] += getL3CacheMisses(start, end);
collection[thread][point_idx + points.size() * 5] += getBytesReadFromMC(start, end);
}
#endif
void stop(const std::string& point, uint32_t thread) {
#ifdef PCM_MEASURE
auto state = pcm::getSystemCounterState();
int64_t idx = point_index(point);
if(idx < 0) {
std::cerr << "Invalid 'point' given. Ignored!" << std::endl;
return;
}
auto start = states[thread][static_cast<uint32_t>(idx)];
read_values(static_cast<uint32_t>(idx), thread, start, state);
#endif
}
};

6
qdp_project/src/utils/pcm.h

@ -1,6 +0,0 @@
#pragma once
//this file includes all important header from the pcm repository
#include "cpucounters.h"
#include "msr.h"
#include "pci.h"
#include "mutex.h"

80
qdp_project/src/utils/timer_utils.h

@ -1,80 +0,0 @@
#pragma once
#include <cstdint>
#include <chrono>
#include <barrier>
#include <numa.h>
#define THREAD_TIMINGS 1
struct thread_runtime_timing {
using time_point_t = std::chrono::time_point<std::chrono::steady_clock>;
uint32_t time_points, time_threads;
time_point_t** start_times;
double** time_accumulator;
thread_runtime_timing(uint32_t timing_points, uint32_t timing_threads, uint32_t memory_node) {
#ifdef THREAD_TIMINGS
time_points = timing_points;
time_threads = timing_threads;
start_times = (time_point_t**) numa_alloc_onnode(timing_points * sizeof(time_point_t*), memory_node);
time_accumulator = (double**) numa_alloc_onnode(timing_points * sizeof(double*), memory_node);
for(uint32_t i = 0; i < timing_points; ++i) {
start_times[i] = (time_point_t*) numa_alloc_onnode(timing_threads * sizeof(time_point_t), memory_node);
time_accumulator[i] = (double*) numa_alloc_onnode(timing_threads * sizeof(double), memory_node);
}
#endif
}
~thread_runtime_timing() {
#ifdef THREAD_TIMINGS
for(uint32_t i = 0; i < time_points; ++i) {
numa_free(start_times[i], time_threads * sizeof(time_point_t));
numa_free(time_accumulator[i], time_threads * sizeof(double));
}
numa_free(start_times, time_points * sizeof(time_point_t*));
numa_free(time_accumulator, time_points * sizeof(double*));
#endif
}
void reset_accumulator() {
#ifdef THREAD_TIMINGS
for(uint32_t i = 0; i < time_points; ++i){
for(uint32_t j = 0; j < time_threads; ++j){
time_accumulator[i][j] = 0.0;
}}
#endif
}
double summarize_time(uint32_t time_point) {
#ifdef THREAD_TIMINGS
double sum = 0.0;
for(uint32_t i = 0; i < time_threads; ++i) {
sum += time_accumulator[time_point][i];
}
return sum;
#endif
}
void stop_timer(uint32_t point_id, uint32_t thread_id) {
#ifdef THREAD_TIMINGS
auto end_time = std::chrono::steady_clock::now();
auto start_time = start_times[point_id][thread_id];
uint64_t time = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count();
double seconds = time / (1000.0 * 1000.0 * 1000.0);
time_accumulator[point_id][thread_id] += seconds;
#endif
}
void start_timer(uint32_t point_id, uint32_t thread_id) {
#ifdef THREAD_TIMINGS
start_times[point_id][thread_id] = std::chrono::steady_clock::now();
#endif
}
};
Loading…
Cancel
Save